summaryrefslogtreecommitdiffstats
path: root/app/web_hub.go
diff options
context:
space:
mode:
authorChris <ccbrown112@gmail.com>2017-09-27 11:52:34 -0500
committerSaturnino Abril <saturnino.abril@gmail.com>2017-09-28 00:52:34 +0800
commit8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df (patch)
treeb3563dfe35ad78991774c9d1842dc439376e1db1 /app/web_hub.go
parent1bd66589a2adc67df5df9c108a2f2ecc77dfdf44 (diff)
downloadchat-8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df.tar.gz
chat-8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df.tar.bz2
chat-8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df.zip
remove remaining Global() calls (outside of tests) (#7521)
Diffstat (limited to 'app/web_hub.go')
-rw-r--r--app/web_hub.go74
1 files changed, 34 insertions, 40 deletions
diff --git a/app/web_hub.go b/app/web_hub.go
index b351de39e..50ccb100e 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -29,6 +29,7 @@ type Hub struct {
// connectionCount should be kept first.
// See https://github.com/mattermost/mattermost-server/pull/7281
connectionCount int64
+ app *App
connections []*WebConn
connectionIndex int
register chan *WebConn
@@ -40,11 +41,9 @@ type Hub struct {
goroutineId int
}
-var hubs []*Hub = make([]*Hub, 0)
-var stopCheckingForDeadlock chan bool
-
-func NewWebHub() *Hub {
+func (a *App) NewWebHub() *Hub {
return &Hub{
+ app: a,
register: make(chan *WebConn),
unregister: make(chan *WebConn),
connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
@@ -55,26 +54,27 @@ func NewWebHub() *Hub {
}
}
-func TotalWebsocketConnections() int {
+func (a *App) TotalWebsocketConnections() int {
count := int64(0)
- for _, hub := range hubs {
+ for _, hub := range a.Hubs {
count = count + atomic.LoadInt64(&hub.connectionCount)
}
return int(count)
}
-func HubStart() {
+func (a *App) HubStart() {
// Total number of hubs is twice the number of CPUs.
numberOfHubs := runtime.NumCPU() * 2
l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs)
- hubs = make([]*Hub, numberOfHubs)
+ a.Hubs = make([]*Hub, numberOfHubs)
+ a.HubsStopCheckingForDeadlock = make(chan bool, 1)
- for i := 0; i < len(hubs); i++ {
- hubs[i] = NewWebHub()
- hubs[i].connectionIndex = i
- hubs[i].Start()
+ for i := 0; i < len(a.Hubs); i++ {
+ a.Hubs[i] = a.NewWebHub()
+ a.Hubs[i].connectionIndex = i
+ a.Hubs[i].Start()
}
go func() {
@@ -84,12 +84,10 @@ func HubStart() {
ticker.Stop()
}()
- stopCheckingForDeadlock = make(chan bool, 1)
-
for {
select {
case <-ticker.C:
- for _, hub := range hubs {
+ for _, hub := range a.Hubs {
if len(hub.broadcast) >= DEADLOCK_WARN {
l4g.Error("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast))
buf := make([]byte, 1<<16)
@@ -105,46 +103,42 @@ func HubStart() {
}
}
- case <-stopCheckingForDeadlock:
+ case <-a.HubsStopCheckingForDeadlock:
return
}
}
}()
}
-func HubStop() {
+func (a *App) HubStop() {
l4g.Info(utils.T("api.web_hub.start.stopping.debug"))
select {
- case stopCheckingForDeadlock <- true:
+ case a.HubsStopCheckingForDeadlock <- true:
default:
l4g.Warn("We appear to have already sent the stop checking for deadlocks command")
}
- for _, hub := range hubs {
+ for _, hub := range a.Hubs {
hub.Stop()
}
- hubs = make([]*Hub, 0)
+ a.Hubs = []*Hub{}
}
-func GetHubForUserId(userId string) *Hub {
+func (a *App) GetHubForUserId(userId string) *Hub {
hash := fnv.New32a()
hash.Write([]byte(userId))
- index := hash.Sum32() % uint32(len(hubs))
- return hubs[index]
-}
-
-func HubRegister(webConn *WebConn) {
- GetHubForUserId(webConn.UserId).Register(webConn)
+ index := hash.Sum32() % uint32(len(a.Hubs))
+ return a.Hubs[index]
}
-func HubUnregister(webConn *WebConn) {
- GetHubForUserId(webConn.UserId).Unregister(webConn)
+func (a *App) HubRegister(webConn *WebConn) {
+ a.GetHubForUserId(webConn.UserId).Register(webConn)
}
-func Publish(message *model.WebSocketEvent) {
- Global().Publish(message)
+func (a *App) HubUnregister(webConn *WebConn) {
+ a.GetHubForUserId(webConn.UserId).Unregister(webConn)
}
func (a *App) Publish(message *model.WebSocketEvent) {
@@ -152,7 +146,7 @@ func (a *App) Publish(message *model.WebSocketEvent) {
metrics.IncrementWebsocketEvent(message.Event)
}
- PublishSkipClusterSend(message)
+ a.PublishSkipClusterSend(message)
if a.Cluster != nil {
cm := &model.ClusterMessage{
@@ -173,8 +167,8 @@ func (a *App) Publish(message *model.WebSocketEvent) {
}
}
-func PublishSkipClusterSend(message *model.WebSocketEvent) {
- for _, hub := range hubs {
+func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) {
+ for _, hub := range a.Hubs {
hub.Broadcast(message)
}
}
@@ -291,8 +285,8 @@ func (a *App) InvalidateCacheForUserSkipClusterSend(userId string) {
a.Srv.Store.User().InvalidateProfilesInChannelCacheByUser(userId)
a.Srv.Store.User().InvalidatProfileCacheForUser(userId)
- if len(hubs) != 0 {
- GetHubForUserId(userId).InvalidateUser(userId)
+ if len(a.Hubs) != 0 {
+ a.GetHubForUserId(userId).InvalidateUser(userId)
}
}
@@ -313,9 +307,9 @@ func (a *App) InvalidateCacheForWebhookSkipClusterSend(webhookId string) {
a.Srv.Store.Webhook().InvalidateWebhookCache(webhookId)
}
-func InvalidateWebConnSessionCacheForUser(userId string) {
- if len(hubs) != 0 {
- GetHubForUserId(userId).InvalidateUser(userId)
+func (a *App) InvalidateWebConnSessionCacheForUser(userId string) {
+ if len(a.Hubs) != 0 {
+ a.GetHubForUserId(userId).InvalidateUser(userId)
}
}
@@ -401,7 +395,7 @@ func (h *Hub) Start() {
}
if !found {
- go Global().SetStatusOffline(userId, false)
+ go h.app.SetStatusOffline(userId, false)
}
case userId := <-h.invalidateUser: