From 5e69ce099f521aa49fc267c62235c003eae530ff Mon Sep 17 00:00:00 2001 From: Chris Date: Tue, 3 Oct 2017 10:53:53 -0500 Subject: Goroutine wranglin (#7556) * goroutine wranglin * synchronize WebConn.WritePump --- api/channel.go | 4 +++- api/post.go | 4 +++- api/user.go | 12 ++++++---- api/websocket.go | 3 +-- api4/websocket.go | 3 +-- app/app.go | 53 +++++++++++++++++++++++++++++++++++------- app/channel.go | 28 ++++++++++++++++------ app/command_echo.go | 4 ++-- app/command_expand_collapse.go | 4 +++- app/compliance.go | 4 +++- app/email_batching.go | 6 ++++- app/ldap.go | 12 +++++----- app/notification.go | 22 ++++++++++++------ app/oauth.go | 8 +++---- app/post.go | 40 +++++++++++++++++++++---------- app/preference.go | 8 +++++-- app/reaction.go | 8 +++++-- app/server.go | 15 ------------ app/status.go | 12 +++++++--- app/team.go | 8 +++++-- app/user.go | 24 +++++++++++-------- app/web_conn.go | 22 ++++++++++++++---- app/web_hub.go | 4 +++- app/webhook.go | 34 ++++++++++++++++----------- app/websocket_router.go | 4 ++-- cmd/platform/test.go | 5 ++-- 26 files changed, 233 insertions(+), 118 deletions(-) diff --git a/api/channel.go b/api/channel.go index 7222ba0b1..87fbf5d39 100644 --- a/api/channel.go +++ b/api/channel.go @@ -649,7 +649,9 @@ func addMember(c *Context, w http.ResponseWriter, r *http.Request) { return } - go c.App.PostAddToChannelMessage(oUser, nUser, channel) + c.App.Go(func() { + c.App.PostAddToChannelMessage(oUser, nUser, channel) + }) c.App.UpdateChannelLastViewedAt([]string{id}, oUser.Id) w.Write([]byte(cm.ToJson())) diff --git a/api/post.go b/api/post.go index db9412e7b..b2b8e3d0e 100644 --- a/api/post.go +++ b/api/post.go @@ -141,7 +141,9 @@ func saveIsPinnedPost(c *Context, w http.ResponseWriter, r *http.Request, isPinn message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", rpost.ChannelId, "", nil) message.Add("post", rpost.ToJson()) - go c.App.Publish(message) + c.App.Go(func() { + c.App.Publish(message) + }) c.App.InvalidateCacheForChannelPosts(rpost.ChannelId) diff --git a/api/user.go b/api/user.go index 5aeb2762b..1e77b8e2e 100644 --- a/api/user.go +++ b/api/user.go @@ -1081,7 +1081,7 @@ func updateMfa(c *Context, w http.ResponseWriter, r *http.Request) { c.LogAudit("success - deactivated") } - go func() { + c.App.Go(func() { var user *model.User var err *model.AppError if user, err = c.App.GetUser(c.Session.UserId); err != nil { @@ -1092,7 +1092,7 @@ func updateMfa(c *Context, w http.ResponseWriter, r *http.Request) { if err := app.SendMfaChangeEmail(user.Email, activate, user.Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) rdata := map[string]string{} rdata["status"] = "ok" @@ -1212,7 +1212,9 @@ func completeSaml(c *Context, w http.ResponseWriter, r *http.Request) { case model.OAUTH_ACTION_SIGNUP: teamId := relayProps["team_id"] if len(teamId) > 0 { - go c.App.AddDirectChannels(teamId, user) + c.App.Go(func() { + c.App.AddDirectChannels(teamId, user) + }) } break case model.OAUTH_ACTION_EMAIL_TO_SSO: @@ -1221,11 +1223,11 @@ func completeSaml(c *Context, w http.ResponseWriter, r *http.Request) { return } c.LogAuditWithUserId(user.Id, "Revoked all sessions for user") - go func() { + c.App.Go(func() { if err := app.SendSignInChangeEmail(user.Email, strings.Title(model.USER_AUTH_SERVICE_SAML)+" SSO", user.Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) break } doLogin(c, w, r, user, "") diff --git a/api/websocket.go b/api/websocket.go index c90968e7c..e5e2390c7 100644 --- a/api/websocket.go +++ b/api/websocket.go @@ -39,6 +39,5 @@ func connect(c *Context, w http.ResponseWriter, r *http.Request) { c.App.HubRegister(wc) } - go wc.WritePump() - wc.ReadPump() + wc.Pump() } diff --git a/api4/websocket.go b/api4/websocket.go index 2793c0bd0..c148ec3bf 100644 --- a/api4/websocket.go +++ b/api4/websocket.go @@ -40,6 +40,5 @@ func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) { c.App.HubRegister(wc) } - go wc.WritePump() - wc.ReadPump() + wc.Pump() } diff --git a/app/app.go b/app/app.go index 7974ab44f..d0d5bb4e0 100644 --- a/app/app.go +++ b/app/app.go @@ -7,7 +7,9 @@ import ( "io/ioutil" "net/http" "sync" - "time" + "sync/atomic" + + l4g "github.com/alecthomas/log4go" "github.com/mattermost/mattermost-server/einterfaces" ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" @@ -18,6 +20,9 @@ import ( ) type App struct { + goroutineCount int32 + goroutineExitSignal chan struct{} + Srv *Server PluginEnv *pluginenv.Environment @@ -43,7 +48,8 @@ type App struct { } var globalApp App = App{ - Jobs: &jobs.JobServer{}, + goroutineExitSignal: make(chan struct{}, 1), + Jobs: &jobs.JobServer{}, } var appCount = 0 @@ -61,7 +67,8 @@ func New() *App { panic("Only one App should exist at a time. Did you forget to call Shutdown()?") } app := &App{ - Jobs: &jobs.JobServer{}, + goroutineExitSignal: make(chan struct{}, 1), + Jobs: &jobs.JobServer{}, } app.initEnterprise() return app @@ -76,12 +83,19 @@ func New() *App { func (a *App) Shutdown() { appCount-- if appCount == 0 { - // XXX: This is to give all of our runaway goroutines time to complete. - // We should wrangle them up and remove this. - time.Sleep(time.Second) - if a.Srv != nil { - a.StopServer() + l4g.Info(utils.T("api.server.stop_server.stopping.info")) + + a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN) + a.Srv.Store.Close() + a.HubStop() + + a.ShutDownPlugins() + a.WaitForGoroutines() + + a.Srv = nil + + l4g.Info(utils.T("api.server.stop_server.stopped.info")) } } } @@ -211,6 +225,29 @@ func (a *App) Config() *model.Config { return utils.Cfg } +// Go creates a goroutine, but maintains a record of it to ensure that execution completes before +// the app is destroyed. +func (a *App) Go(f func()) { + atomic.AddInt32(&a.goroutineCount, 1) + + go func() { + f() + + atomic.AddInt32(&a.goroutineCount, -1) + select { + case a.goroutineExitSignal <- struct{}{}: + default: + } + }() +} + +// WaitForGoroutines blocks until all goroutines created by App.Go exit. +func (a *App) WaitForGoroutines() { + for atomic.LoadInt32(&a.goroutineCount) != 0 { + <-a.goroutineExitSignal + } +} + func CloseBody(r *http.Response) { if r.Body != nil { ioutil.ReadAll(r.Body) diff --git a/app/channel.go b/app/channel.go index 88f9cc7d7..2ab591c42 100644 --- a/app/channel.go +++ b/app/channel.go @@ -584,7 +584,9 @@ func (a *App) AddChannelMember(userId string, channel *model.Channel, userReques if userId == userRequestorId { a.postJoinChannelMessage(user, channel) } else { - go a.PostAddToChannelMessage(userRequestor, user, channel) + a.Go(func() { + a.PostAddToChannelMessage(userRequestor, user, channel) + }) } a.UpdateChannelLastViewedAt([]string{channel.Id}, userRequestor.Id) @@ -958,7 +960,9 @@ func (a *App) LeaveChannel(channelId string, userId string) *model.AppError { return err } - go a.postLeaveChannelMessage(user, channel) + a.Go(func() { + a.postLeaveChannelMessage(user, channel) + }) } return nil @@ -1039,13 +1043,17 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string, message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", channel.Id, "", nil) message.Add("user_id", userIdToRemove) message.Add("remover_id", removerUserId) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) // because the removed user no longer belongs to the channel we need to send a separate websocket event userMsg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", "", userIdToRemove, nil) userMsg.Add("channel_id", channel.Id) userMsg.Add("remover_id", removerUserId) - go a.Publish(userMsg) + a.Go(func() { + a.Publish(userMsg) + }) return nil } @@ -1064,7 +1072,9 @@ func (a *App) RemoveUserFromChannel(userIdToRemove string, removerUserId string, if userIdToRemove == removerUserId { a.postLeaveChannelMessage(user, channel) } else { - go a.PostRemoveFromChannelMessage(removerUserId, user, channel) + a.Go(func() { + a.PostRemoveFromChannelMessage(removerUserId, user, channel) + }) } return nil @@ -1113,7 +1123,9 @@ func (a *App) UpdateChannelLastViewedAt(channelIds []string, userId string) *mod for _, channelId := range channelIds { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil) message.Add("channel_id", channelId) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) } } @@ -1182,7 +1194,9 @@ func (a *App) ViewChannel(view *model.ChannelView, userId string, clearPushNotif if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages && model.IsValidId(view.ChannelId) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil) message.Add("channel_id", view.ChannelId) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) } return times, nil diff --git a/app/command_echo.go b/app/command_echo.go index 61a24b59a..404736aa4 100644 --- a/app/command_echo.go +++ b/app/command_echo.go @@ -77,7 +77,7 @@ func (me *EchoProvider) DoCommand(a *App, args *model.CommandArgs, message strin } echoSem <- true - go func() { + a.Go(func() { defer func() { <-echoSem }() post := &model.Post{} post.ChannelId = args.ChannelId @@ -91,7 +91,7 @@ func (me *EchoProvider) DoCommand(a *App, args *model.CommandArgs, message strin if _, err := a.CreatePostMissingChannel(post, true); err != nil { l4g.Error(args.T("api.command_echo.create.app_error"), err) } - }() + }) return &model.CommandResponse{} } diff --git a/app/command_expand_collapse.go b/app/command_expand_collapse.go index 9c94a4325..093e53a38 100644 --- a/app/command_expand_collapse.go +++ b/app/command_expand_collapse.go @@ -74,7 +74,9 @@ func (a *App) setCollapsePreference(args *model.CommandArgs, isCollapse bool) *m socketMessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCE_CHANGED, "", "", args.UserId, nil) socketMessage.Add("preference", pref.ToJson()) - go a.Publish(socketMessage) + a.Go(func() { + a.Publish(socketMessage) + }) var rmsg string diff --git a/app/compliance.go b/app/compliance.go index cf942ca00..77590af6c 100644 --- a/app/compliance.go +++ b/app/compliance.go @@ -35,7 +35,9 @@ func (a *App) SaveComplianceReport(job *model.Compliance) (*model.Compliance, *m return nil, result.Err } else { job = result.Data.(*model.Compliance) - go a.Compliance.RunComplianceJob(job) + a.Go(func() { + a.Compliance.RunComplianceJob(job) + }) } return job, nil diff --git a/app/email_batching.go b/app/email_batching.go index b9f89d646..4c546531f 100644 --- a/app/email_batching.go +++ b/app/email_batching.go @@ -175,7 +175,11 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu // send the email notification if it's been long enough if now.Sub(time.Unix(batchStartTime/1000, 0)) > time.Duration(interval)*time.Second { - go handler(userId, notifications) + job.app.Go(func(userId string, notifications []*batchedNotification) func() { + return func() { + handler(userId, notifications) + } + }(userId, notifications)) delete(job.pendingNotifications, userId) } } diff --git a/app/ldap.go b/app/ldap.go index 96791168e..119587866 100644 --- a/app/ldap.go +++ b/app/ldap.go @@ -12,7 +12,7 @@ import ( ) func (a *App) SyncLdap() { - go func() { + a.Go(func() { if utils.IsLicensed() && *utils.License().Features.LDAP && *utils.Cfg.LdapSettings.Enable { if ldapI := a.Ldap; ldapI != nil { ldapI.StartSynchronizeJob(false) @@ -20,7 +20,7 @@ func (a *App) SyncLdap() { l4g.Error("%v", model.NewAppError("SyncLdap", "ent.ldap.disabled.app_error", nil, "", http.StatusNotImplemented).Error()) } } - }() + }) } func (a *App) TestLdap() *model.AppError { @@ -60,11 +60,11 @@ func (a *App) SwitchEmailToLdap(email, password, code, ldapId, ldapPassword stri return "", err } - go func() { + a.Go(func() { if err := SendSignInChangeEmail(user.Email, "AD/LDAP", user.Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) return "/login?extra=signin_change", nil } @@ -102,11 +102,11 @@ func (a *App) SwitchLdapToEmail(ldapPassword, code, email, newPassword string) ( T := utils.GetUserTranslations(user.Locale) - go func() { + a.Go(func() { if err := SendSignInChangeEmail(user.Email, T("api.templates.signin_change_email.body.method_email"), user.Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) return "/login?extra=signin_change", nil } diff --git a/app/notification.go b/app/notification.go index 0859dfd20..3df4a789f 100644 --- a/app/notification.go +++ b/app/notification.go @@ -94,7 +94,9 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod if result := <-a.Srv.Store.User().GetProfilesByUsernames(potentialOtherMentions, team.Id); result.Err == nil { outOfChannelMentions := result.Data.([]*model.User) if channel.Type != model.CHANNEL_GROUP { - go a.sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions) + a.Go(func() { + a.sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions) + }) } } } @@ -362,11 +364,11 @@ func (a *App) sendNotificationEmail(post *model.Post, user *model.User, channel teamURL := utils.GetSiteURL() + "/" + team.Name var bodyText = a.getNotificationEmailBody(user, post, channel, senderName, team.Name, teamURL, emailNotificationContentsType, translateFunc) - go func() { + a.Go(func() { if err := utils.SendMail(user.Email, html.UnescapeString(subjectText), bodyText); err != nil { l4g.Error(utils.T("api.post.send_notifications_and_forget.send.error"), user.Email, err) } - }() + }) if a.Metrics != nil { a.Metrics.IncrementPostSentEmail() @@ -638,7 +640,11 @@ func (a *App) sendPushNotification(post *model.Post, user *model.User, channel * l4g.Debug("Sending push notification to device %v for user %v with msg of '%v'", tmpMessage.DeviceId, user.Id, msg.Message) - go a.sendToPushProxy(tmpMessage, session) + a.Go(func(session *model.Session) func() { + return func() { + a.sendToPushProxy(tmpMessage, session) + } + }(session)) if a.Metrics != nil { a.Metrics.IncrementPostSentPush() @@ -649,7 +655,7 @@ func (a *App) sendPushNotification(post *model.Post, user *model.User, channel * } func (a *App) ClearPushNotification(userId string, channelId string) { - go func() { + a.Go(func() { // Sleep is to allow the read replicas a chance to fully sync // the unread count for sending an accurate count. // Delaying a little doesn't hurt anything and is cheaper than @@ -678,9 +684,11 @@ func (a *App) ClearPushNotification(userId string, channelId string) { for _, session := range sessions { tmpMessage := *model.PushNotificationFromJson(strings.NewReader(msg.ToJson())) tmpMessage.SetDeviceIdAndPlatform(session.DeviceId) - go a.sendToPushProxy(tmpMessage, session) + a.Go(func() { + a.sendToPushProxy(tmpMessage, session) + }) } - }() + }) } func (a *App) sendToPushProxy(msg model.PushNotification, session *model.Session) { diff --git a/app/oauth.go b/app/oauth.go index cd45b0c10..be0535f35 100644 --- a/app/oauth.go +++ b/app/oauth.go @@ -535,11 +535,11 @@ func (a *App) CompleteSwitchWithOAuth(service string, userData io.ReadCloser, em return nil, result.Err } - go func() { + a.Go(func() { if err := SendSignInChangeEmail(user.Email, strings.Title(service)+" SSO", user.Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) return user, nil } @@ -770,11 +770,11 @@ func (a *App) SwitchOAuthToEmail(email, password, requesterId string) (string, * T := utils.GetUserTranslations(user.Locale) - go func() { + a.Go(func() { if err := SendSignInChangeEmail(user.Email, T("api.templates.signin_change_email.body.method_email"), user.Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) if err := a.RevokeAllSessions(requesterId); err != nil { return "", err diff --git a/app/post.go b/app/post.go index ccdc015bb..497cab5a6 100644 --- a/app/post.go +++ b/app/post.go @@ -75,7 +75,9 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError) if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", post.UserId, nil) message.Add("channel_id", post.ChannelId) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) } } @@ -152,7 +154,9 @@ func (a *App) CreatePost(post *model.Post, channel *model.Channel, triggerWebhoo esInterface := a.Elasticsearch if esInterface != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { - go esInterface.IndexPost(rpost, channel.TeamId) + a.Go(func() { + esInterface.IndexPost(rpost, channel.TeamId) + }) } if a.Metrics != nil { @@ -207,11 +211,11 @@ func (a *App) handlePostEvents(post *model.Post, user *model.User, channel *mode } if triggerWebhooks { - go func() { + a.Go(func() { if err := a.handleWebhookEvents(post, team, channel, user); err != nil { l4g.Error(err.Error()) } - }() + }) } return nil @@ -256,7 +260,9 @@ func (a *App) SendEphemeralPost(userId string, post *model.Post) *model.Post { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE, "", post.ChannelId, userId, nil) message.Add("post", post.ToJson()) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) return post } @@ -321,13 +327,13 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model esInterface := a.Elasticsearch if esInterface != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { - go func() { + a.Go(func() { if rchannel := <-a.Srv.Store.Channel().GetForPost(rpost.Id); rchannel.Err != nil { l4g.Error("Couldn't get channel %v for post %v for Elasticsearch indexing.", rpost.ChannelId, rpost.Id) } else { esInterface.IndexPost(rpost, rchannel.Data.(*model.Channel).TeamId) } - }() + }) } a.sendUpdatedPostEvent(rpost) @@ -361,7 +367,9 @@ func (a *App) sendUpdatedPostEvent(post *model.Post) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil) message.Add("post", post.ToJson()) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) } func (a *App) GetPostsPage(channelId string, page int, perPage int) (*model.PostList, *model.AppError) { @@ -502,13 +510,21 @@ func (a *App) DeletePost(postId string) (*model.Post, *model.AppError) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_DELETED, "", post.ChannelId, "", nil) message.Add("post", post.ToJson()) - go a.Publish(message) - go a.DeletePostFiles(post) - go a.DeleteFlaggedPosts(post.Id) + a.Go(func() { + a.Publish(message) + }) + a.Go(func() { + a.DeletePostFiles(post) + }) + a.Go(func() { + a.DeleteFlaggedPosts(post.Id) + }) esInterface := a.Elasticsearch if esInterface != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { - go esInterface.DeletePost(post) + a.Go(func() { + esInterface.DeletePost(post) + }) } a.InvalidateCacheForChannelPosts(post.ChannelId) diff --git a/app/preference.go b/app/preference.go index bee3236bf..9ca1f474c 100644 --- a/app/preference.go +++ b/app/preference.go @@ -55,7 +55,9 @@ func (a *App) UpdatePreferences(userId string, preferences model.Preferences) *m message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_CHANGED, "", "", userId, nil) message.Add("preferences", preferences.ToJson()) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) return nil } @@ -78,7 +80,9 @@ func (a *App) DeletePreferences(userId string, preferences model.Preferences) *m message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_DELETED, "", "", userId, nil) message.Add("preferences", preferences.ToJson()) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) return nil } diff --git a/app/reaction.go b/app/reaction.go index debf75f7a..bf0d20e2b 100644 --- a/app/reaction.go +++ b/app/reaction.go @@ -18,7 +18,9 @@ func (a *App) SaveReactionForPost(reaction *model.Reaction) (*model.Reaction, *m } else { reaction = result.Data.(*model.Reaction) - go a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_ADDED, reaction, post) + a.Go(func() { + a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_ADDED, reaction, post) + }) return reaction, nil } @@ -41,7 +43,9 @@ func (a *App) DeleteReactionForPost(reaction *model.Reaction) *model.AppError { if result := <-a.Srv.Store.Reaction().Delete(reaction); result.Err != nil { return result.Err } else { - go a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_REMOVED, reaction, post) + a.Go(func() { + a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_REMOVED, reaction, post) + }) } return nil diff --git a/app/server.go b/app/server.go index 3df6a39bb..5f955dd65 100644 --- a/app/server.go +++ b/app/server.go @@ -211,18 +211,3 @@ func (a *App) StartServer() { } }() } - -func (a *App) StopServer() { - - l4g.Info(utils.T("api.server.stop_server.stopping.info")) - - a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN) - a.Srv.Store.Close() - a.HubStop() - - a.ShutDownPlugins() - - a.Srv = nil - - l4g.Info(utils.T("api.server.stop_server.stopped.info")) -} diff --git a/app/status.go b/app/status.go index edfda561b..3d4837cb0 100644 --- a/app/status.go +++ b/app/status.go @@ -221,7 +221,9 @@ func (a *App) BroadcastStatus(status *model.Status) { event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil) event.Add("status", status.Status) event.Add("user_id", status.UserId) - go a.Publish(event) + a.Go(func() { + a.Publish(event) + }) } func (a *App) SetStatusOffline(userId string, manual bool) { @@ -245,7 +247,9 @@ func (a *App) SetStatusOffline(userId string, manual bool) { event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil) event.Add("status", model.STATUS_OFFLINE) event.Add("user_id", status.UserId) - go a.Publish(event) + a.Go(func() { + a.Publish(event) + }) } func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) { @@ -286,7 +290,9 @@ func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) { event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil) event.Add("status", model.STATUS_AWAY) event.Add("user_id", status.UserId) - go a.Publish(event) + a.Go(func() { + a.Publish(event) + }) } func GetStatusFromCache(userId string) *model.Status { diff --git a/app/team.go b/app/team.go index 7a5ccc5d6..d9a857f0b 100644 --- a/app/team.go +++ b/app/team.go @@ -134,7 +134,9 @@ func (a *App) PatchTeam(teamId string, patch *model.TeamPatch) (*model.Team, *mo func (a *App) sendUpdatedTeamEvent(team *model.Team) { message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_UPDATE_TEAM, "", "", "", nil) message.Add("team", team.ToJson()) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) } func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles string) (*model.TeamMember, *model.AppError) { @@ -172,7 +174,9 @@ func (a *App) sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_MEMBERROLE_UPDATED, "", "", userId, nil) message.Add("member", member.ToJson()) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) } func (a *App) AddUserToTeam(teamId string, userId string, userRequestorId string) (*model.Team, *model.AppError) { diff --git a/app/user.go b/app/user.go index 27f1c5d85..b98583f80 100644 --- a/app/user.go +++ b/app/user.go @@ -202,7 +202,9 @@ func (a *App) CreateUser(user *model.User) (*model.User, *model.AppError) { // This message goes to everyone, so the teamId, channelId and userId are irrelevant message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_NEW_USER, "", "", "", nil) message.Add("user_id", ruser.Id) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) return ruser, nil } @@ -980,7 +982,9 @@ func (a *App) sendUpdatedUserEvent(user model.User, asAdmin bool) { omitUsers[user.Id] = true message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers) message.Add("user", user) - go a.Publish(message) + a.Go(func() { + a.Publish(message) + }) } func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, *model.AppError) { @@ -991,11 +995,11 @@ func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, if sendNotifications { if rusers[0].Email != rusers[1].Email { - go func() { + a.Go(func() { if err := SendEmailChangeEmail(rusers[1].Email, rusers[0].Email, rusers[0].Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) if utils.Cfg.EmailSettings.RequireEmailVerification { if err := a.SendEmailVerification(rusers[0]); err != nil { @@ -1005,11 +1009,11 @@ func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, } if rusers[0].Username != rusers[1].Username { - go func() { + a.Go(func() { if err := SendChangeUsernameEmail(rusers[1].Username, rusers[0].Username, rusers[0].Email, rusers[0].Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) } } @@ -1047,7 +1051,7 @@ func (a *App) UpdateMfa(activate bool, userId, token string) *model.AppError { } } - go func() { + a.Go(func() { var user *model.User var err *model.AppError @@ -1059,7 +1063,7 @@ func (a *App) UpdateMfa(activate bool, userId, token string) *model.AppError { if err := SendMfaChangeEmail(user.Email, activate, user.Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) return nil } @@ -1093,11 +1097,11 @@ func (a *App) UpdatePasswordSendEmail(user *model.User, newPassword, method stri return err } - go func() { + a.Go(func() { if err := SendPasswordChangeEmail(user.Email, method, user.Locale, utils.GetSiteURL()); err != nil { l4g.Error(err.Error()) } - }() + }) return nil } diff --git a/app/web_conn.go b/app/web_conn.go index f5644ce17..5f66d9a51 100644 --- a/app/web_conn.go +++ b/app/web_conn.go @@ -44,10 +44,10 @@ type WebConn struct { func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn { if len(session.UserId) > 0 { - go func() { + a.Go(func() { a.SetStatusOnline(session.UserId, session.Id, false) a.UpdateLastActivityAtIfNeeded(session) - }() + }) } wc := &WebConn{ @@ -94,6 +94,16 @@ func (c *WebConn) SetSession(v *model.Session) { c.session.Store(v) } +func (c *WebConn) Pump() { + ch := make(chan struct{}, 1) + go func() { + c.WritePump() + ch <- struct{}{} + }() + c.ReadPump() + <-ch +} + func (c *WebConn) ReadPump() { defer func() { c.App.HubUnregister(c) @@ -104,7 +114,9 @@ func (c *WebConn) ReadPump() { c.WebSocket.SetPongHandler(func(string) error { c.WebSocket.SetReadDeadline(time.Now().Add(PONG_WAIT)) if c.IsAuthenticated() { - go c.App.SetStatusAwayIfNeeded(c.UserId, false) + c.App.Go(func() { + c.App.SetStatusAwayIfNeeded(c.UserId, false) + }) } return nil }) @@ -191,7 +203,9 @@ func (c *WebConn) WritePump() { } if c.App.Metrics != nil { - go c.App.Metrics.IncrementWebSocketBroadcast(msg.EventType()) + c.App.Go(func() { + c.App.Metrics.IncrementWebSocketBroadcast(msg.EventType()) + }) } } diff --git a/app/web_hub.go b/app/web_hub.go index 50ccb100e..0a70cb6d1 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -395,7 +395,9 @@ func (h *Hub) Start() { } if !found { - go h.app.SetStatusOffline(userId, false) + h.app.Go(func() { + h.app.SetStatusOffline(userId, false) + }) } case userId := <-h.invalidateUser: diff --git a/app/webhook.go b/app/webhook.go index 9531cba10..9d9b24b10 100644 --- a/app/webhook.go +++ b/app/webhook.go @@ -79,7 +79,11 @@ func (a *App) handleWebhookEvents(post *model.Post, team *model.Team, channel *m TriggerWord: triggerWord, FileIds: strings.Join(post.FileIds, ","), } - go a.TriggerWebhook(payload, hook, post, channel) + a.Go(func(hook *model.OutgoingWebhook) func() { + return func() { + a.TriggerWebhook(payload, hook, post, channel) + } + }(hook)) } return nil @@ -97,23 +101,25 @@ func (a *App) TriggerWebhook(payload *model.OutgoingWebhookPayload, hook *model. } for _, url := range hook.CallbackURLs { - go func(url string) { - req, _ := http.NewRequest("POST", url, body) - req.Header.Set("Content-Type", contentType) - req.Header.Set("Accept", "application/json") - if resp, err := utils.HttpClient(false).Do(req); err != nil { - l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.event_post.error"), err.Error()) - } else { - defer CloseBody(resp) - webhookResp := model.OutgoingWebhookResponseFromJson(resp.Body) + a.Go(func(url string) func() { + return func() { + req, _ := http.NewRequest("POST", url, body) + req.Header.Set("Content-Type", contentType) + req.Header.Set("Accept", "application/json") + if resp, err := utils.HttpClient(false).Do(req); err != nil { + l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.event_post.error"), err.Error()) + } else { + defer CloseBody(resp) + webhookResp := model.OutgoingWebhookResponseFromJson(resp.Body) - if webhookResp != nil && webhookResp.Text != nil { - if _, err := a.CreateWebhookPost(hook.CreatorId, channel, *webhookResp.Text, webhookResp.Username, webhookResp.IconURL, webhookResp.Props, webhookResp.Type); err != nil { - l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.create_post.error"), err) + if webhookResp != nil && webhookResp.Text != nil { + if _, err := a.CreateWebhookPost(hook.CreatorId, channel, *webhookResp.Text, webhookResp.Username, webhookResp.IconURL, webhookResp.Props, webhookResp.Type); err != nil { + l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.create_post.error"), err) + } } } } - }(url) + }(url)) } } diff --git a/app/websocket_router.go b/app/websocket_router.go index c8220f1f1..cad53ade7 100644 --- a/app/websocket_router.go +++ b/app/websocket_router.go @@ -61,10 +61,10 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque if err != nil { conn.WebSocket.Close() } else { - go func() { + wr.app.Go(func() { wr.app.SetStatusOnline(session.UserId, session.Id, false) wr.app.UpdateLastActivityAtIfNeeded(*session) - }() + }) conn.SetSession(session) conn.SetSessionToken(session.Token) diff --git a/cmd/platform/test.go b/cmd/platform/test.go index f0c75a8d3..7e8b9cf0f 100644 --- a/cmd/platform/test.go +++ b/cmd/platform/test.go @@ -49,6 +49,7 @@ func webClientTestsCmdF(cmd *cobra.Command, args []string) error { if err != nil { return err } + defer a.Shutdown() utils.InitTranslations(utils.Cfg.LocalizationSettings) a.Srv.Router = api.NewRouter() @@ -59,7 +60,6 @@ func webClientTestsCmdF(cmd *cobra.Command, args []string) error { setupClientTests() a.StartServer() runWebClientTests() - a.StopServer() return nil } @@ -69,6 +69,7 @@ func serverForWebClientTestsCmdF(cmd *cobra.Command, args []string) error { if err != nil { return err } + defer a.Shutdown() utils.InitTranslations(utils.Cfg.LocalizationSettings) a.Srv.Router = api.NewRouter() @@ -83,8 +84,6 @@ func serverForWebClientTestsCmdF(cmd *cobra.Command, args []string) error { signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-c - a.StopServer() - return nil } -- cgit v1.2.3-1-g7c22