From e62afeace04e2abd23fa78a0a54e0a5d2e17e0b7 Mon Sep 17 00:00:00 2001 From: Corey Hulen Date: Fri, 21 Apr 2017 09:38:26 -0700 Subject: Adding slow pump detection to web_conn and better metrics (#6114) * Adding slow pump detection to web_conn and better metrics * Fixing bad merge * Fixing typo --- app/web_conn.go | 64 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 21 deletions(-) (limited to 'app/web_conn.go') diff --git a/app/web_conn.go b/app/web_conn.go index 000704791..2c1913e2b 100644 --- a/app/web_conn.go +++ b/app/web_conn.go @@ -17,6 +17,9 @@ import ( ) const ( + SEND_QUEUE_SIZE = 256 + SEND_SLOW_WARN = (SEND_QUEUE_SIZE * 50) / 100 + SEND_DEADLOCK_WARN = (SEND_QUEUE_SIZE * 95) / 100 WRITE_WAIT = 30 * time.Second PONG_WAIT = 100 * time.Second PING_PERIOD = (PONG_WAIT * 6) / 10 @@ -44,7 +47,7 @@ func NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFun } return &WebConn{ - Send: make(chan model.WebSocketMessage, 256), + Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE), WebSocket: ws, UserId: session.UserId, SessionToken: session.Token, @@ -105,35 +108,54 @@ func (c *WebConn) WritePump() { return } - var msgBytes []byte - if evt, ok := msg.(*model.WebSocketEvent); ok { - cpyEvt := &model.WebSocketEvent{} - *cpyEvt = *evt - cpyEvt.Sequence = c.Sequence - msgBytes = []byte(cpyEvt.ToJson()) - c.Sequence++ - } else { - msgBytes = []byte(msg.ToJson()) + evt, evtOk := msg.(*model.WebSocketEvent) + + skipSend := false + if len(c.Send) >= SEND_SLOW_WARN { + // When the pump starts to get slow we'll drop non-critical messages + if msg.EventType() == model.WEBSOCKET_EVENT_TYPING || msg.EventType() == model.WEBSOCKET_EVENT_STATUS_CHANGE { + l4g.Info(fmt.Sprintf("websocket.slow: dropping message userId=%v type=%v channelId=%v", c.UserId, msg.EventType(), evt.Broadcast.ChannelId)) + skipSend = true + } } - c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT)) - if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil { - // browsers will appear as CloseNoStatusReceived - if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { - l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId)) + if !skipSend { + var msgBytes []byte + if evtOk { + cpyEvt := &model.WebSocketEvent{} + *cpyEvt = *evt + cpyEvt.Sequence = c.Sequence + msgBytes = []byte(cpyEvt.ToJson()) + c.Sequence++ } else { - l4g.Debug(fmt.Sprintf("websocket.send: closing websocket for userId=%v, error=%v", c.UserId, err.Error())) + msgBytes = []byte(msg.ToJson()) } - return - } + if len(c.Send) >= SEND_DEADLOCK_WARN { + if evtOk { + l4g.Error(fmt.Sprintf("websocket.full: message userId=%v type=%v channelId=%v size=%v", c.UserId, msg.EventType(), evt.Broadcast.ChannelId, len(msg.ToJson()))) + } else { + l4g.Error(fmt.Sprintf("websocket.full: message userId=%v type=%v size=%v", c.UserId, msg.EventType(), len(msg.ToJson()))) + } + } + + c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT)) + if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil { + // browsers will appear as CloseNoStatusReceived + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { + l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId)) + } else { + l4g.Debug(fmt.Sprintf("websocket.send: closing websocket for userId=%v, error=%v", c.UserId, err.Error())) + } + + return + } - if msg.EventType() == model.WEBSOCKET_EVENT_POSTED { if einterfaces.GetMetricsInterface() != nil { - einterfaces.GetMetricsInterface().IncrementPostBroadcast() + go einterfaces.GetMetricsInterface().IncrementWebSocketBroadcast(msg.EventType()) } - } + } case <-ticker.C: c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT)) if err := c.WebSocket.WriteMessage(websocket.PingMessage, []byte{}); err != nil { -- cgit v1.2.3-1-g7c22