From 19a5d0047dbe5e4d7f4675893bfee125d3a12312 Mon Sep 17 00:00:00 2001 From: Chris Date: Tue, 20 Feb 2018 12:50:10 -0600 Subject: MM-8710: Web Hub optimizations (#8293) * webhub optimizations * test fix * minor fix * big perf improvement to ToJson after precomputing * fix hub connection count --- app/web_hub.go | 133 +++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 83 insertions(+), 50 deletions(-) (limited to 'app/web_hub.go') diff --git a/app/web_hub.go b/app/web_hub.go index eeae13e09..c1c8cb7bb 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -30,7 +30,6 @@ type Hub struct { // See https://github.com/mattermost/mattermost-server/pull/7281 connectionCount int64 app *App - connections []*WebConn connectionIndex int register chan *WebConn unregister chan *WebConn @@ -47,7 +46,6 @@ func (a *App) NewWebHub() *Hub { app: a, register: make(chan *WebConn, 1), unregister: make(chan *WebConn, 1), - connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE), broadcast: make(chan *model.WebSocketEvent, BROADCAST_QUEUE_SIZE), stop: make(chan struct{}), didStop: make(chan struct{}), @@ -170,8 +168,14 @@ func (a *App) Publish(message *model.WebSocketEvent) { } func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) { - for _, hub := range a.Hubs { - hub.Broadcast(message) + if message.Broadcast.UserId != "" { + if len(a.Hubs) != 0 { + a.GetHubForUserId(message.Broadcast.UserId).Broadcast(message) + } + } else { + for _, hub := range a.Hubs { + hub.Broadcast(message) + } } } @@ -362,80 +366,53 @@ func (h *Hub) Start() { var doRecover func() doStart = func() { - h.goroutineId = getGoroutineId() l4g.Debug("Hub for index %v is starting with goroutine %v", h.connectionIndex, h.goroutineId) + connections := newHubConnectionIndex() + for { select { case webCon := <-h.register: - h.connections = append(h.connections, webCon) - atomic.StoreInt64(&h.connectionCount, int64(len(h.connections))) - + connections.Add(webCon) + atomic.StoreInt64(&h.connectionCount, int64(len(connections.All()))) case webCon := <-h.unregister: - userId := webCon.UserId - - found := false - indexToDel := -1 - for i, webConCandidate := range h.connections { - if webConCandidate == webCon { - indexToDel = i - continue - } - if userId == webConCandidate.UserId { - found = true - if indexToDel != -1 { - break - } - } - } + connections.Remove(webCon) - if indexToDel != -1 { - // Delete the webcon we are unregistering - h.connections[indexToDel] = h.connections[len(h.connections)-1] - h.connections = h.connections[:len(h.connections)-1] - } - - if len(userId) == 0 { + if len(webCon.UserId) == 0 { continue } - if !found { + if len(connections.ForUser(webCon.UserId)) == 0 { h.app.Go(func() { - h.app.SetStatusOffline(userId, false) + h.app.SetStatusOffline(webCon.UserId, false) }) } - case userId := <-h.invalidateUser: - for _, webCon := range h.connections { - if webCon.UserId == userId { - webCon.InvalidateCache() - } + for _, webCon := range connections.ForUser(userId) { + webCon.InvalidateCache() } - case msg := <-h.broadcast: - for _, webCon := range h.connections { + candidates := connections.All() + if msg.Broadcast.UserId != "" { + candidates = connections.ForUser(msg.Broadcast.UserId) + } + msg.PrecomputeJSON() + for _, webCon := range candidates { if webCon.ShouldSendEvent(msg) { select { case webCon.Send <- msg: default: l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId)) close(webCon.Send) - for i, webConCandidate := range h.connections { - if webConCandidate == webCon { - h.connections[i] = h.connections[len(h.connections)-1] - h.connections = h.connections[:len(h.connections)-1] - break - } - } + connections.Remove(webCon) } } } - case <-h.stop: userIds := make(map[string]bool) - for _, webCon := range h.connections { + for _, webCon := range connections.All() { userIds[webCon.UserId] = true webCon.Close() } @@ -444,7 +421,6 @@ func (h *Hub) Start() { h.app.SetStatusOffline(userId, false) } - h.connections = make([]*WebConn, 0, model.SESSION_CACHE_SIZE) h.ExplicitStop = true close(h.didStop) @@ -474,3 +450,60 @@ func (h *Hub) Start() { go doRecoverableStart() } + +type hubConnectionIndexIndexes struct { + connections int + connectionsByUserId int +} + +// hubConnectionIndex provides fast addition, removal, and iteration of web connections. +type hubConnectionIndex struct { + connections []*WebConn + connectionsByUserId map[string][]*WebConn + connectionIndexes map[*WebConn]*hubConnectionIndexIndexes +} + +func newHubConnectionIndex() *hubConnectionIndex { + return &hubConnectionIndex{ + connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE), + connectionsByUserId: make(map[string][]*WebConn), + connectionIndexes: make(map[*WebConn]*hubConnectionIndexIndexes), + } +} + +func (i *hubConnectionIndex) Add(wc *WebConn) { + i.connections = append(i.connections, wc) + i.connectionsByUserId[wc.UserId] = append(i.connectionsByUserId[wc.UserId], wc) + i.connectionIndexes[wc] = &hubConnectionIndexIndexes{ + connections: len(i.connections) - 1, + connectionsByUserId: len(i.connectionsByUserId[wc.UserId]) - 1, + } +} + +func (i *hubConnectionIndex) Remove(wc *WebConn) { + indexes, ok := i.connectionIndexes[wc] + if !ok { + return + } + + last := i.connections[len(i.connections)-1] + i.connections[indexes.connections] = last + i.connections = i.connections[:len(i.connections)-1] + i.connectionIndexes[last].connections = indexes.connections + + userConnections := i.connectionsByUserId[wc.UserId] + last = userConnections[len(userConnections)-1] + userConnections[indexes.connectionsByUserId] = last + i.connectionsByUserId[wc.UserId] = userConnections[:len(userConnections)-1] + i.connectionIndexes[last].connectionsByUserId = indexes.connectionsByUserId + + delete(i.connectionIndexes, wc) +} + +func (i *hubConnectionIndex) ForUser(id string) []*WebConn { + return i.connectionsByUserId[id] +} + +func (i *hubConnectionIndex) All() []*WebConn { + return i.connections +} -- cgit v1.2.3-1-g7c22