summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--api/api.go2
-rw-r--r--api/apitestlib.go4
-rw-r--r--api/cli_test.go5
-rw-r--r--api/emoji.go2
-rw-r--r--api/post.go2
-rw-r--r--api/reaction.go18
-rw-r--r--api/websocket.go3
-rw-r--r--api/websocket_test.go5
-rw-r--r--api4/api.go2
-rw-r--r--api4/apitestlib.go4
-rw-r--r--api4/websocket.go3
-rw-r--r--app/admin.go4
-rw-r--r--app/analytics.go4
-rw-r--r--app/app.go5
-rw-r--r--app/auto_environment.go10
-rw-r--r--app/auto_users.go8
-rw-r--r--app/channel.go22
-rw-r--r--app/cluster_handlers.go8
-rw-r--r--app/command.go2
-rw-r--r--app/command_expand_collapse.go2
-rw-r--r--app/command_loadtest.go4
-rw-r--r--app/email_batching.go26
-rw-r--r--app/email_batching_test.go12
-rw-r--r--app/emoji.go2
-rw-r--r--app/notification.go16
-rw-r--r--app/plugins.go4
-rw-r--r--app/post.go20
-rw-r--r--app/preference.go4
-rw-r--r--app/reaction.go4
-rw-r--r--app/server.go2
-rw-r--r--app/session.go8
-rw-r--r--app/status.go10
-rw-r--r--app/team.go22
-rw-r--r--app/user.go12
-rw-r--r--app/web_conn.go2
-rw-r--r--app/web_hub.go74
-rw-r--r--app/websocket_router.go18
-rw-r--r--cmd/platform/server.go4
-rw-r--r--cmd/platform/test.go8
-rw-r--r--wsapi/api.go22
-rw-r--r--wsapi/status.go10
-rw-r--r--wsapi/system.go5
-rw-r--r--wsapi/user.go9
-rw-r--r--wsapi/webrtc.go9
-rw-r--r--wsapi/websocket_handler.go7
45 files changed, 207 insertions, 222 deletions
diff --git a/api/api.go b/api/api.go
index 283120b55..806ff84eb 100644
--- a/api/api.go
+++ b/api/api.go
@@ -121,7 +121,7 @@ func Init(a *app.App, root *mux.Router) *API {
utils.InitHTML()
- app.InitEmailBatching()
+ a.InitEmailBatching()
if *utils.Cfg.ServiceSettings.EnableAPIv3 {
l4g.Info("API version 3 is scheduled for deprecation. Please see https://api.mattermost.com for details.")
diff --git a/api/apitestlib.go b/api/apitestlib.go
index 8504748e1..f285b8d79 100644
--- a/api/apitestlib.go
+++ b/api/apitestlib.go
@@ -49,11 +49,11 @@ func setupTestHelper(enterprise bool) *TestHelper {
th.App.NewServer()
th.App.InitStores()
th.App.Srv.Router = NewRouter()
- wsapi.InitRouter()
+ th.App.Srv.WebSocketRouter = th.App.NewWebSocketRouter()
th.App.StartServer()
api4.Init(th.App, th.App.Srv.Router, false)
Init(th.App, th.App.Srv.Router)
- wsapi.InitApi()
+ wsapi.Init(th.App, th.App.Srv.WebSocketRouter)
utils.EnableDebugLogForTest()
th.App.Srv.Store.MarkSystemRanUnitTests()
diff --git a/api/cli_test.go b/api/cli_test.go
index 8c7381290..14645f35c 100644
--- a/api/cli_test.go
+++ b/api/cli_test.go
@@ -8,7 +8,6 @@ import (
"strings"
"testing"
- "github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
)
@@ -96,7 +95,7 @@ func TestCliCreateUserWithoutTeam(t *testing.T) {
t.SkipNow()
}
- Setup()
+ th := Setup()
id := model.NewId()
email := "success+" + id + "@simulator.amazonses.com"
username := "name" + id
@@ -108,7 +107,7 @@ func TestCliCreateUserWithoutTeam(t *testing.T) {
t.Fatal(err)
}
- if result := <-app.Global().Srv.Store.User().GetByEmail(email); result.Err != nil {
+ if result := <-th.App.Srv.Store.User().GetByEmail(email); result.Err != nil {
t.Fatal()
} else {
user := result.Data.(*model.User)
diff --git a/api/emoji.go b/api/emoji.go
index 1961ad146..b4ad9fa2c 100644
--- a/api/emoji.go
+++ b/api/emoji.go
@@ -118,7 +118,7 @@ func createEmoji(c *Context, w http.ResponseWriter, r *http.Request) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EMOJI_ADDED, "", "", "", nil)
message.Add("emoji", result.Data.(*model.Emoji).ToJson())
- app.Publish(message)
+ c.App.Publish(message)
w.Write([]byte(result.Data.(*model.Emoji).ToJson()))
}
}
diff --git a/api/post.go b/api/post.go
index 41cd7564b..db9412e7b 100644
--- a/api/post.go
+++ b/api/post.go
@@ -141,7 +141,7 @@ func saveIsPinnedPost(c *Context, w http.ResponseWriter, r *http.Request, isPinn
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", rpost.ChannelId, "", nil)
message.Add("post", rpost.ToJson())
- go app.Publish(message)
+ go c.App.Publish(message)
c.App.InvalidateCacheForChannelPosts(rpost.ChannelId)
diff --git a/api/reaction.go b/api/reaction.go
index 28cc9ade2..aada8cdd4 100644
--- a/api/reaction.go
+++ b/api/reaction.go
@@ -8,7 +8,6 @@ import (
l4g "github.com/alecthomas/log4go"
"github.com/gorilla/mux"
- "github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -112,23 +111,6 @@ func deleteReaction(c *Context, w http.ResponseWriter, r *http.Request) {
ReturnStatusOK(w)
}
-func sendReactionEvent(event string, channelId string, reaction *model.Reaction, post *model.Post) {
- // send out that a reaction has been added/removed
-
- message := model.NewWebSocketEvent(event, "", channelId, "", nil)
- message.Add("reaction", reaction.ToJson())
- app.Publish(message)
-
- // THe post is always modified since the UpdateAt always changes
- app.Global().InvalidateCacheForChannelPosts(post.ChannelId)
- post.HasReactions = true
- post.UpdateAt = model.GetMillis()
- umessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", channelId, "", nil)
- umessage.Add("post", post.ToJson())
- app.Publish(umessage)
-
-}
-
func listReactions(c *Context, w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
diff --git a/api/websocket.go b/api/websocket.go
index 6de5741f3..c90968e7c 100644
--- a/api/websocket.go
+++ b/api/websocket.go
@@ -8,7 +8,6 @@ import (
l4g "github.com/alecthomas/log4go"
"github.com/gorilla/websocket"
- "github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -37,7 +36,7 @@ func connect(c *Context, w http.ResponseWriter, r *http.Request) {
wc := c.App.NewWebConn(ws, c.Session, c.T, c.Locale)
if len(c.Session.UserId) > 0 {
- app.HubRegister(wc)
+ c.App.HubRegister(wc)
}
go wc.WritePump()
diff --git a/api/websocket_test.go b/api/websocket_test.go
index 9bf957297..a5c512262 100644
--- a/api/websocket_test.go
+++ b/api/websocket_test.go
@@ -11,7 +11,6 @@ import (
"time"
"github.com/gorilla/websocket"
- "github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -195,7 +194,7 @@ func TestWebSocketEvent(t *testing.T) {
omitUser["somerandomid"] = true
evt1 := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_TYPING, "", th.BasicChannel.Id, "", omitUser)
evt1.Add("user_id", "somerandomid")
- app.Publish(evt1)
+ th.App.Publish(evt1)
time.Sleep(300 * time.Millisecond)
@@ -224,7 +223,7 @@ func TestWebSocketEvent(t *testing.T) {
}
evt2 := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_TYPING, "", "somerandomid", "", nil)
- go app.Publish(evt2)
+ go th.App.Publish(evt2)
time.Sleep(300 * time.Millisecond)
eventHit = false
diff --git a/api4/api.go b/api4/api.go
index 9df051456..e9cf3054c 100644
--- a/api4/api.go
+++ b/api4/api.go
@@ -226,7 +226,7 @@ func Init(a *app.App, root *mux.Router, full bool) *API {
if full {
utils.InitHTML()
- app.InitEmailBatching()
+ a.InitEmailBatching()
}
return api
diff --git a/api4/apitestlib.go b/api4/apitestlib.go
index 7b5230014..fad066ff8 100644
--- a/api4/apitestlib.go
+++ b/api4/apitestlib.go
@@ -61,10 +61,10 @@ func setupTestHelper(enterprise bool) *TestHelper {
th.App.NewServer()
th.App.InitStores()
th.App.Srv.Router = NewRouter()
- wsapi.InitRouter()
+ th.App.Srv.WebSocketRouter = th.App.NewWebSocketRouter()
th.App.StartServer()
Init(th.App, th.App.Srv.Router, true)
- wsapi.InitApi()
+ wsapi.Init(th.App, th.App.Srv.WebSocketRouter)
utils.EnableDebugLogForTest()
th.App.Srv.Store.MarkSystemRanUnitTests()
diff --git a/api4/websocket.go b/api4/websocket.go
index 7dfe3d61c..2793c0bd0 100644
--- a/api4/websocket.go
+++ b/api4/websocket.go
@@ -8,7 +8,6 @@ import (
l4g "github.com/alecthomas/log4go"
"github.com/gorilla/websocket"
- "github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -38,7 +37,7 @@ func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {
wc := c.App.NewWebConn(ws, c.Session, c.T, "")
if len(c.Session.UserId) > 0 {
- app.HubRegister(wc)
+ c.App.HubRegister(wc)
}
go wc.WritePump()
diff --git a/app/admin.go b/app/admin.go
index bd687627e..0d02c3b49 100644
--- a/app/admin.go
+++ b/app/admin.go
@@ -138,7 +138,7 @@ func (a *App) ReloadConfig() {
utils.LoadConfig(utils.CfgFileName)
// start/restart email batching job if necessary
- InitEmailBatching()
+ a.InitEmailBatching()
}
func (a *App) SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool) *model.AppError {
@@ -179,7 +179,7 @@ func (a *App) SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool)
}
// start/restart email batching job if necessary
- InitEmailBatching()
+ a.InitEmailBatching()
return nil
}
diff --git a/app/analytics.go b/app/analytics.go
index 70c049350..65a9e4129 100644
--- a/app/analytics.go
+++ b/app/analytics.go
@@ -103,7 +103,7 @@ func (a *App) GetAnalytics(name string, teamId string) (model.AnalyticsRows, *mo
return nil, err
}
- totalSockets := TotalWebsocketConnections()
+ totalSockets := a.TotalWebsocketConnections()
totalMasterDb := a.Srv.Store.TotalMasterDbConnections()
totalReadDb := a.Srv.Store.TotalReadDbConnections()
@@ -118,7 +118,7 @@ func (a *App) GetAnalytics(name string, teamId string) (model.AnalyticsRows, *mo
rows[7].Value = float64(totalReadDb)
} else {
- rows[5].Value = float64(TotalWebsocketConnections())
+ rows[5].Value = float64(a.TotalWebsocketConnections())
rows[6].Value = float64(a.Srv.Store.TotalMasterDbConnections())
rows[7].Value = float64(a.Srv.Store.TotalReadDbConnections())
}
diff --git a/app/app.go b/app/app.go
index eaaf9acee..e85fa6342 100644
--- a/app/app.go
+++ b/app/app.go
@@ -20,6 +20,11 @@ type App struct {
PluginEnv *pluginenv.Environment
PluginConfigListenerId string
+ EmailBatching *EmailBatchingJob
+
+ Hubs []*Hub
+ HubsStopCheckingForDeadlock chan bool
+
AccountMigration einterfaces.AccountMigrationInterface
Brand einterfaces.BrandInterface
Cluster einterfaces.ClusterInterface
diff --git a/app/auto_environment.go b/app/auto_environment.go
index 7bafc6948..660316e4b 100644
--- a/app/auto_environment.go
+++ b/app/auto_environment.go
@@ -16,7 +16,7 @@ type TestEnvironment struct {
Environments []TeamEnvironment
}
-func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TestEnvironment, bool) {
+func CreateTestEnvironmentWithTeams(a *App, client *model.Client, rangeTeams utils.Range, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TestEnvironment, bool) {
rand.Seed(time.Now().UTC().UnixNano())
teamCreator := NewAutoTeamCreator(client)
@@ -29,7 +29,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range
environment := TestEnvironment{teams, make([]TeamEnvironment, len(teams))}
for i, team := range teams {
- userCreator := NewAutoUserCreator(client, team)
+ userCreator := NewAutoUserCreator(a, client, team)
userCreator.Fuzzy = fuzzy
randomUser, err := userCreator.createRandomUser()
if err != true {
@@ -37,7 +37,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range
}
client.LoginById(randomUser.Id, USER_PASSWORD)
client.SetTeamId(team.Id)
- teamEnvironment, err := CreateTestEnvironmentInTeam(client, team, rangeChannels, rangeUsers, rangePosts, fuzzy)
+ teamEnvironment, err := CreateTestEnvironmentInTeam(a, client, team, rangeChannels, rangeUsers, rangePosts, fuzzy)
if err != true {
return TestEnvironment{}, false
}
@@ -47,7 +47,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range
return environment, true
}
-func CreateTestEnvironmentInTeam(client *model.Client, team *model.Team, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TeamEnvironment, bool) {
+func CreateTestEnvironmentInTeam(a *App, client *model.Client, team *model.Team, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TeamEnvironment, bool) {
rand.Seed(time.Now().UTC().UnixNano())
// We need to create at least one user
@@ -55,7 +55,7 @@ func CreateTestEnvironmentInTeam(client *model.Client, team *model.Team, rangeCh
rangeUsers.Begin = 1
}
- userCreator := NewAutoUserCreator(client, team)
+ userCreator := NewAutoUserCreator(a, client, team)
userCreator.Fuzzy = fuzzy
users, err := userCreator.CreateTestUsers(rangeUsers)
if err != true {
diff --git a/app/auto_users.go b/app/auto_users.go
index 50e2084d1..78126211e 100644
--- a/app/auto_users.go
+++ b/app/auto_users.go
@@ -12,6 +12,7 @@ import (
)
type AutoUserCreator struct {
+ app *App
client *model.Client
team *model.Team
EmailLength utils.Range
@@ -21,8 +22,9 @@ type AutoUserCreator struct {
Fuzzy bool
}
-func NewAutoUserCreator(client *model.Client, team *model.Team) *AutoUserCreator {
+func NewAutoUserCreator(a *App, client *model.Client, team *model.Team) *AutoUserCreator {
return &AutoUserCreator{
+ app: a,
client: client,
team: team,
EmailLength: USER_EMAIL_LEN,
@@ -81,14 +83,14 @@ func (cfg *AutoUserCreator) createRandomUser() (*model.User, bool) {
ruser := result.Data.(*model.User)
status := &model.Status{UserId: ruser.Id, Status: model.STATUS_ONLINE, Manual: false, LastActivityAt: model.GetMillis(), ActiveChannel: ""}
- if result := <-Global().Srv.Store.Status().SaveOrUpdate(status); result.Err != nil {
+ if result := <-cfg.app.Srv.Store.Status().SaveOrUpdate(status); result.Err != nil {
result.Err.Translate(utils.T)
l4g.Error(result.Err.Error())
return nil, false
}
// We need to cheat to verify the user's email
- store.Must(Global().Srv.Store.User().VerifyEmail(ruser.Id))
+ store.Must(cfg.app.Srv.Store.User().VerifyEmail(ruser.Id))
return result.Data.(*model.User), true
}
diff --git a/app/channel.go b/app/channel.go
index 436d429c9..2d3709a0c 100644
--- a/app/channel.go
+++ b/app/channel.go
@@ -136,7 +136,7 @@ func (a *App) CreateChannelWithUser(channel *model.Channel, userId string) (*mod
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_CREATED, "", "", userId, nil)
message.Add("channel_id", channel.Id)
message.Add("team_id", channel.TeamId)
- Publish(message)
+ a.Publish(message)
return rchannel, nil
}
@@ -181,7 +181,7 @@ func (a *App) CreateDirectChannel(userId string, otherUserId string) (*model.Cha
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_DIRECT_ADDED, "", channel.Id, "", nil)
message.Add("teammate_id", otherUserId)
- Publish(message)
+ a.Publish(message)
return channel, nil
}
@@ -254,7 +254,7 @@ func (a *App) CreateGroupChannel(userIds []string, creatorId string) (*model.Cha
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_GROUP_ADDED, "", channel.Id, "", nil)
message.Add("teammate_ids", model.ArrayToJson(userIds))
- Publish(message)
+ a.Publish(message)
return channel, nil
}
@@ -316,7 +316,7 @@ func (a *App) UpdateChannel(channel *model.Channel) (*model.Channel, *model.AppE
messageWs := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_UPDATED, "", channel.Id, "", nil)
messageWs.Add("channel", channel.ToJson())
- Publish(messageWs)
+ a.Publish(messageWs)
return channel, nil
}
@@ -484,7 +484,7 @@ func (a *App) DeleteChannel(channel *model.Channel, userId string) *model.AppErr
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_DELETED, channel.TeamId, "", "", nil)
message.Add("channel_id", channel.Id)
- Publish(message)
+ a.Publish(message)
}
return nil
@@ -550,7 +550,7 @@ func (a *App) AddUserToChannel(user *model.User, channel *model.Channel) (*model
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_ADDED, "", channel.Id, "", nil)
message.Add("user_id", user.Id)
message.Add("team_id", channel.TeamId)
- Publish(message)
+ a.Publish(message)
return newMember, nil
}
@@ -1039,13 +1039,13 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string,
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", channel.Id, "", nil)
message.Add("user_id", userIdToRemove)
message.Add("remover_id", removerUserId)
- go Publish(message)
+ go a.Publish(message)
// because the removed user no longer belongs to the channel we need to send a separate websocket event
userMsg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", "", userIdToRemove, nil)
userMsg.Add("channel_id", channel.Id)
userMsg.Add("remover_id", removerUserId)
- go Publish(userMsg)
+ go a.Publish(userMsg)
return nil
}
@@ -1098,7 +1098,7 @@ func (a *App) SetActiveChannel(userId string, channelId string) *model.AppError
a.AddStatusCache(status)
if status.Status != oldStatus {
- BroadcastStatus(status)
+ a.BroadcastStatus(status)
}
return nil
@@ -1113,7 +1113,7 @@ func (a *App) UpdateChannelLastViewedAt(channelIds []string, userId string) *mod
for _, channelId := range channelIds {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil)
message.Add("channel_id", channelId)
- go Publish(message)
+ go a.Publish(message)
}
}
@@ -1179,7 +1179,7 @@ func (a *App) ViewChannel(view *model.ChannelView, userId string, clearPushNotif
if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages && model.IsValidId(view.ChannelId) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil)
message.Add("channel_id", view.ChannelId)
- go Publish(message)
+ go a.Publish(message)
}
return nil
diff --git a/app/cluster_handlers.go b/app/cluster_handlers.go
index 5c4bd7026..997de6dcd 100644
--- a/app/cluster_handlers.go
+++ b/app/cluster_handlers.go
@@ -20,12 +20,12 @@ func (a *App) RegisterAllClusterMessageHandlers() {
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME, a.ClusterInvalidateCacheForChannelByNameHandler)
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL, a.ClusterInvalidateCacheForChannelHandler)
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER, a.ClusterInvalidateCacheForUserHandler)
- a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, ClusterClearSessionCacheForUserHandler)
+ a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, a.ClusterClearSessionCacheForUserHandler)
}
func (a *App) ClusterPublishHandler(msg *model.ClusterMessage) {
event := model.WebSocketEventFromJson(strings.NewReader(msg.Data))
- PublishSkipClusterSend(event)
+ a.PublishSkipClusterSend(event)
}
func (a *App) ClusterUpdateStatusHandler(msg *model.ClusterMessage) {
@@ -65,6 +65,6 @@ func (a *App) ClusterInvalidateCacheForUserHandler(msg *model.ClusterMessage) {
a.InvalidateCacheForUserSkipClusterSend(msg.Data)
}
-func ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) {
- ClearSessionCacheForUserSkipClusterSend(msg.Data)
+func (a *App) ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) {
+ a.ClearSessionCacheForUserSkipClusterSend(msg.Data)
}
diff --git a/app/command.go b/app/command.go
index f8885219a..0763e24c7 100644
--- a/app/command.go
+++ b/app/command.go
@@ -49,7 +49,7 @@ func (a *App) CreateCommandPost(post *model.Post, teamId string, response *model
return a.CreatePostMissingChannel(post, true)
} else if response.ResponseType == "" || response.ResponseType == model.COMMAND_RESPONSE_TYPE_EPHEMERAL {
post.ParentId = ""
- SendEphemeralPost(post.UserId, post)
+ a.SendEphemeralPost(post.UserId, post)
}
return post, nil
diff --git a/app/command_expand_collapse.go b/app/command_expand_collapse.go
index 116e29f17..9c94a4325 100644
--- a/app/command_expand_collapse.go
+++ b/app/command_expand_collapse.go
@@ -74,7 +74,7 @@ func (a *App) setCollapsePreference(args *model.CommandArgs, isCollapse bool) *m
socketMessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCE_CHANGED, "", "", args.UserId, nil)
socketMessage.Add("preference", pref.ToJson())
- go Publish(socketMessage)
+ go a.Publish(socketMessage)
var rmsg string
diff --git a/app/command_loadtest.go b/app/command_loadtest.go
index 629b9c9f5..4bc371bdc 100644
--- a/app/command_loadtest.go
+++ b/app/command_loadtest.go
@@ -166,6 +166,7 @@ func (me *LoadTestProvider) SetupCommand(a *App, args *model.CommandArgs, messag
}
client.Login(BTEST_USER_EMAIL, BTEST_USER_PASSWORD)
environment, err := CreateTestEnvironmentWithTeams(
+ a,
client,
utils.Range{Begin: numTeams, End: numTeams},
utils.Range{Begin: numChannels, End: numChannels},
@@ -193,6 +194,7 @@ func (me *LoadTestProvider) SetupCommand(a *App, args *model.CommandArgs, messag
client.MockSession(args.Session.Token)
client.SetTeamId(args.TeamId)
CreateTestEnvironmentInTeam(
+ a,
client,
team,
utils.Range{Begin: numChannels, End: numChannels},
@@ -227,7 +229,7 @@ func (me *LoadTestProvider) UsersCommand(a *App, args *model.CommandArgs, messag
client := model.NewClient(args.SiteURL)
client.SetTeamId(team.Id)
- userCreator := NewAutoUserCreator(client, team)
+ userCreator := NewAutoUserCreator(a, client, team)
userCreator.Fuzzy = doFuzz
userCreator.CreateTestUsers(usersr)
diff --git a/app/email_batching.go b/app/email_batching.go
index c8ea8c98a..b9f89d646 100644
--- a/app/email_batching.go
+++ b/app/email_batching.go
@@ -22,26 +22,24 @@ const (
EMAIL_BATCHING_TASK_NAME = "Email Batching"
)
-var emailBatchingJob *EmailBatchingJob
-
-func InitEmailBatching() {
+func (a *App) InitEmailBatching() {
if *utils.Cfg.EmailSettings.EnableEmailBatching {
- if emailBatchingJob == nil {
- emailBatchingJob = MakeEmailBatchingJob(*utils.Cfg.EmailSettings.EmailBatchingBufferSize)
+ if a.EmailBatching == nil {
+ a.EmailBatching = NewEmailBatchingJob(a, *utils.Cfg.EmailSettings.EmailBatchingBufferSize)
}
// note that we don't support changing EmailBatchingBufferSize without restarting the server
- emailBatchingJob.Start()
+ a.EmailBatching.Start()
}
}
-func AddNotificationEmailToBatch(user *model.User, post *model.Post, team *model.Team) *model.AppError {
+func (a *App) AddNotificationEmailToBatch(user *model.User, post *model.Post, team *model.Team) *model.AppError {
if !*utils.Cfg.EmailSettings.EnableEmailBatching {
return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.disabled.app_error", nil, "", http.StatusNotImplemented)
}
- if !emailBatchingJob.Add(user, post, team) {
+ if !a.EmailBatching.Add(user, post, team) {
l4g.Error(utils.T("api.email_batching.add_notification_email_to_batch.channel_full.app_error"))
return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.channel_full.app_error", nil, "", http.StatusInternalServerError)
}
@@ -56,12 +54,14 @@ type batchedNotification struct {
}
type EmailBatchingJob struct {
+ app *App
newNotifications chan *batchedNotification
pendingNotifications map[string][]*batchedNotification
}
-func MakeEmailBatchingJob(bufferSize int) *EmailBatchingJob {
+func NewEmailBatchingJob(a *App, bufferSize int) *EmailBatchingJob {
return &EmailBatchingJob{
+ app: a,
newNotifications: make(chan *batchedNotification, bufferSize),
pendingNotifications: make(map[string][]*batchedNotification),
}
@@ -97,7 +97,7 @@ func (job *EmailBatchingJob) CheckPendingEmails() {
// it's a bit weird to pass the send email function through here, but it makes it so that we can test
// without actually sending emails
- job.checkPendingNotifications(time.Now(), Global().sendBatchedEmailNotification)
+ job.checkPendingNotifications(time.Now(), job.app.sendBatchedEmailNotification)
l4g.Debug(utils.T("api.email_batching.check_pending_emails.finished_running"), len(job.pendingNotifications))
}
@@ -131,7 +131,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
if inspectedTeamNames[notification.teamName] != "" {
continue
}
- tchan := Global().Srv.Store.Team().GetByName(notifications[0].teamName)
+ tchan := job.app.Srv.Store.Team().GetByName(notifications[0].teamName)
if result := <-tchan; result.Err != nil {
l4g.Error("Unable to find Team id for notification", result.Err)
continue
@@ -141,7 +141,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
// if the user has viewed any channels in this team since the notification was queued, delete
// all queued notifications
- mchan := Global().Srv.Store.Channel().GetMembersForUser(inspectedTeamNames[notification.teamName], userId)
+ mchan := job.app.Srv.Store.Channel().GetMembersForUser(inspectedTeamNames[notification.teamName], userId)
if result := <-mchan; result.Err != nil {
l4g.Error("Unable to find ChannelMembers for user", result.Err)
continue
@@ -158,7 +158,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
// get how long we need to wait to send notifications to the user
var interval int64
- pchan := Global().Srv.Store.Preference().Get(userId, model.PREFERENCE_CATEGORY_NOTIFICATIONS, model.PREFERENCE_NAME_EMAIL_INTERVAL)
+ pchan := job.app.Srv.Store.Preference().Get(userId, model.PREFERENCE_CATEGORY_NOTIFICATIONS, model.PREFERENCE_NAME_EMAIL_INTERVAL)
if result := <-pchan; result.Err != nil {
// use the default batching interval if an error ocurrs while fetching user preferences
interval, _ = strconv.ParseInt(model.PREFERENCE_EMAIL_INTERVAL_BATCHING_SECONDS, 10, 64)
diff --git a/app/email_batching_test.go b/app/email_batching_test.go
index 2c58d43f8..b69eeec2d 100644
--- a/app/email_batching_test.go
+++ b/app/email_batching_test.go
@@ -13,14 +13,14 @@ import (
)
func TestHandleNewNotifications(t *testing.T) {
- Setup()
+ th := Setup()
id1 := model.NewId()
id2 := model.NewId()
id3 := model.NewId()
// test queueing of received posts by user
- job := MakeEmailBatchingJob(128)
+ job := NewEmailBatchingJob(th.App, 128)
job.handleNewNotifications()
@@ -74,7 +74,7 @@ func TestHandleNewNotifications(t *testing.T) {
}
// test ordering of received posts
- job = MakeEmailBatchingJob(128)
+ job = NewEmailBatchingJob(th.App, 128)
job.Add(&model.User{Id: id1}, &model.Post{UserId: id1, Message: "test1"}, &model.Team{Name: "team"})
job.Add(&model.User{Id: id1}, &model.Post{UserId: id1, Message: "test2"}, &model.Team{Name: "team"})
@@ -95,7 +95,7 @@ func TestHandleNewNotifications(t *testing.T) {
func TestCheckPendingNotifications(t *testing.T) {
th := Setup().InitBasic()
- job := MakeEmailBatchingJob(128)
+ job := NewEmailBatchingJob(th.App, 128)
job.pendingNotifications[th.BasicUser.Id] = []*batchedNotification{
{
post: &model.Post{
@@ -201,7 +201,7 @@ func TestCheckPendingNotifications(t *testing.T) {
*/
func TestCheckPendingNotificationsDefaultInterval(t *testing.T) {
th := Setup().InitBasic()
- job := MakeEmailBatchingJob(128)
+ job := NewEmailBatchingJob(th.App, 128)
// bypasses recent user activity check
channelMember := store.Must(th.App.Srv.Store.Channel().GetMember(th.BasicChannel.Id, th.BasicUser.Id)).(*model.ChannelMember)
@@ -237,7 +237,7 @@ func TestCheckPendingNotificationsDefaultInterval(t *testing.T) {
*/
func TestCheckPendingNotificationsCantParseInterval(t *testing.T) {
th := Setup().InitBasic()
- job := MakeEmailBatchingJob(128)
+ job := NewEmailBatchingJob(th.App, 128)
// bypasses recent user activity check
channelMember := store.Must(th.App.Srv.Store.Channel().GetMember(th.BasicChannel.Id, th.BasicUser.Id)).(*model.ChannelMember)
diff --git a/app/emoji.go b/app/emoji.go
index d5b81c89a..308791aed 100644
--- a/app/emoji.go
+++ b/app/emoji.go
@@ -61,7 +61,7 @@ func (a *App) CreateEmoji(sessionUserId string, emoji *model.Emoji, multiPartIma
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EMOJI_ADDED, "", "", "", nil)
message.Add("emoji", emoji.ToJson())
- Publish(message)
+ a.Publish(message)
return result.Data.(*model.Emoji), nil
}
}
diff --git a/app/notification.go b/app/notification.go
index cc3db8b55..0859dfd20 100644
--- a/app/notification.go
+++ b/app/notification.go
@@ -94,7 +94,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
if result := <-a.Srv.Store.User().GetProfilesByUsernames(potentialOtherMentions, team.Id); result.Err == nil {
outOfChannelMentions := result.Data.([]*model.User)
if channel.Type != model.CHANNEL_GROUP {
- go sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions)
+ go a.sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions)
}
}
}
@@ -186,7 +186,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
// If the channel has more than 1K users then @here is disabled
if hereNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
hereNotification = false
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
@@ -198,7 +198,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
// If the channel has more than 1K users then @channel is disabled
if channelNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
@@ -210,7 +210,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
// If the channel has more than 1K users then @all is disabled
if allNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
@@ -298,7 +298,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
message.Add("mentions", model.ArrayToJson(mentionedUsersList))
}
- Publish(message)
+ a.Publish(message)
return mentionedUsersList, nil
}
@@ -337,7 +337,7 @@ func (a *App) sendNotificationEmail(post *model.Post, user *model.User, channel
}
if sendBatched {
- if err := AddNotificationEmailToBatch(user, post, team); err == nil {
+ if err := a.AddNotificationEmailToBatch(user, post, team); err == nil {
return nil
}
}
@@ -717,7 +717,7 @@ func (a *App) getMobileAppSessions(userId string) ([]*model.Session, *model.AppE
}
}
-func sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId string, users []*model.User) *model.AppError {
+func (a *App) sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId string, users []*model.User) *model.AppError {
if len(users) == 0 {
return nil
}
@@ -742,7 +742,7 @@ func sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId strin
})
}
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
diff --git a/app/plugins.go b/app/plugins.go
index 450cfebeb..94a1bfd6f 100644
--- a/app/plugins.go
+++ b/app/plugins.go
@@ -335,7 +335,7 @@ func (a *App) UnpackAndActivatePlugin(pluginFile io.Reader) (*model.Manifest, *m
if manifest.HasClient() {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_ACTIVATED, "", "", "", nil)
message.Add("manifest", manifest.ClientManifest())
- Publish(message)
+ a.Publish(message)
}
return manifest, nil
@@ -383,7 +383,7 @@ func (a *App) RemovePlugin(id string) *model.AppError {
if manifest.HasClient() {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_DEACTIVATED, "", "", "", nil)
message.Add("manifest", manifest.ClientManifest())
- Publish(message)
+ a.Publish(message)
}
return nil
diff --git a/app/post.go b/app/post.go
index e81af4673..ccdc015bb 100644
--- a/app/post.go
+++ b/app/post.go
@@ -51,7 +51,7 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError)
}
T := utils.GetUserTranslations(user.Locale)
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: channel.Id,
@@ -75,7 +75,7 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError)
if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", post.UserId, nil)
message.Add("channel_id", post.ChannelId)
- go Publish(message)
+ go a.Publish(message)
}
}
@@ -239,7 +239,7 @@ func parseSlackLinksToMarkdown(text string) string {
return linkWithTextRegex.ReplaceAllString(text, "[${2}](${1})")
}
-func SendEphemeralPost(userId string, post *model.Post) *model.Post {
+func (a *App) SendEphemeralPost(userId string, post *model.Post) *model.Post {
post.Type = model.POST_EPHEMERAL
// fill in fields which haven't been specified which have sensible defaults
@@ -256,7 +256,7 @@ func SendEphemeralPost(userId string, post *model.Post) *model.Post {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE, "", post.ChannelId, userId, nil)
message.Add("post", post.ToJson())
- go Publish(message)
+ go a.Publish(message)
return post
}
@@ -330,7 +330,7 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model
}()
}
- sendUpdatedPostEvent(rpost)
+ a.sendUpdatedPostEvent(rpost)
a.InvalidateCacheForChannelPosts(rpost.ChannelId)
@@ -351,17 +351,17 @@ func (a *App) PatchPost(postId string, patch *model.PostPatch) (*model.Post, *mo
return nil, err
}
- sendUpdatedPostEvent(updatedPost)
+ a.sendUpdatedPostEvent(updatedPost)
a.InvalidateCacheForChannelPosts(updatedPost.ChannelId)
return updatedPost, nil
}
-func sendUpdatedPostEvent(post *model.Post) {
+func (a *App) sendUpdatedPostEvent(post *model.Post) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil)
message.Add("post", post.ToJson())
- go Publish(message)
+ go a.Publish(message)
}
func (a *App) GetPostsPage(channelId string, page int, perPage int) (*model.PostList, *model.AppError) {
@@ -502,7 +502,7 @@ func (a *App) DeletePost(postId string) (*model.Post, *model.AppError) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_DELETED, "", post.ChannelId, "", nil)
message.Add("post", post.ToJson())
- go Publish(message)
+ go a.Publish(message)
go a.DeletePostFiles(post)
go a.DeleteFlaggedPosts(post.Id)
@@ -724,7 +724,7 @@ func (a *App) DoPostAction(postId string, actionId string, userId string) *model
}
ephemeralPost.UserId = userId
ephemeralPost.AddProp("from_webhook", "true")
- SendEphemeralPost(userId, ephemeralPost)
+ a.SendEphemeralPost(userId, ephemeralPost)
}
return nil
diff --git a/app/preference.go b/app/preference.go
index 8ae33b728..bee3236bf 100644
--- a/app/preference.go
+++ b/app/preference.go
@@ -55,7 +55,7 @@ func (a *App) UpdatePreferences(userId string, preferences model.Preferences) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_CHANGED, "", "", userId, nil)
message.Add("preferences", preferences.ToJson())
- go Publish(message)
+ go a.Publish(message)
return nil
}
@@ -78,7 +78,7 @@ func (a *App) DeletePreferences(userId string, preferences model.Preferences) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_DELETED, "", "", userId, nil)
message.Add("preferences", preferences.ToJson())
- go Publish(message)
+ go a.Publish(message)
return nil
}
diff --git a/app/reaction.go b/app/reaction.go
index 6513fa8b0..debf75f7a 100644
--- a/app/reaction.go
+++ b/app/reaction.go
@@ -51,7 +51,7 @@ func (a *App) sendReactionEvent(event string, reaction *model.Reaction, post *mo
// send out that a reaction has been added/removed
message := model.NewWebSocketEvent(event, "", post.ChannelId, "", nil)
message.Add("reaction", reaction.ToJson())
- Publish(message)
+ a.Publish(message)
// The post is always modified since the UpdateAt always changes
a.InvalidateCacheForChannelPosts(post.ChannelId)
@@ -59,5 +59,5 @@ func (a *App) sendReactionEvent(event string, reaction *model.Reaction, post *mo
post.UpdateAt = model.GetMillis()
umessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil)
umessage.Add("post", post.ToJson())
- Publish(umessage)
+ a.Publish(umessage)
}
diff --git a/app/server.go b/app/server.go
index 938773ad9..6915369c4 100644
--- a/app/server.go
+++ b/app/server.go
@@ -218,7 +218,7 @@ func (a *App) StopServer() {
a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN)
a.Srv.Store.Close()
- HubStop()
+ a.HubStop()
a.ShutDownPlugins()
diff --git a/app/session.go b/app/session.go
index e5e5c939d..f0245acba 100644
--- a/app/session.go
+++ b/app/session.go
@@ -107,8 +107,7 @@ func (a *App) RevokeAllSessions(userId string) *model.AppError {
}
func (a *App) ClearSessionCacheForUser(userId string) {
-
- ClearSessionCacheForUserSkipClusterSend(userId)
+ a.ClearSessionCacheForUserSkipClusterSend(userId)
if a.Cluster != nil {
msg := &model.ClusterMessage{
@@ -120,7 +119,7 @@ func (a *App) ClearSessionCacheForUser(userId string) {
}
}
-func ClearSessionCacheForUserSkipClusterSend(userId string) {
+func (a *App) ClearSessionCacheForUserSkipClusterSend(userId string) {
keys := sessionCache.Keys()
for _, key := range keys {
@@ -132,8 +131,7 @@ func ClearSessionCacheForUserSkipClusterSend(userId string) {
}
}
- InvalidateWebConnSessionCacheForUser(userId)
-
+ a.InvalidateWebConnSessionCacheForUser(userId)
}
func AddSessionToCache(session *model.Session) {
diff --git a/app/status.go b/app/status.go
index fb93a9e39..edfda561b 100644
--- a/app/status.go
+++ b/app/status.go
@@ -213,15 +213,15 @@ func (a *App) SetStatusOnline(userId string, sessionId string, manual bool) {
}
if broadcast {
- BroadcastStatus(status)
+ a.BroadcastStatus(status)
}
}
-func BroadcastStatus(status *model.Status) {
+func (a *App) BroadcastStatus(status *model.Status) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", status.Status)
event.Add("user_id", status.UserId)
- go Publish(event)
+ go a.Publish(event)
}
func (a *App) SetStatusOffline(userId string, manual bool) {
@@ -245,7 +245,7 @@ func (a *App) SetStatusOffline(userId string, manual bool) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", model.STATUS_OFFLINE)
event.Add("user_id", status.UserId)
- go Publish(event)
+ go a.Publish(event)
}
func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) {
@@ -286,7 +286,7 @@ func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", model.STATUS_AWAY)
event.Add("user_id", status.UserId)
- go Publish(event)
+ go a.Publish(event)
}
func GetStatusFromCache(userId string) *model.Status {
diff --git a/app/team.go b/app/team.go
index fdf44a783..7a5ccc5d6 100644
--- a/app/team.go
+++ b/app/team.go
@@ -106,7 +106,7 @@ func (a *App) UpdateTeam(team *model.Team) (*model.Team, *model.AppError) {
oldTeam.Sanitize()
- sendUpdatedTeamEvent(oldTeam)
+ a.sendUpdatedTeamEvent(oldTeam)
return oldTeam, nil
}
@@ -126,15 +126,15 @@ func (a *App) PatchTeam(teamId string, patch *model.TeamPatch) (*model.Team, *mo
updatedTeam.Sanitize()
- sendUpdatedTeamEvent(updatedTeam)
+ a.sendUpdatedTeamEvent(updatedTeam)
return updatedTeam, nil
}
-func sendUpdatedTeamEvent(team *model.Team) {
+func (a *App) sendUpdatedTeamEvent(team *model.Team) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_UPDATE_TEAM, "", "", "", nil)
message.Add("team", team.ToJson())
- go Publish(message)
+ go a.Publish(message)
}
func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles string) (*model.TeamMember, *model.AppError) {
@@ -163,16 +163,16 @@ func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles strin
a.ClearSessionCacheForUser(userId)
- sendUpdatedMemberRoleEvent(userId, member)
+ a.sendUpdatedMemberRoleEvent(userId, member)
return member, nil
}
-func sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember) {
+func (a *App) sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_MEMBERROLE_UPDATED, "", "", userId, nil)
message.Add("member", member.ToJson())
- go Publish(message)
+ go a.Publish(message)
}
func (a *App) AddUserToTeam(teamId string, userId string, userRequestorId string) (*model.Team, *model.AppError) {
@@ -330,7 +330,7 @@ func (a *App) JoinUserToTeam(team *model.Team, user *model.User, userRequestorId
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", user.Id, nil)
message.Add("team_id", team.Id)
message.Add("user_id", user.Id)
- Publish(message)
+ a.Publish(message)
return nil
}
@@ -462,7 +462,7 @@ func (a *App) AddTeamMember(teamId, userId string) (*model.TeamMember, *model.Ap
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", userId, nil)
message.Add("team_id", teamId)
message.Add("user_id", userId)
- Publish(message)
+ a.Publish(message)
return teamMember, nil
}
@@ -484,7 +484,7 @@ func (a *App) AddTeamMembers(teamId string, userIds []string, userRequestorId st
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", userId, nil)
message.Add("team_id", teamId)
message.Add("user_id", userId)
- Publish(message)
+ a.Publish(message)
}
return members, nil
@@ -603,7 +603,7 @@ func (a *App) LeaveTeam(team *model.Team, user *model.User) *model.AppError {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_LEAVE_TEAM, team.Id, "", "", nil)
message.Add("user_id", user.Id)
message.Add("team_id", team.Id)
- Publish(message)
+ a.Publish(message)
teamMember.Roles = ""
teamMember.DeleteAt = model.GetMillis()
diff --git a/app/user.go b/app/user.go
index c91b4cfb7..27f1c5d85 100644
--- a/app/user.go
+++ b/app/user.go
@@ -202,7 +202,7 @@ func (a *App) CreateUser(user *model.User) (*model.User, *model.AppError) {
// This message goes to everyone, so the teamId, channelId and userId are irrelevant
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_NEW_USER, "", "", "", nil)
message.Add("user_id", ruser.Id)
- go Publish(message)
+ go a.Publish(message)
return ruser, nil
}
@@ -829,7 +829,7 @@ func (a *App) SetProfileImage(userId string, imageData *multipart.FileHeader) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers)
message.Add("user", user)
- Publish(message)
+ a.Publish(message)
}
return nil
@@ -950,7 +950,7 @@ func (a *App) UpdateUserAsUser(user *model.User, asAdmin bool) (*model.User, *mo
return nil, err
}
- sendUpdatedUserEvent(*updatedUser, asAdmin)
+ a.sendUpdatedUserEvent(*updatedUser, asAdmin)
return updatedUser, nil
}
@@ -968,19 +968,19 @@ func (a *App) PatchUser(userId string, patch *model.UserPatch, asAdmin bool) (*m
return nil, err
}
- sendUpdatedUserEvent(*updatedUser, asAdmin)
+ a.sendUpdatedUserEvent(*updatedUser, asAdmin)
return updatedUser, nil
}
-func sendUpdatedUserEvent(user model.User, asAdmin bool) {
+func (a *App) sendUpdatedUserEvent(user model.User, asAdmin bool) {
SanitizeProfile(&user, asAdmin)
omitUsers := make(map[string]bool, 1)
omitUsers[user.Id] = true
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers)
message.Add("user", user)
- go Publish(message)
+ go a.Publish(message)
}
func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, *model.AppError) {
diff --git a/app/web_conn.go b/app/web_conn.go
index 069d2c8f4..f5644ce17 100644
--- a/app/web_conn.go
+++ b/app/web_conn.go
@@ -96,7 +96,7 @@ func (c *WebConn) SetSession(v *model.Session) {
func (c *WebConn) ReadPump() {
defer func() {
- HubUnregister(c)
+ c.App.HubUnregister(c)
c.WebSocket.Close()
}()
c.WebSocket.SetReadLimit(model.SOCKET_MAX_MESSAGE_SIZE_KB)
diff --git a/app/web_hub.go b/app/web_hub.go
index b351de39e..50ccb100e 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -29,6 +29,7 @@ type Hub struct {
// connectionCount should be kept first.
// See https://github.com/mattermost/mattermost-server/pull/7281
connectionCount int64
+ app *App
connections []*WebConn
connectionIndex int
register chan *WebConn
@@ -40,11 +41,9 @@ type Hub struct {
goroutineId int
}
-var hubs []*Hub = make([]*Hub, 0)
-var stopCheckingForDeadlock chan bool
-
-func NewWebHub() *Hub {
+func (a *App) NewWebHub() *Hub {
return &Hub{
+ app: a,
register: make(chan *WebConn),
unregister: make(chan *WebConn),
connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
@@ -55,26 +54,27 @@ func NewWebHub() *Hub {
}
}
-func TotalWebsocketConnections() int {
+func (a *App) TotalWebsocketConnections() int {
count := int64(0)
- for _, hub := range hubs {
+ for _, hub := range a.Hubs {
count = count + atomic.LoadInt64(&hub.connectionCount)
}
return int(count)
}
-func HubStart() {
+func (a *App) HubStart() {
// Total number of hubs is twice the number of CPUs.
numberOfHubs := runtime.NumCPU() * 2
l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs)
- hubs = make([]*Hub, numberOfHubs)
+ a.Hubs = make([]*Hub, numberOfHubs)
+ a.HubsStopCheckingForDeadlock = make(chan bool, 1)
- for i := 0; i < len(hubs); i++ {
- hubs[i] = NewWebHub()
- hubs[i].connectionIndex = i
- hubs[i].Start()
+ for i := 0; i < len(a.Hubs); i++ {
+ a.Hubs[i] = a.NewWebHub()
+ a.Hubs[i].connectionIndex = i
+ a.Hubs[i].Start()
}
go func() {
@@ -84,12 +84,10 @@ func HubStart() {
ticker.Stop()
}()
- stopCheckingForDeadlock = make(chan bool, 1)
-
for {
select {
case <-ticker.C:
- for _, hub := range hubs {
+ for _, hub := range a.Hubs {
if len(hub.broadcast) >= DEADLOCK_WARN {
l4g.Error("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast))
buf := make([]byte, 1<<16)
@@ -105,46 +103,42 @@ func HubStart() {
}
}
- case <-stopCheckingForDeadlock:
+ case <-a.HubsStopCheckingForDeadlock:
return
}
}
}()
}
-func HubStop() {
+func (a *App) HubStop() {
l4g.Info(utils.T("api.web_hub.start.stopping.debug"))
select {
- case stopCheckingForDeadlock <- true:
+ case a.HubsStopCheckingForDeadlock <- true:
default:
l4g.Warn("We appear to have already sent the stop checking for deadlocks command")
}
- for _, hub := range hubs {
+ for _, hub := range a.Hubs {
hub.Stop()
}
- hubs = make([]*Hub, 0)
+ a.Hubs = []*Hub{}
}
-func GetHubForUserId(userId string) *Hub {
+func (a *App) GetHubForUserId(userId string) *Hub {
hash := fnv.New32a()
hash.Write([]byte(userId))
- index := hash.Sum32() % uint32(len(hubs))
- return hubs[index]
-}
-
-func HubRegister(webConn *WebConn) {
- GetHubForUserId(webConn.UserId).Register(webConn)
+ index := hash.Sum32() % uint32(len(a.Hubs))
+ return a.Hubs[index]
}
-func HubUnregister(webConn *WebConn) {
- GetHubForUserId(webConn.UserId).Unregister(webConn)
+func (a *App) HubRegister(webConn *WebConn) {
+ a.GetHubForUserId(webConn.UserId).Register(webConn)
}
-func Publish(message *model.WebSocketEvent) {
- Global().Publish(message)
+func (a *App) HubUnregister(webConn *WebConn) {
+ a.GetHubForUserId(webConn.UserId).Unregister(webConn)
}
func (a *App) Publish(message *model.WebSocketEvent) {
@@ -152,7 +146,7 @@ func (a *App) Publish(message *model.WebSocketEvent) {
metrics.IncrementWebsocketEvent(message.Event)
}
- PublishSkipClusterSend(message)
+ a.PublishSkipClusterSend(message)
if a.Cluster != nil {
cm := &model.ClusterMessage{
@@ -173,8 +167,8 @@ func (a *App) Publish(message *model.WebSocketEvent) {
}
}
-func PublishSkipClusterSend(message *model.WebSocketEvent) {
- for _, hub := range hubs {
+func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) {
+ for _, hub := range a.Hubs {
hub.Broadcast(message)
}
}
@@ -291,8 +285,8 @@ func (a *App) InvalidateCacheForUserSkipClusterSend(userId string) {
a.Srv.Store.User().InvalidateProfilesInChannelCacheByUser(userId)
a.Srv.Store.User().InvalidatProfileCacheForUser(userId)
- if len(hubs) != 0 {
- GetHubForUserId(userId).InvalidateUser(userId)
+ if len(a.Hubs) != 0 {
+ a.GetHubForUserId(userId).InvalidateUser(userId)
}
}
@@ -313,9 +307,9 @@ func (a *App) InvalidateCacheForWebhookSkipClusterSend(webhookId string) {
a.Srv.Store.Webhook().InvalidateWebhookCache(webhookId)
}
-func InvalidateWebConnSessionCacheForUser(userId string) {
- if len(hubs) != 0 {
- GetHubForUserId(userId).InvalidateUser(userId)
+func (a *App) InvalidateWebConnSessionCacheForUser(userId string) {
+ if len(a.Hubs) != 0 {
+ a.GetHubForUserId(userId).InvalidateUser(userId)
}
}
@@ -401,7 +395,7 @@ func (h *Hub) Start() {
}
if !found {
- go Global().SetStatusOffline(userId, false)
+ go h.app.SetStatusOffline(userId, false)
}
case userId := <-h.invalidateUser:
diff --git a/app/websocket_router.go b/app/websocket_router.go
index bfb649d6c..c8220f1f1 100644
--- a/app/websocket_router.go
+++ b/app/websocket_router.go
@@ -17,13 +17,15 @@ type webSocketHandler interface {
}
type WebSocketRouter struct {
+ app *App
handlers map[string]webSocketHandler
}
-func NewWebSocketRouter() *WebSocketRouter {
- router := &WebSocketRouter{}
- router.handlers = make(map[string]webSocketHandler)
- return router
+func (a *App) NewWebSocketRouter() *WebSocketRouter {
+ return &WebSocketRouter{
+ app: a,
+ handlers: make(map[string]webSocketHandler),
+ }
}
func (wr *WebSocketRouter) Handle(action string, handler webSocketHandler) {
@@ -54,21 +56,21 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque
return
}
- session, err := Global().GetSession(token)
+ session, err := wr.app.GetSession(token)
if err != nil {
conn.WebSocket.Close()
} else {
go func() {
- Global().SetStatusOnline(session.UserId, session.Id, false)
- Global().UpdateLastActivityAtIfNeeded(*session)
+ wr.app.SetStatusOnline(session.UserId, session.Id, false)
+ wr.app.UpdateLastActivityAtIfNeeded(*session)
}()
conn.SetSession(session)
conn.SetSessionToken(session.Token)
conn.UserId = session.UserId
- HubRegister(conn)
+ wr.app.HubRegister(conn)
resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, nil)
conn.Send <- resp
diff --git a/cmd/platform/server.go b/cmd/platform/server.go
index 55854dfe4..7f5fbf6e8 100644
--- a/cmd/platform/server.go
+++ b/cmd/platform/server.go
@@ -69,6 +69,7 @@ func runServer(configFileLocation string) {
a.NewServer()
a.InitStores()
a.Srv.Router = api.NewRouter()
+ a.Srv.WebSocketRouter = a.NewWebSocketRouter()
if model.BuildEnterpriseReady == "true" {
a.LoadLicense()
@@ -80,10 +81,9 @@ func runServer(configFileLocation string) {
l4g.Error("Unable to find webapp directory, could not initialize plugins")
}
- wsapi.InitRouter()
api4.Init(a, a.Srv.Router, false)
api3 := api.Init(a, a.Srv.Router)
- wsapi.InitApi()
+ wsapi.Init(a, a.Srv.WebSocketRouter)
web.Init(api3)
if !utils.IsLicensed() && len(utils.Cfg.SqlSettings.DataSourceReplicas) > 1 {
diff --git a/cmd/platform/test.go b/cmd/platform/test.go
index fd8777a46..f0c75a8d3 100644
--- a/cmd/platform/test.go
+++ b/cmd/platform/test.go
@@ -52,10 +52,10 @@ func webClientTestsCmdF(cmd *cobra.Command, args []string) error {
utils.InitTranslations(utils.Cfg.LocalizationSettings)
a.Srv.Router = api.NewRouter()
- wsapi.InitRouter()
+ a.Srv.WebSocketRouter = a.NewWebSocketRouter()
api4.Init(a, a.Srv.Router, false)
api.Init(a, a.Srv.Router)
- wsapi.InitApi()
+ wsapi.Init(a, a.Srv.WebSocketRouter)
setupClientTests()
a.StartServer()
runWebClientTests()
@@ -72,10 +72,10 @@ func serverForWebClientTestsCmdF(cmd *cobra.Command, args []string) error {
utils.InitTranslations(utils.Cfg.LocalizationSettings)
a.Srv.Router = api.NewRouter()
- wsapi.InitRouter()
+ a.Srv.WebSocketRouter = a.NewWebSocketRouter()
api4.Init(a, a.Srv.Router, false)
api.Init(a, a.Srv.Router)
- wsapi.InitApi()
+ wsapi.Init(a, a.Srv.WebSocketRouter)
setupClientTests()
a.StartServer()
diff --git a/wsapi/api.go b/wsapi/api.go
index 4de0f265f..f318cba6f 100644
--- a/wsapi/api.go
+++ b/wsapi/api.go
@@ -7,15 +7,21 @@ import (
"github.com/mattermost/mattermost-server/app"
)
-func InitRouter() {
- app.Global().Srv.WebSocketRouter = app.NewWebSocketRouter()
+type API struct {
+ App *app.App
+ Router *app.WebSocketRouter
}
-func InitApi() {
- InitUser()
- InitSystem()
- InitStatus()
- InitWebrtc()
+func Init(a *app.App, router *app.WebSocketRouter) {
+ api := &API{
+ App: a,
+ Router: router,
+ }
- app.HubStart()
+ api.InitUser()
+ api.InitSystem()
+ api.InitStatus()
+ api.InitWebrtc()
+
+ a.HubStart()
}
diff --git a/wsapi/status.go b/wsapi/status.go
index a60900feb..a20b6e284 100644
--- a/wsapi/status.go
+++ b/wsapi/status.go
@@ -10,11 +10,11 @@ import (
"github.com/mattermost/mattermost-server/utils"
)
-func InitStatus() {
+func (api *API) InitStatus() {
l4g.Debug(utils.T("wsapi.status.init.debug"))
- app.Global().Srv.WebSocketRouter.Handle("get_statuses", ApiWebSocketHandler(getStatuses))
- app.Global().Srv.WebSocketRouter.Handle("get_statuses_by_ids", ApiWebSocketHandler(getStatusesByIds))
+ api.Router.Handle("get_statuses", api.ApiWebSocketHandler(getStatuses))
+ api.Router.Handle("get_statuses_by_ids", api.ApiWebSocketHandler(api.getStatusesByIds))
}
func getStatuses(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
@@ -22,14 +22,14 @@ func getStatuses(req *model.WebSocketRequest) (map[string]interface{}, *model.Ap
return model.StatusMapToInterfaceMap(statusMap), nil
}
-func getStatusesByIds(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
+func (api *API) getStatusesByIds(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
var userIds []string
if userIds = model.ArrayFromInterface(req.Data["user_ids"]); len(userIds) == 0 {
l4g.Error(model.StringInterfaceToJson(req.Data))
return nil, NewInvalidWebSocketParamError(req.Action, "user_ids")
}
- statusMap, err := app.Global().GetStatusesByIds(userIds)
+ statusMap, err := api.App.GetStatusesByIds(userIds)
if err != nil {
return nil, err
}
diff --git a/wsapi/system.go b/wsapi/system.go
index e019e79b2..6959106d6 100644
--- a/wsapi/system.go
+++ b/wsapi/system.go
@@ -5,15 +5,14 @@ package wsapi
import (
l4g "github.com/alecthomas/log4go"
- "github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
-func InitSystem() {
+func (api *API) InitSystem() {
l4g.Debug(utils.T("wsapi.system.init.debug"))
- app.Global().Srv.WebSocketRouter.Handle("ping", ApiWebSocketHandler(ping))
+ api.Router.Handle("ping", api.ApiWebSocketHandler(ping))
}
func ping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
diff --git a/wsapi/user.go b/wsapi/user.go
index d79ecab83..c8236c978 100644
--- a/wsapi/user.go
+++ b/wsapi/user.go
@@ -5,18 +5,17 @@ package wsapi
import (
l4g "github.com/alecthomas/log4go"
- "github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
-func InitUser() {
+func (api *API) InitUser() {
l4g.Debug(utils.T("wsapi.user.init.debug"))
- app.Global().Srv.WebSocketRouter.Handle("user_typing", ApiWebSocketHandler(userTyping))
+ api.Router.Handle("user_typing", api.ApiWebSocketHandler(api.userTyping))
}
-func userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
+func (api *API) userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
var ok bool
var channelId string
if channelId, ok = req.Data["channel_id"].(string); !ok || len(channelId) != 26 {
@@ -34,7 +33,7 @@ func userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.App
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_TYPING, "", channelId, "", omitUsers)
event.Add("parent_id", parentId)
event.Add("user_id", req.Session.UserId)
- go app.Publish(event)
+ go api.App.Publish(event)
return nil, nil
}
diff --git a/wsapi/webrtc.go b/wsapi/webrtc.go
index a2e1a18ce..42e749ebb 100644
--- a/wsapi/webrtc.go
+++ b/wsapi/webrtc.go
@@ -5,18 +5,17 @@ package wsapi
import (
l4g "github.com/alecthomas/log4go"
- "github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
-func InitWebrtc() {
+func (api *API) InitWebrtc() {
l4g.Debug(utils.T("wsapi.webtrc.init.debug"))
- app.Global().Srv.WebSocketRouter.Handle("webrtc", ApiWebSocketHandler(webrtcMessage))
+ api.Router.Handle("webrtc", api.ApiWebSocketHandler(api.webrtcMessage))
}
-func webrtcMessage(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
+func (api *API) webrtcMessage(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
var ok bool
var toUserId string
if toUserId, ok = req.Data["to_user_id"].(string); !ok || len(toUserId) != 26 {
@@ -25,7 +24,7 @@ func webrtcMessage(req *model.WebSocketRequest) (map[string]interface{}, *model.
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_WEBRTC, "", "", toUserId, nil)
event.Data = req.Data
- go app.Publish(event)
+ go api.App.Publish(event)
return nil, nil
}
diff --git a/wsapi/websocket_handler.go b/wsapi/websocket_handler.go
index 5dfa5bdb0..daf225061 100644
--- a/wsapi/websocket_handler.go
+++ b/wsapi/websocket_handler.go
@@ -13,18 +13,19 @@ import (
"github.com/mattermost/mattermost-server/utils"
)
-func ApiWebSocketHandler(wh func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError)) webSocketHandler {
- return webSocketHandler{wh}
+func (api *API) ApiWebSocketHandler(wh func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError)) webSocketHandler {
+ return webSocketHandler{api.App, wh}
}
type webSocketHandler struct {
+ app *app.App
handlerFunc func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError)
}
func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketRequest) {
l4g.Debug("/api/v3/users/websocket:%s", r.Action)
- session, sessionErr := app.Global().GetSession(conn.GetSessionToken())
+ session, sessionErr := wh.app.GetSession(conn.GetSessionToken())
if sessionErr != nil {
l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, conn.UserId, sessionErr.SystemMessage(utils.T), sessionErr.Error())
sessionErr.DetailedError = ""