summaryrefslogtreecommitdiffstats
path: root/app/web_conn.go
diff options
context:
space:
mode:
authorCorey Hulen <corey@hulen.com>2017-04-21 09:38:26 -0700
committerJoram Wilander <jwawilander@gmail.com>2017-04-21 12:38:26 -0400
commite62afeace04e2abd23fa78a0a54e0a5d2e17e0b7 (patch)
treeccc4feb2ac3c9dcb0e8766366854de0646443b63 /app/web_conn.go
parent81706b402dafd6ce0727ed6d65105092f76b118a (diff)
downloadchat-e62afeace04e2abd23fa78a0a54e0a5d2e17e0b7.tar.gz
chat-e62afeace04e2abd23fa78a0a54e0a5d2e17e0b7.tar.bz2
chat-e62afeace04e2abd23fa78a0a54e0a5d2e17e0b7.zip
Adding slow pump detection to web_conn and better metrics (#6114)
* Adding slow pump detection to web_conn and better metrics * Fixing bad merge * Fixing typo
Diffstat (limited to 'app/web_conn.go')
-rw-r--r--app/web_conn.go64
1 files changed, 43 insertions, 21 deletions
diff --git a/app/web_conn.go b/app/web_conn.go
index 000704791..2c1913e2b 100644
--- a/app/web_conn.go
+++ b/app/web_conn.go
@@ -17,6 +17,9 @@ import (
)
const (
+ SEND_QUEUE_SIZE = 256
+ SEND_SLOW_WARN = (SEND_QUEUE_SIZE * 50) / 100
+ SEND_DEADLOCK_WARN = (SEND_QUEUE_SIZE * 95) / 100
WRITE_WAIT = 30 * time.Second
PONG_WAIT = 100 * time.Second
PING_PERIOD = (PONG_WAIT * 6) / 10
@@ -44,7 +47,7 @@ func NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFun
}
return &WebConn{
- Send: make(chan model.WebSocketMessage, 256),
+ Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE),
WebSocket: ws,
UserId: session.UserId,
SessionToken: session.Token,
@@ -105,35 +108,54 @@ func (c *WebConn) WritePump() {
return
}
- var msgBytes []byte
- if evt, ok := msg.(*model.WebSocketEvent); ok {
- cpyEvt := &model.WebSocketEvent{}
- *cpyEvt = *evt
- cpyEvt.Sequence = c.Sequence
- msgBytes = []byte(cpyEvt.ToJson())
- c.Sequence++
- } else {
- msgBytes = []byte(msg.ToJson())
+ evt, evtOk := msg.(*model.WebSocketEvent)
+
+ skipSend := false
+ if len(c.Send) >= SEND_SLOW_WARN {
+ // When the pump starts to get slow we'll drop non-critical messages
+ if msg.EventType() == model.WEBSOCKET_EVENT_TYPING || msg.EventType() == model.WEBSOCKET_EVENT_STATUS_CHANGE {
+ l4g.Info(fmt.Sprintf("websocket.slow: dropping message userId=%v type=%v channelId=%v", c.UserId, msg.EventType(), evt.Broadcast.ChannelId))
+ skipSend = true
+ }
}
- c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
- if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
- // browsers will appear as CloseNoStatusReceived
- if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
- l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId))
+ if !skipSend {
+ var msgBytes []byte
+ if evtOk {
+ cpyEvt := &model.WebSocketEvent{}
+ *cpyEvt = *evt
+ cpyEvt.Sequence = c.Sequence
+ msgBytes = []byte(cpyEvt.ToJson())
+ c.Sequence++
} else {
- l4g.Debug(fmt.Sprintf("websocket.send: closing websocket for userId=%v, error=%v", c.UserId, err.Error()))
+ msgBytes = []byte(msg.ToJson())
}
- return
- }
+ if len(c.Send) >= SEND_DEADLOCK_WARN {
+ if evtOk {
+ l4g.Error(fmt.Sprintf("websocket.full: message userId=%v type=%v channelId=%v size=%v", c.UserId, msg.EventType(), evt.Broadcast.ChannelId, len(msg.ToJson())))
+ } else {
+ l4g.Error(fmt.Sprintf("websocket.full: message userId=%v type=%v size=%v", c.UserId, msg.EventType(), len(msg.ToJson())))
+ }
+ }
+
+ c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
+ if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
+ // browsers will appear as CloseNoStatusReceived
+ if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
+ l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId))
+ } else {
+ l4g.Debug(fmt.Sprintf("websocket.send: closing websocket for userId=%v, error=%v", c.UserId, err.Error()))
+ }
+
+ return
+ }
- if msg.EventType() == model.WEBSOCKET_EVENT_POSTED {
if einterfaces.GetMetricsInterface() != nil {
- einterfaces.GetMetricsInterface().IncrementPostBroadcast()
+ go einterfaces.GetMetricsInterface().IncrementWebSocketBroadcast(msg.EventType())
}
- }
+ }
case <-ticker.C:
c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
if err := c.WebSocket.WriteMessage(websocket.PingMessage, []byte{}); err != nil {