summaryrefslogtreecommitdiffstats
path: root/api
diff options
context:
space:
mode:
authorCorey Hulen <corey@hulen.com>2015-07-17 14:45:50 -0800
committerCorey Hulen <corey@hulen.com>2015-07-17 14:45:50 -0800
commitd65f199e12b19676bfac5e315ee634a8d3f05b88 (patch)
tree5d32f8193910f4a644e17a2c9d389ea3e01dffc0 /api
parentf1329787357544b1f4ecdf36586b93206d56832e (diff)
parent246d12aaf23fa3a2c23225b33a333effff76253b (diff)
downloadchat-d65f199e12b19676bfac5e315ee634a8d3f05b88.tar.gz
chat-d65f199e12b19676bfac5e315ee634a8d3f05b88.tar.bz2
chat-d65f199e12b19676bfac5e315ee634a8d3f05b88.zip
Merge pull request #185 from mattermost/mm-1348
fixes mm-1348 removing dependency on redis
Diffstat (limited to 'api')
-rw-r--r--api/channel.go5
-rw-r--r--api/command.go2
-rw-r--r--api/post.go8
-rw-r--r--api/server.go3
-rw-r--r--api/user.go2
-rw-r--r--api/web_conn.go2
-rw-r--r--api/web_hub.go38
-rw-r--r--api/web_socket_test.go3
-rw-r--r--api/web_team_hub.go40
9 files changed, 35 insertions, 68 deletions
diff --git a/api/channel.go b/api/channel.go
index 88db27def..4d8dbad09 100644
--- a/api/channel.go
+++ b/api/channel.go
@@ -8,7 +8,6 @@ import (
"fmt"
"github.com/gorilla/mux"
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
"net/http"
"strings"
)
@@ -542,7 +541,7 @@ func updateLastViewedAt(c *Context, w http.ResponseWriter, r *http.Request) {
message := model.NewMessage(c.Session.TeamId, id, c.Session.UserId, model.ACTION_VIEWED)
message.Add("channel_id", id)
- store.PublishAndForget(message)
+ PublishAndForget(message)
result := make(map[string]string)
result["id"] = id
@@ -657,7 +656,7 @@ func addChannelMember(c *Context, w http.ResponseWriter, r *http.Request) {
message := model.NewMessage(c.Session.TeamId, "", userId, model.ACTION_USER_ADDED)
- store.PublishAndForget(message)
+ PublishAndForget(message)
<-Srv.Store.Channel().UpdateLastViewedAt(id, oUser.Id)
w.Write([]byte(cm.ToJson()))
diff --git a/api/command.go b/api/command.go
index 810a8a07e..ee7a11af3 100644
--- a/api/command.go
+++ b/api/command.go
@@ -27,8 +27,6 @@ var commands = []commandHandler{
func InitCommand(r *mux.Router) {
l4g.Debug("Initializing command api routes")
r.Handle("/command", ApiUserRequired(command)).Methods("POST")
-
- hub.Start()
}
func command(c *Context, w http.ResponseWriter, r *http.Request) {
diff --git a/api/post.go b/api/post.go
index 5b7983386..27dedbf71 100644
--- a/api/post.go
+++ b/api/post.go
@@ -11,10 +11,10 @@ import (
"github.com/mattermost/platform/store"
"github.com/mattermost/platform/utils"
"net/http"
+ "path/filepath"
"strconv"
"strings"
"time"
- "path/filepath"
)
func InitPost(r *mux.Router) {
@@ -450,7 +450,7 @@ func fireAndForgetNotifications(post *model.Post, teamId, teamUrl string) {
message.Add("mentions", model.ArrayToJson(mentionedUsers))
}
- store.PublishAndForget(message)
+ PublishAndForget(message)
}()
}
@@ -516,7 +516,7 @@ func updatePost(c *Context, w http.ResponseWriter, r *http.Request) {
message.Add("channel_id", rpost.ChannelId)
message.Add("message", rpost.Message)
- store.PublishAndForget(message)
+ PublishAndForget(message)
w.Write([]byte(rpost.ToJson()))
}
@@ -666,7 +666,7 @@ func deletePost(c *Context, w http.ResponseWriter, r *http.Request) {
message.Add("post_id", post.Id)
message.Add("channel_id", post.ChannelId)
- store.PublishAndForget(message)
+ PublishAndForget(message)
result := make(map[string]string)
result["id"] = postId
diff --git a/api/server.go b/api/server.go
index 58986a8d4..3163f79f5 100644
--- a/api/server.go
+++ b/api/server.go
@@ -28,7 +28,6 @@ func NewServer() {
Srv = &Server{}
Srv.Server = manners.NewServer()
Srv.Store = store.NewSqlStore()
- store.RedisClient()
Srv.Router = mux.NewRouter()
Srv.Router.NotFoundHandler = http.HandlerFunc(Handle404)
@@ -54,7 +53,7 @@ func StopServer() {
Srv.Server.Shutdown <- true
Srv.Store.Close()
- store.RedisClose()
+ hub.Stop()
l4g.Info("Server stopped")
}
diff --git a/api/user.go b/api/user.go
index 10948c63d..f6422f844 100644
--- a/api/user.go
+++ b/api/user.go
@@ -196,7 +196,7 @@ func CreateUser(c *Context, team *model.Team, user *model.User) *model.User {
// This message goes to every channel, so the channelId is irrelevant
message := model.NewMessage(team.Id, "", ruser.Id, model.ACTION_NEW_USER)
- store.PublishAndForget(message)
+ PublishAndForget(message)
return ruser
}
diff --git a/api/web_conn.go b/api/web_conn.go
index 751f6f407..0990de8ef 100644
--- a/api/web_conn.go
+++ b/api/web_conn.go
@@ -70,7 +70,7 @@ func (c *WebConn) readPump() {
} else {
msg.TeamId = c.TeamId
msg.UserId = c.UserId
- store.PublishAndForget(&msg)
+ PublishAndForget(&msg)
}
}
}
diff --git a/api/web_hub.go b/api/web_hub.go
index bf5fbb321..c7be19cac 100644
--- a/api/web_hub.go
+++ b/api/web_hub.go
@@ -5,12 +5,14 @@ package api
import (
l4g "code.google.com/p/log4go"
+ "github.com/mattermost/platform/model"
)
type Hub struct {
teamHubs map[string]*TeamHub
register chan *WebConn
unregister chan *WebConn
+ broadcast chan *model.Message
stop chan string
}
@@ -18,9 +20,16 @@ var hub = &Hub{
register: make(chan *WebConn),
unregister: make(chan *WebConn),
teamHubs: make(map[string]*TeamHub),
+ broadcast: make(chan *model.Message),
stop: make(chan string),
}
+func PublishAndForget(message *model.Message) {
+ go func() {
+ hub.Broadcast(message)
+ }()
+}
+
func (h *Hub) Register(webConn *WebConn) {
h.register <- webConn
}
@@ -29,8 +38,14 @@ func (h *Hub) Unregister(webConn *WebConn) {
h.unregister <- webConn
}
-func (h *Hub) Stop(teamId string) {
- h.stop <- teamId
+func (h *Hub) Broadcast(message *model.Message) {
+ if message != nil {
+ h.broadcast <- message
+ }
+}
+
+func (h *Hub) Stop() {
+ h.stop <- "all"
}
func (h *Hub) Start() {
@@ -53,18 +68,17 @@ func (h *Hub) Start() {
if nh, ok := h.teamHubs[c.TeamId]; ok {
nh.Unregister(c)
}
-
+ case msg := <-h.broadcast:
+ nh := h.teamHubs[msg.TeamId]
+ if nh != nil {
+ nh.broadcast <- msg
+ }
case s := <-h.stop:
- if len(s) == 0 {
- l4g.Debug("stopping all connections")
- for _, v := range h.teamHubs {
- v.Stop()
- }
- return
- } else if nh, ok := h.teamHubs[s]; ok {
- delete(h.teamHubs, s)
- nh.Stop()
+ l4g.Debug("stopping %v connections", s)
+ for _, v := range h.teamHubs {
+ v.Stop()
}
+ return
}
}
}()
diff --git a/api/web_socket_test.go b/api/web_socket_test.go
index e52a4f731..07fdfea36 100644
--- a/api/web_socket_test.go
+++ b/api/web_socket_test.go
@@ -115,9 +115,6 @@ func TestSocket(t *testing.T) {
}()
time.Sleep(2 * time.Second)
-
- hub.Stop(team.Id)
-
}
func TestZZWebSocketTearDown(t *testing.T) {
diff --git a/api/web_team_hub.go b/api/web_team_hub.go
index 7c7981e76..7a63b84d1 100644
--- a/api/web_team_hub.go
+++ b/api/web_team_hub.go
@@ -6,8 +6,6 @@ package api
import (
l4g "code.google.com/p/log4go"
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
- "strings"
)
type TeamHub struct {
@@ -43,43 +41,6 @@ func (h *TeamHub) Stop() {
}
func (h *TeamHub) Start() {
-
- pubsub := store.RedisClient().PubSub()
-
- go func() {
- defer func() {
- l4g.Debug("redis reader finished for teamId=%v", h.teamId)
- hub.Stop(h.teamId)
- }()
-
- l4g.Debug("redis reader starting for teamId=%v", h.teamId)
-
- err := pubsub.Subscribe(h.teamId)
- if err != nil {
- l4g.Error("Error while subscribing to redis %v %v", h.teamId, err)
- return
- }
-
- for {
- if payload, err := pubsub.ReceiveTimeout(REDIS_WAIT); err != nil {
- if strings.Contains(err.Error(), "i/o timeout") {
- if len(h.connections) == 0 {
- l4g.Debug("No active connections so sending stop %v", h.teamId)
- return
- }
- } else {
- return
- }
- } else {
- msg := store.GetMessageFromPayload(payload)
- if msg != nil {
- h.broadcast <- msg
- }
- }
- }
-
- }()
-
go func() {
for {
select {
@@ -110,7 +71,6 @@ func (h *TeamHub) Start() {
webCon.WebSocket.Close()
}
- pubsub.Close()
return
}
}