From 8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 27 Sep 2017 11:52:34 -0500 Subject: remove remaining Global() calls (outside of tests) (#7521) --- app/web_hub.go | 74 +++++++++++++++++++++++++++------------------------------- 1 file changed, 34 insertions(+), 40 deletions(-) (limited to 'app/web_hub.go') diff --git a/app/web_hub.go b/app/web_hub.go index b351de39e..50ccb100e 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -29,6 +29,7 @@ type Hub struct { // connectionCount should be kept first. // See https://github.com/mattermost/mattermost-server/pull/7281 connectionCount int64 + app *App connections []*WebConn connectionIndex int register chan *WebConn @@ -40,11 +41,9 @@ type Hub struct { goroutineId int } -var hubs []*Hub = make([]*Hub, 0) -var stopCheckingForDeadlock chan bool - -func NewWebHub() *Hub { +func (a *App) NewWebHub() *Hub { return &Hub{ + app: a, register: make(chan *WebConn), unregister: make(chan *WebConn), connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE), @@ -55,26 +54,27 @@ func NewWebHub() *Hub { } } -func TotalWebsocketConnections() int { +func (a *App) TotalWebsocketConnections() int { count := int64(0) - for _, hub := range hubs { + for _, hub := range a.Hubs { count = count + atomic.LoadInt64(&hub.connectionCount) } return int(count) } -func HubStart() { +func (a *App) HubStart() { // Total number of hubs is twice the number of CPUs. numberOfHubs := runtime.NumCPU() * 2 l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs) - hubs = make([]*Hub, numberOfHubs) + a.Hubs = make([]*Hub, numberOfHubs) + a.HubsStopCheckingForDeadlock = make(chan bool, 1) - for i := 0; i < len(hubs); i++ { - hubs[i] = NewWebHub() - hubs[i].connectionIndex = i - hubs[i].Start() + for i := 0; i < len(a.Hubs); i++ { + a.Hubs[i] = a.NewWebHub() + a.Hubs[i].connectionIndex = i + a.Hubs[i].Start() } go func() { @@ -84,12 +84,10 @@ func HubStart() { ticker.Stop() }() - stopCheckingForDeadlock = make(chan bool, 1) - for { select { case <-ticker.C: - for _, hub := range hubs { + for _, hub := range a.Hubs { if len(hub.broadcast) >= DEADLOCK_WARN { l4g.Error("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast)) buf := make([]byte, 1<<16) @@ -105,46 +103,42 @@ func HubStart() { } } - case <-stopCheckingForDeadlock: + case <-a.HubsStopCheckingForDeadlock: return } } }() } -func HubStop() { +func (a *App) HubStop() { l4g.Info(utils.T("api.web_hub.start.stopping.debug")) select { - case stopCheckingForDeadlock <- true: + case a.HubsStopCheckingForDeadlock <- true: default: l4g.Warn("We appear to have already sent the stop checking for deadlocks command") } - for _, hub := range hubs { + for _, hub := range a.Hubs { hub.Stop() } - hubs = make([]*Hub, 0) + a.Hubs = []*Hub{} } -func GetHubForUserId(userId string) *Hub { +func (a *App) GetHubForUserId(userId string) *Hub { hash := fnv.New32a() hash.Write([]byte(userId)) - index := hash.Sum32() % uint32(len(hubs)) - return hubs[index] -} - -func HubRegister(webConn *WebConn) { - GetHubForUserId(webConn.UserId).Register(webConn) + index := hash.Sum32() % uint32(len(a.Hubs)) + return a.Hubs[index] } -func HubUnregister(webConn *WebConn) { - GetHubForUserId(webConn.UserId).Unregister(webConn) +func (a *App) HubRegister(webConn *WebConn) { + a.GetHubForUserId(webConn.UserId).Register(webConn) } -func Publish(message *model.WebSocketEvent) { - Global().Publish(message) +func (a *App) HubUnregister(webConn *WebConn) { + a.GetHubForUserId(webConn.UserId).Unregister(webConn) } func (a *App) Publish(message *model.WebSocketEvent) { @@ -152,7 +146,7 @@ func (a *App) Publish(message *model.WebSocketEvent) { metrics.IncrementWebsocketEvent(message.Event) } - PublishSkipClusterSend(message) + a.PublishSkipClusterSend(message) if a.Cluster != nil { cm := &model.ClusterMessage{ @@ -173,8 +167,8 @@ func (a *App) Publish(message *model.WebSocketEvent) { } } -func PublishSkipClusterSend(message *model.WebSocketEvent) { - for _, hub := range hubs { +func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) { + for _, hub := range a.Hubs { hub.Broadcast(message) } } @@ -291,8 +285,8 @@ func (a *App) InvalidateCacheForUserSkipClusterSend(userId string) { a.Srv.Store.User().InvalidateProfilesInChannelCacheByUser(userId) a.Srv.Store.User().InvalidatProfileCacheForUser(userId) - if len(hubs) != 0 { - GetHubForUserId(userId).InvalidateUser(userId) + if len(a.Hubs) != 0 { + a.GetHubForUserId(userId).InvalidateUser(userId) } } @@ -313,9 +307,9 @@ func (a *App) InvalidateCacheForWebhookSkipClusterSend(webhookId string) { a.Srv.Store.Webhook().InvalidateWebhookCache(webhookId) } -func InvalidateWebConnSessionCacheForUser(userId string) { - if len(hubs) != 0 { - GetHubForUserId(userId).InvalidateUser(userId) +func (a *App) InvalidateWebConnSessionCacheForUser(userId string) { + if len(a.Hubs) != 0 { + a.GetHubForUserId(userId).InvalidateUser(userId) } } @@ -401,7 +395,7 @@ func (h *Hub) Start() { } if !found { - go Global().SetStatusOffline(userId, false) + go h.app.SetStatusOffline(userId, false) } case userId := <-h.invalidateUser: -- cgit v1.2.3-1-g7c22