From 97558f6a6ec4c53fa69035fb430ead209d9c222d Mon Sep 17 00:00:00 2001 From: Joram Wilander Date: Fri, 13 Jan 2017 13:53:37 -0500 Subject: PLT-4938 Add app package and move logic over from api package (#4931) * Add app package and move logic over from api package * Change app package functions to return errors * Move non-api tests into app package * Fix merge --- app/web_hub.go | 241 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100644 app/web_hub.go (limited to 'app/web_hub.go') diff --git a/app/web_hub.go b/app/web_hub.go new file mode 100644 index 000000000..28d2c0095 --- /dev/null +++ b/app/web_hub.go @@ -0,0 +1,241 @@ +// Copyright (c) 2015 Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import ( + "fmt" + "hash/fnv" + "runtime" + + l4g "github.com/alecthomas/log4go" + + "github.com/mattermost/platform/einterfaces" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" +) + +type Hub struct { + connections map[*WebConn]bool + register chan *WebConn + unregister chan *WebConn + broadcast chan *model.WebSocketEvent + stop chan string + invalidateUser chan string +} + +var hubs []*Hub = make([]*Hub, 0) + +func NewWebHub() *Hub { + return &Hub{ + register: make(chan *WebConn), + unregister: make(chan *WebConn), + connections: make(map[*WebConn]bool, model.SESSION_CACHE_SIZE), + broadcast: make(chan *model.WebSocketEvent, 4096), + stop: make(chan string), + invalidateUser: make(chan string), + } +} + +func TotalWebsocketConnections() int { + // This is racy, but it's only used for reporting information + // so it's probably OK + count := 0 + for _, hub := range hubs { + count = count + len(hub.connections) + } + + return count +} + +func HubStart() { + l4g.Info(utils.T("api.web_hub.start.starting.debug"), runtime.NumCPU()*2) + + // Total number of hubs is twice the number of CPUs. + hubs = make([]*Hub, runtime.NumCPU()*2) + + for i := 0; i < len(hubs); i++ { + hubs[i] = NewWebHub() + hubs[i].Start() + } +} + +func HubStop() { + l4g.Info(utils.T("api.web_hub.start.stopping.debug")) + + for _, hub := range hubs { + hub.Stop() + } + + hubs = make([]*Hub, 0) +} + +func GetHubForUserId(userId string) *Hub { + hash := fnv.New32a() + hash.Write([]byte(userId)) + index := hash.Sum32() % uint32(len(hubs)) + return hubs[index] +} + +func HubRegister(webConn *WebConn) { + GetHubForUserId(webConn.UserId).Register(webConn) +} + +func HubUnregister(webConn *WebConn) { + GetHubForUserId(webConn.UserId).Unregister(webConn) +} + +func Publish(message *model.WebSocketEvent) { + message.DoPreComputeJson() + for _, hub := range hubs { + hub.Broadcast(message) + } + + if einterfaces.GetClusterInterface() != nil { + einterfaces.GetClusterInterface().Publish(message) + } +} + +func PublishSkipClusterSend(message *model.WebSocketEvent) { + message.DoPreComputeJson() + for _, hub := range hubs { + hub.Broadcast(message) + } +} + +func InvalidateCacheForChannel(channelId string) { + InvalidateCacheForChannelSkipClusterSend(channelId) + + if cluster := einterfaces.GetClusterInterface(); cluster != nil { + cluster.InvalidateCacheForChannel(channelId) + } +} + +func InvalidateCacheForChannelSkipClusterSend(channelId string) { + Srv.Store.User().InvalidateProfilesInChannelCache(channelId) + Srv.Store.Channel().InvalidateMemberCount(channelId) + Srv.Store.Channel().InvalidateChannel(channelId) +} + +func InvalidateCacheForChannelPosts(channelId string) { + InvalidateCacheForChannelPostsSkipClusterSend(channelId) + + if cluster := einterfaces.GetClusterInterface(); cluster != nil { + cluster.InvalidateCacheForChannelPosts(channelId) + } +} + +func InvalidateCacheForChannelPostsSkipClusterSend(channelId string) { + Srv.Store.Post().InvalidateLastPostTimeCache(channelId) +} + +func InvalidateCacheForUser(userId string) { + InvalidateCacheForUserSkipClusterSend(userId) + + if einterfaces.GetClusterInterface() != nil { + einterfaces.GetClusterInterface().InvalidateCacheForUser(userId) + } +} + +func InvalidateCacheForUserSkipClusterSend(userId string) { + Srv.Store.Channel().InvalidateAllChannelMembersForUser(userId) + Srv.Store.User().InvalidateProfilesInChannelCacheByUser(userId) + Srv.Store.User().InvalidatProfileCacheForUser(userId) + + if len(hubs) != 0 { + GetHubForUserId(userId).InvalidateUser(userId) + } +} + +func InvalidateWebConnSessionCacheForUser(userId string) { + if len(hubs) != 0 { + GetHubForUserId(userId).InvalidateUser(userId) + } +} + +func (h *Hub) Register(webConn *WebConn) { + h.register <- webConn + + if webConn.IsAuthenticated() { + webConn.SendHello() + } +} + +func (h *Hub) Unregister(webConn *WebConn) { + h.unregister <- webConn +} + +func (h *Hub) Broadcast(message *model.WebSocketEvent) { + if message != nil { + h.broadcast <- message + } +} + +func (h *Hub) InvalidateUser(userId string) { + h.invalidateUser <- userId +} + +func (h *Hub) Stop() { + h.stop <- "all" +} + +func (h *Hub) Start() { + go func() { + for { + select { + case webCon := <-h.register: + h.connections[webCon] = true + + case webCon := <-h.unregister: + userId := webCon.UserId + if _, ok := h.connections[webCon]; ok { + delete(h.connections, webCon) + close(webCon.Send) + } + + if len(userId) == 0 { + continue + } + + found := false + for webCon := range h.connections { + if userId == webCon.UserId { + found = true + break + } + } + + if !found { + go SetStatusOffline(userId, false) + } + + case userId := <-h.invalidateUser: + for webCon := range h.connections { + if webCon.UserId == userId { + webCon.InvalidateCache() + } + } + + case msg := <-h.broadcast: + for webCon := range h.connections { + if webCon.ShouldSendEvent(msg) { + select { + case webCon.Send <- msg: + default: + l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId)) + close(webCon.Send) + delete(h.connections, webCon) + } + } + } + + case <-h.stop: + for webCon := range h.connections { + webCon.WebSocket.Close() + } + + return + } + } + }() +} -- cgit v1.2.3-1-g7c22