From 86c7e7cacd0608648ac65ff32d21b948c7527715 Mon Sep 17 00:00:00 2001 From: Corey Hulen Date: Tue, 18 Apr 2017 14:28:02 -0700 Subject: Adding hub deadlock detection into master (#6100) --- app/web_hub.go | 92 +++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 79 insertions(+), 13 deletions(-) (limited to 'app/web_hub.go') diff --git a/app/web_hub.go b/app/web_hub.go index da2a41ec4..6b61430dc 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -8,7 +8,10 @@ import ( "hash/fnv" "runtime" "runtime/debug" + "strconv" + "strings" "sync/atomic" + "time" l4g "github.com/alecthomas/log4go" @@ -17,25 +20,34 @@ import ( "github.com/mattermost/platform/utils" ) +const ( + BROADCAST_QUEUE_SIZE = 4096 + DEADLOCK_TICKER = 15 * time.Second // check every 15 seconds + DEADLOCK_WARN = (BROADCAST_QUEUE_SIZE * 99) / 100 // number of buffered messages before printing stack trace +) + type Hub struct { - connections []*WebConn - count int64 - register chan *WebConn - unregister chan *WebConn - broadcast chan *model.WebSocketEvent - stop chan string - invalidateUser chan string - ExplicitStop bool + connections []*WebConn + connectionCount int64 + connectionIndex int + register chan *WebConn + unregister chan *WebConn + broadcast chan *model.WebSocketEvent + stop chan string + invalidateUser chan string + ExplicitStop bool + goroutineId int } var hubs []*Hub = make([]*Hub, 0) +var stopCheckingForDeadlock chan bool func NewWebHub() *Hub { return &Hub{ register: make(chan *WebConn), unregister: make(chan *WebConn), connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE), - broadcast: make(chan *model.WebSocketEvent, 4096), + broadcast: make(chan *model.WebSocketEvent, BROADCAST_QUEUE_SIZE), stop: make(chan string), invalidateUser: make(chan string), ExplicitStop: false, @@ -43,11 +55,9 @@ func NewWebHub() *Hub { } func TotalWebsocketConnections() int { - // This is racy, but it's only used for reporting information - // so it's probably OK count := int64(0) for _, hub := range hubs { - count = count + atomic.LoadInt64(&hub.count) + count = count + atomic.LoadInt64(&hub.connectionCount) } return int(count) @@ -61,13 +71,54 @@ func HubStart() { for i := 0; i < len(hubs); i++ { hubs[i] = NewWebHub() + hubs[i].connectionIndex = i hubs[i].Start() } + + go func() { + ticker := time.NewTicker(DEADLOCK_TICKER) + + defer func() { + ticker.Stop() + }() + + stopCheckingForDeadlock = make(chan bool, 1) + + for { + select { + case <-ticker.C: + for _, hub := range hubs { + if len(hub.broadcast) >= DEADLOCK_WARN { + l4g.Error("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast)) + buf := make([]byte, 1<<16) + runtime.Stack(buf, true) + output := fmt.Sprintf("%s", buf) + splits := strings.Split(output, "goroutine ") + + for _, part := range splits { + if strings.Index(part, fmt.Sprintf("%v", hub.goroutineId)) > -1 { + l4g.Error("Trace for possible deadlock goroutine %v", part) + } + } + } + } + + case <-stopCheckingForDeadlock: + return + } + } + }() } func HubStop() { l4g.Info(utils.T("api.web_hub.start.stopping.debug")) + select { + case stopCheckingForDeadlock <- true: + default: + l4g.Warn("We appear to have already sent the stop checking for deadlocks command") + } + for _, hub := range hubs { hub.Stop() } @@ -236,6 +287,17 @@ func (h *Hub) InvalidateUser(userId string) { h.invalidateUser <- userId } +func getGoroutineId() int { + var buf [64]byte + n := runtime.Stack(buf[:], false) + idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0] + id, err := strconv.Atoi(idField) + if err != nil { + id = -1 + } + return id +} + func (h *Hub) Stop() { h.stop <- "all" } @@ -246,11 +308,15 @@ func (h *Hub) Start() { var doRecover func() doStart = func() { + + h.goroutineId = getGoroutineId() + l4g.Debug("Hub for index %v is starting with goroutine %v", h.connectionIndex, h.goroutineId) + for { select { case webCon := <-h.register: h.connections = append(h.connections, webCon) - atomic.StoreInt64(&h.count, int64(len(h.connections))) + atomic.StoreInt64(&h.connectionCount, int64(len(h.connections))) case webCon := <-h.unregister: userId := webCon.UserId -- cgit v1.2.3-1-g7c22