summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoram Wilander <jwawilander@gmail.com>2017-04-01 11:39:13 -0400
committerGitHub <noreply@github.com>2017-04-01 11:39:13 -0400
commit95da05a8c97332d8eff90c7587ed17a41966c5f0 (patch)
treed50084481487988ad83deb5ab6af2be7d2a9f110
parentd39947f53933ee4beb4ed8ab614324edc36fba2d (diff)
downloadchat-95da05a8c97332d8eff90c7587ed17a41966c5f0.tar.gz
chat-95da05a8c97332d8eff90c7587ed17a41966c5f0.tar.bz2
chat-95da05a8c97332d8eff90c7587ed17a41966c5f0.zip
PLT-5750 Add sequence number to websocket connections and events (#5907)
* Add sequence number to websocket connections and events * Copy pointer instead of pass by value and use int64 over uint64 * Add more logging to missed events
-rw-r--r--app/web_conn.go15
-rw-r--r--app/web_hub.go2
-rw-r--r--app/websocket_router.go2
-rw-r--r--model/websocket_message.go45
-rw-r--r--webapp/actions/websocket_actions.jsx7
-rw-r--r--webapp/client/websocket_client.jsx13
-rw-r--r--wsapi/websocket_handler.go3
7 files changed, 41 insertions, 46 deletions
diff --git a/app/web_conn.go b/app/web_conn.go
index da6330f5c..11290b67d 100644
--- a/app/web_conn.go
+++ b/app/web_conn.go
@@ -35,6 +35,7 @@ type WebConn struct {
Locale string
AllChannelMembers map[string]string
LastAllChannelMembersTime int64
+ Sequence int64
}
func NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn {
@@ -104,8 +105,19 @@ func (c *WebConn) WritePump() {
return
}
+ var msgBytes []byte
+ if evt, ok := msg.(*model.WebSocketEvent); ok {
+ cpyEvt := &model.WebSocketEvent{}
+ *cpyEvt = *evt
+ cpyEvt.Sequence = c.Sequence
+ msgBytes = []byte(cpyEvt.ToJson())
+ c.Sequence++
+ } else {
+ msgBytes = []byte(msg.ToJson())
+ }
+
c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
- if err := c.WebSocket.WriteMessage(websocket.TextMessage, msg.GetPreComputeJson()); err != nil {
+ if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
// browsers will appear as CloseNoStatusReceived
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId))
@@ -179,7 +191,6 @@ func (webCon *WebConn) IsAuthenticated() bool {
func (webCon *WebConn) SendHello() {
msg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_HELLO, "", "", webCon.UserId, nil)
msg.Add("server_version", fmt.Sprintf("%v.%v.%v.%v", model.CurrentVersion, model.BuildNumber, utils.CfgHash, utils.IsLicensed))
- msg.DoPreComputeJson()
webCon.Send <- msg
}
diff --git a/app/web_hub.go b/app/web_hub.go
index 65d18481f..a0663459a 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -94,7 +94,6 @@ func Publish(message *model.WebSocketEvent) {
metrics.IncrementWebsocketEvent(message.Event)
}
- message.DoPreComputeJson()
for _, hub := range hubs {
hub.Broadcast(message)
}
@@ -105,7 +104,6 @@ func Publish(message *model.WebSocketEvent) {
}
func PublishSkipClusterSend(message *model.WebSocketEvent) {
- message.DoPreComputeJson()
for _, hub := range hubs {
hub.Broadcast(message)
}
diff --git a/app/websocket_router.go b/app/websocket_router.go
index 30714a447..4569134b0 100644
--- a/app/websocket_router.go
+++ b/app/websocket_router.go
@@ -61,7 +61,6 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque
HubRegister(conn)
resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, nil)
- resp.DoPreComputeJson()
conn.Send <- resp
}
@@ -91,7 +90,6 @@ func ReturnWebSocketError(conn *WebConn, r *model.WebSocketRequest, err *model.A
err.DetailedError = ""
errorResp := model.NewWebSocketError(r.Seq, err)
- errorResp.DoPreComputeJson()
conn.Send <- errorResp
}
diff --git a/model/websocket_message.go b/model/websocket_message.go
index 23820470b..6c47eb948 100644
--- a/model/websocket_message.go
+++ b/model/websocket_message.go
@@ -36,8 +36,6 @@ const (
type WebSocketMessage interface {
ToJson() string
IsValid() bool
- DoPreComputeJson()
- GetPreComputeJson() []byte
EventType() string
}
@@ -49,10 +47,10 @@ type WebsocketBroadcast struct {
}
type WebSocketEvent struct {
- Event string `json:"event"`
- Data map[string]interface{} `json:"data"`
- Broadcast *WebsocketBroadcast `json:"broadcast"`
- PreComputeJson []byte `json:"-"`
+ Event string `json:"event"`
+ Data map[string]interface{} `json:"data"`
+ Broadcast *WebsocketBroadcast `json:"broadcast"`
+ Sequence int64 `json:"seq"`
}
func (m *WebSocketEvent) Add(key string, value interface{}) {
@@ -72,19 +70,6 @@ func (o *WebSocketEvent) EventType() string {
return o.Event
}
-func (o *WebSocketEvent) DoPreComputeJson() {
- b, err := json.Marshal(o)
- if err != nil {
- o.PreComputeJson = []byte("")
- } else {
- o.PreComputeJson = b
- }
-}
-
-func (o *WebSocketEvent) GetPreComputeJson() []byte {
- return o.PreComputeJson
-}
-
func (o *WebSocketEvent) ToJson() string {
b, err := json.Marshal(o)
if err != nil {
@@ -106,11 +91,10 @@ func WebSocketEventFromJson(data io.Reader) *WebSocketEvent {
}
type WebSocketResponse struct {
- Status string `json:"status"`
- SeqReply int64 `json:"seq_reply,omitempty"`
- Data map[string]interface{} `json:"data,omitempty"`
- Error *AppError `json:"error,omitempty"`
- PreComputeJson []byte `json:"-"`
+ Status string `json:"status"`
+ SeqReply int64 `json:"seq_reply,omitempty"`
+ Data map[string]interface{} `json:"data,omitempty"`
+ Error *AppError `json:"error,omitempty"`
}
func (m *WebSocketResponse) Add(key string, value interface{}) {
@@ -142,19 +126,6 @@ func (o *WebSocketResponse) ToJson() string {
}
}
-func (o *WebSocketResponse) DoPreComputeJson() {
- b, err := json.Marshal(o)
- if err != nil {
- o.PreComputeJson = []byte("")
- } else {
- o.PreComputeJson = b
- }
-}
-
-func (o *WebSocketResponse) GetPreComputeJson() []byte {
- return o.PreComputeJson
-}
-
func WebSocketResponseFromJson(data io.Reader) *WebSocketResponse {
decoder := json.NewDecoder(data)
var o WebSocketResponse
diff --git a/webapp/actions/websocket_actions.jsx b/webapp/actions/websocket_actions.jsx
index e36d11fde..e07e3e217 100644
--- a/webapp/actions/websocket_actions.jsx
+++ b/webapp/actions/websocket_actions.jsx
@@ -61,6 +61,13 @@ export function initialize() {
WebSocketClient.setEventCallback(handleEvent);
WebSocketClient.setFirstConnectCallback(handleFirstConnect);
+ WebSocketClient.setReconnectCallback(() => reconnect(false));
+ WebSocketClient.setMissedEventCallback(() => {
+ if (global.window.mm_config.EnableDeveloper === 'true') {
+ Client.logClientError('missed websocket event seq=' + WebSocketClient.eventSequence);
+ }
+ reconnect(false);
+ });
WebSocketClient.setCloseCallback(handleClose);
WebSocketClient.initialize(connUrl);
}
diff --git a/webapp/client/websocket_client.jsx b/webapp/client/websocket_client.jsx
index 35be5c3df..1cf97b788 100644
--- a/webapp/client/websocket_client.jsx
+++ b/webapp/client/websocket_client.jsx
@@ -10,11 +10,13 @@ export default class WebSocketClient {
this.conn = null;
this.connectionUrl = null;
this.sequence = 1;
+ this.eventSequence = 0;
this.connectFailCount = 0;
this.eventCallback = null;
this.responseCallbacks = {};
this.firstConnectCallback = null;
this.reconnectCallback = null;
+ this.missedEventCallback = null;
this.errorCallback = null;
this.closeCallback = null;
}
@@ -37,6 +39,8 @@ export default class WebSocketClient {
this.connectionUrl = connectionUrl;
this.conn.onopen = () => {
+ this.eventSequence = 0;
+
if (token) {
this.sendMessage('authentication_challenge', {token});
}
@@ -108,6 +112,11 @@ export default class WebSocketClient {
Reflect.deleteProperty(this.responseCallbacks, msg.seq_reply);
}
} else if (this.eventCallback) {
+ if (msg.seq !== this.eventSequence && this.missedEventCallback) {
+ console.log('missed websocket event, act_seq=' + msg.seq + ' exp_seq=' + this.eventSequence); //eslint-disable-line no-console
+ this.missedEventCallback();
+ }
+ this.eventSequence = msg.seq + 1;
this.eventCallback(msg);
}
};
@@ -125,6 +134,10 @@ export default class WebSocketClient {
this.reconnectCallback = callback;
}
+ setMissedEventCallback(callback) {
+ this.missedEventCallback = callback;
+ }
+
setErrorCallback(callback) {
this.errorCallback = callback;
}
diff --git a/wsapi/websocket_handler.go b/wsapi/websocket_handler.go
index 193539242..8d78ece04 100644
--- a/wsapi/websocket_handler.go
+++ b/wsapi/websocket_handler.go
@@ -27,7 +27,6 @@ func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketR
l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, conn.UserId, sessionErr.SystemMessage(utils.T), sessionErr.Error())
sessionErr.DetailedError = ""
errResp := model.NewWebSocketError(r.Seq, sessionErr)
- errResp.DoPreComputeJson()
conn.Send <- errResp
return
@@ -44,14 +43,12 @@ func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketR
l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, r.Session.UserId, err.SystemMessage(utils.T), err.DetailedError)
err.DetailedError = ""
errResp := model.NewWebSocketError(r.Seq, err)
- errResp.DoPreComputeJson()
conn.Send <- errResp
return
}
resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, data)
- resp.DoPreComputeJson()
conn.Send <- resp
}