From 8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 27 Sep 2017 11:52:34 -0500 Subject: remove remaining Global() calls (outside of tests) (#7521) --- api/api.go | 2 +- api/apitestlib.go | 4 +-- api/cli_test.go | 5 ++- api/emoji.go | 2 +- api/post.go | 2 +- api/reaction.go | 18 ---------- api/websocket.go | 3 +- api/websocket_test.go | 5 ++- api4/api.go | 2 +- api4/apitestlib.go | 4 +-- api4/websocket.go | 3 +- app/admin.go | 4 +-- app/analytics.go | 4 +-- app/app.go | 5 +++ app/auto_environment.go | 10 +++--- app/auto_users.go | 8 +++-- app/channel.go | 22 ++++++------- app/cluster_handlers.go | 8 ++--- app/command.go | 2 +- app/command_expand_collapse.go | 2 +- app/command_loadtest.go | 4 ++- app/email_batching.go | 26 +++++++-------- app/email_batching_test.go | 12 +++---- app/emoji.go | 2 +- app/notification.go | 16 ++++----- app/plugins.go | 4 +-- app/post.go | 20 ++++++------ app/preference.go | 4 +-- app/reaction.go | 4 +-- app/server.go | 2 +- app/session.go | 8 ++--- app/status.go | 10 +++--- app/team.go | 22 ++++++------- app/user.go | 12 +++---- app/web_conn.go | 2 +- app/web_hub.go | 74 +++++++++++++++++++----------------------- app/websocket_router.go | 18 +++++----- cmd/platform/server.go | 4 +-- cmd/platform/test.go | 8 ++--- wsapi/api.go | 22 ++++++++----- wsapi/status.go | 10 +++--- wsapi/system.go | 5 ++- wsapi/user.go | 9 +++-- wsapi/webrtc.go | 9 +++-- wsapi/websocket_handler.go | 7 ++-- 45 files changed, 207 insertions(+), 222 deletions(-) diff --git a/api/api.go b/api/api.go index 283120b55..806ff84eb 100644 --- a/api/api.go +++ b/api/api.go @@ -121,7 +121,7 @@ func Init(a *app.App, root *mux.Router) *API { utils.InitHTML() - app.InitEmailBatching() + a.InitEmailBatching() if *utils.Cfg.ServiceSettings.EnableAPIv3 { l4g.Info("API version 3 is scheduled for deprecation. Please see https://api.mattermost.com for details.") diff --git a/api/apitestlib.go b/api/apitestlib.go index 8504748e1..f285b8d79 100644 --- a/api/apitestlib.go +++ b/api/apitestlib.go @@ -49,11 +49,11 @@ func setupTestHelper(enterprise bool) *TestHelper { th.App.NewServer() th.App.InitStores() th.App.Srv.Router = NewRouter() - wsapi.InitRouter() + th.App.Srv.WebSocketRouter = th.App.NewWebSocketRouter() th.App.StartServer() api4.Init(th.App, th.App.Srv.Router, false) Init(th.App, th.App.Srv.Router) - wsapi.InitApi() + wsapi.Init(th.App, th.App.Srv.WebSocketRouter) utils.EnableDebugLogForTest() th.App.Srv.Store.MarkSystemRanUnitTests() diff --git a/api/cli_test.go b/api/cli_test.go index 8c7381290..14645f35c 100644 --- a/api/cli_test.go +++ b/api/cli_test.go @@ -8,7 +8,6 @@ import ( "strings" "testing" - "github.com/mattermost/mattermost-server/app" "github.com/mattermost/mattermost-server/model" ) @@ -96,7 +95,7 @@ func TestCliCreateUserWithoutTeam(t *testing.T) { t.SkipNow() } - Setup() + th := Setup() id := model.NewId() email := "success+" + id + "@simulator.amazonses.com" username := "name" + id @@ -108,7 +107,7 @@ func TestCliCreateUserWithoutTeam(t *testing.T) { t.Fatal(err) } - if result := <-app.Global().Srv.Store.User().GetByEmail(email); result.Err != nil { + if result := <-th.App.Srv.Store.User().GetByEmail(email); result.Err != nil { t.Fatal() } else { user := result.Data.(*model.User) diff --git a/api/emoji.go b/api/emoji.go index 1961ad146..b4ad9fa2c 100644 --- a/api/emoji.go +++ b/api/emoji.go @@ -118,7 +118,7 @@ func createEmoji(c *Context, w http.ResponseWriter, r *http.Request) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EMOJI_ADDED, "", "", "", nil) message.Add("emoji", result.Data.(*model.Emoji).ToJson()) - app.Publish(message) + c.App.Publish(message) w.Write([]byte(result.Data.(*model.Emoji).ToJson())) } } diff --git a/api/post.go b/api/post.go index 41cd7564b..db9412e7b 100644 --- a/api/post.go +++ b/api/post.go @@ -141,7 +141,7 @@ func saveIsPinnedPost(c *Context, w http.ResponseWriter, r *http.Request, isPinn message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", rpost.ChannelId, "", nil) message.Add("post", rpost.ToJson()) - go app.Publish(message) + go c.App.Publish(message) c.App.InvalidateCacheForChannelPosts(rpost.ChannelId) diff --git a/api/reaction.go b/api/reaction.go index 28cc9ade2..aada8cdd4 100644 --- a/api/reaction.go +++ b/api/reaction.go @@ -8,7 +8,6 @@ import ( l4g "github.com/alecthomas/log4go" "github.com/gorilla/mux" - "github.com/mattermost/mattermost-server/app" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) @@ -112,23 +111,6 @@ func deleteReaction(c *Context, w http.ResponseWriter, r *http.Request) { ReturnStatusOK(w) } -func sendReactionEvent(event string, channelId string, reaction *model.Reaction, post *model.Post) { - // send out that a reaction has been added/removed - - message := model.NewWebSocketEvent(event, "", channelId, "", nil) - message.Add("reaction", reaction.ToJson()) - app.Publish(message) - - // THe post is always modified since the UpdateAt always changes - app.Global().InvalidateCacheForChannelPosts(post.ChannelId) - post.HasReactions = true - post.UpdateAt = model.GetMillis() - umessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", channelId, "", nil) - umessage.Add("post", post.ToJson()) - app.Publish(umessage) - -} - func listReactions(c *Context, w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) diff --git a/api/websocket.go b/api/websocket.go index 6de5741f3..c90968e7c 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -8,7 +8,6 @@ import ( l4g "github.com/alecthomas/log4go" "github.com/gorilla/websocket" - "github.com/mattermost/mattermost-server/app" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) @@ -37,7 +36,7 @@ func connect(c *Context, w http.ResponseWriter, r *http.Request) { wc := c.App.NewWebConn(ws, c.Session, c.T, c.Locale) if len(c.Session.UserId) > 0 { - app.HubRegister(wc) + c.App.HubRegister(wc) } go wc.WritePump() diff --git a/api/websocket_test.go b/api/websocket_test.go index 9bf957297..a5c512262 100644 --- a/api/websocket_test.go +++ b/api/websocket_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/gorilla/websocket" - "github.com/mattermost/mattermost-server/app" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) @@ -195,7 +194,7 @@ func TestWebSocketEvent(t *testing.T) { omitUser["somerandomid"] = true evt1 := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_TYPING, "", th.BasicChannel.Id, "", omitUser) evt1.Add("user_id", "somerandomid") - app.Publish(evt1) + th.App.Publish(evt1) time.Sleep(300 * time.Millisecond) @@ -224,7 +223,7 @@ func TestWebSocketEvent(t *testing.T) { } evt2 := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_TYPING, "", "somerandomid", "", nil) - go app.Publish(evt2) + go th.App.Publish(evt2) time.Sleep(300 * time.Millisecond) eventHit = false diff --git a/api4/api.go b/api4/api.go index 9df051456..e9cf3054c 100644 --- a/api4/api.go +++ b/api4/api.go @@ -226,7 +226,7 @@ func Init(a *app.App, root *mux.Router, full bool) *API { if full { utils.InitHTML() - app.InitEmailBatching() + a.InitEmailBatching() } return api diff --git a/api4/apitestlib.go b/api4/apitestlib.go index 7b5230014..fad066ff8 100644 --- a/api4/apitestlib.go +++ b/api4/apitestlib.go @@ -61,10 +61,10 @@ func setupTestHelper(enterprise bool) *TestHelper { th.App.NewServer() th.App.InitStores() th.App.Srv.Router = NewRouter() - wsapi.InitRouter() + th.App.Srv.WebSocketRouter = th.App.NewWebSocketRouter() th.App.StartServer() Init(th.App, th.App.Srv.Router, true) - wsapi.InitApi() + wsapi.Init(th.App, th.App.Srv.WebSocketRouter) utils.EnableDebugLogForTest() th.App.Srv.Store.MarkSystemRanUnitTests() diff --git a/api4/websocket.go b/api4/websocket.go index 7dfe3d61c..2793c0bd0 100644 --- a/api4/websocket.go +++ b/api4/websocket.go @@ -8,7 +8,6 @@ import ( l4g "github.com/alecthomas/log4go" "github.com/gorilla/websocket" - "github.com/mattermost/mattermost-server/app" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) @@ -38,7 +37,7 @@ func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) { wc := c.App.NewWebConn(ws, c.Session, c.T, "") if len(c.Session.UserId) > 0 { - app.HubRegister(wc) + c.App.HubRegister(wc) } go wc.WritePump() diff --git a/app/admin.go b/app/admin.go index bd687627e..0d02c3b49 100644 --- a/app/admin.go +++ b/app/admin.go @@ -138,7 +138,7 @@ func (a *App) ReloadConfig() { utils.LoadConfig(utils.CfgFileName) // start/restart email batching job if necessary - InitEmailBatching() + a.InitEmailBatching() } func (a *App) SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool) *model.AppError { @@ -179,7 +179,7 @@ func (a *App) SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool) } // start/restart email batching job if necessary - InitEmailBatching() + a.InitEmailBatching() return nil } diff --git a/app/analytics.go b/app/analytics.go index 70c049350..65a9e4129 100644 --- a/app/analytics.go +++ b/app/analytics.go @@ -103,7 +103,7 @@ func (a *App) GetAnalytics(name string, teamId string) (model.AnalyticsRows, *mo return nil, err } - totalSockets := TotalWebsocketConnections() + totalSockets := a.TotalWebsocketConnections() totalMasterDb := a.Srv.Store.TotalMasterDbConnections() totalReadDb := a.Srv.Store.TotalReadDbConnections() @@ -118,7 +118,7 @@ func (a *App) GetAnalytics(name string, teamId string) (model.AnalyticsRows, *mo rows[7].Value = float64(totalReadDb) } else { - rows[5].Value = float64(TotalWebsocketConnections()) + rows[5].Value = float64(a.TotalWebsocketConnections()) rows[6].Value = float64(a.Srv.Store.TotalMasterDbConnections()) rows[7].Value = float64(a.Srv.Store.TotalReadDbConnections()) } diff --git a/app/app.go b/app/app.go index eaaf9acee..e85fa6342 100644 --- a/app/app.go +++ b/app/app.go @@ -20,6 +20,11 @@ type App struct { PluginEnv *pluginenv.Environment PluginConfigListenerId string + EmailBatching *EmailBatchingJob + + Hubs []*Hub + HubsStopCheckingForDeadlock chan bool + AccountMigration einterfaces.AccountMigrationInterface Brand einterfaces.BrandInterface Cluster einterfaces.ClusterInterface diff --git a/app/auto_environment.go b/app/auto_environment.go index 7bafc6948..660316e4b 100644 --- a/app/auto_environment.go +++ b/app/auto_environment.go @@ -16,7 +16,7 @@ type TestEnvironment struct { Environments []TeamEnvironment } -func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TestEnvironment, bool) { +func CreateTestEnvironmentWithTeams(a *App, client *model.Client, rangeTeams utils.Range, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TestEnvironment, bool) { rand.Seed(time.Now().UTC().UnixNano()) teamCreator := NewAutoTeamCreator(client) @@ -29,7 +29,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range environment := TestEnvironment{teams, make([]TeamEnvironment, len(teams))} for i, team := range teams { - userCreator := NewAutoUserCreator(client, team) + userCreator := NewAutoUserCreator(a, client, team) userCreator.Fuzzy = fuzzy randomUser, err := userCreator.createRandomUser() if err != true { @@ -37,7 +37,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range } client.LoginById(randomUser.Id, USER_PASSWORD) client.SetTeamId(team.Id) - teamEnvironment, err := CreateTestEnvironmentInTeam(client, team, rangeChannels, rangeUsers, rangePosts, fuzzy) + teamEnvironment, err := CreateTestEnvironmentInTeam(a, client, team, rangeChannels, rangeUsers, rangePosts, fuzzy) if err != true { return TestEnvironment{}, false } @@ -47,7 +47,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range return environment, true } -func CreateTestEnvironmentInTeam(client *model.Client, team *model.Team, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TeamEnvironment, bool) { +func CreateTestEnvironmentInTeam(a *App, client *model.Client, team *model.Team, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TeamEnvironment, bool) { rand.Seed(time.Now().UTC().UnixNano()) // We need to create at least one user @@ -55,7 +55,7 @@ func CreateTestEnvironmentInTeam(client *model.Client, team *model.Team, rangeCh rangeUsers.Begin = 1 } - userCreator := NewAutoUserCreator(client, team) + userCreator := NewAutoUserCreator(a, client, team) userCreator.Fuzzy = fuzzy users, err := userCreator.CreateTestUsers(rangeUsers) if err != true { diff --git a/app/auto_users.go b/app/auto_users.go index 50e2084d1..78126211e 100644 --- a/app/auto_users.go +++ b/app/auto_users.go @@ -12,6 +12,7 @@ import ( ) type AutoUserCreator struct { + app *App client *model.Client team *model.Team EmailLength utils.Range @@ -21,8 +22,9 @@ type AutoUserCreator struct { Fuzzy bool } -func NewAutoUserCreator(client *model.Client, team *model.Team) *AutoUserCreator { +func NewAutoUserCreator(a *App, client *model.Client, team *model.Team) *AutoUserCreator { return &AutoUserCreator{ + app: a, client: client, team: team, EmailLength: USER_EMAIL_LEN, @@ -81,14 +83,14 @@ func (cfg *AutoUserCreator) createRandomUser() (*model.User, bool) { ruser := result.Data.(*model.User) status := &model.Status{UserId: ruser.Id, Status: model.STATUS_ONLINE, Manual: false, LastActivityAt: model.GetMillis(), ActiveChannel: ""} - if result := <-Global().Srv.Store.Status().SaveOrUpdate(status); result.Err != nil { + if result := <-cfg.app.Srv.Store.Status().SaveOrUpdate(status); result.Err != nil { result.Err.Translate(utils.T) l4g.Error(result.Err.Error()) return nil, false } // We need to cheat to verify the user's email - store.Must(Global().Srv.Store.User().VerifyEmail(ruser.Id)) + store.Must(cfg.app.Srv.Store.User().VerifyEmail(ruser.Id)) return result.Data.(*model.User), true } diff --git a/app/channel.go b/app/channel.go index 436d429c9..2d3709a0c 100644 --- a/app/channel.go +++ b/app/channel.go @@ -136,7 +136,7 @@ func (a *App) CreateChannelWithUser(channel *model.Channel, userId string) (*mod message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_CREATED, "", "", userId, nil) message.Add("channel_id", channel.Id) message.Add("team_id", channel.TeamId) - Publish(message) + a.Publish(message) return rchannel, nil } @@ -181,7 +181,7 @@ func (a *App) CreateDirectChannel(userId string, otherUserId string) (*model.Cha message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_DIRECT_ADDED, "", channel.Id, "", nil) message.Add("teammate_id", otherUserId) - Publish(message) + a.Publish(message) return channel, nil } @@ -254,7 +254,7 @@ func (a *App) CreateGroupChannel(userIds []string, creatorId string) (*model.Cha message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_GROUP_ADDED, "", channel.Id, "", nil) message.Add("teammate_ids", model.ArrayToJson(userIds)) - Publish(message) + a.Publish(message) return channel, nil } @@ -316,7 +316,7 @@ func (a *App) UpdateChannel(channel *model.Channel) (*model.Channel, *model.AppE messageWs := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_UPDATED, "", channel.Id, "", nil) messageWs.Add("channel", channel.ToJson()) - Publish(messageWs) + a.Publish(messageWs) return channel, nil } @@ -484,7 +484,7 @@ func (a *App) DeleteChannel(channel *model.Channel, userId string) *model.AppErr message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_DELETED, channel.TeamId, "", "", nil) message.Add("channel_id", channel.Id) - Publish(message) + a.Publish(message) } return nil @@ -550,7 +550,7 @@ func (a *App) AddUserToChannel(user *model.User, channel *model.Channel) (*model message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_ADDED, "", channel.Id, "", nil) message.Add("user_id", user.Id) message.Add("team_id", channel.TeamId) - Publish(message) + a.Publish(message) return newMember, nil } @@ -1039,13 +1039,13 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string, message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", channel.Id, "", nil) message.Add("user_id", userIdToRemove) message.Add("remover_id", removerUserId) - go Publish(message) + go a.Publish(message) // because the removed user no longer belongs to the channel we need to send a separate websocket event userMsg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", "", userIdToRemove, nil) userMsg.Add("channel_id", channel.Id) userMsg.Add("remover_id", removerUserId) - go Publish(userMsg) + go a.Publish(userMsg) return nil } @@ -1098,7 +1098,7 @@ func (a *App) SetActiveChannel(userId string, channelId string) *model.AppError a.AddStatusCache(status) if status.Status != oldStatus { - BroadcastStatus(status) + a.BroadcastStatus(status) } return nil @@ -1113,7 +1113,7 @@ func (a *App) UpdateChannelLastViewedAt(channelIds []string, userId string) *mod for _, channelId := range channelIds { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil) message.Add("channel_id", channelId) - go Publish(message) + go a.Publish(message) } } @@ -1179,7 +1179,7 @@ func (a *App) ViewChannel(view *model.ChannelView, userId string, clearPushNotif if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages && model.IsValidId(view.ChannelId) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil) message.Add("channel_id", view.ChannelId) - go Publish(message) + go a.Publish(message) } return nil diff --git a/app/cluster_handlers.go b/app/cluster_handlers.go index 5c4bd7026..997de6dcd 100644 --- a/app/cluster_handlers.go +++ b/app/cluster_handlers.go @@ -20,12 +20,12 @@ func (a *App) RegisterAllClusterMessageHandlers() { a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME, a.ClusterInvalidateCacheForChannelByNameHandler) a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL, a.ClusterInvalidateCacheForChannelHandler) a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER, a.ClusterInvalidateCacheForUserHandler) - a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, ClusterClearSessionCacheForUserHandler) + a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, a.ClusterClearSessionCacheForUserHandler) } func (a *App) ClusterPublishHandler(msg *model.ClusterMessage) { event := model.WebSocketEventFromJson(strings.NewReader(msg.Data)) - PublishSkipClusterSend(event) + a.PublishSkipClusterSend(event) } func (a *App) ClusterUpdateStatusHandler(msg *model.ClusterMessage) { @@ -65,6 +65,6 @@ func (a *App) ClusterInvalidateCacheForUserHandler(msg *model.ClusterMessage) { a.InvalidateCacheForUserSkipClusterSend(msg.Data) } -func ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) { - ClearSessionCacheForUserSkipClusterSend(msg.Data) +func (a *App) ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) { + a.ClearSessionCacheForUserSkipClusterSend(msg.Data) } diff --git a/app/command.go b/app/command.go index f8885219a..0763e24c7 100644 --- a/app/command.go +++ b/app/command.go @@ -49,7 +49,7 @@ func (a *App) CreateCommandPost(post *model.Post, teamId string, response *model return a.CreatePostMissingChannel(post, true) } else if response.ResponseType == "" || response.ResponseType == model.COMMAND_RESPONSE_TYPE_EPHEMERAL { post.ParentId = "" - SendEphemeralPost(post.UserId, post) + a.SendEphemeralPost(post.UserId, post) } return post, nil diff --git a/app/command_expand_collapse.go b/app/command_expand_collapse.go index 116e29f17..9c94a4325 100644 --- a/app/command_expand_collapse.go +++ b/app/command_expand_collapse.go @@ -74,7 +74,7 @@ func (a *App) setCollapsePreference(args *model.CommandArgs, isCollapse bool) *m socketMessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCE_CHANGED, "", "", args.UserId, nil) socketMessage.Add("preference", pref.ToJson()) - go Publish(socketMessage) + go a.Publish(socketMessage) var rmsg string diff --git a/app/command_loadtest.go b/app/command_loadtest.go index 629b9c9f5..4bc371bdc 100644 --- a/app/command_loadtest.go +++ b/app/command_loadtest.go @@ -166,6 +166,7 @@ func (me *LoadTestProvider) SetupCommand(a *App, args *model.CommandArgs, messag } client.Login(BTEST_USER_EMAIL, BTEST_USER_PASSWORD) environment, err := CreateTestEnvironmentWithTeams( + a, client, utils.Range{Begin: numTeams, End: numTeams}, utils.Range{Begin: numChannels, End: numChannels}, @@ -193,6 +194,7 @@ func (me *LoadTestProvider) SetupCommand(a *App, args *model.CommandArgs, messag client.MockSession(args.Session.Token) client.SetTeamId(args.TeamId) CreateTestEnvironmentInTeam( + a, client, team, utils.Range{Begin: numChannels, End: numChannels}, @@ -227,7 +229,7 @@ func (me *LoadTestProvider) UsersCommand(a *App, args *model.CommandArgs, messag client := model.NewClient(args.SiteURL) client.SetTeamId(team.Id) - userCreator := NewAutoUserCreator(client, team) + userCreator := NewAutoUserCreator(a, client, team) userCreator.Fuzzy = doFuzz userCreator.CreateTestUsers(usersr) diff --git a/app/email_batching.go b/app/email_batching.go index c8ea8c98a..b9f89d646 100644 --- a/app/email_batching.go +++ b/app/email_batching.go @@ -22,26 +22,24 @@ const ( EMAIL_BATCHING_TASK_NAME = "Email Batching" ) -var emailBatchingJob *EmailBatchingJob - -func InitEmailBatching() { +func (a *App) InitEmailBatching() { if *utils.Cfg.EmailSettings.EnableEmailBatching { - if emailBatchingJob == nil { - emailBatchingJob = MakeEmailBatchingJob(*utils.Cfg.EmailSettings.EmailBatchingBufferSize) + if a.EmailBatching == nil { + a.EmailBatching = NewEmailBatchingJob(a, *utils.Cfg.EmailSettings.EmailBatchingBufferSize) } // note that we don't support changing EmailBatchingBufferSize without restarting the server - emailBatchingJob.Start() + a.EmailBatching.Start() } } -func AddNotificationEmailToBatch(user *model.User, post *model.Post, team *model.Team) *model.AppError { +func (a *App) AddNotificationEmailToBatch(user *model.User, post *model.Post, team *model.Team) *model.AppError { if !*utils.Cfg.EmailSettings.EnableEmailBatching { return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.disabled.app_error", nil, "", http.StatusNotImplemented) } - if !emailBatchingJob.Add(user, post, team) { + if !a.EmailBatching.Add(user, post, team) { l4g.Error(utils.T("api.email_batching.add_notification_email_to_batch.channel_full.app_error")) return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.channel_full.app_error", nil, "", http.StatusInternalServerError) } @@ -56,12 +54,14 @@ type batchedNotification struct { } type EmailBatchingJob struct { + app *App newNotifications chan *batchedNotification pendingNotifications map[string][]*batchedNotification } -func MakeEmailBatchingJob(bufferSize int) *EmailBatchingJob { +func NewEmailBatchingJob(a *App, bufferSize int) *EmailBatchingJob { return &EmailBatchingJob{ + app: a, newNotifications: make(chan *batchedNotification, bufferSize), pendingNotifications: make(map[string][]*batchedNotification), } @@ -97,7 +97,7 @@ func (job *EmailBatchingJob) CheckPendingEmails() { // it's a bit weird to pass the send email function through here, but it makes it so that we can test // without actually sending emails - job.checkPendingNotifications(time.Now(), Global().sendBatchedEmailNotification) + job.checkPendingNotifications(time.Now(), job.app.sendBatchedEmailNotification) l4g.Debug(utils.T("api.email_batching.check_pending_emails.finished_running"), len(job.pendingNotifications)) } @@ -131,7 +131,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu if inspectedTeamNames[notification.teamName] != "" { continue } - tchan := Global().Srv.Store.Team().GetByName(notifications[0].teamName) + tchan := job.app.Srv.Store.Team().GetByName(notifications[0].teamName) if result := <-tchan; result.Err != nil { l4g.Error("Unable to find Team id for notification", result.Err) continue @@ -141,7 +141,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu // if the user has viewed any channels in this team since the notification was queued, delete // all queued notifications - mchan := Global().Srv.Store.Channel().GetMembersForUser(inspectedTeamNames[notification.teamName], userId) + mchan := job.app.Srv.Store.Channel().GetMembersForUser(inspectedTeamNames[notification.teamName], userId) if result := <-mchan; result.Err != nil { l4g.Error("Unable to find ChannelMembers for user", result.Err) continue @@ -158,7 +158,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu // get how long we need to wait to send notifications to the user var interval int64 - pchan := Global().Srv.Store.Preference().Get(userId, model.PREFERENCE_CATEGORY_NOTIFICATIONS, model.PREFERENCE_NAME_EMAIL_INTERVAL) + pchan := job.app.Srv.Store.Preference().Get(userId, model.PREFERENCE_CATEGORY_NOTIFICATIONS, model.PREFERENCE_NAME_EMAIL_INTERVAL) if result := <-pchan; result.Err != nil { // use the default batching interval if an error ocurrs while fetching user preferences interval, _ = strconv.ParseInt(model.PREFERENCE_EMAIL_INTERVAL_BATCHING_SECONDS, 10, 64) diff --git a/app/email_batching_test.go b/app/email_batching_test.go index 2c58d43f8..b69eeec2d 100644 --- a/app/email_batching_test.go +++ b/app/email_batching_test.go @@ -13,14 +13,14 @@ import ( ) func TestHandleNewNotifications(t *testing.T) { - Setup() + th := Setup() id1 := model.NewId() id2 := model.NewId() id3 := model.NewId() // test queueing of received posts by user - job := MakeEmailBatchingJob(128) + job := NewEmailBatchingJob(th.App, 128) job.handleNewNotifications() @@ -74,7 +74,7 @@ func TestHandleNewNotifications(t *testing.T) { } // test ordering of received posts - job = MakeEmailBatchingJob(128) + job = NewEmailBatchingJob(th.App, 128) job.Add(&model.User{Id: id1}, &model.Post{UserId: id1, Message: "test1"}, &model.Team{Name: "team"}) job.Add(&model.User{Id: id1}, &model.Post{UserId: id1, Message: "test2"}, &model.Team{Name: "team"}) @@ -95,7 +95,7 @@ func TestHandleNewNotifications(t *testing.T) { func TestCheckPendingNotifications(t *testing.T) { th := Setup().InitBasic() - job := MakeEmailBatchingJob(128) + job := NewEmailBatchingJob(th.App, 128) job.pendingNotifications[th.BasicUser.Id] = []*batchedNotification{ { post: &model.Post{ @@ -201,7 +201,7 @@ func TestCheckPendingNotifications(t *testing.T) { */ func TestCheckPendingNotificationsDefaultInterval(t *testing.T) { th := Setup().InitBasic() - job := MakeEmailBatchingJob(128) + job := NewEmailBatchingJob(th.App, 128) // bypasses recent user activity check channelMember := store.Must(th.App.Srv.Store.Channel().GetMember(th.BasicChannel.Id, th.BasicUser.Id)).(*model.ChannelMember) @@ -237,7 +237,7 @@ func TestCheckPendingNotificationsDefaultInterval(t *testing.T) { */ func TestCheckPendingNotificationsCantParseInterval(t *testing.T) { th := Setup().InitBasic() - job := MakeEmailBatchingJob(128) + job := NewEmailBatchingJob(th.App, 128) // bypasses recent user activity check channelMember := store.Must(th.App.Srv.Store.Channel().GetMember(th.BasicChannel.Id, th.BasicUser.Id)).(*model.ChannelMember) diff --git a/app/emoji.go b/app/emoji.go index d5b81c89a..308791aed 100644 --- a/app/emoji.go +++ b/app/emoji.go @@ -61,7 +61,7 @@ func (a *App) CreateEmoji(sessionUserId string, emoji *model.Emoji, multiPartIma message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EMOJI_ADDED, "", "", "", nil) message.Add("emoji", emoji.ToJson()) - Publish(message) + a.Publish(message) return result.Data.(*model.Emoji), nil } } diff --git a/app/notification.go b/app/notification.go index cc3db8b55..0859dfd20 100644 --- a/app/notification.go +++ b/app/notification.go @@ -94,7 +94,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod if result := <-a.Srv.Store.User().GetProfilesByUsernames(potentialOtherMentions, team.Id); result.Err == nil { outOfChannelMentions := result.Data.([]*model.User) if channel.Type != model.CHANNEL_GROUP { - go sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions) + go a.sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions) } } } @@ -186,7 +186,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod // If the channel has more than 1K users then @here is disabled if hereNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel { hereNotification = false - SendEphemeralPost( + a.SendEphemeralPost( post.UserId, &model.Post{ ChannelId: post.ChannelId, @@ -198,7 +198,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod // If the channel has more than 1K users then @channel is disabled if channelNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel { - SendEphemeralPost( + a.SendEphemeralPost( post.UserId, &model.Post{ ChannelId: post.ChannelId, @@ -210,7 +210,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod // If the channel has more than 1K users then @all is disabled if allNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel { - SendEphemeralPost( + a.SendEphemeralPost( post.UserId, &model.Post{ ChannelId: post.ChannelId, @@ -298,7 +298,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod message.Add("mentions", model.ArrayToJson(mentionedUsersList)) } - Publish(message) + a.Publish(message) return mentionedUsersList, nil } @@ -337,7 +337,7 @@ func (a *App) sendNotificationEmail(post *model.Post, user *model.User, channel } if sendBatched { - if err := AddNotificationEmailToBatch(user, post, team); err == nil { + if err := a.AddNotificationEmailToBatch(user, post, team); err == nil { return nil } } @@ -717,7 +717,7 @@ func (a *App) getMobileAppSessions(userId string) ([]*model.Session, *model.AppE } } -func sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId string, users []*model.User) *model.AppError { +func (a *App) sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId string, users []*model.User) *model.AppError { if len(users) == 0 { return nil } @@ -742,7 +742,7 @@ func sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId strin }) } - SendEphemeralPost( + a.SendEphemeralPost( post.UserId, &model.Post{ ChannelId: post.ChannelId, diff --git a/app/plugins.go b/app/plugins.go index 450cfebeb..94a1bfd6f 100644 --- a/app/plugins.go +++ b/app/plugins.go @@ -335,7 +335,7 @@ func (a *App) UnpackAndActivatePlugin(pluginFile io.Reader) (*model.Manifest, *m if manifest.HasClient() { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_ACTIVATED, "", "", "", nil) message.Add("manifest", manifest.ClientManifest()) - Publish(message) + a.Publish(message) } return manifest, nil @@ -383,7 +383,7 @@ func (a *App) RemovePlugin(id string) *model.AppError { if manifest.HasClient() { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_DEACTIVATED, "", "", "", nil) message.Add("manifest", manifest.ClientManifest()) - Publish(message) + a.Publish(message) } return nil diff --git a/app/post.go b/app/post.go index e81af4673..ccdc015bb 100644 --- a/app/post.go +++ b/app/post.go @@ -51,7 +51,7 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError) } T := utils.GetUserTranslations(user.Locale) - SendEphemeralPost( + a.SendEphemeralPost( post.UserId, &model.Post{ ChannelId: channel.Id, @@ -75,7 +75,7 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError) if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", post.UserId, nil) message.Add("channel_id", post.ChannelId) - go Publish(message) + go a.Publish(message) } } @@ -239,7 +239,7 @@ func parseSlackLinksToMarkdown(text string) string { return linkWithTextRegex.ReplaceAllString(text, "[${2}](${1})") } -func SendEphemeralPost(userId string, post *model.Post) *model.Post { +func (a *App) SendEphemeralPost(userId string, post *model.Post) *model.Post { post.Type = model.POST_EPHEMERAL // fill in fields which haven't been specified which have sensible defaults @@ -256,7 +256,7 @@ func SendEphemeralPost(userId string, post *model.Post) *model.Post { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE, "", post.ChannelId, userId, nil) message.Add("post", post.ToJson()) - go Publish(message) + go a.Publish(message) return post } @@ -330,7 +330,7 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model }() } - sendUpdatedPostEvent(rpost) + a.sendUpdatedPostEvent(rpost) a.InvalidateCacheForChannelPosts(rpost.ChannelId) @@ -351,17 +351,17 @@ func (a *App) PatchPost(postId string, patch *model.PostPatch) (*model.Post, *mo return nil, err } - sendUpdatedPostEvent(updatedPost) + a.sendUpdatedPostEvent(updatedPost) a.InvalidateCacheForChannelPosts(updatedPost.ChannelId) return updatedPost, nil } -func sendUpdatedPostEvent(post *model.Post) { +func (a *App) sendUpdatedPostEvent(post *model.Post) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil) message.Add("post", post.ToJson()) - go Publish(message) + go a.Publish(message) } func (a *App) GetPostsPage(channelId string, page int, perPage int) (*model.PostList, *model.AppError) { @@ -502,7 +502,7 @@ func (a *App) DeletePost(postId string) (*model.Post, *model.AppError) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_DELETED, "", post.ChannelId, "", nil) message.Add("post", post.ToJson()) - go Publish(message) + go a.Publish(message) go a.DeletePostFiles(post) go a.DeleteFlaggedPosts(post.Id) @@ -724,7 +724,7 @@ func (a *App) DoPostAction(postId string, actionId string, userId string) *model } ephemeralPost.UserId = userId ephemeralPost.AddProp("from_webhook", "true") - SendEphemeralPost(userId, ephemeralPost) + a.SendEphemeralPost(userId, ephemeralPost) } return nil diff --git a/app/preference.go b/app/preference.go index 8ae33b728..bee3236bf 100644 --- a/app/preference.go +++ b/app/preference.go @@ -55,7 +55,7 @@ func (a *App) UpdatePreferences(userId string, preferences model.Preferences) *m message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_CHANGED, "", "", userId, nil) message.Add("preferences", preferences.ToJson()) - go Publish(message) + go a.Publish(message) return nil } @@ -78,7 +78,7 @@ func (a *App) DeletePreferences(userId string, preferences model.Preferences) *m message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_DELETED, "", "", userId, nil) message.Add("preferences", preferences.ToJson()) - go Publish(message) + go a.Publish(message) return nil } diff --git a/app/reaction.go b/app/reaction.go index 6513fa8b0..debf75f7a 100644 --- a/app/reaction.go +++ b/app/reaction.go @@ -51,7 +51,7 @@ func (a *App) sendReactionEvent(event string, reaction *model.Reaction, post *mo // send out that a reaction has been added/removed message := model.NewWebSocketEvent(event, "", post.ChannelId, "", nil) message.Add("reaction", reaction.ToJson()) - Publish(message) + a.Publish(message) // The post is always modified since the UpdateAt always changes a.InvalidateCacheForChannelPosts(post.ChannelId) @@ -59,5 +59,5 @@ func (a *App) sendReactionEvent(event string, reaction *model.Reaction, post *mo post.UpdateAt = model.GetMillis() umessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil) umessage.Add("post", post.ToJson()) - Publish(umessage) + a.Publish(umessage) } diff --git a/app/server.go b/app/server.go index 938773ad9..6915369c4 100644 --- a/app/server.go +++ b/app/server.go @@ -218,7 +218,7 @@ func (a *App) StopServer() { a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN) a.Srv.Store.Close() - HubStop() + a.HubStop() a.ShutDownPlugins() diff --git a/app/session.go b/app/session.go index e5e5c939d..f0245acba 100644 --- a/app/session.go +++ b/app/session.go @@ -107,8 +107,7 @@ func (a *App) RevokeAllSessions(userId string) *model.AppError { } func (a *App) ClearSessionCacheForUser(userId string) { - - ClearSessionCacheForUserSkipClusterSend(userId) + a.ClearSessionCacheForUserSkipClusterSend(userId) if a.Cluster != nil { msg := &model.ClusterMessage{ @@ -120,7 +119,7 @@ func (a *App) ClearSessionCacheForUser(userId string) { } } -func ClearSessionCacheForUserSkipClusterSend(userId string) { +func (a *App) ClearSessionCacheForUserSkipClusterSend(userId string) { keys := sessionCache.Keys() for _, key := range keys { @@ -132,8 +131,7 @@ func ClearSessionCacheForUserSkipClusterSend(userId string) { } } - InvalidateWebConnSessionCacheForUser(userId) - + a.InvalidateWebConnSessionCacheForUser(userId) } func AddSessionToCache(session *model.Session) { diff --git a/app/status.go b/app/status.go index fb93a9e39..edfda561b 100644 --- a/app/status.go +++ b/app/status.go @@ -213,15 +213,15 @@ func (a *App) SetStatusOnline(userId string, sessionId string, manual bool) { } if broadcast { - BroadcastStatus(status) + a.BroadcastStatus(status) } } -func BroadcastStatus(status *model.Status) { +func (a *App) BroadcastStatus(status *model.Status) { event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil) event.Add("status", status.Status) event.Add("user_id", status.UserId) - go Publish(event) + go a.Publish(event) } func (a *App) SetStatusOffline(userId string, manual bool) { @@ -245,7 +245,7 @@ func (a *App) SetStatusOffline(userId string, manual bool) { event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil) event.Add("status", model.STATUS_OFFLINE) event.Add("user_id", status.UserId) - go Publish(event) + go a.Publish(event) } func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) { @@ -286,7 +286,7 @@ func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) { event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil) event.Add("status", model.STATUS_AWAY) event.Add("user_id", status.UserId) - go Publish(event) + go a.Publish(event) } func GetStatusFromCache(userId string) *model.Status { diff --git a/app/team.go b/app/team.go index fdf44a783..7a5ccc5d6 100644 --- a/app/team.go +++ b/app/team.go @@ -106,7 +106,7 @@ func (a *App) UpdateTeam(team *model.Team) (*model.Team, *model.AppError) { oldTeam.Sanitize() - sendUpdatedTeamEvent(oldTeam) + a.sendUpdatedTeamEvent(oldTeam) return oldTeam, nil } @@ -126,15 +126,15 @@ func (a *App) PatchTeam(teamId string, patch *model.TeamPatch) (*model.Team, *mo updatedTeam.Sanitize() - sendUpdatedTeamEvent(updatedTeam) + a.sendUpdatedTeamEvent(updatedTeam) return updatedTeam, nil } -func sendUpdatedTeamEvent(team *model.Team) { +func (a *App) sendUpdatedTeamEvent(team *model.Team) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_UPDATE_TEAM, "", "", "", nil) message.Add("team", team.ToJson()) - go Publish(message) + go a.Publish(message) } func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles string) (*model.TeamMember, *model.AppError) { @@ -163,16 +163,16 @@ func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles strin a.ClearSessionCacheForUser(userId) - sendUpdatedMemberRoleEvent(userId, member) + a.sendUpdatedMemberRoleEvent(userId, member) return member, nil } -func sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember) { +func (a *App) sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_MEMBERROLE_UPDATED, "", "", userId, nil) message.Add("member", member.ToJson()) - go Publish(message) + go a.Publish(message) } func (a *App) AddUserToTeam(teamId string, userId string, userRequestorId string) (*model.Team, *model.AppError) { @@ -330,7 +330,7 @@ func (a *App) JoinUserToTeam(team *model.Team, user *model.User, userRequestorId message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", user.Id, nil) message.Add("team_id", team.Id) message.Add("user_id", user.Id) - Publish(message) + a.Publish(message) return nil } @@ -462,7 +462,7 @@ func (a *App) AddTeamMember(teamId, userId string) (*model.TeamMember, *model.Ap message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", userId, nil) message.Add("team_id", teamId) message.Add("user_id", userId) - Publish(message) + a.Publish(message) return teamMember, nil } @@ -484,7 +484,7 @@ func (a *App) AddTeamMembers(teamId string, userIds []string, userRequestorId st message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", userId, nil) message.Add("team_id", teamId) message.Add("user_id", userId) - Publish(message) + a.Publish(message) } return members, nil @@ -603,7 +603,7 @@ func (a *App) LeaveTeam(team *model.Team, user *model.User) *model.AppError { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_LEAVE_TEAM, team.Id, "", "", nil) message.Add("user_id", user.Id) message.Add("team_id", team.Id) - Publish(message) + a.Publish(message) teamMember.Roles = "" teamMember.DeleteAt = model.GetMillis() diff --git a/app/user.go b/app/user.go index c91b4cfb7..27f1c5d85 100644 --- a/app/user.go +++ b/app/user.go @@ -202,7 +202,7 @@ func (a *App) CreateUser(user *model.User) (*model.User, *model.AppError) { // This message goes to everyone, so the teamId, channelId and userId are irrelevant message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_NEW_USER, "", "", "", nil) message.Add("user_id", ruser.Id) - go Publish(message) + go a.Publish(message) return ruser, nil } @@ -829,7 +829,7 @@ func (a *App) SetProfileImage(userId string, imageData *multipart.FileHeader) *m message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers) message.Add("user", user) - Publish(message) + a.Publish(message) } return nil @@ -950,7 +950,7 @@ func (a *App) UpdateUserAsUser(user *model.User, asAdmin bool) (*model.User, *mo return nil, err } - sendUpdatedUserEvent(*updatedUser, asAdmin) + a.sendUpdatedUserEvent(*updatedUser, asAdmin) return updatedUser, nil } @@ -968,19 +968,19 @@ func (a *App) PatchUser(userId string, patch *model.UserPatch, asAdmin bool) (*m return nil, err } - sendUpdatedUserEvent(*updatedUser, asAdmin) + a.sendUpdatedUserEvent(*updatedUser, asAdmin) return updatedUser, nil } -func sendUpdatedUserEvent(user model.User, asAdmin bool) { +func (a *App) sendUpdatedUserEvent(user model.User, asAdmin bool) { SanitizeProfile(&user, asAdmin) omitUsers := make(map[string]bool, 1) omitUsers[user.Id] = true message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers) message.Add("user", user) - go Publish(message) + go a.Publish(message) } func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, *model.AppError) { diff --git a/app/web_conn.go b/app/web_conn.go index 069d2c8f4..f5644ce17 100644 --- a/app/web_conn.go +++ b/app/web_conn.go @@ -96,7 +96,7 @@ func (c *WebConn) SetSession(v *model.Session) { func (c *WebConn) ReadPump() { defer func() { - HubUnregister(c) + c.App.HubUnregister(c) c.WebSocket.Close() }() c.WebSocket.SetReadLimit(model.SOCKET_MAX_MESSAGE_SIZE_KB) diff --git a/app/web_hub.go b/app/web_hub.go index b351de39e..50ccb100e 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -29,6 +29,7 @@ type Hub struct { // connectionCount should be kept first. // See https://github.com/mattermost/mattermost-server/pull/7281 connectionCount int64 + app *App connections []*WebConn connectionIndex int register chan *WebConn @@ -40,11 +41,9 @@ type Hub struct { goroutineId int } -var hubs []*Hub = make([]*Hub, 0) -var stopCheckingForDeadlock chan bool - -func NewWebHub() *Hub { +func (a *App) NewWebHub() *Hub { return &Hub{ + app: a, register: make(chan *WebConn), unregister: make(chan *WebConn), connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE), @@ -55,26 +54,27 @@ func NewWebHub() *Hub { } } -func TotalWebsocketConnections() int { +func (a *App) TotalWebsocketConnections() int { count := int64(0) - for _, hub := range hubs { + for _, hub := range a.Hubs { count = count + atomic.LoadInt64(&hub.connectionCount) } return int(count) } -func HubStart() { +func (a *App) HubStart() { // Total number of hubs is twice the number of CPUs. numberOfHubs := runtime.NumCPU() * 2 l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs) - hubs = make([]*Hub, numberOfHubs) + a.Hubs = make([]*Hub, numberOfHubs) + a.HubsStopCheckingForDeadlock = make(chan bool, 1) - for i := 0; i < len(hubs); i++ { - hubs[i] = NewWebHub() - hubs[i].connectionIndex = i - hubs[i].Start() + for i := 0; i < len(a.Hubs); i++ { + a.Hubs[i] = a.NewWebHub() + a.Hubs[i].connectionIndex = i + a.Hubs[i].Start() } go func() { @@ -84,12 +84,10 @@ func HubStart() { ticker.Stop() }() - stopCheckingForDeadlock = make(chan bool, 1) - for { select { case <-ticker.C: - for _, hub := range hubs { + for _, hub := range a.Hubs { if len(hub.broadcast) >= DEADLOCK_WARN { l4g.Error("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast)) buf := make([]byte, 1<<16) @@ -105,46 +103,42 @@ func HubStart() { } } - case <-stopCheckingForDeadlock: + case <-a.HubsStopCheckingForDeadlock: return } } }() } -func HubStop() { +func (a *App) HubStop() { l4g.Info(utils.T("api.web_hub.start.stopping.debug")) select { - case stopCheckingForDeadlock <- true: + case a.HubsStopCheckingForDeadlock <- true: default: l4g.Warn("We appear to have already sent the stop checking for deadlocks command") } - for _, hub := range hubs { + for _, hub := range a.Hubs { hub.Stop() } - hubs = make([]*Hub, 0) + a.Hubs = []*Hub{} } -func GetHubForUserId(userId string) *Hub { +func (a *App) GetHubForUserId(userId string) *Hub { hash := fnv.New32a() hash.Write([]byte(userId)) - index := hash.Sum32() % uint32(len(hubs)) - return hubs[index] -} - -func HubRegister(webConn *WebConn) { - GetHubForUserId(webConn.UserId).Register(webConn) + index := hash.Sum32() % uint32(len(a.Hubs)) + return a.Hubs[index] } -func HubUnregister(webConn *WebConn) { - GetHubForUserId(webConn.UserId).Unregister(webConn) +func (a *App) HubRegister(webConn *WebConn) { + a.GetHubForUserId(webConn.UserId).Register(webConn) } -func Publish(message *model.WebSocketEvent) { - Global().Publish(message) +func (a *App) HubUnregister(webConn *WebConn) { + a.GetHubForUserId(webConn.UserId).Unregister(webConn) } func (a *App) Publish(message *model.WebSocketEvent) { @@ -152,7 +146,7 @@ func (a *App) Publish(message *model.WebSocketEvent) { metrics.IncrementWebsocketEvent(message.Event) } - PublishSkipClusterSend(message) + a.PublishSkipClusterSend(message) if a.Cluster != nil { cm := &model.ClusterMessage{ @@ -173,8 +167,8 @@ func (a *App) Publish(message *model.WebSocketEvent) { } } -func PublishSkipClusterSend(message *model.WebSocketEvent) { - for _, hub := range hubs { +func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) { + for _, hub := range a.Hubs { hub.Broadcast(message) } } @@ -291,8 +285,8 @@ func (a *App) InvalidateCacheForUserSkipClusterSend(userId string) { a.Srv.Store.User().InvalidateProfilesInChannelCacheByUser(userId) a.Srv.Store.User().InvalidatProfileCacheForUser(userId) - if len(hubs) != 0 { - GetHubForUserId(userId).InvalidateUser(userId) + if len(a.Hubs) != 0 { + a.GetHubForUserId(userId).InvalidateUser(userId) } } @@ -313,9 +307,9 @@ func (a *App) InvalidateCacheForWebhookSkipClusterSend(webhookId string) { a.Srv.Store.Webhook().InvalidateWebhookCache(webhookId) } -func InvalidateWebConnSessionCacheForUser(userId string) { - if len(hubs) != 0 { - GetHubForUserId(userId).InvalidateUser(userId) +func (a *App) InvalidateWebConnSessionCacheForUser(userId string) { + if len(a.Hubs) != 0 { + a.GetHubForUserId(userId).InvalidateUser(userId) } } @@ -401,7 +395,7 @@ func (h *Hub) Start() { } if !found { - go Global().SetStatusOffline(userId, false) + go h.app.SetStatusOffline(userId, false) } case userId := <-h.invalidateUser: diff --git a/app/websocket_router.go b/app/websocket_router.go index bfb649d6c..c8220f1f1 100644 --- a/app/websocket_router.go +++ b/app/websocket_router.go @@ -17,13 +17,15 @@ type webSocketHandler interface { } type WebSocketRouter struct { + app *App handlers map[string]webSocketHandler } -func NewWebSocketRouter() *WebSocketRouter { - router := &WebSocketRouter{} - router.handlers = make(map[string]webSocketHandler) - return router +func (a *App) NewWebSocketRouter() *WebSocketRouter { + return &WebSocketRouter{ + app: a, + handlers: make(map[string]webSocketHandler), + } } func (wr *WebSocketRouter) Handle(action string, handler webSocketHandler) { @@ -54,21 +56,21 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque return } - session, err := Global().GetSession(token) + session, err := wr.app.GetSession(token) if err != nil { conn.WebSocket.Close() } else { go func() { - Global().SetStatusOnline(session.UserId, session.Id, false) - Global().UpdateLastActivityAtIfNeeded(*session) + wr.app.SetStatusOnline(session.UserId, session.Id, false) + wr.app.UpdateLastActivityAtIfNeeded(*session) }() conn.SetSession(session) conn.SetSessionToken(session.Token) conn.UserId = session.UserId - HubRegister(conn) + wr.app.HubRegister(conn) resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, nil) conn.Send <- resp diff --git a/cmd/platform/server.go b/cmd/platform/server.go index 55854dfe4..7f5fbf6e8 100644 --- a/cmd/platform/server.go +++ b/cmd/platform/server.go @@ -69,6 +69,7 @@ func runServer(configFileLocation string) { a.NewServer() a.InitStores() a.Srv.Router = api.NewRouter() + a.Srv.WebSocketRouter = a.NewWebSocketRouter() if model.BuildEnterpriseReady == "true" { a.LoadLicense() @@ -80,10 +81,9 @@ func runServer(configFileLocation string) { l4g.Error("Unable to find webapp directory, could not initialize plugins") } - wsapi.InitRouter() api4.Init(a, a.Srv.Router, false) api3 := api.Init(a, a.Srv.Router) - wsapi.InitApi() + wsapi.Init(a, a.Srv.WebSocketRouter) web.Init(api3) if !utils.IsLicensed() && len(utils.Cfg.SqlSettings.DataSourceReplicas) > 1 { diff --git a/cmd/platform/test.go b/cmd/platform/test.go index fd8777a46..f0c75a8d3 100644 --- a/cmd/platform/test.go +++ b/cmd/platform/test.go @@ -52,10 +52,10 @@ func webClientTestsCmdF(cmd *cobra.Command, args []string) error { utils.InitTranslations(utils.Cfg.LocalizationSettings) a.Srv.Router = api.NewRouter() - wsapi.InitRouter() + a.Srv.WebSocketRouter = a.NewWebSocketRouter() api4.Init(a, a.Srv.Router, false) api.Init(a, a.Srv.Router) - wsapi.InitApi() + wsapi.Init(a, a.Srv.WebSocketRouter) setupClientTests() a.StartServer() runWebClientTests() @@ -72,10 +72,10 @@ func serverForWebClientTestsCmdF(cmd *cobra.Command, args []string) error { utils.InitTranslations(utils.Cfg.LocalizationSettings) a.Srv.Router = api.NewRouter() - wsapi.InitRouter() + a.Srv.WebSocketRouter = a.NewWebSocketRouter() api4.Init(a, a.Srv.Router, false) api.Init(a, a.Srv.Router) - wsapi.InitApi() + wsapi.Init(a, a.Srv.WebSocketRouter) setupClientTests() a.StartServer() diff --git a/wsapi/api.go b/wsapi/api.go index 4de0f265f..f318cba6f 100644 --- a/wsapi/api.go +++ b/wsapi/api.go @@ -7,15 +7,21 @@ import ( "github.com/mattermost/mattermost-server/app" ) -func InitRouter() { - app.Global().Srv.WebSocketRouter = app.NewWebSocketRouter() +type API struct { + App *app.App + Router *app.WebSocketRouter } -func InitApi() { - InitUser() - InitSystem() - InitStatus() - InitWebrtc() +func Init(a *app.App, router *app.WebSocketRouter) { + api := &API{ + App: a, + Router: router, + } - app.HubStart() + api.InitUser() + api.InitSystem() + api.InitStatus() + api.InitWebrtc() + + a.HubStart() } diff --git a/wsapi/status.go b/wsapi/status.go index a60900feb..a20b6e284 100644 --- a/wsapi/status.go +++ b/wsapi/status.go @@ -10,11 +10,11 @@ import ( "github.com/mattermost/mattermost-server/utils" ) -func InitStatus() { +func (api *API) InitStatus() { l4g.Debug(utils.T("wsapi.status.init.debug")) - app.Global().Srv.WebSocketRouter.Handle("get_statuses", ApiWebSocketHandler(getStatuses)) - app.Global().Srv.WebSocketRouter.Handle("get_statuses_by_ids", ApiWebSocketHandler(getStatusesByIds)) + api.Router.Handle("get_statuses", api.ApiWebSocketHandler(getStatuses)) + api.Router.Handle("get_statuses_by_ids", api.ApiWebSocketHandler(api.getStatusesByIds)) } func getStatuses(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { @@ -22,14 +22,14 @@ func getStatuses(req *model.WebSocketRequest) (map[string]interface{}, *model.Ap return model.StatusMapToInterfaceMap(statusMap), nil } -func getStatusesByIds(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { +func (api *API) getStatusesByIds(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { var userIds []string if userIds = model.ArrayFromInterface(req.Data["user_ids"]); len(userIds) == 0 { l4g.Error(model.StringInterfaceToJson(req.Data)) return nil, NewInvalidWebSocketParamError(req.Action, "user_ids") } - statusMap, err := app.Global().GetStatusesByIds(userIds) + statusMap, err := api.App.GetStatusesByIds(userIds) if err != nil { return nil, err } diff --git a/wsapi/system.go b/wsapi/system.go index e019e79b2..6959106d6 100644 --- a/wsapi/system.go +++ b/wsapi/system.go @@ -5,15 +5,14 @@ package wsapi import ( l4g "github.com/alecthomas/log4go" - "github.com/mattermost/mattermost-server/app" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) -func InitSystem() { +func (api *API) InitSystem() { l4g.Debug(utils.T("wsapi.system.init.debug")) - app.Global().Srv.WebSocketRouter.Handle("ping", ApiWebSocketHandler(ping)) + api.Router.Handle("ping", api.ApiWebSocketHandler(ping)) } func ping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { diff --git a/wsapi/user.go b/wsapi/user.go index d79ecab83..c8236c978 100644 --- a/wsapi/user.go +++ b/wsapi/user.go @@ -5,18 +5,17 @@ package wsapi import ( l4g "github.com/alecthomas/log4go" - "github.com/mattermost/mattermost-server/app" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) -func InitUser() { +func (api *API) InitUser() { l4g.Debug(utils.T("wsapi.user.init.debug")) - app.Global().Srv.WebSocketRouter.Handle("user_typing", ApiWebSocketHandler(userTyping)) + api.Router.Handle("user_typing", api.ApiWebSocketHandler(api.userTyping)) } -func userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { +func (api *API) userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { var ok bool var channelId string if channelId, ok = req.Data["channel_id"].(string); !ok || len(channelId) != 26 { @@ -34,7 +33,7 @@ func userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.App event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_TYPING, "", channelId, "", omitUsers) event.Add("parent_id", parentId) event.Add("user_id", req.Session.UserId) - go app.Publish(event) + go api.App.Publish(event) return nil, nil } diff --git a/wsapi/webrtc.go b/wsapi/webrtc.go index a2e1a18ce..42e749ebb 100644 --- a/wsapi/webrtc.go +++ b/wsapi/webrtc.go @@ -5,18 +5,17 @@ package wsapi import ( l4g "github.com/alecthomas/log4go" - "github.com/mattermost/mattermost-server/app" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) -func InitWebrtc() { +func (api *API) InitWebrtc() { l4g.Debug(utils.T("wsapi.webtrc.init.debug")) - app.Global().Srv.WebSocketRouter.Handle("webrtc", ApiWebSocketHandler(webrtcMessage)) + api.Router.Handle("webrtc", api.ApiWebSocketHandler(api.webrtcMessage)) } -func webrtcMessage(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { +func (api *API) webrtcMessage(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { var ok bool var toUserId string if toUserId, ok = req.Data["to_user_id"].(string); !ok || len(toUserId) != 26 { @@ -25,7 +24,7 @@ func webrtcMessage(req *model.WebSocketRequest) (map[string]interface{}, *model. event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_WEBRTC, "", "", toUserId, nil) event.Data = req.Data - go app.Publish(event) + go api.App.Publish(event) return nil, nil } diff --git a/wsapi/websocket_handler.go b/wsapi/websocket_handler.go index 5dfa5bdb0..daf225061 100644 --- a/wsapi/websocket_handler.go +++ b/wsapi/websocket_handler.go @@ -13,18 +13,19 @@ import ( "github.com/mattermost/mattermost-server/utils" ) -func ApiWebSocketHandler(wh func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError)) webSocketHandler { - return webSocketHandler{wh} +func (api *API) ApiWebSocketHandler(wh func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError)) webSocketHandler { + return webSocketHandler{api.App, wh} } type webSocketHandler struct { + app *app.App handlerFunc func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError) } func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketRequest) { l4g.Debug("/api/v3/users/websocket:%s", r.Action) - session, sessionErr := app.Global().GetSession(conn.GetSessionToken()) + session, sessionErr := wh.app.GetSession(conn.GetSessionToken()) if sessionErr != nil { l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, conn.UserId, sessionErr.SystemMessage(utils.T), sessionErr.Error()) sessionErr.DetailedError = "" -- cgit v1.2.3-1-g7c22