From 55f6a0b21c46f77fb33e1af29c5960298bdb6907 Mon Sep 17 00:00:00 2001 From: Thomas Balthazar Date: Wed, 18 May 2016 22:34:31 +0200 Subject: Move away from the "andForget" style of function (#3046) This is the second and last part of the refactoring. First part is documented here: https://github.com/mattermost/platform/pull/3043 --- api/channel.go | 93 +++++++-------- api/file.go | 216 +++++++++++++++++----------------- api/post.go | 359 +++++++++++++++++++++++++++----------------------------- api/team.go | 2 +- api/user.go | 2 +- api/web_conn.go | 5 +- api/web_hub.go | 6 +- 7 files changed, 332 insertions(+), 351 deletions(-) (limited to 'api') diff --git a/api/channel.go b/api/channel.go index 9d36dd2eb..7b12849e3 100644 --- a/api/channel.go +++ b/api/channel.go @@ -5,14 +5,15 @@ package api import ( "fmt" + "net/http" + "strconv" + "strings" + l4g "github.com/alecthomas/log4go" "github.com/gorilla/mux" "github.com/mattermost/platform/model" "github.com/mattermost/platform/store" "github.com/mattermost/platform/utils" - "net/http" - "strconv" - "strings" ) const ( @@ -154,7 +155,7 @@ func CreateDirectChannel(userId string, otherUserId string) (*model.Channel, *mo } else { message := model.NewMessage("", channel.Id, userId, model.ACTION_DIRECT_ADDED) message.Add("teammate_id", otherUserId) - PublishAndForget(message) + go Publish(message) return result.Data.(*model.Channel), nil } @@ -291,42 +292,40 @@ func updateChannelHeader(c *Context, w http.ResponseWriter, r *http.Request) { c.Err = ucresult.Err return } else { - PostUpdateChannelHeaderMessageAndForget(c, channel.Id, oldChannelHeader, channelHeader) + go PostUpdateChannelHeaderMessage(c, channel.Id, oldChannelHeader, channelHeader) c.LogAudit("name=" + channel.Name) w.Write([]byte(channel.ToJson())) } } } -func PostUpdateChannelHeaderMessageAndForget(c *Context, channelId string, oldChannelHeader, newChannelHeader string) { - go func() { - uc := Srv.Store.User().Get(c.Session.UserId) +func PostUpdateChannelHeaderMessage(c *Context, channelId string, oldChannelHeader, newChannelHeader string) { + uc := Srv.Store.User().Get(c.Session.UserId) - if uresult := <-uc; uresult.Err != nil { - l4g.Error(utils.T("api.channel.post_update_channel_header_message_and_forget.retrieve_user.error"), uresult.Err) - return + if uresult := <-uc; uresult.Err != nil { + l4g.Error(utils.T("api.channel.post_update_channel_header_message_and_forget.retrieve_user.error"), uresult.Err) + return + } else { + user := uresult.Data.(*model.User) + + var message string + if oldChannelHeader == "" { + message = fmt.Sprintf(utils.T("api.channel.post_update_channel_header_message_and_forget.updated_to"), user.Username, newChannelHeader) + } else if newChannelHeader == "" { + message = fmt.Sprintf(utils.T("api.channel.post_update_channel_header_message_and_forget.removed"), user.Username, oldChannelHeader) } else { - user := uresult.Data.(*model.User) - - var message string - if oldChannelHeader == "" { - message = fmt.Sprintf(utils.T("api.channel.post_update_channel_header_message_and_forget.updated_to"), user.Username, newChannelHeader) - } else if newChannelHeader == "" { - message = fmt.Sprintf(utils.T("api.channel.post_update_channel_header_message_and_forget.removed"), user.Username, oldChannelHeader) - } else { - message = fmt.Sprintf(utils.T("api.channel.post_update_channel_header_message_and_forget.updated_from"), user.Username, oldChannelHeader, newChannelHeader) - } + message = fmt.Sprintf(utils.T("api.channel.post_update_channel_header_message_and_forget.updated_from"), user.Username, oldChannelHeader, newChannelHeader) + } - post := &model.Post{ - ChannelId: channelId, - Message: message, - Type: model.POST_HEADER_CHANGE, - } - if _, err := CreatePost(c, post, false); err != nil { - l4g.Error(utils.T("api.channel.post_update_channel_header_message_and_forget.join_leave.error"), err) - } + post := &model.Post{ + ChannelId: channelId, + Message: message, + Type: model.POST_HEADER_CHANGE, } - }() + if _, err := CreatePost(c, post, false); err != nil { + l4g.Error(utils.T("api.channel.post_update_channel_header_message_and_forget.join_leave.error"), err) + } + } } func updateChannelPurpose(c *Context, w http.ResponseWriter, r *http.Request) { @@ -490,7 +489,7 @@ func joinChannel(c *Context, channelChannel store.StoreChannel, userChannel stor if _, err := AddUserToChannel(user, channel); err != nil { return err, nil } - PostUserAddRemoveMessageAndForget(c, channel.Id, fmt.Sprintf(utils.T("api.channel.join_channel.post_and_forget"), user.Username)) + go PostUserAddRemoveMessage(c, channel.Id, fmt.Sprintf(utils.T("api.channel.join_channel.post_and_forget"), user.Username)) } else { return model.NewLocAppError("join", "api.channel.join_channel.permissions.app_error", nil, ""), nil } @@ -498,17 +497,15 @@ func joinChannel(c *Context, channelChannel store.StoreChannel, userChannel stor } } -func PostUserAddRemoveMessageAndForget(c *Context, channelId string, message string) { - go func() { - post := &model.Post{ - ChannelId: channelId, - Message: message, - Type: model.POST_JOIN_LEAVE, - } - if _, err := CreatePost(c, post, false); err != nil { - l4g.Error(utils.T("api.channel.post_user_add_remove_message_and_forget.error"), err) - } - }() +func PostUserAddRemoveMessage(c *Context, channelId string, message string) { + post := &model.Post{ + ChannelId: channelId, + Message: message, + Type: model.POST_JOIN_LEAVE, + } + if _, err := CreatePost(c, post, false); err != nil { + l4g.Error(utils.T("api.channel.post_user_add_remove_message_and_forget.error"), err) + } } func AddUserToChannel(user *model.User, channel *model.Channel) (*model.ChannelMember, *model.AppError) { @@ -539,7 +536,7 @@ func AddUserToChannel(user *model.User, channel *model.Channel) (*model.ChannelM InvalidateCacheForUser(user.Id) message := model.NewMessage(channel.TeamId, channel.Id, user.Id, model.ACTION_USER_ADDED) - PublishAndForget(message) + go Publish(message) }() return newMember, nil @@ -627,7 +624,7 @@ func leave(c *Context, w http.ResponseWriter, r *http.Request) { RemoveUserFromChannel(c.Session.UserId, c.Session.UserId, channel) - PostUserAddRemoveMessageAndForget(c, channel.Id, fmt.Sprintf(utils.T("api.channel.leave.left"), user.Username)) + go PostUserAddRemoveMessage(c, channel.Id, fmt.Sprintf(utils.T("api.channel.leave.left"), user.Username)) result := make(map[string]string) result["id"] = channel.Id @@ -722,7 +719,7 @@ func deleteChannel(c *Context, w http.ResponseWriter, r *http.Request) { go func() { InvalidateCacheForChannel(channel.Id) message := model.NewMessage(c.TeamId, channel.Id, c.Session.UserId, model.ACTION_CHANNEL_DELETED) - PublishAndForget(message) + go Publish(message) post := &model.Post{ ChannelId: channel.Id, @@ -758,7 +755,7 @@ func updateLastViewedAt(c *Context, w http.ResponseWriter, r *http.Request) { message := model.NewMessage(c.TeamId, id, c.Session.UserId, model.ACTION_CHANNEL_VIEWED) message.Add("channel_id", id) - PublishAndForget(message) + go Publish(message) result := make(map[string]string) result["id"] = id @@ -907,7 +904,7 @@ func addMember(c *Context, w http.ResponseWriter, r *http.Request) { c.LogAudit("name=" + channel.Name + " user_id=" + userId) - PostUserAddRemoveMessageAndForget(c, channel.Id, fmt.Sprintf(utils.T("api.channel.add_member.added"), nUser.Username, oUser.Username)) + go PostUserAddRemoveMessage(c, channel.Id, fmt.Sprintf(utils.T("api.channel.add_member.added"), nUser.Username, oUser.Username)) <-Srv.Store.Channel().UpdateLastViewedAt(id, oUser.Id) w.Write([]byte(cm.ToJson())) @@ -978,7 +975,7 @@ func RemoveUserFromChannel(userIdToRemove string, removerUserId string, channel message := model.NewMessage(channel.TeamId, channel.Id, userIdToRemove, model.ACTION_USER_REMOVED) message.Add("remover_id", removerUserId) - PublishAndForget(message) + go Publish(message) return nil } diff --git a/api/file.go b/api/file.go index 6cda48866..f4d1e0005 100644 --- a/api/file.go +++ b/api/file.go @@ -6,16 +6,6 @@ package api import ( "bytes" "fmt" - l4g "github.com/alecthomas/log4go" - "github.com/disintegration/imaging" - "github.com/goamz/goamz/aws" - "github.com/goamz/goamz/s3" - "github.com/gorilla/mux" - "github.com/mattermost/platform/model" - "github.com/mattermost/platform/utils" - "github.com/mssola/user_agent" - "github.com/rwcarlsen/goexif/exif" - _ "golang.org/x/image/bmp" "image" "image/color" "image/draw" @@ -30,6 +20,17 @@ import ( "strconv" "strings" "time" + + l4g "github.com/alecthomas/log4go" + "github.com/disintegration/imaging" + "github.com/goamz/goamz/aws" + "github.com/goamz/goamz/s3" + "github.com/gorilla/mux" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" + "github.com/mssola/user_agent" + "github.com/rwcarlsen/goexif/exif" + _ "golang.org/x/image/bmp" ) const ( @@ -165,110 +166,107 @@ func uploadFile(c *Context, w http.ResponseWriter, r *http.Request) { resStruct.ClientIds = append(resStruct.ClientIds, clientId) } - handleImagesAndForget(imageNameList, imageDataList, c.TeamId, channelId, c.Session.UserId) + go handleImages(imageNameList, imageDataList, c.TeamId, channelId, c.Session.UserId) w.Write([]byte(resStruct.ToJson())) } -func handleImagesAndForget(filenames []string, fileData [][]byte, teamId, channelId, userId string) { +func handleImages(filenames []string, fileData [][]byte, teamId, channelId, userId string) { + dest := "teams/" + teamId + "/channels/" + channelId + "/users/" + userId + "/" + + for i, filename := range filenames { + name := filename[:strings.LastIndex(filename, ".")] + go func() { + // Decode image bytes into Image object + img, imgType, err := image.Decode(bytes.NewReader(fileData[i])) + if err != nil { + l4g.Error(utils.T("api.file.handle_images_forget.decode.error"), channelId, userId, filename, err) + return + } + + width := img.Bounds().Dx() + height := img.Bounds().Dy() + + // Get the image's orientation and ignore any errors since not all images will have orientation data + orientation, _ := getImageOrientation(fileData[i]) - go func() { - dest := "teams/" + teamId + "/channels/" + channelId + "/users/" + userId + "/" + if imgType == "png" { + dst := image.NewRGBA(img.Bounds()) + draw.Draw(dst, dst.Bounds(), image.NewUniform(color.White), image.Point{}, draw.Src) + draw.Draw(dst, dst.Bounds(), img, img.Bounds().Min, draw.Over) + img = dst + } + + switch orientation { + case UprightMirrored: + img = imaging.FlipH(img) + case UpsideDown: + img = imaging.Rotate180(img) + case UpsideDownMirrored: + img = imaging.FlipV(img) + case RotatedCWMirrored: + img = imaging.Transpose(img) + case RotatedCCW: + img = imaging.Rotate270(img) + case RotatedCCWMirrored: + img = imaging.Transverse(img) + case RotatedCW: + img = imaging.Rotate90(img) + } - for i, filename := range filenames { - name := filename[:strings.LastIndex(filename, ".")] + // Create thumbnail go func() { - // Decode image bytes into Image object - img, imgType, err := image.Decode(bytes.NewReader(fileData[i])) + thumbWidth := float64(utils.Cfg.FileSettings.ThumbnailWidth) + thumbHeight := float64(utils.Cfg.FileSettings.ThumbnailHeight) + imgWidth := float64(width) + imgHeight := float64(height) + + var thumbnail image.Image + if imgHeight < thumbHeight && imgWidth < thumbWidth { + thumbnail = img + } else if imgHeight/imgWidth < thumbHeight/thumbWidth { + thumbnail = imaging.Resize(img, 0, utils.Cfg.FileSettings.ThumbnailHeight, imaging.Lanczos) + } else { + thumbnail = imaging.Resize(img, utils.Cfg.FileSettings.ThumbnailWidth, 0, imaging.Lanczos) + } + + buf := new(bytes.Buffer) + err = jpeg.Encode(buf, thumbnail, &jpeg.Options{Quality: 90}) if err != nil { - l4g.Error(utils.T("api.file.handle_images_forget.decode.error"), channelId, userId, filename, err) + l4g.Error(utils.T("api.file.handle_images_forget.encode_jpeg.error"), channelId, userId, filename, err) return } - width := img.Bounds().Dx() - height := img.Bounds().Dy() - - // Get the image's orientation and ignore any errors since not all images will have orientation data - orientation, _ := getImageOrientation(fileData[i]) + if err := WriteFile(buf.Bytes(), dest+name+"_thumb.jpg"); err != nil { + l4g.Error(utils.T("api.file.handle_images_forget.upload_thumb.error"), channelId, userId, filename, err) + return + } + }() - if imgType == "png" { - dst := image.NewRGBA(img.Bounds()) - draw.Draw(dst, dst.Bounds(), image.NewUniform(color.White), image.Point{}, draw.Src) - draw.Draw(dst, dst.Bounds(), img, img.Bounds().Min, draw.Over) - img = dst + // Create preview + go func() { + var preview image.Image + if width > int(utils.Cfg.FileSettings.PreviewWidth) { + preview = imaging.Resize(img, utils.Cfg.FileSettings.PreviewWidth, utils.Cfg.FileSettings.PreviewHeight, imaging.Lanczos) + } else { + preview = img } - switch orientation { - case UprightMirrored: - img = imaging.FlipH(img) - case UpsideDown: - img = imaging.Rotate180(img) - case UpsideDownMirrored: - img = imaging.FlipV(img) - case RotatedCWMirrored: - img = imaging.Transpose(img) - case RotatedCCW: - img = imaging.Rotate270(img) - case RotatedCCWMirrored: - img = imaging.Transverse(img) - case RotatedCW: - img = imaging.Rotate90(img) + buf := new(bytes.Buffer) + + err = jpeg.Encode(buf, preview, &jpeg.Options{Quality: 90}) + if err != nil { + l4g.Error(utils.T("api.file.handle_images_forget.encode_preview.error"), channelId, userId, filename, err) + return } - // Create thumbnail - go func() { - thumbWidth := float64(utils.Cfg.FileSettings.ThumbnailWidth) - thumbHeight := float64(utils.Cfg.FileSettings.ThumbnailHeight) - imgWidth := float64(width) - imgHeight := float64(height) - - var thumbnail image.Image - if imgHeight < thumbHeight && imgWidth < thumbWidth { - thumbnail = img - } else if imgHeight/imgWidth < thumbHeight/thumbWidth { - thumbnail = imaging.Resize(img, 0, utils.Cfg.FileSettings.ThumbnailHeight, imaging.Lanczos) - } else { - thumbnail = imaging.Resize(img, utils.Cfg.FileSettings.ThumbnailWidth, 0, imaging.Lanczos) - } - - buf := new(bytes.Buffer) - err = jpeg.Encode(buf, thumbnail, &jpeg.Options{Quality: 90}) - if err != nil { - l4g.Error(utils.T("api.file.handle_images_forget.encode_jpeg.error"), channelId, userId, filename, err) - return - } - - if err := WriteFile(buf.Bytes(), dest+name+"_thumb.jpg"); err != nil { - l4g.Error(utils.T("api.file.handle_images_forget.upload_thumb.error"), channelId, userId, filename, err) - return - } - }() - - // Create preview - go func() { - var preview image.Image - if width > int(utils.Cfg.FileSettings.PreviewWidth) { - preview = imaging.Resize(img, utils.Cfg.FileSettings.PreviewWidth, utils.Cfg.FileSettings.PreviewHeight, imaging.Lanczos) - } else { - preview = img - } - - buf := new(bytes.Buffer) - - err = jpeg.Encode(buf, preview, &jpeg.Options{Quality: 90}) - if err != nil { - l4g.Error(utils.T("api.file.handle_images_forget.encode_preview.error"), channelId, userId, filename, err) - return - } - - if err := WriteFile(buf.Bytes(), dest+name+"_preview.jpg"); err != nil { - l4g.Error(utils.T("api.file.handle_images_forget.upload_preview.error"), channelId, userId, filename, err) - return - } - }() + if err := WriteFile(buf.Bytes(), dest+name+"_preview.jpg"); err != nil { + l4g.Error(utils.T("api.file.handle_images_forget.upload_preview.error"), channelId, userId, filename, err) + return + } }() - } - }() + }() + } } func getImageOrientation(imageData []byte) (int, error) { @@ -329,7 +327,7 @@ func getFileInfo(c *Context, w http.ResponseWriter, r *http.Request) { info = cached.(*model.FileInfo) } else { fileData := make(chan []byte) - getFileAndForget(path, fileData) + go readFile(path, fileData) newInfo, err := model.GetInfoForBytes(filename, <-fileData) if err != nil { @@ -435,7 +433,7 @@ func getFileData(teamId string, channelId string, userId string, filename string path := "teams/" + teamId + "/channels/" + channelId + "/users/" + userId + "/" + filename fileChan := make(chan []byte) - getFileAndForget(path, fileChan) + go readFile(path, fileChan) if bytes := <-fileChan; bytes == nil { err := model.NewLocAppError("writeFileResponse", "api.file.get_file.not_found.app_error", nil, "path="+path) @@ -472,16 +470,14 @@ func writeFileResponse(filename string, bytes []byte, w http.ResponseWriter, r * return nil } -func getFileAndForget(path string, fileData chan []byte) { - go func() { - data, getErr := ReadFile(path) - if getErr != nil { - l4g.Error(getErr) - fileData <- nil - } else { - fileData <- data - } - }() +func readFile(path string, fileData chan []byte) { + data, getErr := ReadFile(path) + if getErr != nil { + l4g.Error(getErr) + fileData <- nil + } else { + fileData <- data + } } func getPublicLink(c *Context, w http.ResponseWriter, r *http.Request) { @@ -588,7 +584,7 @@ func WriteFile(f []byte, path string) *model.AppError { func MoveFile(oldPath, newPath string) *model.AppError { if utils.Cfg.FileSettings.DriverName == model.IMAGE_DRIVER_S3 { fileData := make(chan []byte) - getFileAndForget(oldPath, fileData) + go readFile(oldPath, fileData) fileBytes := <-fileData if fileBytes == nil { diff --git a/api/post.go b/api/post.go index 6be3ec7eb..2676bcd20 100644 --- a/api/post.go +++ b/api/post.go @@ -6,11 +6,6 @@ package api import ( "crypto/tls" "fmt" - l4g "github.com/alecthomas/log4go" - "github.com/gorilla/mux" - "github.com/mattermost/platform/model" - "github.com/mattermost/platform/store" - "github.com/mattermost/platform/utils" "html/template" "net/http" "net/url" @@ -20,6 +15,12 @@ import ( "strconv" "strings" "time" + + l4g "github.com/alecthomas/log4go" + "github.com/gorilla/mux" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/store" + "github.com/mattermost/platform/utils" ) func InitPost() { @@ -147,8 +148,7 @@ func CreatePost(c *Context, post *model.Post, triggerWebhooks bool) (*model.Post } else { rpost = result.Data.(*model.Post) - handlePostEventsAndForget(c, rpost, triggerWebhooks) - + go handlePostEvents(c, rpost, triggerWebhooks) } return rpost, nil @@ -227,76 +227,74 @@ func CreateWebhookPost(c *Context, channelId, text, overrideUsername, overrideIc return post, nil } -func handlePostEventsAndForget(c *Context, post *model.Post, triggerWebhooks bool) { - go func() { - tchan := Srv.Store.Team().Get(c.TeamId) - cchan := Srv.Store.Channel().Get(post.ChannelId) - uchan := Srv.Store.User().Get(post.UserId) - pchan := Srv.Store.User().GetProfiles(c.TeamId) - dpchan := Srv.Store.User().GetDirectProfiles(c.Session.UserId) - mchan := Srv.Store.Channel().GetMembers(post.ChannelId) - - var team *model.Team - if result := <-tchan; result.Err != nil { - l4g.Error(utils.T("api.post.handle_post_events_and_forget.team.error"), c.TeamId, result.Err) - return - } else { - team = result.Data.(*model.Team) - } +func handlePostEvents(c *Context, post *model.Post, triggerWebhooks bool) { + tchan := Srv.Store.Team().Get(c.TeamId) + cchan := Srv.Store.Channel().Get(post.ChannelId) + uchan := Srv.Store.User().Get(post.UserId) + pchan := Srv.Store.User().GetProfiles(c.TeamId) + dpchan := Srv.Store.User().GetDirectProfiles(c.Session.UserId) + mchan := Srv.Store.Channel().GetMembers(post.ChannelId) + + var team *model.Team + if result := <-tchan; result.Err != nil { + l4g.Error(utils.T("api.post.handle_post_events_and_forget.team.error"), c.TeamId, result.Err) + return + } else { + team = result.Data.(*model.Team) + } - var channel *model.Channel - if result := <-cchan; result.Err != nil { - l4g.Error(utils.T("api.post.handle_post_events_and_forget.channel.error"), post.ChannelId, result.Err) - return - } else { - channel = result.Data.(*model.Channel) - } + var channel *model.Channel + if result := <-cchan; result.Err != nil { + l4g.Error(utils.T("api.post.handle_post_events_and_forget.channel.error"), post.ChannelId, result.Err) + return + } else { + channel = result.Data.(*model.Channel) + } - var profiles map[string]*model.User - if result := <-pchan; result.Err != nil { - l4g.Error(utils.T("api.post.handle_post_events_and_forget.profiles.error"), c.TeamId, result.Err) - return - } else { - profiles = result.Data.(map[string]*model.User) - } + var profiles map[string]*model.User + if result := <-pchan; result.Err != nil { + l4g.Error(utils.T("api.post.handle_post_events_and_forget.profiles.error"), c.TeamId, result.Err) + return + } else { + profiles = result.Data.(map[string]*model.User) + } - if result := <-dpchan; result.Err != nil { - l4g.Error(utils.T("api.post.handle_post_events_and_forget.profiles.error"), c.TeamId, result.Err) - return - } else { - dps := result.Data.(map[string]*model.User) - for k, v := range dps { - profiles[k] = v - } + if result := <-dpchan; result.Err != nil { + l4g.Error(utils.T("api.post.handle_post_events_and_forget.profiles.error"), c.TeamId, result.Err) + return + } else { + dps := result.Data.(map[string]*model.User) + for k, v := range dps { + profiles[k] = v } + } - var members []model.ChannelMember - if result := <-mchan; result.Err != nil { - l4g.Error(utils.T("api.post.handle_post_events_and_forget.members.error"), post.ChannelId, result.Err) - return - } else { - members = result.Data.([]model.ChannelMember) - } + var members []model.ChannelMember + if result := <-mchan; result.Err != nil { + l4g.Error(utils.T("api.post.handle_post_events_and_forget.members.error"), post.ChannelId, result.Err) + return + } else { + members = result.Data.([]model.ChannelMember) + } - go sendNotifications(c, post, team, channel, profiles, members) - go checkForOutOfChannelMentions(c, post, channel, profiles, members) + go sendNotifications(c, post, team, channel, profiles, members) + go checkForOutOfChannelMentions(c, post, channel, profiles, members) - var user *model.User - if result := <-uchan; result.Err != nil { - l4g.Error(utils.T("api.post.handle_post_events_and_forget.user.error"), post.UserId, result.Err) - return - } else { - user = result.Data.(*model.User) - } + var user *model.User + if result := <-uchan; result.Err != nil { + l4g.Error(utils.T("api.post.handle_post_events_and_forget.user.error"), post.UserId, result.Err) + return + } else { + user = result.Data.(*model.User) + } - if triggerWebhooks { - handleWebhookEventsAndForget(c, post, team, channel, user) - } + if triggerWebhooks { + go handleWebhookEvents(c, post, team, channel, user) + } - if channel.Type == model.CHANNEL_DIRECT { - go makeDirectChannelVisible(c.TeamId, post.ChannelId) - } - }() + if channel.Type == model.CHANNEL_DIRECT { + go makeDirectChannelVisible(c.TeamId, post.ChannelId) + } } func makeDirectChannelVisible(teamId string, channelId string) { @@ -332,7 +330,7 @@ func makeDirectChannelVisible(teamId string, channelId string) { message := model.NewMessage(teamId, channelId, member.UserId, model.ACTION_PREFERENCE_CHANGED) message.Add("preference", preference.ToJson()) - PublishAndForget(message) + go Publish(message) } } else { preference := result.Data.(model.Preference) @@ -347,127 +345,123 @@ func makeDirectChannelVisible(teamId string, channelId string) { message := model.NewMessage(teamId, channelId, member.UserId, model.ACTION_PREFERENCE_CHANGED) message.Add("preference", preference.ToJson()) - PublishAndForget(message) + go Publish(message) } } } } } -func handleWebhookEventsAndForget(c *Context, post *model.Post, team *model.Team, channel *model.Channel, user *model.User) { - go func() { - if !utils.Cfg.ServiceSettings.EnableOutgoingWebhooks { - return - } +func handleWebhookEvents(c *Context, post *model.Post, team *model.Team, channel *model.Channel, user *model.User) { + if !utils.Cfg.ServiceSettings.EnableOutgoingWebhooks { + return + } - if channel.Type != model.CHANNEL_OPEN { - return - } + if channel.Type != model.CHANNEL_OPEN { + return + } - hchan := Srv.Store.Webhook().GetOutgoingByTeam(c.TeamId) + hchan := Srv.Store.Webhook().GetOutgoingByTeam(c.TeamId) - hooks := []*model.OutgoingWebhook{} + hooks := []*model.OutgoingWebhook{} - if result := <-hchan; result.Err != nil { - l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.getting.error"), result.Err) - return - } else { - hooks = result.Data.([]*model.OutgoingWebhook) - } + if result := <-hchan; result.Err != nil { + l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.getting.error"), result.Err) + return + } else { + hooks = result.Data.([]*model.OutgoingWebhook) + } - if len(hooks) == 0 { - return - } + if len(hooks) == 0 { + return + } - splitWords := strings.Fields(post.Message) + splitWords := strings.Fields(post.Message) - if len(splitWords) == 0 { - return - } + if len(splitWords) == 0 { + return + } - firstWord := splitWords[0] + firstWord := splitWords[0] - relevantHooks := []*model.OutgoingWebhook{} + relevantHooks := []*model.OutgoingWebhook{} - for _, hook := range hooks { - if hook.ChannelId == post.ChannelId { - if len(hook.TriggerWords) == 0 || hook.HasTriggerWord(firstWord) { - relevantHooks = append(relevantHooks, hook) - } - } else if len(hook.ChannelId) == 0 && hook.HasTriggerWord(firstWord) { + for _, hook := range hooks { + if hook.ChannelId == post.ChannelId { + if len(hook.TriggerWords) == 0 || hook.HasTriggerWord(firstWord) { relevantHooks = append(relevantHooks, hook) } + } else if len(hook.ChannelId) == 0 && hook.HasTriggerWord(firstWord) { + relevantHooks = append(relevantHooks, hook) } + } - for _, hook := range relevantHooks { - go func(hook *model.OutgoingWebhook) { - p := url.Values{} - p.Set("token", hook.Token) - - p.Set("team_id", hook.TeamId) - p.Set("team_domain", team.Name) + for _, hook := range relevantHooks { + go func(hook *model.OutgoingWebhook) { + p := url.Values{} + p.Set("token", hook.Token) - p.Set("channel_id", post.ChannelId) - p.Set("channel_name", channel.Name) + p.Set("team_id", hook.TeamId) + p.Set("team_domain", team.Name) - p.Set("timestamp", strconv.FormatInt(post.CreateAt/1000, 10)) + p.Set("channel_id", post.ChannelId) + p.Set("channel_name", channel.Name) - p.Set("user_id", post.UserId) - p.Set("user_name", user.Username) + p.Set("timestamp", strconv.FormatInt(post.CreateAt/1000, 10)) - p.Set("text", post.Message) - p.Set("trigger_word", firstWord) + p.Set("user_id", post.UserId) + p.Set("user_name", user.Username) - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: *utils.Cfg.ServiceSettings.EnableInsecureOutgoingConnections}, - } - client := &http.Client{Transport: tr} - - for _, url := range hook.CallbackURLs { - go func(url string) { - req, _ := http.NewRequest("POST", url, strings.NewReader(p.Encode())) - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - req.Header.Set("Accept", "application/json") - if resp, err := client.Do(req); err != nil { - l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.event_post.error"), err.Error()) - } else { - respProps := model.MapFromJson(resp.Body) - - // copy the context and create a mock session for posting the message - mockSession := model.Session{ - UserId: hook.CreatorId, - TeamMembers: []*model.TeamMember{{TeamId: hook.TeamId, UserId: hook.CreatorId}}, - IsOAuth: false, - } + p.Set("text", post.Message) + p.Set("trigger_word", firstWord) - newContext := &Context{ - Session: mockSession, - RequestId: model.NewId(), - IpAddress: "", - Path: c.Path, - Err: nil, - teamURLValid: c.teamURLValid, - teamURL: c.teamURL, - siteURL: c.siteURL, - T: c.T, - Locale: c.Locale, - TeamId: hook.TeamId, - } + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: *utils.Cfg.ServiceSettings.EnableInsecureOutgoingConnections}, + } + client := &http.Client{Transport: tr} + + for _, url := range hook.CallbackURLs { + go func(url string) { + req, _ := http.NewRequest("POST", url, strings.NewReader(p.Encode())) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "application/json") + if resp, err := client.Do(req); err != nil { + l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.event_post.error"), err.Error()) + } else { + respProps := model.MapFromJson(resp.Body) - if text, ok := respProps["text"]; ok { - if _, err := CreateWebhookPost(newContext, post.ChannelId, text, respProps["username"], respProps["icon_url"], post.Props, post.Type); err != nil { - l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.create_post.error"), err) - } - } + // copy the context and create a mock session for posting the message + mockSession := model.Session{ + UserId: hook.CreatorId, + TeamMembers: []*model.TeamMember{{TeamId: hook.TeamId, UserId: hook.CreatorId}}, + IsOAuth: false, } - }(url) - } - }(hook) - } + newContext := &Context{ + Session: mockSession, + RequestId: model.NewId(), + IpAddress: "", + Path: c.Path, + Err: nil, + teamURLValid: c.teamURLValid, + teamURL: c.teamURL, + siteURL: c.siteURL, + T: c.T, + Locale: c.Locale, + TeamId: hook.TeamId, + } - }() + if text, ok := respProps["text"]; ok { + if _, err := CreateWebhookPost(newContext, post.ChannelId, text, respProps["username"], respProps["icon_url"], post.Props, post.Type); err != nil { + l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.create_post.error"), err) + } + } + } + }(url) + } + }(hook) + } } func sendNotifications(c *Context, post *model.Post, team *model.Team, channel *model.Channel, profileMap map[string]*model.User, members []model.ChannelMember) { @@ -595,7 +589,7 @@ func sendNotifications(c *Context, post *model.Post, team *model.Team, channel * } for id := range toEmailMap { - updateMentionCountAndForget(post.ChannelId, id) + go updateMentionCount(post.ChannelId, id) } } @@ -779,15 +773,13 @@ func sendNotifications(c *Context, post *model.Post, team *model.Team, channel * message.Add("mentions", model.ArrayToJson(mentionedUsers)) } - PublishAndForget(message) + go Publish(message) } -func updateMentionCountAndForget(channelId, userId string) { - go func() { - if result := <-Srv.Store.Channel().IncrementMentionCount(channelId, userId); result.Err != nil { - l4g.Error(utils.T("api.post.update_mention_count_and_forget.update_error"), userId, channelId, result.Err) - } - }() +func updateMentionCount(channelId, userId string) { + if result := <-Srv.Store.Channel().IncrementMentionCount(channelId, userId); result.Err != nil { + l4g.Error(utils.T("api.post.update_mention_count_and_forget.update_error"), userId, channelId, result.Err) + } } func checkForOutOfChannelMentions(c *Context, post *model.Post, channel *model.Channel, allProfiles map[string]*model.User, members []model.ChannelMember) { @@ -876,7 +868,7 @@ func SendEphemeralPost(teamId, userId string, post *model.Post) { message := model.NewMessage(teamId, post.ChannelId, userId, model.ACTION_EPHEMERAL_MESSAGE) message.Add("post", post.ToJson()) - PublishAndForget(message) + go Publish(message) } func updatePost(c *Context, w http.ResponseWriter, r *http.Request) { @@ -938,7 +930,7 @@ func updatePost(c *Context, w http.ResponseWriter, r *http.Request) { message := model.NewMessage(c.TeamId, rpost.ChannelId, c.Session.UserId, model.ACTION_POST_EDITED) message.Add("post", rpost.ToJson()) - PublishAndForget(message) + go Publish(message) w.Write([]byte(rpost.ToJson())) } @@ -1202,8 +1194,8 @@ func deletePost(c *Context, w http.ResponseWriter, r *http.Request) { message := model.NewMessage(c.TeamId, post.ChannelId, c.Session.UserId, model.ACTION_POST_DELETED) message.Add("post", post.ToJson()) - PublishAndForget(message) - DeletePostFilesAndForget(c.TeamId, post) + go Publish(message) + go DeletePostFiles(c.TeamId, post) result := make(map[string]string) result["id"] = postId @@ -1211,21 +1203,18 @@ func deletePost(c *Context, w http.ResponseWriter, r *http.Request) { } } -func DeletePostFilesAndForget(teamId string, post *model.Post) { - go func() { - if len(post.Filenames) == 0 { - return - } - - prefix := "teams/" + teamId + "/channels/" + post.ChannelId + "/users/" + post.UserId + "/" - for _, filename := range post.Filenames { - splitUrl := strings.Split(filename, "/") - oldPath := prefix + splitUrl[len(splitUrl)-2] + "/" + splitUrl[len(splitUrl)-1] - newPath := prefix + splitUrl[len(splitUrl)-2] + "/deleted_" + splitUrl[len(splitUrl)-1] - MoveFile(oldPath, newPath) - } +func DeletePostFiles(teamId string, post *model.Post) { + if len(post.Filenames) == 0 { + return + } - }() + prefix := "teams/" + teamId + "/channels/" + post.ChannelId + "/users/" + post.UserId + "/" + for _, filename := range post.Filenames { + splitUrl := strings.Split(filename, "/") + oldPath := prefix + splitUrl[len(splitUrl)-2] + "/" + splitUrl[len(splitUrl)-1] + newPath := prefix + splitUrl[len(splitUrl)-2] + "/deleted_" + splitUrl[len(splitUrl)-1] + MoveFile(oldPath, newPath) + } } func getPostsBefore(c *Context, w http.ResponseWriter, r *http.Request) { diff --git a/api/team.go b/api/team.go index 3ed9b70af..f65feb5dc 100644 --- a/api/team.go +++ b/api/team.go @@ -274,7 +274,7 @@ func JoinUserToTeam(team *model.Team, user *model.User) *model.AppError { InvalidateCacheForUser(user.Id) // This message goes to every channel, so the channelId is irrelevant - PublishAndForget(model.NewMessage("", "", user.Id, model.ACTION_NEW_USER)) + go Publish(model.NewMessage("", "", user.Id, model.ACTION_NEW_USER)) return nil } diff --git a/api/user.go b/api/user.go index 34e6c01af..89ac1c837 100644 --- a/api/user.go +++ b/api/user.go @@ -260,7 +260,7 @@ func CreateUser(user *model.User) (*model.User, *model.AppError) { ruser.Sanitize(map[string]bool{}) // This message goes to every channel, so the channelId is irrelevant - PublishAndForget(model.NewMessage("", "", ruser.Id, model.ACTION_NEW_USER)) + go Publish(model.NewMessage("", "", ruser.Id, model.ACTION_NEW_USER)) return ruser, nil } diff --git a/api/web_conn.go b/api/web_conn.go index 9a6fc29df..971cc8cb8 100644 --- a/api/web_conn.go +++ b/api/web_conn.go @@ -4,11 +4,12 @@ package api import ( + "time" + l4g "github.com/alecthomas/log4go" "github.com/gorilla/websocket" "github.com/mattermost/platform/model" "github.com/mattermost/platform/utils" - "time" ) const ( @@ -77,7 +78,7 @@ func (c *WebConn) readPump() { return } else { msg.UserId = c.UserId - PublishAndForget(&msg) + go Publish(&msg) } } } diff --git a/api/web_hub.go b/api/web_hub.go index 066ae3474..133bb162a 100644 --- a/api/web_hub.go +++ b/api/web_hub.go @@ -29,10 +29,8 @@ var hub = &Hub{ invalidateChannel: make(chan string), } -func PublishAndForget(message *model.Message) { - go func() { - hub.Broadcast(message) - }() +func Publish(message *model.Message) { + hub.Broadcast(message) } func InvalidateCacheForUser(userId string) { -- cgit v1.2.3-1-g7c22