summaryrefslogtreecommitdiffstats
path: root/app/web_hub.go
diff options
context:
space:
mode:
authorCorey Hulen <corey@hulen.com>2017-04-18 14:28:02 -0700
committerGitHub <noreply@github.com>2017-04-18 14:28:02 -0700
commit86c7e7cacd0608648ac65ff32d21b948c7527715 (patch)
tree48856bbfa1915637c553676f0f12e97437ad378e /app/web_hub.go
parentd2b86f1b8de4784baf578b611cf80779ccfa722a (diff)
downloadchat-86c7e7cacd0608648ac65ff32d21b948c7527715.tar.gz
chat-86c7e7cacd0608648ac65ff32d21b948c7527715.tar.bz2
chat-86c7e7cacd0608648ac65ff32d21b948c7527715.zip
Adding hub deadlock detection into master (#6100)
Diffstat (limited to 'app/web_hub.go')
-rw-r--r--app/web_hub.go92
1 files changed, 79 insertions, 13 deletions
diff --git a/app/web_hub.go b/app/web_hub.go
index da2a41ec4..6b61430dc 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -8,7 +8,10 @@ import (
"hash/fnv"
"runtime"
"runtime/debug"
+ "strconv"
+ "strings"
"sync/atomic"
+ "time"
l4g "github.com/alecthomas/log4go"
@@ -17,25 +20,34 @@ import (
"github.com/mattermost/platform/utils"
)
+const (
+ BROADCAST_QUEUE_SIZE = 4096
+ DEADLOCK_TICKER = 15 * time.Second // check every 15 seconds
+ DEADLOCK_WARN = (BROADCAST_QUEUE_SIZE * 99) / 100 // number of buffered messages before printing stack trace
+)
+
type Hub struct {
- connections []*WebConn
- count int64
- register chan *WebConn
- unregister chan *WebConn
- broadcast chan *model.WebSocketEvent
- stop chan string
- invalidateUser chan string
- ExplicitStop bool
+ connections []*WebConn
+ connectionCount int64
+ connectionIndex int
+ register chan *WebConn
+ unregister chan *WebConn
+ broadcast chan *model.WebSocketEvent
+ stop chan string
+ invalidateUser chan string
+ ExplicitStop bool
+ goroutineId int
}
var hubs []*Hub = make([]*Hub, 0)
+var stopCheckingForDeadlock chan bool
func NewWebHub() *Hub {
return &Hub{
register: make(chan *WebConn),
unregister: make(chan *WebConn),
connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
- broadcast: make(chan *model.WebSocketEvent, 4096),
+ broadcast: make(chan *model.WebSocketEvent, BROADCAST_QUEUE_SIZE),
stop: make(chan string),
invalidateUser: make(chan string),
ExplicitStop: false,
@@ -43,11 +55,9 @@ func NewWebHub() *Hub {
}
func TotalWebsocketConnections() int {
- // This is racy, but it's only used for reporting information
- // so it's probably OK
count := int64(0)
for _, hub := range hubs {
- count = count + atomic.LoadInt64(&hub.count)
+ count = count + atomic.LoadInt64(&hub.connectionCount)
}
return int(count)
@@ -61,13 +71,54 @@ func HubStart() {
for i := 0; i < len(hubs); i++ {
hubs[i] = NewWebHub()
+ hubs[i].connectionIndex = i
hubs[i].Start()
}
+
+ go func() {
+ ticker := time.NewTicker(DEADLOCK_TICKER)
+
+ defer func() {
+ ticker.Stop()
+ }()
+
+ stopCheckingForDeadlock = make(chan bool, 1)
+
+ for {
+ select {
+ case <-ticker.C:
+ for _, hub := range hubs {
+ if len(hub.broadcast) >= DEADLOCK_WARN {
+ l4g.Error("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast))
+ buf := make([]byte, 1<<16)
+ runtime.Stack(buf, true)
+ output := fmt.Sprintf("%s", buf)
+ splits := strings.Split(output, "goroutine ")
+
+ for _, part := range splits {
+ if strings.Index(part, fmt.Sprintf("%v", hub.goroutineId)) > -1 {
+ l4g.Error("Trace for possible deadlock goroutine %v", part)
+ }
+ }
+ }
+ }
+
+ case <-stopCheckingForDeadlock:
+ return
+ }
+ }
+ }()
}
func HubStop() {
l4g.Info(utils.T("api.web_hub.start.stopping.debug"))
+ select {
+ case stopCheckingForDeadlock <- true:
+ default:
+ l4g.Warn("We appear to have already sent the stop checking for deadlocks command")
+ }
+
for _, hub := range hubs {
hub.Stop()
}
@@ -236,6 +287,17 @@ func (h *Hub) InvalidateUser(userId string) {
h.invalidateUser <- userId
}
+func getGoroutineId() int {
+ var buf [64]byte
+ n := runtime.Stack(buf[:], false)
+ idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
+ id, err := strconv.Atoi(idField)
+ if err != nil {
+ id = -1
+ }
+ return id
+}
+
func (h *Hub) Stop() {
h.stop <- "all"
}
@@ -246,11 +308,15 @@ func (h *Hub) Start() {
var doRecover func()
doStart = func() {
+
+ h.goroutineId = getGoroutineId()
+ l4g.Debug("Hub for index %v is starting with goroutine %v", h.connectionIndex, h.goroutineId)
+
for {
select {
case webCon := <-h.register:
h.connections = append(h.connections, webCon)
- atomic.StoreInt64(&h.count, int64(len(h.connections)))
+ atomic.StoreInt64(&h.connectionCount, int64(len(h.connections)))
case webCon := <-h.unregister:
userId := webCon.UserId