summaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
authorChris <ccbrown112@gmail.com>2017-09-27 11:52:34 -0500
committerSaturnino Abril <saturnino.abril@gmail.com>2017-09-28 00:52:34 +0800
commit8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df (patch)
treeb3563dfe35ad78991774c9d1842dc439376e1db1 /app
parent1bd66589a2adc67df5df9c108a2f2ecc77dfdf44 (diff)
downloadchat-8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df.tar.gz
chat-8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df.tar.bz2
chat-8c80cdde38cceb3424b2b1f43eadf7a1aab0c6df.zip
remove remaining Global() calls (outside of tests) (#7521)
Diffstat (limited to 'app')
-rw-r--r--app/admin.go4
-rw-r--r--app/analytics.go4
-rw-r--r--app/app.go5
-rw-r--r--app/auto_environment.go10
-rw-r--r--app/auto_users.go8
-rw-r--r--app/channel.go22
-rw-r--r--app/cluster_handlers.go8
-rw-r--r--app/command.go2
-rw-r--r--app/command_expand_collapse.go2
-rw-r--r--app/command_loadtest.go4
-rw-r--r--app/email_batching.go26
-rw-r--r--app/email_batching_test.go12
-rw-r--r--app/emoji.go2
-rw-r--r--app/notification.go16
-rw-r--r--app/plugins.go4
-rw-r--r--app/post.go20
-rw-r--r--app/preference.go4
-rw-r--r--app/reaction.go4
-rw-r--r--app/server.go2
-rw-r--r--app/session.go8
-rw-r--r--app/status.go10
-rw-r--r--app/team.go22
-rw-r--r--app/user.go12
-rw-r--r--app/web_conn.go2
-rw-r--r--app/web_hub.go74
-rw-r--r--app/websocket_router.go18
26 files changed, 154 insertions, 151 deletions
diff --git a/app/admin.go b/app/admin.go
index bd687627e..0d02c3b49 100644
--- a/app/admin.go
+++ b/app/admin.go
@@ -138,7 +138,7 @@ func (a *App) ReloadConfig() {
utils.LoadConfig(utils.CfgFileName)
// start/restart email batching job if necessary
- InitEmailBatching()
+ a.InitEmailBatching()
}
func (a *App) SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool) *model.AppError {
@@ -179,7 +179,7 @@ func (a *App) SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool)
}
// start/restart email batching job if necessary
- InitEmailBatching()
+ a.InitEmailBatching()
return nil
}
diff --git a/app/analytics.go b/app/analytics.go
index 70c049350..65a9e4129 100644
--- a/app/analytics.go
+++ b/app/analytics.go
@@ -103,7 +103,7 @@ func (a *App) GetAnalytics(name string, teamId string) (model.AnalyticsRows, *mo
return nil, err
}
- totalSockets := TotalWebsocketConnections()
+ totalSockets := a.TotalWebsocketConnections()
totalMasterDb := a.Srv.Store.TotalMasterDbConnections()
totalReadDb := a.Srv.Store.TotalReadDbConnections()
@@ -118,7 +118,7 @@ func (a *App) GetAnalytics(name string, teamId string) (model.AnalyticsRows, *mo
rows[7].Value = float64(totalReadDb)
} else {
- rows[5].Value = float64(TotalWebsocketConnections())
+ rows[5].Value = float64(a.TotalWebsocketConnections())
rows[6].Value = float64(a.Srv.Store.TotalMasterDbConnections())
rows[7].Value = float64(a.Srv.Store.TotalReadDbConnections())
}
diff --git a/app/app.go b/app/app.go
index eaaf9acee..e85fa6342 100644
--- a/app/app.go
+++ b/app/app.go
@@ -20,6 +20,11 @@ type App struct {
PluginEnv *pluginenv.Environment
PluginConfigListenerId string
+ EmailBatching *EmailBatchingJob
+
+ Hubs []*Hub
+ HubsStopCheckingForDeadlock chan bool
+
AccountMigration einterfaces.AccountMigrationInterface
Brand einterfaces.BrandInterface
Cluster einterfaces.ClusterInterface
diff --git a/app/auto_environment.go b/app/auto_environment.go
index 7bafc6948..660316e4b 100644
--- a/app/auto_environment.go
+++ b/app/auto_environment.go
@@ -16,7 +16,7 @@ type TestEnvironment struct {
Environments []TeamEnvironment
}
-func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TestEnvironment, bool) {
+func CreateTestEnvironmentWithTeams(a *App, client *model.Client, rangeTeams utils.Range, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TestEnvironment, bool) {
rand.Seed(time.Now().UTC().UnixNano())
teamCreator := NewAutoTeamCreator(client)
@@ -29,7 +29,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range
environment := TestEnvironment{teams, make([]TeamEnvironment, len(teams))}
for i, team := range teams {
- userCreator := NewAutoUserCreator(client, team)
+ userCreator := NewAutoUserCreator(a, client, team)
userCreator.Fuzzy = fuzzy
randomUser, err := userCreator.createRandomUser()
if err != true {
@@ -37,7 +37,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range
}
client.LoginById(randomUser.Id, USER_PASSWORD)
client.SetTeamId(team.Id)
- teamEnvironment, err := CreateTestEnvironmentInTeam(client, team, rangeChannels, rangeUsers, rangePosts, fuzzy)
+ teamEnvironment, err := CreateTestEnvironmentInTeam(a, client, team, rangeChannels, rangeUsers, rangePosts, fuzzy)
if err != true {
return TestEnvironment{}, false
}
@@ -47,7 +47,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range
return environment, true
}
-func CreateTestEnvironmentInTeam(client *model.Client, team *model.Team, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TeamEnvironment, bool) {
+func CreateTestEnvironmentInTeam(a *App, client *model.Client, team *model.Team, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TeamEnvironment, bool) {
rand.Seed(time.Now().UTC().UnixNano())
// We need to create at least one user
@@ -55,7 +55,7 @@ func CreateTestEnvironmentInTeam(client *model.Client, team *model.Team, rangeCh
rangeUsers.Begin = 1
}
- userCreator := NewAutoUserCreator(client, team)
+ userCreator := NewAutoUserCreator(a, client, team)
userCreator.Fuzzy = fuzzy
users, err := userCreator.CreateTestUsers(rangeUsers)
if err != true {
diff --git a/app/auto_users.go b/app/auto_users.go
index 50e2084d1..78126211e 100644
--- a/app/auto_users.go
+++ b/app/auto_users.go
@@ -12,6 +12,7 @@ import (
)
type AutoUserCreator struct {
+ app *App
client *model.Client
team *model.Team
EmailLength utils.Range
@@ -21,8 +22,9 @@ type AutoUserCreator struct {
Fuzzy bool
}
-func NewAutoUserCreator(client *model.Client, team *model.Team) *AutoUserCreator {
+func NewAutoUserCreator(a *App, client *model.Client, team *model.Team) *AutoUserCreator {
return &AutoUserCreator{
+ app: a,
client: client,
team: team,
EmailLength: USER_EMAIL_LEN,
@@ -81,14 +83,14 @@ func (cfg *AutoUserCreator) createRandomUser() (*model.User, bool) {
ruser := result.Data.(*model.User)
status := &model.Status{UserId: ruser.Id, Status: model.STATUS_ONLINE, Manual: false, LastActivityAt: model.GetMillis(), ActiveChannel: ""}
- if result := <-Global().Srv.Store.Status().SaveOrUpdate(status); result.Err != nil {
+ if result := <-cfg.app.Srv.Store.Status().SaveOrUpdate(status); result.Err != nil {
result.Err.Translate(utils.T)
l4g.Error(result.Err.Error())
return nil, false
}
// We need to cheat to verify the user's email
- store.Must(Global().Srv.Store.User().VerifyEmail(ruser.Id))
+ store.Must(cfg.app.Srv.Store.User().VerifyEmail(ruser.Id))
return result.Data.(*model.User), true
}
diff --git a/app/channel.go b/app/channel.go
index 436d429c9..2d3709a0c 100644
--- a/app/channel.go
+++ b/app/channel.go
@@ -136,7 +136,7 @@ func (a *App) CreateChannelWithUser(channel *model.Channel, userId string) (*mod
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_CREATED, "", "", userId, nil)
message.Add("channel_id", channel.Id)
message.Add("team_id", channel.TeamId)
- Publish(message)
+ a.Publish(message)
return rchannel, nil
}
@@ -181,7 +181,7 @@ func (a *App) CreateDirectChannel(userId string, otherUserId string) (*model.Cha
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_DIRECT_ADDED, "", channel.Id, "", nil)
message.Add("teammate_id", otherUserId)
- Publish(message)
+ a.Publish(message)
return channel, nil
}
@@ -254,7 +254,7 @@ func (a *App) CreateGroupChannel(userIds []string, creatorId string) (*model.Cha
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_GROUP_ADDED, "", channel.Id, "", nil)
message.Add("teammate_ids", model.ArrayToJson(userIds))
- Publish(message)
+ a.Publish(message)
return channel, nil
}
@@ -316,7 +316,7 @@ func (a *App) UpdateChannel(channel *model.Channel) (*model.Channel, *model.AppE
messageWs := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_UPDATED, "", channel.Id, "", nil)
messageWs.Add("channel", channel.ToJson())
- Publish(messageWs)
+ a.Publish(messageWs)
return channel, nil
}
@@ -484,7 +484,7 @@ func (a *App) DeleteChannel(channel *model.Channel, userId string) *model.AppErr
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_DELETED, channel.TeamId, "", "", nil)
message.Add("channel_id", channel.Id)
- Publish(message)
+ a.Publish(message)
}
return nil
@@ -550,7 +550,7 @@ func (a *App) AddUserToChannel(user *model.User, channel *model.Channel) (*model
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_ADDED, "", channel.Id, "", nil)
message.Add("user_id", user.Id)
message.Add("team_id", channel.TeamId)
- Publish(message)
+ a.Publish(message)
return newMember, nil
}
@@ -1039,13 +1039,13 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string,
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", channel.Id, "", nil)
message.Add("user_id", userIdToRemove)
message.Add("remover_id", removerUserId)
- go Publish(message)
+ go a.Publish(message)
// because the removed user no longer belongs to the channel we need to send a separate websocket event
userMsg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", "", userIdToRemove, nil)
userMsg.Add("channel_id", channel.Id)
userMsg.Add("remover_id", removerUserId)
- go Publish(userMsg)
+ go a.Publish(userMsg)
return nil
}
@@ -1098,7 +1098,7 @@ func (a *App) SetActiveChannel(userId string, channelId string) *model.AppError
a.AddStatusCache(status)
if status.Status != oldStatus {
- BroadcastStatus(status)
+ a.BroadcastStatus(status)
}
return nil
@@ -1113,7 +1113,7 @@ func (a *App) UpdateChannelLastViewedAt(channelIds []string, userId string) *mod
for _, channelId := range channelIds {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil)
message.Add("channel_id", channelId)
- go Publish(message)
+ go a.Publish(message)
}
}
@@ -1179,7 +1179,7 @@ func (a *App) ViewChannel(view *model.ChannelView, userId string, clearPushNotif
if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages && model.IsValidId(view.ChannelId) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil)
message.Add("channel_id", view.ChannelId)
- go Publish(message)
+ go a.Publish(message)
}
return nil
diff --git a/app/cluster_handlers.go b/app/cluster_handlers.go
index 5c4bd7026..997de6dcd 100644
--- a/app/cluster_handlers.go
+++ b/app/cluster_handlers.go
@@ -20,12 +20,12 @@ func (a *App) RegisterAllClusterMessageHandlers() {
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME, a.ClusterInvalidateCacheForChannelByNameHandler)
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL, a.ClusterInvalidateCacheForChannelHandler)
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER, a.ClusterInvalidateCacheForUserHandler)
- a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, ClusterClearSessionCacheForUserHandler)
+ a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, a.ClusterClearSessionCacheForUserHandler)
}
func (a *App) ClusterPublishHandler(msg *model.ClusterMessage) {
event := model.WebSocketEventFromJson(strings.NewReader(msg.Data))
- PublishSkipClusterSend(event)
+ a.PublishSkipClusterSend(event)
}
func (a *App) ClusterUpdateStatusHandler(msg *model.ClusterMessage) {
@@ -65,6 +65,6 @@ func (a *App) ClusterInvalidateCacheForUserHandler(msg *model.ClusterMessage) {
a.InvalidateCacheForUserSkipClusterSend(msg.Data)
}
-func ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) {
- ClearSessionCacheForUserSkipClusterSend(msg.Data)
+func (a *App) ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) {
+ a.ClearSessionCacheForUserSkipClusterSend(msg.Data)
}
diff --git a/app/command.go b/app/command.go
index f8885219a..0763e24c7 100644
--- a/app/command.go
+++ b/app/command.go
@@ -49,7 +49,7 @@ func (a *App) CreateCommandPost(post *model.Post, teamId string, response *model
return a.CreatePostMissingChannel(post, true)
} else if response.ResponseType == "" || response.ResponseType == model.COMMAND_RESPONSE_TYPE_EPHEMERAL {
post.ParentId = ""
- SendEphemeralPost(post.UserId, post)
+ a.SendEphemeralPost(post.UserId, post)
}
return post, nil
diff --git a/app/command_expand_collapse.go b/app/command_expand_collapse.go
index 116e29f17..9c94a4325 100644
--- a/app/command_expand_collapse.go
+++ b/app/command_expand_collapse.go
@@ -74,7 +74,7 @@ func (a *App) setCollapsePreference(args *model.CommandArgs, isCollapse bool) *m
socketMessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCE_CHANGED, "", "", args.UserId, nil)
socketMessage.Add("preference", pref.ToJson())
- go Publish(socketMessage)
+ go a.Publish(socketMessage)
var rmsg string
diff --git a/app/command_loadtest.go b/app/command_loadtest.go
index 629b9c9f5..4bc371bdc 100644
--- a/app/command_loadtest.go
+++ b/app/command_loadtest.go
@@ -166,6 +166,7 @@ func (me *LoadTestProvider) SetupCommand(a *App, args *model.CommandArgs, messag
}
client.Login(BTEST_USER_EMAIL, BTEST_USER_PASSWORD)
environment, err := CreateTestEnvironmentWithTeams(
+ a,
client,
utils.Range{Begin: numTeams, End: numTeams},
utils.Range{Begin: numChannels, End: numChannels},
@@ -193,6 +194,7 @@ func (me *LoadTestProvider) SetupCommand(a *App, args *model.CommandArgs, messag
client.MockSession(args.Session.Token)
client.SetTeamId(args.TeamId)
CreateTestEnvironmentInTeam(
+ a,
client,
team,
utils.Range{Begin: numChannels, End: numChannels},
@@ -227,7 +229,7 @@ func (me *LoadTestProvider) UsersCommand(a *App, args *model.CommandArgs, messag
client := model.NewClient(args.SiteURL)
client.SetTeamId(team.Id)
- userCreator := NewAutoUserCreator(client, team)
+ userCreator := NewAutoUserCreator(a, client, team)
userCreator.Fuzzy = doFuzz
userCreator.CreateTestUsers(usersr)
diff --git a/app/email_batching.go b/app/email_batching.go
index c8ea8c98a..b9f89d646 100644
--- a/app/email_batching.go
+++ b/app/email_batching.go
@@ -22,26 +22,24 @@ const (
EMAIL_BATCHING_TASK_NAME = "Email Batching"
)
-var emailBatchingJob *EmailBatchingJob
-
-func InitEmailBatching() {
+func (a *App) InitEmailBatching() {
if *utils.Cfg.EmailSettings.EnableEmailBatching {
- if emailBatchingJob == nil {
- emailBatchingJob = MakeEmailBatchingJob(*utils.Cfg.EmailSettings.EmailBatchingBufferSize)
+ if a.EmailBatching == nil {
+ a.EmailBatching = NewEmailBatchingJob(a, *utils.Cfg.EmailSettings.EmailBatchingBufferSize)
}
// note that we don't support changing EmailBatchingBufferSize without restarting the server
- emailBatchingJob.Start()
+ a.EmailBatching.Start()
}
}
-func AddNotificationEmailToBatch(user *model.User, post *model.Post, team *model.Team) *model.AppError {
+func (a *App) AddNotificationEmailToBatch(user *model.User, post *model.Post, team *model.Team) *model.AppError {
if !*utils.Cfg.EmailSettings.EnableEmailBatching {
return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.disabled.app_error", nil, "", http.StatusNotImplemented)
}
- if !emailBatchingJob.Add(user, post, team) {
+ if !a.EmailBatching.Add(user, post, team) {
l4g.Error(utils.T("api.email_batching.add_notification_email_to_batch.channel_full.app_error"))
return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.channel_full.app_error", nil, "", http.StatusInternalServerError)
}
@@ -56,12 +54,14 @@ type batchedNotification struct {
}
type EmailBatchingJob struct {
+ app *App
newNotifications chan *batchedNotification
pendingNotifications map[string][]*batchedNotification
}
-func MakeEmailBatchingJob(bufferSize int) *EmailBatchingJob {
+func NewEmailBatchingJob(a *App, bufferSize int) *EmailBatchingJob {
return &EmailBatchingJob{
+ app: a,
newNotifications: make(chan *batchedNotification, bufferSize),
pendingNotifications: make(map[string][]*batchedNotification),
}
@@ -97,7 +97,7 @@ func (job *EmailBatchingJob) CheckPendingEmails() {
// it's a bit weird to pass the send email function through here, but it makes it so that we can test
// without actually sending emails
- job.checkPendingNotifications(time.Now(), Global().sendBatchedEmailNotification)
+ job.checkPendingNotifications(time.Now(), job.app.sendBatchedEmailNotification)
l4g.Debug(utils.T("api.email_batching.check_pending_emails.finished_running"), len(job.pendingNotifications))
}
@@ -131,7 +131,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
if inspectedTeamNames[notification.teamName] != "" {
continue
}
- tchan := Global().Srv.Store.Team().GetByName(notifications[0].teamName)
+ tchan := job.app.Srv.Store.Team().GetByName(notifications[0].teamName)
if result := <-tchan; result.Err != nil {
l4g.Error("Unable to find Team id for notification", result.Err)
continue
@@ -141,7 +141,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
// if the user has viewed any channels in this team since the notification was queued, delete
// all queued notifications
- mchan := Global().Srv.Store.Channel().GetMembersForUser(inspectedTeamNames[notification.teamName], userId)
+ mchan := job.app.Srv.Store.Channel().GetMembersForUser(inspectedTeamNames[notification.teamName], userId)
if result := <-mchan; result.Err != nil {
l4g.Error("Unable to find ChannelMembers for user", result.Err)
continue
@@ -158,7 +158,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
// get how long we need to wait to send notifications to the user
var interval int64
- pchan := Global().Srv.Store.Preference().Get(userId, model.PREFERENCE_CATEGORY_NOTIFICATIONS, model.PREFERENCE_NAME_EMAIL_INTERVAL)
+ pchan := job.app.Srv.Store.Preference().Get(userId, model.PREFERENCE_CATEGORY_NOTIFICATIONS, model.PREFERENCE_NAME_EMAIL_INTERVAL)
if result := <-pchan; result.Err != nil {
// use the default batching interval if an error ocurrs while fetching user preferences
interval, _ = strconv.ParseInt(model.PREFERENCE_EMAIL_INTERVAL_BATCHING_SECONDS, 10, 64)
diff --git a/app/email_batching_test.go b/app/email_batching_test.go
index 2c58d43f8..b69eeec2d 100644
--- a/app/email_batching_test.go
+++ b/app/email_batching_test.go
@@ -13,14 +13,14 @@ import (
)
func TestHandleNewNotifications(t *testing.T) {
- Setup()
+ th := Setup()
id1 := model.NewId()
id2 := model.NewId()
id3 := model.NewId()
// test queueing of received posts by user
- job := MakeEmailBatchingJob(128)
+ job := NewEmailBatchingJob(th.App, 128)
job.handleNewNotifications()
@@ -74,7 +74,7 @@ func TestHandleNewNotifications(t *testing.T) {
}
// test ordering of received posts
- job = MakeEmailBatchingJob(128)
+ job = NewEmailBatchingJob(th.App, 128)
job.Add(&model.User{Id: id1}, &model.Post{UserId: id1, Message: "test1"}, &model.Team{Name: "team"})
job.Add(&model.User{Id: id1}, &model.Post{UserId: id1, Message: "test2"}, &model.Team{Name: "team"})
@@ -95,7 +95,7 @@ func TestHandleNewNotifications(t *testing.T) {
func TestCheckPendingNotifications(t *testing.T) {
th := Setup().InitBasic()
- job := MakeEmailBatchingJob(128)
+ job := NewEmailBatchingJob(th.App, 128)
job.pendingNotifications[th.BasicUser.Id] = []*batchedNotification{
{
post: &model.Post{
@@ -201,7 +201,7 @@ func TestCheckPendingNotifications(t *testing.T) {
*/
func TestCheckPendingNotificationsDefaultInterval(t *testing.T) {
th := Setup().InitBasic()
- job := MakeEmailBatchingJob(128)
+ job := NewEmailBatchingJob(th.App, 128)
// bypasses recent user activity check
channelMember := store.Must(th.App.Srv.Store.Channel().GetMember(th.BasicChannel.Id, th.BasicUser.Id)).(*model.ChannelMember)
@@ -237,7 +237,7 @@ func TestCheckPendingNotificationsDefaultInterval(t *testing.T) {
*/
func TestCheckPendingNotificationsCantParseInterval(t *testing.T) {
th := Setup().InitBasic()
- job := MakeEmailBatchingJob(128)
+ job := NewEmailBatchingJob(th.App, 128)
// bypasses recent user activity check
channelMember := store.Must(th.App.Srv.Store.Channel().GetMember(th.BasicChannel.Id, th.BasicUser.Id)).(*model.ChannelMember)
diff --git a/app/emoji.go b/app/emoji.go
index d5b81c89a..308791aed 100644
--- a/app/emoji.go
+++ b/app/emoji.go
@@ -61,7 +61,7 @@ func (a *App) CreateEmoji(sessionUserId string, emoji *model.Emoji, multiPartIma
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EMOJI_ADDED, "", "", "", nil)
message.Add("emoji", emoji.ToJson())
- Publish(message)
+ a.Publish(message)
return result.Data.(*model.Emoji), nil
}
}
diff --git a/app/notification.go b/app/notification.go
index cc3db8b55..0859dfd20 100644
--- a/app/notification.go
+++ b/app/notification.go
@@ -94,7 +94,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
if result := <-a.Srv.Store.User().GetProfilesByUsernames(potentialOtherMentions, team.Id); result.Err == nil {
outOfChannelMentions := result.Data.([]*model.User)
if channel.Type != model.CHANNEL_GROUP {
- go sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions)
+ go a.sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions)
}
}
}
@@ -186,7 +186,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
// If the channel has more than 1K users then @here is disabled
if hereNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
hereNotification = false
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
@@ -198,7 +198,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
// If the channel has more than 1K users then @channel is disabled
if channelNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
@@ -210,7 +210,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
// If the channel has more than 1K users then @all is disabled
if allNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
@@ -298,7 +298,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
message.Add("mentions", model.ArrayToJson(mentionedUsersList))
}
- Publish(message)
+ a.Publish(message)
return mentionedUsersList, nil
}
@@ -337,7 +337,7 @@ func (a *App) sendNotificationEmail(post *model.Post, user *model.User, channel
}
if sendBatched {
- if err := AddNotificationEmailToBatch(user, post, team); err == nil {
+ if err := a.AddNotificationEmailToBatch(user, post, team); err == nil {
return nil
}
}
@@ -717,7 +717,7 @@ func (a *App) getMobileAppSessions(userId string) ([]*model.Session, *model.AppE
}
}
-func sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId string, users []*model.User) *model.AppError {
+func (a *App) sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId string, users []*model.User) *model.AppError {
if len(users) == 0 {
return nil
}
@@ -742,7 +742,7 @@ func sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId strin
})
}
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
diff --git a/app/plugins.go b/app/plugins.go
index 450cfebeb..94a1bfd6f 100644
--- a/app/plugins.go
+++ b/app/plugins.go
@@ -335,7 +335,7 @@ func (a *App) UnpackAndActivatePlugin(pluginFile io.Reader) (*model.Manifest, *m
if manifest.HasClient() {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_ACTIVATED, "", "", "", nil)
message.Add("manifest", manifest.ClientManifest())
- Publish(message)
+ a.Publish(message)
}
return manifest, nil
@@ -383,7 +383,7 @@ func (a *App) RemovePlugin(id string) *model.AppError {
if manifest.HasClient() {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_DEACTIVATED, "", "", "", nil)
message.Add("manifest", manifest.ClientManifest())
- Publish(message)
+ a.Publish(message)
}
return nil
diff --git a/app/post.go b/app/post.go
index e81af4673..ccdc015bb 100644
--- a/app/post.go
+++ b/app/post.go
@@ -51,7 +51,7 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError)
}
T := utils.GetUserTranslations(user.Locale)
- SendEphemeralPost(
+ a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: channel.Id,
@@ -75,7 +75,7 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError)
if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", post.UserId, nil)
message.Add("channel_id", post.ChannelId)
- go Publish(message)
+ go a.Publish(message)
}
}
@@ -239,7 +239,7 @@ func parseSlackLinksToMarkdown(text string) string {
return linkWithTextRegex.ReplaceAllString(text, "[${2}](${1})")
}
-func SendEphemeralPost(userId string, post *model.Post) *model.Post {
+func (a *App) SendEphemeralPost(userId string, post *model.Post) *model.Post {
post.Type = model.POST_EPHEMERAL
// fill in fields which haven't been specified which have sensible defaults
@@ -256,7 +256,7 @@ func SendEphemeralPost(userId string, post *model.Post) *model.Post {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE, "", post.ChannelId, userId, nil)
message.Add("post", post.ToJson())
- go Publish(message)
+ go a.Publish(message)
return post
}
@@ -330,7 +330,7 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model
}()
}
- sendUpdatedPostEvent(rpost)
+ a.sendUpdatedPostEvent(rpost)
a.InvalidateCacheForChannelPosts(rpost.ChannelId)
@@ -351,17 +351,17 @@ func (a *App) PatchPost(postId string, patch *model.PostPatch) (*model.Post, *mo
return nil, err
}
- sendUpdatedPostEvent(updatedPost)
+ a.sendUpdatedPostEvent(updatedPost)
a.InvalidateCacheForChannelPosts(updatedPost.ChannelId)
return updatedPost, nil
}
-func sendUpdatedPostEvent(post *model.Post) {
+func (a *App) sendUpdatedPostEvent(post *model.Post) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil)
message.Add("post", post.ToJson())
- go Publish(message)
+ go a.Publish(message)
}
func (a *App) GetPostsPage(channelId string, page int, perPage int) (*model.PostList, *model.AppError) {
@@ -502,7 +502,7 @@ func (a *App) DeletePost(postId string) (*model.Post, *model.AppError) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_DELETED, "", post.ChannelId, "", nil)
message.Add("post", post.ToJson())
- go Publish(message)
+ go a.Publish(message)
go a.DeletePostFiles(post)
go a.DeleteFlaggedPosts(post.Id)
@@ -724,7 +724,7 @@ func (a *App) DoPostAction(postId string, actionId string, userId string) *model
}
ephemeralPost.UserId = userId
ephemeralPost.AddProp("from_webhook", "true")
- SendEphemeralPost(userId, ephemeralPost)
+ a.SendEphemeralPost(userId, ephemeralPost)
}
return nil
diff --git a/app/preference.go b/app/preference.go
index 8ae33b728..bee3236bf 100644
--- a/app/preference.go
+++ b/app/preference.go
@@ -55,7 +55,7 @@ func (a *App) UpdatePreferences(userId string, preferences model.Preferences) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_CHANGED, "", "", userId, nil)
message.Add("preferences", preferences.ToJson())
- go Publish(message)
+ go a.Publish(message)
return nil
}
@@ -78,7 +78,7 @@ func (a *App) DeletePreferences(userId string, preferences model.Preferences) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_DELETED, "", "", userId, nil)
message.Add("preferences", preferences.ToJson())
- go Publish(message)
+ go a.Publish(message)
return nil
}
diff --git a/app/reaction.go b/app/reaction.go
index 6513fa8b0..debf75f7a 100644
--- a/app/reaction.go
+++ b/app/reaction.go
@@ -51,7 +51,7 @@ func (a *App) sendReactionEvent(event string, reaction *model.Reaction, post *mo
// send out that a reaction has been added/removed
message := model.NewWebSocketEvent(event, "", post.ChannelId, "", nil)
message.Add("reaction", reaction.ToJson())
- Publish(message)
+ a.Publish(message)
// The post is always modified since the UpdateAt always changes
a.InvalidateCacheForChannelPosts(post.ChannelId)
@@ -59,5 +59,5 @@ func (a *App) sendReactionEvent(event string, reaction *model.Reaction, post *mo
post.UpdateAt = model.GetMillis()
umessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil)
umessage.Add("post", post.ToJson())
- Publish(umessage)
+ a.Publish(umessage)
}
diff --git a/app/server.go b/app/server.go
index 938773ad9..6915369c4 100644
--- a/app/server.go
+++ b/app/server.go
@@ -218,7 +218,7 @@ func (a *App) StopServer() {
a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN)
a.Srv.Store.Close()
- HubStop()
+ a.HubStop()
a.ShutDownPlugins()
diff --git a/app/session.go b/app/session.go
index e5e5c939d..f0245acba 100644
--- a/app/session.go
+++ b/app/session.go
@@ -107,8 +107,7 @@ func (a *App) RevokeAllSessions(userId string) *model.AppError {
}
func (a *App) ClearSessionCacheForUser(userId string) {
-
- ClearSessionCacheForUserSkipClusterSend(userId)
+ a.ClearSessionCacheForUserSkipClusterSend(userId)
if a.Cluster != nil {
msg := &model.ClusterMessage{
@@ -120,7 +119,7 @@ func (a *App) ClearSessionCacheForUser(userId string) {
}
}
-func ClearSessionCacheForUserSkipClusterSend(userId string) {
+func (a *App) ClearSessionCacheForUserSkipClusterSend(userId string) {
keys := sessionCache.Keys()
for _, key := range keys {
@@ -132,8 +131,7 @@ func ClearSessionCacheForUserSkipClusterSend(userId string) {
}
}
- InvalidateWebConnSessionCacheForUser(userId)
-
+ a.InvalidateWebConnSessionCacheForUser(userId)
}
func AddSessionToCache(session *model.Session) {
diff --git a/app/status.go b/app/status.go
index fb93a9e39..edfda561b 100644
--- a/app/status.go
+++ b/app/status.go
@@ -213,15 +213,15 @@ func (a *App) SetStatusOnline(userId string, sessionId string, manual bool) {
}
if broadcast {
- BroadcastStatus(status)
+ a.BroadcastStatus(status)
}
}
-func BroadcastStatus(status *model.Status) {
+func (a *App) BroadcastStatus(status *model.Status) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", status.Status)
event.Add("user_id", status.UserId)
- go Publish(event)
+ go a.Publish(event)
}
func (a *App) SetStatusOffline(userId string, manual bool) {
@@ -245,7 +245,7 @@ func (a *App) SetStatusOffline(userId string, manual bool) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", model.STATUS_OFFLINE)
event.Add("user_id", status.UserId)
- go Publish(event)
+ go a.Publish(event)
}
func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) {
@@ -286,7 +286,7 @@ func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", model.STATUS_AWAY)
event.Add("user_id", status.UserId)
- go Publish(event)
+ go a.Publish(event)
}
func GetStatusFromCache(userId string) *model.Status {
diff --git a/app/team.go b/app/team.go
index fdf44a783..7a5ccc5d6 100644
--- a/app/team.go
+++ b/app/team.go
@@ -106,7 +106,7 @@ func (a *App) UpdateTeam(team *model.Team) (*model.Team, *model.AppError) {
oldTeam.Sanitize()
- sendUpdatedTeamEvent(oldTeam)
+ a.sendUpdatedTeamEvent(oldTeam)
return oldTeam, nil
}
@@ -126,15 +126,15 @@ func (a *App) PatchTeam(teamId string, patch *model.TeamPatch) (*model.Team, *mo
updatedTeam.Sanitize()
- sendUpdatedTeamEvent(updatedTeam)
+ a.sendUpdatedTeamEvent(updatedTeam)
return updatedTeam, nil
}
-func sendUpdatedTeamEvent(team *model.Team) {
+func (a *App) sendUpdatedTeamEvent(team *model.Team) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_UPDATE_TEAM, "", "", "", nil)
message.Add("team", team.ToJson())
- go Publish(message)
+ go a.Publish(message)
}
func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles string) (*model.TeamMember, *model.AppError) {
@@ -163,16 +163,16 @@ func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles strin
a.ClearSessionCacheForUser(userId)
- sendUpdatedMemberRoleEvent(userId, member)
+ a.sendUpdatedMemberRoleEvent(userId, member)
return member, nil
}
-func sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember) {
+func (a *App) sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_MEMBERROLE_UPDATED, "", "", userId, nil)
message.Add("member", member.ToJson())
- go Publish(message)
+ go a.Publish(message)
}
func (a *App) AddUserToTeam(teamId string, userId string, userRequestorId string) (*model.Team, *model.AppError) {
@@ -330,7 +330,7 @@ func (a *App) JoinUserToTeam(team *model.Team, user *model.User, userRequestorId
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", user.Id, nil)
message.Add("team_id", team.Id)
message.Add("user_id", user.Id)
- Publish(message)
+ a.Publish(message)
return nil
}
@@ -462,7 +462,7 @@ func (a *App) AddTeamMember(teamId, userId string) (*model.TeamMember, *model.Ap
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", userId, nil)
message.Add("team_id", teamId)
message.Add("user_id", userId)
- Publish(message)
+ a.Publish(message)
return teamMember, nil
}
@@ -484,7 +484,7 @@ func (a *App) AddTeamMembers(teamId string, userIds []string, userRequestorId st
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", userId, nil)
message.Add("team_id", teamId)
message.Add("user_id", userId)
- Publish(message)
+ a.Publish(message)
}
return members, nil
@@ -603,7 +603,7 @@ func (a *App) LeaveTeam(team *model.Team, user *model.User) *model.AppError {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_LEAVE_TEAM, team.Id, "", "", nil)
message.Add("user_id", user.Id)
message.Add("team_id", team.Id)
- Publish(message)
+ a.Publish(message)
teamMember.Roles = ""
teamMember.DeleteAt = model.GetMillis()
diff --git a/app/user.go b/app/user.go
index c91b4cfb7..27f1c5d85 100644
--- a/app/user.go
+++ b/app/user.go
@@ -202,7 +202,7 @@ func (a *App) CreateUser(user *model.User) (*model.User, *model.AppError) {
// This message goes to everyone, so the teamId, channelId and userId are irrelevant
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_NEW_USER, "", "", "", nil)
message.Add("user_id", ruser.Id)
- go Publish(message)
+ go a.Publish(message)
return ruser, nil
}
@@ -829,7 +829,7 @@ func (a *App) SetProfileImage(userId string, imageData *multipart.FileHeader) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers)
message.Add("user", user)
- Publish(message)
+ a.Publish(message)
}
return nil
@@ -950,7 +950,7 @@ func (a *App) UpdateUserAsUser(user *model.User, asAdmin bool) (*model.User, *mo
return nil, err
}
- sendUpdatedUserEvent(*updatedUser, asAdmin)
+ a.sendUpdatedUserEvent(*updatedUser, asAdmin)
return updatedUser, nil
}
@@ -968,19 +968,19 @@ func (a *App) PatchUser(userId string, patch *model.UserPatch, asAdmin bool) (*m
return nil, err
}
- sendUpdatedUserEvent(*updatedUser, asAdmin)
+ a.sendUpdatedUserEvent(*updatedUser, asAdmin)
return updatedUser, nil
}
-func sendUpdatedUserEvent(user model.User, asAdmin bool) {
+func (a *App) sendUpdatedUserEvent(user model.User, asAdmin bool) {
SanitizeProfile(&user, asAdmin)
omitUsers := make(map[string]bool, 1)
omitUsers[user.Id] = true
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers)
message.Add("user", user)
- go Publish(message)
+ go a.Publish(message)
}
func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, *model.AppError) {
diff --git a/app/web_conn.go b/app/web_conn.go
index 069d2c8f4..f5644ce17 100644
--- a/app/web_conn.go
+++ b/app/web_conn.go
@@ -96,7 +96,7 @@ func (c *WebConn) SetSession(v *model.Session) {
func (c *WebConn) ReadPump() {
defer func() {
- HubUnregister(c)
+ c.App.HubUnregister(c)
c.WebSocket.Close()
}()
c.WebSocket.SetReadLimit(model.SOCKET_MAX_MESSAGE_SIZE_KB)
diff --git a/app/web_hub.go b/app/web_hub.go
index b351de39e..50ccb100e 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -29,6 +29,7 @@ type Hub struct {
// connectionCount should be kept first.
// See https://github.com/mattermost/mattermost-server/pull/7281
connectionCount int64
+ app *App
connections []*WebConn
connectionIndex int
register chan *WebConn
@@ -40,11 +41,9 @@ type Hub struct {
goroutineId int
}
-var hubs []*Hub = make([]*Hub, 0)
-var stopCheckingForDeadlock chan bool
-
-func NewWebHub() *Hub {
+func (a *App) NewWebHub() *Hub {
return &Hub{
+ app: a,
register: make(chan *WebConn),
unregister: make(chan *WebConn),
connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
@@ -55,26 +54,27 @@ func NewWebHub() *Hub {
}
}
-func TotalWebsocketConnections() int {
+func (a *App) TotalWebsocketConnections() int {
count := int64(0)
- for _, hub := range hubs {
+ for _, hub := range a.Hubs {
count = count + atomic.LoadInt64(&hub.connectionCount)
}
return int(count)
}
-func HubStart() {
+func (a *App) HubStart() {
// Total number of hubs is twice the number of CPUs.
numberOfHubs := runtime.NumCPU() * 2
l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs)
- hubs = make([]*Hub, numberOfHubs)
+ a.Hubs = make([]*Hub, numberOfHubs)
+ a.HubsStopCheckingForDeadlock = make(chan bool, 1)
- for i := 0; i < len(hubs); i++ {
- hubs[i] = NewWebHub()
- hubs[i].connectionIndex = i
- hubs[i].Start()
+ for i := 0; i < len(a.Hubs); i++ {
+ a.Hubs[i] = a.NewWebHub()
+ a.Hubs[i].connectionIndex = i
+ a.Hubs[i].Start()
}
go func() {
@@ -84,12 +84,10 @@ func HubStart() {
ticker.Stop()
}()
- stopCheckingForDeadlock = make(chan bool, 1)
-
for {
select {
case <-ticker.C:
- for _, hub := range hubs {
+ for _, hub := range a.Hubs {
if len(hub.broadcast) >= DEADLOCK_WARN {
l4g.Error("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast))
buf := make([]byte, 1<<16)
@@ -105,46 +103,42 @@ func HubStart() {
}
}
- case <-stopCheckingForDeadlock:
+ case <-a.HubsStopCheckingForDeadlock:
return
}
}
}()
}
-func HubStop() {
+func (a *App) HubStop() {
l4g.Info(utils.T("api.web_hub.start.stopping.debug"))
select {
- case stopCheckingForDeadlock <- true:
+ case a.HubsStopCheckingForDeadlock <- true:
default:
l4g.Warn("We appear to have already sent the stop checking for deadlocks command")
}
- for _, hub := range hubs {
+ for _, hub := range a.Hubs {
hub.Stop()
}
- hubs = make([]*Hub, 0)
+ a.Hubs = []*Hub{}
}
-func GetHubForUserId(userId string) *Hub {
+func (a *App) GetHubForUserId(userId string) *Hub {
hash := fnv.New32a()
hash.Write([]byte(userId))
- index := hash.Sum32() % uint32(len(hubs))
- return hubs[index]
-}
-
-func HubRegister(webConn *WebConn) {
- GetHubForUserId(webConn.UserId).Register(webConn)
+ index := hash.Sum32() % uint32(len(a.Hubs))
+ return a.Hubs[index]
}
-func HubUnregister(webConn *WebConn) {
- GetHubForUserId(webConn.UserId).Unregister(webConn)
+func (a *App) HubRegister(webConn *WebConn) {
+ a.GetHubForUserId(webConn.UserId).Register(webConn)
}
-func Publish(message *model.WebSocketEvent) {
- Global().Publish(message)
+func (a *App) HubUnregister(webConn *WebConn) {
+ a.GetHubForUserId(webConn.UserId).Unregister(webConn)
}
func (a *App) Publish(message *model.WebSocketEvent) {
@@ -152,7 +146,7 @@ func (a *App) Publish(message *model.WebSocketEvent) {
metrics.IncrementWebsocketEvent(message.Event)
}
- PublishSkipClusterSend(message)
+ a.PublishSkipClusterSend(message)
if a.Cluster != nil {
cm := &model.ClusterMessage{
@@ -173,8 +167,8 @@ func (a *App) Publish(message *model.WebSocketEvent) {
}
}
-func PublishSkipClusterSend(message *model.WebSocketEvent) {
- for _, hub := range hubs {
+func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) {
+ for _, hub := range a.Hubs {
hub.Broadcast(message)
}
}
@@ -291,8 +285,8 @@ func (a *App) InvalidateCacheForUserSkipClusterSend(userId string) {
a.Srv.Store.User().InvalidateProfilesInChannelCacheByUser(userId)
a.Srv.Store.User().InvalidatProfileCacheForUser(userId)
- if len(hubs) != 0 {
- GetHubForUserId(userId).InvalidateUser(userId)
+ if len(a.Hubs) != 0 {
+ a.GetHubForUserId(userId).InvalidateUser(userId)
}
}
@@ -313,9 +307,9 @@ func (a *App) InvalidateCacheForWebhookSkipClusterSend(webhookId string) {
a.Srv.Store.Webhook().InvalidateWebhookCache(webhookId)
}
-func InvalidateWebConnSessionCacheForUser(userId string) {
- if len(hubs) != 0 {
- GetHubForUserId(userId).InvalidateUser(userId)
+func (a *App) InvalidateWebConnSessionCacheForUser(userId string) {
+ if len(a.Hubs) != 0 {
+ a.GetHubForUserId(userId).InvalidateUser(userId)
}
}
@@ -401,7 +395,7 @@ func (h *Hub) Start() {
}
if !found {
- go Global().SetStatusOffline(userId, false)
+ go h.app.SetStatusOffline(userId, false)
}
case userId := <-h.invalidateUser:
diff --git a/app/websocket_router.go b/app/websocket_router.go
index bfb649d6c..c8220f1f1 100644
--- a/app/websocket_router.go
+++ b/app/websocket_router.go
@@ -17,13 +17,15 @@ type webSocketHandler interface {
}
type WebSocketRouter struct {
+ app *App
handlers map[string]webSocketHandler
}
-func NewWebSocketRouter() *WebSocketRouter {
- router := &WebSocketRouter{}
- router.handlers = make(map[string]webSocketHandler)
- return router
+func (a *App) NewWebSocketRouter() *WebSocketRouter {
+ return &WebSocketRouter{
+ app: a,
+ handlers: make(map[string]webSocketHandler),
+ }
}
func (wr *WebSocketRouter) Handle(action string, handler webSocketHandler) {
@@ -54,21 +56,21 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque
return
}
- session, err := Global().GetSession(token)
+ session, err := wr.app.GetSession(token)
if err != nil {
conn.WebSocket.Close()
} else {
go func() {
- Global().SetStatusOnline(session.UserId, session.Id, false)
- Global().UpdateLastActivityAtIfNeeded(*session)
+ wr.app.SetStatusOnline(session.UserId, session.Id, false)
+ wr.app.UpdateLastActivityAtIfNeeded(*session)
}()
conn.SetSession(session)
conn.SetSessionToken(session.Token)
conn.UserId = session.UserId
- HubRegister(conn)
+ wr.app.HubRegister(conn)
resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, nil)
conn.Send <- resp