summaryrefslogtreecommitdiffstats
path: root/app/web_hub.go
diff options
context:
space:
mode:
authorJoram Wilander <jwawilander@gmail.com>2017-01-13 13:53:37 -0500
committerGitHub <noreply@github.com>2017-01-13 13:53:37 -0500
commit97558f6a6ec4c53fa69035fb430ead209d9c222d (patch)
tree6fc57f5b75b15a025348c6e295cea6aedb9e69ae /app/web_hub.go
parent07bad4d6d518a9012a20fec8309cd625f57c7a8c (diff)
downloadchat-97558f6a6ec4c53fa69035fb430ead209d9c222d.tar.gz
chat-97558f6a6ec4c53fa69035fb430ead209d9c222d.tar.bz2
chat-97558f6a6ec4c53fa69035fb430ead209d9c222d.zip
PLT-4938 Add app package and move logic over from api package (#4931)
* Add app package and move logic over from api package * Change app package functions to return errors * Move non-api tests into app package * Fix merge
Diffstat (limited to 'app/web_hub.go')
-rw-r--r--app/web_hub.go241
1 files changed, 241 insertions, 0 deletions
diff --git a/app/web_hub.go b/app/web_hub.go
new file mode 100644
index 000000000..28d2c0095
--- /dev/null
+++ b/app/web_hub.go
@@ -0,0 +1,241 @@
+// Copyright (c) 2015 Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "fmt"
+ "hash/fnv"
+ "runtime"
+
+ l4g "github.com/alecthomas/log4go"
+
+ "github.com/mattermost/platform/einterfaces"
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+type Hub struct {
+ connections map[*WebConn]bool
+ register chan *WebConn
+ unregister chan *WebConn
+ broadcast chan *model.WebSocketEvent
+ stop chan string
+ invalidateUser chan string
+}
+
+var hubs []*Hub = make([]*Hub, 0)
+
+func NewWebHub() *Hub {
+ return &Hub{
+ register: make(chan *WebConn),
+ unregister: make(chan *WebConn),
+ connections: make(map[*WebConn]bool, model.SESSION_CACHE_SIZE),
+ broadcast: make(chan *model.WebSocketEvent, 4096),
+ stop: make(chan string),
+ invalidateUser: make(chan string),
+ }
+}
+
+func TotalWebsocketConnections() int {
+ // This is racy, but it's only used for reporting information
+ // so it's probably OK
+ count := 0
+ for _, hub := range hubs {
+ count = count + len(hub.connections)
+ }
+
+ return count
+}
+
+func HubStart() {
+ l4g.Info(utils.T("api.web_hub.start.starting.debug"), runtime.NumCPU()*2)
+
+ // Total number of hubs is twice the number of CPUs.
+ hubs = make([]*Hub, runtime.NumCPU()*2)
+
+ for i := 0; i < len(hubs); i++ {
+ hubs[i] = NewWebHub()
+ hubs[i].Start()
+ }
+}
+
+func HubStop() {
+ l4g.Info(utils.T("api.web_hub.start.stopping.debug"))
+
+ for _, hub := range hubs {
+ hub.Stop()
+ }
+
+ hubs = make([]*Hub, 0)
+}
+
+func GetHubForUserId(userId string) *Hub {
+ hash := fnv.New32a()
+ hash.Write([]byte(userId))
+ index := hash.Sum32() % uint32(len(hubs))
+ return hubs[index]
+}
+
+func HubRegister(webConn *WebConn) {
+ GetHubForUserId(webConn.UserId).Register(webConn)
+}
+
+func HubUnregister(webConn *WebConn) {
+ GetHubForUserId(webConn.UserId).Unregister(webConn)
+}
+
+func Publish(message *model.WebSocketEvent) {
+ message.DoPreComputeJson()
+ for _, hub := range hubs {
+ hub.Broadcast(message)
+ }
+
+ if einterfaces.GetClusterInterface() != nil {
+ einterfaces.GetClusterInterface().Publish(message)
+ }
+}
+
+func PublishSkipClusterSend(message *model.WebSocketEvent) {
+ message.DoPreComputeJson()
+ for _, hub := range hubs {
+ hub.Broadcast(message)
+ }
+}
+
+func InvalidateCacheForChannel(channelId string) {
+ InvalidateCacheForChannelSkipClusterSend(channelId)
+
+ if cluster := einterfaces.GetClusterInterface(); cluster != nil {
+ cluster.InvalidateCacheForChannel(channelId)
+ }
+}
+
+func InvalidateCacheForChannelSkipClusterSend(channelId string) {
+ Srv.Store.User().InvalidateProfilesInChannelCache(channelId)
+ Srv.Store.Channel().InvalidateMemberCount(channelId)
+ Srv.Store.Channel().InvalidateChannel(channelId)
+}
+
+func InvalidateCacheForChannelPosts(channelId string) {
+ InvalidateCacheForChannelPostsSkipClusterSend(channelId)
+
+ if cluster := einterfaces.GetClusterInterface(); cluster != nil {
+ cluster.InvalidateCacheForChannelPosts(channelId)
+ }
+}
+
+func InvalidateCacheForChannelPostsSkipClusterSend(channelId string) {
+ Srv.Store.Post().InvalidateLastPostTimeCache(channelId)
+}
+
+func InvalidateCacheForUser(userId string) {
+ InvalidateCacheForUserSkipClusterSend(userId)
+
+ if einterfaces.GetClusterInterface() != nil {
+ einterfaces.GetClusterInterface().InvalidateCacheForUser(userId)
+ }
+}
+
+func InvalidateCacheForUserSkipClusterSend(userId string) {
+ Srv.Store.Channel().InvalidateAllChannelMembersForUser(userId)
+ Srv.Store.User().InvalidateProfilesInChannelCacheByUser(userId)
+ Srv.Store.User().InvalidatProfileCacheForUser(userId)
+
+ if len(hubs) != 0 {
+ GetHubForUserId(userId).InvalidateUser(userId)
+ }
+}
+
+func InvalidateWebConnSessionCacheForUser(userId string) {
+ if len(hubs) != 0 {
+ GetHubForUserId(userId).InvalidateUser(userId)
+ }
+}
+
+func (h *Hub) Register(webConn *WebConn) {
+ h.register <- webConn
+
+ if webConn.IsAuthenticated() {
+ webConn.SendHello()
+ }
+}
+
+func (h *Hub) Unregister(webConn *WebConn) {
+ h.unregister <- webConn
+}
+
+func (h *Hub) Broadcast(message *model.WebSocketEvent) {
+ if message != nil {
+ h.broadcast <- message
+ }
+}
+
+func (h *Hub) InvalidateUser(userId string) {
+ h.invalidateUser <- userId
+}
+
+func (h *Hub) Stop() {
+ h.stop <- "all"
+}
+
+func (h *Hub) Start() {
+ go func() {
+ for {
+ select {
+ case webCon := <-h.register:
+ h.connections[webCon] = true
+
+ case webCon := <-h.unregister:
+ userId := webCon.UserId
+ if _, ok := h.connections[webCon]; ok {
+ delete(h.connections, webCon)
+ close(webCon.Send)
+ }
+
+ if len(userId) == 0 {
+ continue
+ }
+
+ found := false
+ for webCon := range h.connections {
+ if userId == webCon.UserId {
+ found = true
+ break
+ }
+ }
+
+ if !found {
+ go SetStatusOffline(userId, false)
+ }
+
+ case userId := <-h.invalidateUser:
+ for webCon := range h.connections {
+ if webCon.UserId == userId {
+ webCon.InvalidateCache()
+ }
+ }
+
+ case msg := <-h.broadcast:
+ for webCon := range h.connections {
+ if webCon.ShouldSendEvent(msg) {
+ select {
+ case webCon.Send <- msg:
+ default:
+ l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId))
+ close(webCon.Send)
+ delete(h.connections, webCon)
+ }
+ }
+ }
+
+ case <-h.stop:
+ for webCon := range h.connections {
+ webCon.WebSocket.Close()
+ }
+
+ return
+ }
+ }
+ }()
+}