summaryrefslogtreecommitdiffstats
path: root/api/web_hub.go
diff options
context:
space:
mode:
Diffstat (limited to 'api/web_hub.go')
-rw-r--r--api/web_hub.go128
1 files changed, 90 insertions, 38 deletions
diff --git a/api/web_hub.go b/api/web_hub.go
index 5fe9d6ae8..241ebcef0 100644
--- a/api/web_hub.go
+++ b/api/web_hub.go
@@ -10,19 +10,21 @@ import (
)
type Hub struct {
- teamHubs map[string]*TeamHub
- register chan *WebConn
- unregister chan *WebConn
- broadcast chan *model.Message
- stop chan string
+ connections map[*WebConn]bool
+ register chan *WebConn
+ unregister chan *WebConn
+ broadcast chan *model.Message
+ stop chan string
+ invalidateUser chan string
}
var hub = &Hub{
- register: make(chan *WebConn),
- unregister: make(chan *WebConn),
- teamHubs: make(map[string]*TeamHub),
- broadcast: make(chan *model.Message),
- stop: make(chan string),
+ register: make(chan *WebConn),
+ unregister: make(chan *WebConn),
+ connections: make(map[*WebConn]bool),
+ broadcast: make(chan *model.Message),
+ stop: make(chan string),
+ invalidateUser: make(chan string),
}
func PublishAndForget(message *model.Message) {
@@ -31,16 +33,8 @@ func PublishAndForget(message *model.Message) {
}()
}
-func UpdateChannelAccessCache(teamId, userId, channelId string) {
- if nh, ok := hub.teamHubs[teamId]; ok {
- nh.UpdateChannelAccessCache(userId, channelId)
- }
-}
-
-func UpdateChannelAccessCacheAndForget(teamId, userId, channelId string) {
- go func() {
- UpdateChannelAccessCache(teamId, userId, channelId)
- }()
+func InvalidateCacheForUser(userId string) {
+ hub.invalidateUser <- userId
}
func (h *Hub) Register(webConn *WebConn) {
@@ -65,34 +59,92 @@ func (h *Hub) Start() {
go func() {
for {
select {
+ case webCon := <-h.register:
+ h.connections[webCon] = true
- case c := <-h.register:
- nh := h.teamHubs[c.TeamId]
-
- if nh == nil {
- nh = NewTeamHub(c.TeamId)
- h.teamHubs[c.TeamId] = nh
- nh.Start()
+ case webCon := <-h.unregister:
+ if _, ok := h.connections[webCon]; ok {
+ delete(h.connections, webCon)
+ close(webCon.Send)
}
-
- nh.Register(c)
-
- case c := <-h.unregister:
- if nh, ok := h.teamHubs[c.TeamId]; ok {
- nh.Unregister(c)
+ case userId := <-h.invalidateUser:
+ for webCon := range h.connections {
+ if webCon.UserId == userId {
+ webCon.InvalidateCache()
+ }
}
+
case msg := <-h.broadcast:
- nh := h.teamHubs[msg.TeamId]
- if nh != nil {
- nh.broadcast <- msg
+ for webCon := range h.connections {
+ if shouldSendEvent(webCon, msg) {
+ select {
+ case webCon.Send <- msg:
+ default:
+ close(webCon.Send)
+ delete(h.connections, webCon)
+ }
+ }
}
+
case s := <-h.stop:
l4g.Debug(utils.T("api.web_hub.start.stopping.debug"), s)
- for _, v := range h.teamHubs {
- v.Stop()
+
+ for webCon := range h.connections {
+ webCon.WebSocket.Close()
}
+
return
}
}
}()
}
+
+func shouldSendEvent(webCon *WebConn, msg *model.Message) bool {
+
+ if webCon.UserId == msg.UserId {
+ // Don't need to tell the user they are typing
+ if msg.Action == model.ACTION_TYPING {
+ return false
+ }
+
+ // We have to make sure the user is in the channel. Otherwise system messages that
+ // post about users in channels they are not in trigger warnings.
+ if len(msg.ChannelId) > 0 {
+ allowed := webCon.HasPermissionsToChannel(msg.ChannelId)
+
+ if !allowed {
+ return false
+ }
+ }
+ } else {
+ // Don't share a user's view or preference events with other users
+ if msg.Action == model.ACTION_CHANNEL_VIEWED {
+ return false
+ } else if msg.Action == model.ACTION_PREFERENCE_CHANGED {
+ return false
+ } else if msg.Action == model.ACTION_EPHEMERAL_MESSAGE {
+ // For now, ephemeral messages are sent directly to individual users
+ return false
+ }
+
+ // Only report events to users who are in the team for the event
+ if len(msg.TeamId) > 0 {
+ allowed := webCon.HasPermissionsToTeam(msg.TeamId)
+
+ if !allowed {
+ return false
+ }
+ }
+
+ // Only report events to users who are in the channel for the event
+ if len(msg.ChannelId) > 0 {
+ allowed := webCon.HasPermissionsToChannel(msg.ChannelId)
+
+ if !allowed {
+ return false
+ }
+ }
+ }
+
+ return true
+}