summaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
authorChris <ccbrown112@gmail.com>2017-10-03 10:53:53 -0500
committerGitHub <noreply@github.com>2017-10-03 10:53:53 -0500
commit5e69ce099f521aa49fc267c62235c003eae530ff (patch)
treec7177e4cac419082753225819f62d07c8b5671e8 /app
parentbfe7955fb0c72bb6f3e0a1e0aaca70cff27d7ddc (diff)
downloadchat-5e69ce099f521aa49fc267c62235c003eae530ff.tar.gz
chat-5e69ce099f521aa49fc267c62235c003eae530ff.tar.bz2
chat-5e69ce099f521aa49fc267c62235c003eae530ff.zip
Goroutine wranglin (#7556)
* goroutine wranglin * synchronize WebConn.WritePump
Diffstat (limited to 'app')
-rw-r--r--app/app.go53
-rw-r--r--app/channel.go28
-rw-r--r--app/command_echo.go4
-rw-r--r--app/command_expand_collapse.go4
-rw-r--r--app/compliance.go4
-rw-r--r--app/email_batching.go6
-rw-r--r--app/ldap.go12
-rw-r--r--app/notification.go22
-rw-r--r--app/oauth.go8
-rw-r--r--app/post.go40
-rw-r--r--app/preference.go8
-rw-r--r--app/reaction.go8
-rw-r--r--app/server.go15
-rw-r--r--app/status.go12
-rw-r--r--app/team.go8
-rw-r--r--app/user.go24
-rw-r--r--app/web_conn.go22
-rw-r--r--app/web_hub.go4
-rw-r--r--app/webhook.go34
-rw-r--r--app/websocket_router.go4
20 files changed, 216 insertions, 104 deletions
diff --git a/app/app.go b/app/app.go
index 7974ab44f..d0d5bb4e0 100644
--- a/app/app.go
+++ b/app/app.go
@@ -7,7 +7,9 @@ import (
"io/ioutil"
"net/http"
"sync"
- "time"
+ "sync/atomic"
+
+ l4g "github.com/alecthomas/log4go"
"github.com/mattermost/mattermost-server/einterfaces"
ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
@@ -18,6 +20,9 @@ import (
)
type App struct {
+ goroutineCount int32
+ goroutineExitSignal chan struct{}
+
Srv *Server
PluginEnv *pluginenv.Environment
@@ -43,7 +48,8 @@ type App struct {
}
var globalApp App = App{
- Jobs: &jobs.JobServer{},
+ goroutineExitSignal: make(chan struct{}, 1),
+ Jobs: &jobs.JobServer{},
}
var appCount = 0
@@ -61,7 +67,8 @@ func New() *App {
panic("Only one App should exist at a time. Did you forget to call Shutdown()?")
}
app := &App{
- Jobs: &jobs.JobServer{},
+ goroutineExitSignal: make(chan struct{}, 1),
+ Jobs: &jobs.JobServer{},
}
app.initEnterprise()
return app
@@ -76,12 +83,19 @@ func New() *App {
func (a *App) Shutdown() {
appCount--
if appCount == 0 {
- // XXX: This is to give all of our runaway goroutines time to complete.
- // We should wrangle them up and remove this.
- time.Sleep(time.Second)
-
if a.Srv != nil {
- a.StopServer()
+ l4g.Info(utils.T("api.server.stop_server.stopping.info"))
+
+ a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN)
+ a.Srv.Store.Close()
+ a.HubStop()
+
+ a.ShutDownPlugins()
+ a.WaitForGoroutines()
+
+ a.Srv = nil
+
+ l4g.Info(utils.T("api.server.stop_server.stopped.info"))
}
}
}
@@ -211,6 +225,29 @@ func (a *App) Config() *model.Config {
return utils.Cfg
}
+// Go creates a goroutine, but maintains a record of it to ensure that execution completes before
+// the app is destroyed.
+func (a *App) Go(f func()) {
+ atomic.AddInt32(&a.goroutineCount, 1)
+
+ go func() {
+ f()
+
+ atomic.AddInt32(&a.goroutineCount, -1)
+ select {
+ case a.goroutineExitSignal <- struct{}{}:
+ default:
+ }
+ }()
+}
+
+// WaitForGoroutines blocks until all goroutines created by App.Go exit.
+func (a *App) WaitForGoroutines() {
+ for atomic.LoadInt32(&a.goroutineCount) != 0 {
+ <-a.goroutineExitSignal
+ }
+}
+
func CloseBody(r *http.Response) {
if r.Body != nil {
ioutil.ReadAll(r.Body)
diff --git a/app/channel.go b/app/channel.go
index 88f9cc7d7..2ab591c42 100644
--- a/app/channel.go
+++ b/app/channel.go
@@ -584,7 +584,9 @@ func (a *App) AddChannelMember(userId string, channel *model.Channel, userReques
if userId == userRequestorId {
a.postJoinChannelMessage(user, channel)
} else {
- go a.PostAddToChannelMessage(userRequestor, user, channel)
+ a.Go(func() {
+ a.PostAddToChannelMessage(userRequestor, user, channel)
+ })
}
a.UpdateChannelLastViewedAt([]string{channel.Id}, userRequestor.Id)
@@ -958,7 +960,9 @@ func (a *App) LeaveChannel(channelId string, userId string) *model.AppError {
return err
}
- go a.postLeaveChannelMessage(user, channel)
+ a.Go(func() {
+ a.postLeaveChannelMessage(user, channel)
+ })
}
return nil
@@ -1039,13 +1043,17 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string,
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", channel.Id, "", nil)
message.Add("user_id", userIdToRemove)
message.Add("remover_id", removerUserId)
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
// because the removed user no longer belongs to the channel we need to send a separate websocket event
userMsg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", "", userIdToRemove, nil)
userMsg.Add("channel_id", channel.Id)
userMsg.Add("remover_id", removerUserId)
- go a.Publish(userMsg)
+ a.Go(func() {
+ a.Publish(userMsg)
+ })
return nil
}
@@ -1064,7 +1072,9 @@ func (a *App) RemoveUserFromChannel(userIdToRemove string, removerUserId string,
if userIdToRemove == removerUserId {
a.postLeaveChannelMessage(user, channel)
} else {
- go a.PostRemoveFromChannelMessage(removerUserId, user, channel)
+ a.Go(func() {
+ a.PostRemoveFromChannelMessage(removerUserId, user, channel)
+ })
}
return nil
@@ -1113,7 +1123,9 @@ func (a *App) UpdateChannelLastViewedAt(channelIds []string, userId string) *mod
for _, channelId := range channelIds {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil)
message.Add("channel_id", channelId)
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
}
}
@@ -1182,7 +1194,9 @@ func (a *App) ViewChannel(view *model.ChannelView, userId string, clearPushNotif
if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages && model.IsValidId(view.ChannelId) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil)
message.Add("channel_id", view.ChannelId)
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
}
return times, nil
diff --git a/app/command_echo.go b/app/command_echo.go
index 61a24b59a..404736aa4 100644
--- a/app/command_echo.go
+++ b/app/command_echo.go
@@ -77,7 +77,7 @@ func (me *EchoProvider) DoCommand(a *App, args *model.CommandArgs, message strin
}
echoSem <- true
- go func() {
+ a.Go(func() {
defer func() { <-echoSem }()
post := &model.Post{}
post.ChannelId = args.ChannelId
@@ -91,7 +91,7 @@ func (me *EchoProvider) DoCommand(a *App, args *model.CommandArgs, message strin
if _, err := a.CreatePostMissingChannel(post, true); err != nil {
l4g.Error(args.T("api.command_echo.create.app_error"), err)
}
- }()
+ })
return &model.CommandResponse{}
}
diff --git a/app/command_expand_collapse.go b/app/command_expand_collapse.go
index 9c94a4325..093e53a38 100644
--- a/app/command_expand_collapse.go
+++ b/app/command_expand_collapse.go
@@ -74,7 +74,9 @@ func (a *App) setCollapsePreference(args *model.CommandArgs, isCollapse bool) *m
socketMessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCE_CHANGED, "", "", args.UserId, nil)
socketMessage.Add("preference", pref.ToJson())
- go a.Publish(socketMessage)
+ a.Go(func() {
+ a.Publish(socketMessage)
+ })
var rmsg string
diff --git a/app/compliance.go b/app/compliance.go
index cf942ca00..77590af6c 100644
--- a/app/compliance.go
+++ b/app/compliance.go
@@ -35,7 +35,9 @@ func (a *App) SaveComplianceReport(job *model.Compliance) (*model.Compliance, *m
return nil, result.Err
} else {
job = result.Data.(*model.Compliance)
- go a.Compliance.RunComplianceJob(job)
+ a.Go(func() {
+ a.Compliance.RunComplianceJob(job)
+ })
}
return job, nil
diff --git a/app/email_batching.go b/app/email_batching.go
index b9f89d646..4c546531f 100644
--- a/app/email_batching.go
+++ b/app/email_batching.go
@@ -175,7 +175,11 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
// send the email notification if it's been long enough
if now.Sub(time.Unix(batchStartTime/1000, 0)) > time.Duration(interval)*time.Second {
- go handler(userId, notifications)
+ job.app.Go(func(userId string, notifications []*batchedNotification) func() {
+ return func() {
+ handler(userId, notifications)
+ }
+ }(userId, notifications))
delete(job.pendingNotifications, userId)
}
}
diff --git a/app/ldap.go b/app/ldap.go
index 96791168e..119587866 100644
--- a/app/ldap.go
+++ b/app/ldap.go
@@ -12,7 +12,7 @@ import (
)
func (a *App) SyncLdap() {
- go func() {
+ a.Go(func() {
if utils.IsLicensed() && *utils.License().Features.LDAP && *utils.Cfg.LdapSettings.Enable {
if ldapI := a.Ldap; ldapI != nil {
ldapI.StartSynchronizeJob(false)
@@ -20,7 +20,7 @@ func (a *App) SyncLdap() {
l4g.Error("%v", model.NewAppError("SyncLdap", "ent.ldap.disabled.app_error", nil, "", http.StatusNotImplemented).Error())
}
}
- }()
+ })
}
func (a *App) TestLdap() *model.AppError {
@@ -60,11 +60,11 @@ func (a *App) SwitchEmailToLdap(email, password, code, ldapId, ldapPassword stri
return "", err
}
- go func() {
+ a.Go(func() {
if err := SendSignInChangeEmail(user.Email, "AD/LDAP", user.Locale, utils.GetSiteURL()); err != nil {
l4g.Error(err.Error())
}
- }()
+ })
return "/login?extra=signin_change", nil
}
@@ -102,11 +102,11 @@ func (a *App) SwitchLdapToEmail(ldapPassword, code, email, newPassword string) (
T := utils.GetUserTranslations(user.Locale)
- go func() {
+ a.Go(func() {
if err := SendSignInChangeEmail(user.Email, T("api.templates.signin_change_email.body.method_email"), user.Locale, utils.GetSiteURL()); err != nil {
l4g.Error(err.Error())
}
- }()
+ })
return "/login?extra=signin_change", nil
}
diff --git a/app/notification.go b/app/notification.go
index 0859dfd20..3df4a789f 100644
--- a/app/notification.go
+++ b/app/notification.go
@@ -94,7 +94,9 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
if result := <-a.Srv.Store.User().GetProfilesByUsernames(potentialOtherMentions, team.Id); result.Err == nil {
outOfChannelMentions := result.Data.([]*model.User)
if channel.Type != model.CHANNEL_GROUP {
- go a.sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions)
+ a.Go(func() {
+ a.sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions)
+ })
}
}
}
@@ -362,11 +364,11 @@ func (a *App) sendNotificationEmail(post *model.Post, user *model.User, channel
teamURL := utils.GetSiteURL() + "/" + team.Name
var bodyText = a.getNotificationEmailBody(user, post, channel, senderName, team.Name, teamURL, emailNotificationContentsType, translateFunc)
- go func() {
+ a.Go(func() {
if err := utils.SendMail(user.Email, html.UnescapeString(subjectText), bodyText); err != nil {
l4g.Error(utils.T("api.post.send_notifications_and_forget.send.error"), user.Email, err)
}
- }()
+ })
if a.Metrics != nil {
a.Metrics.IncrementPostSentEmail()
@@ -638,7 +640,11 @@ func (a *App) sendPushNotification(post *model.Post, user *model.User, channel *
l4g.Debug("Sending push notification to device %v for user %v with msg of '%v'", tmpMessage.DeviceId, user.Id, msg.Message)
- go a.sendToPushProxy(tmpMessage, session)
+ a.Go(func(session *model.Session) func() {
+ return func() {
+ a.sendToPushProxy(tmpMessage, session)
+ }
+ }(session))
if a.Metrics != nil {
a.Metrics.IncrementPostSentPush()
@@ -649,7 +655,7 @@ func (a *App) sendPushNotification(post *model.Post, user *model.User, channel *
}
func (a *App) ClearPushNotification(userId string, channelId string) {
- go func() {
+ a.Go(func() {
// Sleep is to allow the read replicas a chance to fully sync
// the unread count for sending an accurate count.
// Delaying a little doesn't hurt anything and is cheaper than
@@ -678,9 +684,11 @@ func (a *App) ClearPushNotification(userId string, channelId string) {
for _, session := range sessions {
tmpMessage := *model.PushNotificationFromJson(strings.NewReader(msg.ToJson()))
tmpMessage.SetDeviceIdAndPlatform(session.DeviceId)
- go a.sendToPushProxy(tmpMessage, session)
+ a.Go(func() {
+ a.sendToPushProxy(tmpMessage, session)
+ })
}
- }()
+ })
}
func (a *App) sendToPushProxy(msg model.PushNotification, session *model.Session) {
diff --git a/app/oauth.go b/app/oauth.go
index cd45b0c10..be0535f35 100644
--- a/app/oauth.go
+++ b/app/oauth.go
@@ -535,11 +535,11 @@ func (a *App) CompleteSwitchWithOAuth(service string, userData io.ReadCloser, em
return nil, result.Err
}
- go func() {
+ a.Go(func() {
if err := SendSignInChangeEmail(user.Email, strings.Title(service)+" SSO", user.Locale, utils.GetSiteURL()); err != nil {
l4g.Error(err.Error())
}
- }()
+ })
return user, nil
}
@@ -770,11 +770,11 @@ func (a *App) SwitchOAuthToEmail(email, password, requesterId string) (string, *
T := utils.GetUserTranslations(user.Locale)
- go func() {
+ a.Go(func() {
if err := SendSignInChangeEmail(user.Email, T("api.templates.signin_change_email.body.method_email"), user.Locale, utils.GetSiteURL()); err != nil {
l4g.Error(err.Error())
}
- }()
+ })
if err := a.RevokeAllSessions(requesterId); err != nil {
return "", err
diff --git a/app/post.go b/app/post.go
index ccdc015bb..497cab5a6 100644
--- a/app/post.go
+++ b/app/post.go
@@ -75,7 +75,9 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError)
if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", post.UserId, nil)
message.Add("channel_id", post.ChannelId)
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
}
}
@@ -152,7 +154,9 @@ func (a *App) CreatePost(post *model.Post, channel *model.Channel, triggerWebhoo
esInterface := a.Elasticsearch
if esInterface != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
- go esInterface.IndexPost(rpost, channel.TeamId)
+ a.Go(func() {
+ esInterface.IndexPost(rpost, channel.TeamId)
+ })
}
if a.Metrics != nil {
@@ -207,11 +211,11 @@ func (a *App) handlePostEvents(post *model.Post, user *model.User, channel *mode
}
if triggerWebhooks {
- go func() {
+ a.Go(func() {
if err := a.handleWebhookEvents(post, team, channel, user); err != nil {
l4g.Error(err.Error())
}
- }()
+ })
}
return nil
@@ -256,7 +260,9 @@ func (a *App) SendEphemeralPost(userId string, post *model.Post) *model.Post {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE, "", post.ChannelId, userId, nil)
message.Add("post", post.ToJson())
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
return post
}
@@ -321,13 +327,13 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model
esInterface := a.Elasticsearch
if esInterface != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
- go func() {
+ a.Go(func() {
if rchannel := <-a.Srv.Store.Channel().GetForPost(rpost.Id); rchannel.Err != nil {
l4g.Error("Couldn't get channel %v for post %v for Elasticsearch indexing.", rpost.ChannelId, rpost.Id)
} else {
esInterface.IndexPost(rpost, rchannel.Data.(*model.Channel).TeamId)
}
- }()
+ })
}
a.sendUpdatedPostEvent(rpost)
@@ -361,7 +367,9 @@ func (a *App) sendUpdatedPostEvent(post *model.Post) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil)
message.Add("post", post.ToJson())
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
}
func (a *App) GetPostsPage(channelId string, page int, perPage int) (*model.PostList, *model.AppError) {
@@ -502,13 +510,21 @@ func (a *App) DeletePost(postId string) (*model.Post, *model.AppError) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_DELETED, "", post.ChannelId, "", nil)
message.Add("post", post.ToJson())
- go a.Publish(message)
- go a.DeletePostFiles(post)
- go a.DeleteFlaggedPosts(post.Id)
+ a.Go(func() {
+ a.Publish(message)
+ })
+ a.Go(func() {
+ a.DeletePostFiles(post)
+ })
+ a.Go(func() {
+ a.DeleteFlaggedPosts(post.Id)
+ })
esInterface := a.Elasticsearch
if esInterface != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
- go esInterface.DeletePost(post)
+ a.Go(func() {
+ esInterface.DeletePost(post)
+ })
}
a.InvalidateCacheForChannelPosts(post.ChannelId)
diff --git a/app/preference.go b/app/preference.go
index bee3236bf..9ca1f474c 100644
--- a/app/preference.go
+++ b/app/preference.go
@@ -55,7 +55,9 @@ func (a *App) UpdatePreferences(userId string, preferences model.Preferences) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_CHANGED, "", "", userId, nil)
message.Add("preferences", preferences.ToJson())
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
return nil
}
@@ -78,7 +80,9 @@ func (a *App) DeletePreferences(userId string, preferences model.Preferences) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_DELETED, "", "", userId, nil)
message.Add("preferences", preferences.ToJson())
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
return nil
}
diff --git a/app/reaction.go b/app/reaction.go
index debf75f7a..bf0d20e2b 100644
--- a/app/reaction.go
+++ b/app/reaction.go
@@ -18,7 +18,9 @@ func (a *App) SaveReactionForPost(reaction *model.Reaction) (*model.Reaction, *m
} else {
reaction = result.Data.(*model.Reaction)
- go a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_ADDED, reaction, post)
+ a.Go(func() {
+ a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_ADDED, reaction, post)
+ })
return reaction, nil
}
@@ -41,7 +43,9 @@ func (a *App) DeleteReactionForPost(reaction *model.Reaction) *model.AppError {
if result := <-a.Srv.Store.Reaction().Delete(reaction); result.Err != nil {
return result.Err
} else {
- go a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_REMOVED, reaction, post)
+ a.Go(func() {
+ a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_REMOVED, reaction, post)
+ })
}
return nil
diff --git a/app/server.go b/app/server.go
index 3df6a39bb..5f955dd65 100644
--- a/app/server.go
+++ b/app/server.go
@@ -211,18 +211,3 @@ func (a *App) StartServer() {
}
}()
}
-
-func (a *App) StopServer() {
-
- l4g.Info(utils.T("api.server.stop_server.stopping.info"))
-
- a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN)
- a.Srv.Store.Close()
- a.HubStop()
-
- a.ShutDownPlugins()
-
- a.Srv = nil
-
- l4g.Info(utils.T("api.server.stop_server.stopped.info"))
-}
diff --git a/app/status.go b/app/status.go
index edfda561b..3d4837cb0 100644
--- a/app/status.go
+++ b/app/status.go
@@ -221,7 +221,9 @@ func (a *App) BroadcastStatus(status *model.Status) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", status.Status)
event.Add("user_id", status.UserId)
- go a.Publish(event)
+ a.Go(func() {
+ a.Publish(event)
+ })
}
func (a *App) SetStatusOffline(userId string, manual bool) {
@@ -245,7 +247,9 @@ func (a *App) SetStatusOffline(userId string, manual bool) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", model.STATUS_OFFLINE)
event.Add("user_id", status.UserId)
- go a.Publish(event)
+ a.Go(func() {
+ a.Publish(event)
+ })
}
func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) {
@@ -286,7 +290,9 @@ func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", model.STATUS_AWAY)
event.Add("user_id", status.UserId)
- go a.Publish(event)
+ a.Go(func() {
+ a.Publish(event)
+ })
}
func GetStatusFromCache(userId string) *model.Status {
diff --git a/app/team.go b/app/team.go
index 7a5ccc5d6..d9a857f0b 100644
--- a/app/team.go
+++ b/app/team.go
@@ -134,7 +134,9 @@ func (a *App) PatchTeam(teamId string, patch *model.TeamPatch) (*model.Team, *mo
func (a *App) sendUpdatedTeamEvent(team *model.Team) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_UPDATE_TEAM, "", "", "", nil)
message.Add("team", team.ToJson())
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
}
func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles string) (*model.TeamMember, *model.AppError) {
@@ -172,7 +174,9 @@ func (a *App) sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_MEMBERROLE_UPDATED, "", "", userId, nil)
message.Add("member", member.ToJson())
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
}
func (a *App) AddUserToTeam(teamId string, userId string, userRequestorId string) (*model.Team, *model.AppError) {
diff --git a/app/user.go b/app/user.go
index 27f1c5d85..b98583f80 100644
--- a/app/user.go
+++ b/app/user.go
@@ -202,7 +202,9 @@ func (a *App) CreateUser(user *model.User) (*model.User, *model.AppError) {
// This message goes to everyone, so the teamId, channelId and userId are irrelevant
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_NEW_USER, "", "", "", nil)
message.Add("user_id", ruser.Id)
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
return ruser, nil
}
@@ -980,7 +982,9 @@ func (a *App) sendUpdatedUserEvent(user model.User, asAdmin bool) {
omitUsers[user.Id] = true
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers)
message.Add("user", user)
- go a.Publish(message)
+ a.Go(func() {
+ a.Publish(message)
+ })
}
func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, *model.AppError) {
@@ -991,11 +995,11 @@ func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User,
if sendNotifications {
if rusers[0].Email != rusers[1].Email {
- go func() {
+ a.Go(func() {
if err := SendEmailChangeEmail(rusers[1].Email, rusers[0].Email, rusers[0].Locale, utils.GetSiteURL()); err != nil {
l4g.Error(err.Error())
}
- }()
+ })
if utils.Cfg.EmailSettings.RequireEmailVerification {
if err := a.SendEmailVerification(rusers[0]); err != nil {
@@ -1005,11 +1009,11 @@ func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User,
}
if rusers[0].Username != rusers[1].Username {
- go func() {
+ a.Go(func() {
if err := SendChangeUsernameEmail(rusers[1].Username, rusers[0].Username, rusers[0].Email, rusers[0].Locale, utils.GetSiteURL()); err != nil {
l4g.Error(err.Error())
}
- }()
+ })
}
}
@@ -1047,7 +1051,7 @@ func (a *App) UpdateMfa(activate bool, userId, token string) *model.AppError {
}
}
- go func() {
+ a.Go(func() {
var user *model.User
var err *model.AppError
@@ -1059,7 +1063,7 @@ func (a *App) UpdateMfa(activate bool, userId, token string) *model.AppError {
if err := SendMfaChangeEmail(user.Email, activate, user.Locale, utils.GetSiteURL()); err != nil {
l4g.Error(err.Error())
}
- }()
+ })
return nil
}
@@ -1093,11 +1097,11 @@ func (a *App) UpdatePasswordSendEmail(user *model.User, newPassword, method stri
return err
}
- go func() {
+ a.Go(func() {
if err := SendPasswordChangeEmail(user.Email, method, user.Locale, utils.GetSiteURL()); err != nil {
l4g.Error(err.Error())
}
- }()
+ })
return nil
}
diff --git a/app/web_conn.go b/app/web_conn.go
index f5644ce17..5f66d9a51 100644
--- a/app/web_conn.go
+++ b/app/web_conn.go
@@ -44,10 +44,10 @@ type WebConn struct {
func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn {
if len(session.UserId) > 0 {
- go func() {
+ a.Go(func() {
a.SetStatusOnline(session.UserId, session.Id, false)
a.UpdateLastActivityAtIfNeeded(session)
- }()
+ })
}
wc := &WebConn{
@@ -94,6 +94,16 @@ func (c *WebConn) SetSession(v *model.Session) {
c.session.Store(v)
}
+func (c *WebConn) Pump() {
+ ch := make(chan struct{}, 1)
+ go func() {
+ c.WritePump()
+ ch <- struct{}{}
+ }()
+ c.ReadPump()
+ <-ch
+}
+
func (c *WebConn) ReadPump() {
defer func() {
c.App.HubUnregister(c)
@@ -104,7 +114,9 @@ func (c *WebConn) ReadPump() {
c.WebSocket.SetPongHandler(func(string) error {
c.WebSocket.SetReadDeadline(time.Now().Add(PONG_WAIT))
if c.IsAuthenticated() {
- go c.App.SetStatusAwayIfNeeded(c.UserId, false)
+ c.App.Go(func() {
+ c.App.SetStatusAwayIfNeeded(c.UserId, false)
+ })
}
return nil
})
@@ -191,7 +203,9 @@ func (c *WebConn) WritePump() {
}
if c.App.Metrics != nil {
- go c.App.Metrics.IncrementWebSocketBroadcast(msg.EventType())
+ c.App.Go(func() {
+ c.App.Metrics.IncrementWebSocketBroadcast(msg.EventType())
+ })
}
}
diff --git a/app/web_hub.go b/app/web_hub.go
index 50ccb100e..0a70cb6d1 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -395,7 +395,9 @@ func (h *Hub) Start() {
}
if !found {
- go h.app.SetStatusOffline(userId, false)
+ h.app.Go(func() {
+ h.app.SetStatusOffline(userId, false)
+ })
}
case userId := <-h.invalidateUser:
diff --git a/app/webhook.go b/app/webhook.go
index 9531cba10..9d9b24b10 100644
--- a/app/webhook.go
+++ b/app/webhook.go
@@ -79,7 +79,11 @@ func (a *App) handleWebhookEvents(post *model.Post, team *model.Team, channel *m
TriggerWord: triggerWord,
FileIds: strings.Join(post.FileIds, ","),
}
- go a.TriggerWebhook(payload, hook, post, channel)
+ a.Go(func(hook *model.OutgoingWebhook) func() {
+ return func() {
+ a.TriggerWebhook(payload, hook, post, channel)
+ }
+ }(hook))
}
return nil
@@ -97,23 +101,25 @@ func (a *App) TriggerWebhook(payload *model.OutgoingWebhookPayload, hook *model.
}
for _, url := range hook.CallbackURLs {
- go func(url string) {
- req, _ := http.NewRequest("POST", url, body)
- req.Header.Set("Content-Type", contentType)
- req.Header.Set("Accept", "application/json")
- if resp, err := utils.HttpClient(false).Do(req); err != nil {
- l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.event_post.error"), err.Error())
- } else {
- defer CloseBody(resp)
- webhookResp := model.OutgoingWebhookResponseFromJson(resp.Body)
+ a.Go(func(url string) func() {
+ return func() {
+ req, _ := http.NewRequest("POST", url, body)
+ req.Header.Set("Content-Type", contentType)
+ req.Header.Set("Accept", "application/json")
+ if resp, err := utils.HttpClient(false).Do(req); err != nil {
+ l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.event_post.error"), err.Error())
+ } else {
+ defer CloseBody(resp)
+ webhookResp := model.OutgoingWebhookResponseFromJson(resp.Body)
- if webhookResp != nil && webhookResp.Text != nil {
- if _, err := a.CreateWebhookPost(hook.CreatorId, channel, *webhookResp.Text, webhookResp.Username, webhookResp.IconURL, webhookResp.Props, webhookResp.Type); err != nil {
- l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.create_post.error"), err)
+ if webhookResp != nil && webhookResp.Text != nil {
+ if _, err := a.CreateWebhookPost(hook.CreatorId, channel, *webhookResp.Text, webhookResp.Username, webhookResp.IconURL, webhookResp.Props, webhookResp.Type); err != nil {
+ l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.create_post.error"), err)
+ }
}
}
}
- }(url)
+ }(url))
}
}
diff --git a/app/websocket_router.go b/app/websocket_router.go
index c8220f1f1..cad53ade7 100644
--- a/app/websocket_router.go
+++ b/app/websocket_router.go
@@ -61,10 +61,10 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque
if err != nil {
conn.WebSocket.Close()
} else {
- go func() {
+ wr.app.Go(func() {
wr.app.SetStatusOnline(session.UserId, session.Id, false)
wr.app.UpdateLastActivityAtIfNeeded(*session)
- }()
+ })
conn.SetSession(session)
conn.SetSessionToken(session.Token)