From 246d12aaf23fa3a2c23225b33a333effff76253b Mon Sep 17 00:00:00 2001 From: =Corey Hulen Date: Tue, 14 Jul 2015 15:12:04 -0800 Subject: fixes mm-1348 removing dependency on redis --- api/channel.go | 5 ++--- api/command.go | 2 -- api/post.go | 8 ++++---- api/server.go | 3 +-- api/user.go | 2 +- api/web_conn.go | 2 +- api/web_hub.go | 38 ++++++++++++++++++++++++++------------ api/web_socket_test.go | 3 --- api/web_team_hub.go | 40 ---------------------------------------- 9 files changed, 35 insertions(+), 68 deletions(-) (limited to 'api') diff --git a/api/channel.go b/api/channel.go index 88db27def..4d8dbad09 100644 --- a/api/channel.go +++ b/api/channel.go @@ -8,7 +8,6 @@ import ( "fmt" "github.com/gorilla/mux" "github.com/mattermost/platform/model" - "github.com/mattermost/platform/store" "net/http" "strings" ) @@ -542,7 +541,7 @@ func updateLastViewedAt(c *Context, w http.ResponseWriter, r *http.Request) { message := model.NewMessage(c.Session.TeamId, id, c.Session.UserId, model.ACTION_VIEWED) message.Add("channel_id", id) - store.PublishAndForget(message) + PublishAndForget(message) result := make(map[string]string) result["id"] = id @@ -657,7 +656,7 @@ func addChannelMember(c *Context, w http.ResponseWriter, r *http.Request) { message := model.NewMessage(c.Session.TeamId, "", userId, model.ACTION_USER_ADDED) - store.PublishAndForget(message) + PublishAndForget(message) <-Srv.Store.Channel().UpdateLastViewedAt(id, oUser.Id) w.Write([]byte(cm.ToJson())) diff --git a/api/command.go b/api/command.go index 810a8a07e..ee7a11af3 100644 --- a/api/command.go +++ b/api/command.go @@ -27,8 +27,6 @@ var commands = []commandHandler{ func InitCommand(r *mux.Router) { l4g.Debug("Initializing command api routes") r.Handle("/command", ApiUserRequired(command)).Methods("POST") - - hub.Start() } func command(c *Context, w http.ResponseWriter, r *http.Request) { diff --git a/api/post.go b/api/post.go index 02f997166..aa9b13292 100644 --- a/api/post.go +++ b/api/post.go @@ -11,10 +11,10 @@ import ( "github.com/mattermost/platform/store" "github.com/mattermost/platform/utils" "net/http" + "path/filepath" "strconv" "strings" "time" - "path/filepath" ) func InitPost(r *mux.Router) { @@ -455,7 +455,7 @@ func fireAndForgetNotifications(post *model.Post, teamId, teamUrl string) { message.Add("mentions", model.ArrayToJson(mentionedUsers)) } - store.PublishAndForget(message) + PublishAndForget(message) }() } @@ -521,7 +521,7 @@ func updatePost(c *Context, w http.ResponseWriter, r *http.Request) { message.Add("channel_id", rpost.ChannelId) message.Add("message", rpost.Message) - store.PublishAndForget(message) + PublishAndForget(message) w.Write([]byte(rpost.ToJson())) } @@ -670,7 +670,7 @@ func deletePost(c *Context, w http.ResponseWriter, r *http.Request) { message.Add("post_id", post.Id) message.Add("channel_id", post.ChannelId) - store.PublishAndForget(message) + PublishAndForget(message) result := make(map[string]string) result["id"] = postId diff --git a/api/server.go b/api/server.go index 58986a8d4..3163f79f5 100644 --- a/api/server.go +++ b/api/server.go @@ -28,7 +28,6 @@ func NewServer() { Srv = &Server{} Srv.Server = manners.NewServer() Srv.Store = store.NewSqlStore() - store.RedisClient() Srv.Router = mux.NewRouter() Srv.Router.NotFoundHandler = http.HandlerFunc(Handle404) @@ -54,7 +53,7 @@ func StopServer() { Srv.Server.Shutdown <- true Srv.Store.Close() - store.RedisClose() + hub.Stop() l4g.Info("Server stopped") } diff --git a/api/user.go b/api/user.go index 5b052e826..df1f45042 100644 --- a/api/user.go +++ b/api/user.go @@ -196,7 +196,7 @@ func CreateUser(c *Context, team *model.Team, user *model.User) *model.User { // This message goes to every channel, so the channelId is irrelevant message := model.NewMessage(team.Id, "", ruser.Id, model.ACTION_NEW_USER) - store.PublishAndForget(message) + PublishAndForget(message) return ruser } diff --git a/api/web_conn.go b/api/web_conn.go index 751f6f407..0990de8ef 100644 --- a/api/web_conn.go +++ b/api/web_conn.go @@ -70,7 +70,7 @@ func (c *WebConn) readPump() { } else { msg.TeamId = c.TeamId msg.UserId = c.UserId - store.PublishAndForget(&msg) + PublishAndForget(&msg) } } } diff --git a/api/web_hub.go b/api/web_hub.go index bf5fbb321..c7be19cac 100644 --- a/api/web_hub.go +++ b/api/web_hub.go @@ -5,12 +5,14 @@ package api import ( l4g "code.google.com/p/log4go" + "github.com/mattermost/platform/model" ) type Hub struct { teamHubs map[string]*TeamHub register chan *WebConn unregister chan *WebConn + broadcast chan *model.Message stop chan string } @@ -18,9 +20,16 @@ var hub = &Hub{ register: make(chan *WebConn), unregister: make(chan *WebConn), teamHubs: make(map[string]*TeamHub), + broadcast: make(chan *model.Message), stop: make(chan string), } +func PublishAndForget(message *model.Message) { + go func() { + hub.Broadcast(message) + }() +} + func (h *Hub) Register(webConn *WebConn) { h.register <- webConn } @@ -29,8 +38,14 @@ func (h *Hub) Unregister(webConn *WebConn) { h.unregister <- webConn } -func (h *Hub) Stop(teamId string) { - h.stop <- teamId +func (h *Hub) Broadcast(message *model.Message) { + if message != nil { + h.broadcast <- message + } +} + +func (h *Hub) Stop() { + h.stop <- "all" } func (h *Hub) Start() { @@ -53,18 +68,17 @@ func (h *Hub) Start() { if nh, ok := h.teamHubs[c.TeamId]; ok { nh.Unregister(c) } - + case msg := <-h.broadcast: + nh := h.teamHubs[msg.TeamId] + if nh != nil { + nh.broadcast <- msg + } case s := <-h.stop: - if len(s) == 0 { - l4g.Debug("stopping all connections") - for _, v := range h.teamHubs { - v.Stop() - } - return - } else if nh, ok := h.teamHubs[s]; ok { - delete(h.teamHubs, s) - nh.Stop() + l4g.Debug("stopping %v connections", s) + for _, v := range h.teamHubs { + v.Stop() } + return } } }() diff --git a/api/web_socket_test.go b/api/web_socket_test.go index 4cb49220f..6f6a7d619 100644 --- a/api/web_socket_test.go +++ b/api/web_socket_test.go @@ -115,9 +115,6 @@ func TestSocket(t *testing.T) { }() time.Sleep(2 * time.Second) - - hub.Stop(team.Id) - } func TestZZWebSocketTearDown(t *testing.T) { diff --git a/api/web_team_hub.go b/api/web_team_hub.go index 7c7981e76..7a63b84d1 100644 --- a/api/web_team_hub.go +++ b/api/web_team_hub.go @@ -6,8 +6,6 @@ package api import ( l4g "code.google.com/p/log4go" "github.com/mattermost/platform/model" - "github.com/mattermost/platform/store" - "strings" ) type TeamHub struct { @@ -43,43 +41,6 @@ func (h *TeamHub) Stop() { } func (h *TeamHub) Start() { - - pubsub := store.RedisClient().PubSub() - - go func() { - defer func() { - l4g.Debug("redis reader finished for teamId=%v", h.teamId) - hub.Stop(h.teamId) - }() - - l4g.Debug("redis reader starting for teamId=%v", h.teamId) - - err := pubsub.Subscribe(h.teamId) - if err != nil { - l4g.Error("Error while subscribing to redis %v %v", h.teamId, err) - return - } - - for { - if payload, err := pubsub.ReceiveTimeout(REDIS_WAIT); err != nil { - if strings.Contains(err.Error(), "i/o timeout") { - if len(h.connections) == 0 { - l4g.Debug("No active connections so sending stop %v", h.teamId) - return - } - } else { - return - } - } else { - msg := store.GetMessageFromPayload(payload) - if msg != nil { - h.broadcast <- msg - } - } - } - - }() - go func() { for { select { @@ -110,7 +71,6 @@ func (h *TeamHub) Start() { webCon.WebSocket.Close() } - pubsub.Close() return } } -- cgit v1.2.3-1-g7c22