diff options
Diffstat (limited to 'api/web_hub.go')
-rw-r--r-- | api/web_hub.go | 128 |
1 files changed, 90 insertions, 38 deletions
diff --git a/api/web_hub.go b/api/web_hub.go index 5fe9d6ae8..241ebcef0 100644 --- a/api/web_hub.go +++ b/api/web_hub.go @@ -10,19 +10,21 @@ import ( ) type Hub struct { - teamHubs map[string]*TeamHub - register chan *WebConn - unregister chan *WebConn - broadcast chan *model.Message - stop chan string + connections map[*WebConn]bool + register chan *WebConn + unregister chan *WebConn + broadcast chan *model.Message + stop chan string + invalidateUser chan string } var hub = &Hub{ - register: make(chan *WebConn), - unregister: make(chan *WebConn), - teamHubs: make(map[string]*TeamHub), - broadcast: make(chan *model.Message), - stop: make(chan string), + register: make(chan *WebConn), + unregister: make(chan *WebConn), + connections: make(map[*WebConn]bool), + broadcast: make(chan *model.Message), + stop: make(chan string), + invalidateUser: make(chan string), } func PublishAndForget(message *model.Message) { @@ -31,16 +33,8 @@ func PublishAndForget(message *model.Message) { }() } -func UpdateChannelAccessCache(teamId, userId, channelId string) { - if nh, ok := hub.teamHubs[teamId]; ok { - nh.UpdateChannelAccessCache(userId, channelId) - } -} - -func UpdateChannelAccessCacheAndForget(teamId, userId, channelId string) { - go func() { - UpdateChannelAccessCache(teamId, userId, channelId) - }() +func InvalidateCacheForUser(userId string) { + hub.invalidateUser <- userId } func (h *Hub) Register(webConn *WebConn) { @@ -65,34 +59,92 @@ func (h *Hub) Start() { go func() { for { select { + case webCon := <-h.register: + h.connections[webCon] = true - case c := <-h.register: - nh := h.teamHubs[c.TeamId] - - if nh == nil { - nh = NewTeamHub(c.TeamId) - h.teamHubs[c.TeamId] = nh - nh.Start() + case webCon := <-h.unregister: + if _, ok := h.connections[webCon]; ok { + delete(h.connections, webCon) + close(webCon.Send) } - - nh.Register(c) - - case c := <-h.unregister: - if nh, ok := h.teamHubs[c.TeamId]; ok { - nh.Unregister(c) + case userId := <-h.invalidateUser: + for webCon := range h.connections { + if webCon.UserId == userId { + webCon.InvalidateCache() + } } + case msg := <-h.broadcast: - nh := h.teamHubs[msg.TeamId] - if nh != nil { - nh.broadcast <- msg + for webCon := range h.connections { + if shouldSendEvent(webCon, msg) { + select { + case webCon.Send <- msg: + default: + close(webCon.Send) + delete(h.connections, webCon) + } + } } + case s := <-h.stop: l4g.Debug(utils.T("api.web_hub.start.stopping.debug"), s) - for _, v := range h.teamHubs { - v.Stop() + + for webCon := range h.connections { + webCon.WebSocket.Close() } + return } } }() } + +func shouldSendEvent(webCon *WebConn, msg *model.Message) bool { + + if webCon.UserId == msg.UserId { + // Don't need to tell the user they are typing + if msg.Action == model.ACTION_TYPING { + return false + } + + // We have to make sure the user is in the channel. Otherwise system messages that + // post about users in channels they are not in trigger warnings. + if len(msg.ChannelId) > 0 { + allowed := webCon.HasPermissionsToChannel(msg.ChannelId) + + if !allowed { + return false + } + } + } else { + // Don't share a user's view or preference events with other users + if msg.Action == model.ACTION_CHANNEL_VIEWED { + return false + } else if msg.Action == model.ACTION_PREFERENCE_CHANGED { + return false + } else if msg.Action == model.ACTION_EPHEMERAL_MESSAGE { + // For now, ephemeral messages are sent directly to individual users + return false + } + + // Only report events to users who are in the team for the event + if len(msg.TeamId) > 0 { + allowed := webCon.HasPermissionsToTeam(msg.TeamId) + + if !allowed { + return false + } + } + + // Only report events to users who are in the channel for the event + if len(msg.ChannelId) > 0 { + allowed := webCon.HasPermissionsToChannel(msg.ChannelId) + + if !allowed { + return false + } + } + } + + return true +} |