summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
author=Corey Hulen <corey@hulen.com>2015-07-14 15:12:04 -0800
committer=Corey Hulen <corey@hulen.com>2015-07-14 15:12:04 -0800
commit246d12aaf23fa3a2c23225b33a333effff76253b (patch)
treec2a19c301b34391427e60e0e5cfcbf9879a44ee7
parenta45dea6b06bb62b3fb953d598854c371f417082d (diff)
downloadchat-246d12aaf23fa3a2c23225b33a333effff76253b.tar.gz
chat-246d12aaf23fa3a2c23225b33a333effff76253b.tar.bz2
chat-246d12aaf23fa3a2c23225b33a333effff76253b.zip
fixes mm-1348 removing dependency on redis
-rw-r--r--api/channel.go5
-rw-r--r--api/command.go2
-rw-r--r--api/post.go8
-rw-r--r--api/server.go3
-rw-r--r--api/user.go2
-rw-r--r--api/web_conn.go2
-rw-r--r--api/web_hub.go38
-rw-r--r--api/web_socket_test.go3
-rw-r--r--api/web_team_hub.go40
-rw-r--r--config/config.json4
-rw-r--r--config/config_docker.json4
-rw-r--r--store/redis.go75
-rw-r--r--store/redis_test.go59
-rw-r--r--utils/config.go6
14 files changed, 35 insertions, 216 deletions
diff --git a/api/channel.go b/api/channel.go
index 88db27def..4d8dbad09 100644
--- a/api/channel.go
+++ b/api/channel.go
@@ -8,7 +8,6 @@ import (
"fmt"
"github.com/gorilla/mux"
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
"net/http"
"strings"
)
@@ -542,7 +541,7 @@ func updateLastViewedAt(c *Context, w http.ResponseWriter, r *http.Request) {
message := model.NewMessage(c.Session.TeamId, id, c.Session.UserId, model.ACTION_VIEWED)
message.Add("channel_id", id)
- store.PublishAndForget(message)
+ PublishAndForget(message)
result := make(map[string]string)
result["id"] = id
@@ -657,7 +656,7 @@ func addChannelMember(c *Context, w http.ResponseWriter, r *http.Request) {
message := model.NewMessage(c.Session.TeamId, "", userId, model.ACTION_USER_ADDED)
- store.PublishAndForget(message)
+ PublishAndForget(message)
<-Srv.Store.Channel().UpdateLastViewedAt(id, oUser.Id)
w.Write([]byte(cm.ToJson()))
diff --git a/api/command.go b/api/command.go
index 810a8a07e..ee7a11af3 100644
--- a/api/command.go
+++ b/api/command.go
@@ -27,8 +27,6 @@ var commands = []commandHandler{
func InitCommand(r *mux.Router) {
l4g.Debug("Initializing command api routes")
r.Handle("/command", ApiUserRequired(command)).Methods("POST")
-
- hub.Start()
}
func command(c *Context, w http.ResponseWriter, r *http.Request) {
diff --git a/api/post.go b/api/post.go
index 02f997166..aa9b13292 100644
--- a/api/post.go
+++ b/api/post.go
@@ -11,10 +11,10 @@ import (
"github.com/mattermost/platform/store"
"github.com/mattermost/platform/utils"
"net/http"
+ "path/filepath"
"strconv"
"strings"
"time"
- "path/filepath"
)
func InitPost(r *mux.Router) {
@@ -455,7 +455,7 @@ func fireAndForgetNotifications(post *model.Post, teamId, teamUrl string) {
message.Add("mentions", model.ArrayToJson(mentionedUsers))
}
- store.PublishAndForget(message)
+ PublishAndForget(message)
}()
}
@@ -521,7 +521,7 @@ func updatePost(c *Context, w http.ResponseWriter, r *http.Request) {
message.Add("channel_id", rpost.ChannelId)
message.Add("message", rpost.Message)
- store.PublishAndForget(message)
+ PublishAndForget(message)
w.Write([]byte(rpost.ToJson()))
}
@@ -670,7 +670,7 @@ func deletePost(c *Context, w http.ResponseWriter, r *http.Request) {
message.Add("post_id", post.Id)
message.Add("channel_id", post.ChannelId)
- store.PublishAndForget(message)
+ PublishAndForget(message)
result := make(map[string]string)
result["id"] = postId
diff --git a/api/server.go b/api/server.go
index 58986a8d4..3163f79f5 100644
--- a/api/server.go
+++ b/api/server.go
@@ -28,7 +28,6 @@ func NewServer() {
Srv = &Server{}
Srv.Server = manners.NewServer()
Srv.Store = store.NewSqlStore()
- store.RedisClient()
Srv.Router = mux.NewRouter()
Srv.Router.NotFoundHandler = http.HandlerFunc(Handle404)
@@ -54,7 +53,7 @@ func StopServer() {
Srv.Server.Shutdown <- true
Srv.Store.Close()
- store.RedisClose()
+ hub.Stop()
l4g.Info("Server stopped")
}
diff --git a/api/user.go b/api/user.go
index 5b052e826..df1f45042 100644
--- a/api/user.go
+++ b/api/user.go
@@ -196,7 +196,7 @@ func CreateUser(c *Context, team *model.Team, user *model.User) *model.User {
// This message goes to every channel, so the channelId is irrelevant
message := model.NewMessage(team.Id, "", ruser.Id, model.ACTION_NEW_USER)
- store.PublishAndForget(message)
+ PublishAndForget(message)
return ruser
}
diff --git a/api/web_conn.go b/api/web_conn.go
index 751f6f407..0990de8ef 100644
--- a/api/web_conn.go
+++ b/api/web_conn.go
@@ -70,7 +70,7 @@ func (c *WebConn) readPump() {
} else {
msg.TeamId = c.TeamId
msg.UserId = c.UserId
- store.PublishAndForget(&msg)
+ PublishAndForget(&msg)
}
}
}
diff --git a/api/web_hub.go b/api/web_hub.go
index bf5fbb321..c7be19cac 100644
--- a/api/web_hub.go
+++ b/api/web_hub.go
@@ -5,12 +5,14 @@ package api
import (
l4g "code.google.com/p/log4go"
+ "github.com/mattermost/platform/model"
)
type Hub struct {
teamHubs map[string]*TeamHub
register chan *WebConn
unregister chan *WebConn
+ broadcast chan *model.Message
stop chan string
}
@@ -18,9 +20,16 @@ var hub = &Hub{
register: make(chan *WebConn),
unregister: make(chan *WebConn),
teamHubs: make(map[string]*TeamHub),
+ broadcast: make(chan *model.Message),
stop: make(chan string),
}
+func PublishAndForget(message *model.Message) {
+ go func() {
+ hub.Broadcast(message)
+ }()
+}
+
func (h *Hub) Register(webConn *WebConn) {
h.register <- webConn
}
@@ -29,8 +38,14 @@ func (h *Hub) Unregister(webConn *WebConn) {
h.unregister <- webConn
}
-func (h *Hub) Stop(teamId string) {
- h.stop <- teamId
+func (h *Hub) Broadcast(message *model.Message) {
+ if message != nil {
+ h.broadcast <- message
+ }
+}
+
+func (h *Hub) Stop() {
+ h.stop <- "all"
}
func (h *Hub) Start() {
@@ -53,18 +68,17 @@ func (h *Hub) Start() {
if nh, ok := h.teamHubs[c.TeamId]; ok {
nh.Unregister(c)
}
-
+ case msg := <-h.broadcast:
+ nh := h.teamHubs[msg.TeamId]
+ if nh != nil {
+ nh.broadcast <- msg
+ }
case s := <-h.stop:
- if len(s) == 0 {
- l4g.Debug("stopping all connections")
- for _, v := range h.teamHubs {
- v.Stop()
- }
- return
- } else if nh, ok := h.teamHubs[s]; ok {
- delete(h.teamHubs, s)
- nh.Stop()
+ l4g.Debug("stopping %v connections", s)
+ for _, v := range h.teamHubs {
+ v.Stop()
}
+ return
}
}
}()
diff --git a/api/web_socket_test.go b/api/web_socket_test.go
index 4cb49220f..6f6a7d619 100644
--- a/api/web_socket_test.go
+++ b/api/web_socket_test.go
@@ -115,9 +115,6 @@ func TestSocket(t *testing.T) {
}()
time.Sleep(2 * time.Second)
-
- hub.Stop(team.Id)
-
}
func TestZZWebSocketTearDown(t *testing.T) {
diff --git a/api/web_team_hub.go b/api/web_team_hub.go
index 7c7981e76..7a63b84d1 100644
--- a/api/web_team_hub.go
+++ b/api/web_team_hub.go
@@ -6,8 +6,6 @@ package api
import (
l4g "code.google.com/p/log4go"
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
- "strings"
)
type TeamHub struct {
@@ -43,43 +41,6 @@ func (h *TeamHub) Stop() {
}
func (h *TeamHub) Start() {
-
- pubsub := store.RedisClient().PubSub()
-
- go func() {
- defer func() {
- l4g.Debug("redis reader finished for teamId=%v", h.teamId)
- hub.Stop(h.teamId)
- }()
-
- l4g.Debug("redis reader starting for teamId=%v", h.teamId)
-
- err := pubsub.Subscribe(h.teamId)
- if err != nil {
- l4g.Error("Error while subscribing to redis %v %v", h.teamId, err)
- return
- }
-
- for {
- if payload, err := pubsub.ReceiveTimeout(REDIS_WAIT); err != nil {
- if strings.Contains(err.Error(), "i/o timeout") {
- if len(h.connections) == 0 {
- l4g.Debug("No active connections so sending stop %v", h.teamId)
- return
- }
- } else {
- return
- }
- } else {
- msg := store.GetMessageFromPayload(payload)
- if msg != nil {
- h.broadcast <- msg
- }
- }
- }
-
- }()
-
go func() {
for {
select {
@@ -110,7 +71,6 @@ func (h *TeamHub) Start() {
webCon.WebSocket.Close()
}
- pubsub.Close()
return
}
}
diff --git a/config/config.json b/config/config.json
index b0a019e8d..dfbddaf9f 100644
--- a/config/config.json
+++ b/config/config.json
@@ -31,10 +31,6 @@
"Trace": false,
"AtRestEncryptKey": "Ya0xMrybACJ3sZZVWQC7e31h5nSDWZFS"
},
- "RedisSettings": {
- "DataSource": "dockerhost:6379",
- "MaxOpenConns": 1000
- },
"AWSSettings": {
"S3AccessKeyId": "",
"S3SecretAccessKey": "",
diff --git a/config/config_docker.json b/config/config_docker.json
index 85f0d9c73..dd7ec23ff 100644
--- a/config/config_docker.json
+++ b/config/config_docker.json
@@ -31,10 +31,6 @@
"Trace": false,
"AtRestEncryptKey": "Ya0xMrybACJ3sZZVWQC7e31h5nSDWZFS"
},
- "RedisSettings": {
- "DataSource": "localhost:6379",
- "MaxOpenConns": 1000
- },
"AWSSettings": {
"S3AccessKeyId": "",
"S3SecretAccessKey": "",
diff --git a/store/redis.go b/store/redis.go
deleted file mode 100644
index 262040d43..000000000
--- a/store/redis.go
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright (c) 2015 Spinpunch, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- l4g "code.google.com/p/log4go"
- "github.com/mattermost/platform/model"
- "github.com/mattermost/platform/utils"
- "gopkg.in/redis.v2"
- "strings"
- "time"
-)
-
-var client *redis.Client
-
-func RedisClient() *redis.Client {
-
- if client == nil {
-
- addr := utils.Cfg.RedisSettings.DataSource
-
- client = redis.NewTCPClient(&redis.Options{
- Addr: addr,
- Password: "",
- DB: 0,
- PoolSize: utils.Cfg.RedisSettings.MaxOpenConns,
- })
-
- l4g.Info("Pinging redis at '%v'", addr)
- pong, err := client.Ping().Result()
-
- if err != nil {
- l4g.Critical("Failed to open redis connection to '%v' err:%v", addr, err)
- time.Sleep(time.Second)
- panic("Failed to open redis connection " + err.Error())
- }
-
- if pong != "PONG" {
- l4g.Critical("Failed to ping redis connection to '%v' err:%v", addr, err)
- time.Sleep(time.Second)
- panic("Failed to open ping connection " + err.Error())
- }
- }
-
- return client
-}
-
-func RedisClose() {
- l4g.Info("Closing redis")
-
- if client != nil {
- client.Close()
- client = nil
- }
-}
-
-func PublishAndForget(message *model.Message) {
-
- go func() {
- c := RedisClient()
- result := c.Publish(message.TeamId, message.ToJson())
- if result.Err() != nil {
- l4g.Error("Failed to publish message err=%v, payload=%v", result.Err(), message.ToJson())
- }
- }()
-}
-
-func GetMessageFromPayload(m interface{}) *model.Message {
- if msg, found := m.(*redis.Message); found {
- return model.MessageFromJson(strings.NewReader(msg.Payload))
- } else {
- return nil
- }
-}
diff --git a/store/redis_test.go b/store/redis_test.go
deleted file mode 100644
index 11bd9ca6a..000000000
--- a/store/redis_test.go
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright (c) 2015 Spinpunch, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "fmt"
- "github.com/mattermost/platform/model"
- "github.com/mattermost/platform/utils"
- "testing"
-)
-
-func TestRedis(t *testing.T) {
- utils.LoadConfig("config.json")
-
- c := RedisClient()
-
- if c == nil {
- t.Fatal("should have a valid redis connection")
- }
-
- pubsub := c.PubSub()
- defer pubsub.Close()
-
- m := model.NewMessage(model.NewId(), model.NewId(), model.NewId(), model.ACTION_TYPING)
- m.Add("RootId", model.NewId())
-
- err := pubsub.Subscribe(m.TeamId)
- if err != nil {
- t.Fatal(err)
- }
-
- // should be the subscribe success message
- // lets gobble that up
- if _, err := pubsub.Receive(); err != nil {
- t.Fatal(err)
- }
-
- PublishAndForget(m)
-
- fmt.Println("here1")
-
- if msg, err := pubsub.Receive(); err != nil {
- t.Fatal(err)
- } else {
-
- rmsg := GetMessageFromPayload(msg)
-
- if m.TeamId != rmsg.TeamId {
- t.Fatal("Ids do not match")
- }
-
- if m.Props["RootId"] != rmsg.Props["RootId"] {
- t.Fatal("Ids do not match")
- }
- }
-
- RedisClose()
-}
diff --git a/utils/config.go b/utils/config.go
index eb2ae3050..a166c3d0e 100644
--- a/utils/config.go
+++ b/utils/config.go
@@ -42,11 +42,6 @@ type SqlSettings struct {
AtRestEncryptKey string
}
-type RedisSettings struct {
- DataSource string
- MaxOpenConns int
-}
-
type LogSettings struct {
ConsoleEnable bool
ConsoleLevel string
@@ -112,7 +107,6 @@ type Config struct {
LogSettings LogSettings
ServiceSettings ServiceSettings
SqlSettings SqlSettings
- RedisSettings RedisSettings
AWSSettings AWSSettings
ImageSettings ImageSettings
EmailSettings EmailSettings