summaryrefslogtreecommitdiffstats
path: root/app/web_hub.go
diff options
context:
space:
mode:
authorChris <ccbrown112@gmail.com>2018-02-20 12:50:10 -0600
committerChristopher Speller <crspeller@gmail.com>2018-02-20 10:50:10 -0800
commit19a5d0047dbe5e4d7f4675893bfee125d3a12312 (patch)
tree3007bd99e82309f2f03eabe79aad70e1ec25204e /app/web_hub.go
parentbabd795d792e95f6e708af6ee8207ef6877e2b32 (diff)
downloadchat-19a5d0047dbe5e4d7f4675893bfee125d3a12312.tar.gz
chat-19a5d0047dbe5e4d7f4675893bfee125d3a12312.tar.bz2
chat-19a5d0047dbe5e4d7f4675893bfee125d3a12312.zip
MM-8710: Web Hub optimizations (#8293)
* webhub optimizations * test fix * minor fix * big perf improvement to ToJson after precomputing * fix hub connection count
Diffstat (limited to 'app/web_hub.go')
-rw-r--r--app/web_hub.go133
1 files changed, 83 insertions, 50 deletions
diff --git a/app/web_hub.go b/app/web_hub.go
index eeae13e09..c1c8cb7bb 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -30,7 +30,6 @@ type Hub struct {
// See https://github.com/mattermost/mattermost-server/pull/7281
connectionCount int64
app *App
- connections []*WebConn
connectionIndex int
register chan *WebConn
unregister chan *WebConn
@@ -47,7 +46,6 @@ func (a *App) NewWebHub() *Hub {
app: a,
register: make(chan *WebConn, 1),
unregister: make(chan *WebConn, 1),
- connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
broadcast: make(chan *model.WebSocketEvent, BROADCAST_QUEUE_SIZE),
stop: make(chan struct{}),
didStop: make(chan struct{}),
@@ -170,8 +168,14 @@ func (a *App) Publish(message *model.WebSocketEvent) {
}
func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) {
- for _, hub := range a.Hubs {
- hub.Broadcast(message)
+ if message.Broadcast.UserId != "" {
+ if len(a.Hubs) != 0 {
+ a.GetHubForUserId(message.Broadcast.UserId).Broadcast(message)
+ }
+ } else {
+ for _, hub := range a.Hubs {
+ hub.Broadcast(message)
+ }
}
}
@@ -362,80 +366,53 @@ func (h *Hub) Start() {
var doRecover func()
doStart = func() {
-
h.goroutineId = getGoroutineId()
l4g.Debug("Hub for index %v is starting with goroutine %v", h.connectionIndex, h.goroutineId)
+ connections := newHubConnectionIndex()
+
for {
select {
case webCon := <-h.register:
- h.connections = append(h.connections, webCon)
- atomic.StoreInt64(&h.connectionCount, int64(len(h.connections)))
-
+ connections.Add(webCon)
+ atomic.StoreInt64(&h.connectionCount, int64(len(connections.All())))
case webCon := <-h.unregister:
- userId := webCon.UserId
-
- found := false
- indexToDel := -1
- for i, webConCandidate := range h.connections {
- if webConCandidate == webCon {
- indexToDel = i
- continue
- }
- if userId == webConCandidate.UserId {
- found = true
- if indexToDel != -1 {
- break
- }
- }
- }
+ connections.Remove(webCon)
- if indexToDel != -1 {
- // Delete the webcon we are unregistering
- h.connections[indexToDel] = h.connections[len(h.connections)-1]
- h.connections = h.connections[:len(h.connections)-1]
- }
-
- if len(userId) == 0 {
+ if len(webCon.UserId) == 0 {
continue
}
- if !found {
+ if len(connections.ForUser(webCon.UserId)) == 0 {
h.app.Go(func() {
- h.app.SetStatusOffline(userId, false)
+ h.app.SetStatusOffline(webCon.UserId, false)
})
}
-
case userId := <-h.invalidateUser:
- for _, webCon := range h.connections {
- if webCon.UserId == userId {
- webCon.InvalidateCache()
- }
+ for _, webCon := range connections.ForUser(userId) {
+ webCon.InvalidateCache()
}
-
case msg := <-h.broadcast:
- for _, webCon := range h.connections {
+ candidates := connections.All()
+ if msg.Broadcast.UserId != "" {
+ candidates = connections.ForUser(msg.Broadcast.UserId)
+ }
+ msg.PrecomputeJSON()
+ for _, webCon := range candidates {
if webCon.ShouldSendEvent(msg) {
select {
case webCon.Send <- msg:
default:
l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId))
close(webCon.Send)
- for i, webConCandidate := range h.connections {
- if webConCandidate == webCon {
- h.connections[i] = h.connections[len(h.connections)-1]
- h.connections = h.connections[:len(h.connections)-1]
- break
- }
- }
+ connections.Remove(webCon)
}
}
}
-
case <-h.stop:
userIds := make(map[string]bool)
- for _, webCon := range h.connections {
+ for _, webCon := range connections.All() {
userIds[webCon.UserId] = true
webCon.Close()
}
@@ -444,7 +421,6 @@ func (h *Hub) Start() {
h.app.SetStatusOffline(userId, false)
}
- h.connections = make([]*WebConn, 0, model.SESSION_CACHE_SIZE)
h.ExplicitStop = true
close(h.didStop)
@@ -474,3 +450,60 @@ func (h *Hub) Start() {
go doRecoverableStart()
}
+
+type hubConnectionIndexIndexes struct {
+ connections int
+ connectionsByUserId int
+}
+
+// hubConnectionIndex provides fast addition, removal, and iteration of web connections.
+type hubConnectionIndex struct {
+ connections []*WebConn
+ connectionsByUserId map[string][]*WebConn
+ connectionIndexes map[*WebConn]*hubConnectionIndexIndexes
+}
+
+func newHubConnectionIndex() *hubConnectionIndex {
+ return &hubConnectionIndex{
+ connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
+ connectionsByUserId: make(map[string][]*WebConn),
+ connectionIndexes: make(map[*WebConn]*hubConnectionIndexIndexes),
+ }
+}
+
+func (i *hubConnectionIndex) Add(wc *WebConn) {
+ i.connections = append(i.connections, wc)
+ i.connectionsByUserId[wc.UserId] = append(i.connectionsByUserId[wc.UserId], wc)
+ i.connectionIndexes[wc] = &hubConnectionIndexIndexes{
+ connections: len(i.connections) - 1,
+ connectionsByUserId: len(i.connectionsByUserId[wc.UserId]) - 1,
+ }
+}
+
+func (i *hubConnectionIndex) Remove(wc *WebConn) {
+ indexes, ok := i.connectionIndexes[wc]
+ if !ok {
+ return
+ }
+
+ last := i.connections[len(i.connections)-1]
+ i.connections[indexes.connections] = last
+ i.connections = i.connections[:len(i.connections)-1]
+ i.connectionIndexes[last].connections = indexes.connections
+
+ userConnections := i.connectionsByUserId[wc.UserId]
+ last = userConnections[len(userConnections)-1]
+ userConnections[indexes.connectionsByUserId] = last
+ i.connectionsByUserId[wc.UserId] = userConnections[:len(userConnections)-1]
+ i.connectionIndexes[last].connectionsByUserId = indexes.connectionsByUserId
+
+ delete(i.connectionIndexes, wc)
+}
+
+func (i *hubConnectionIndex) ForUser(id string) []*WebConn {
+ return i.connectionsByUserId[id]
+}
+
+func (i *hubConnectionIndex) All() []*WebConn {
+ return i.connections
+}