From 15d64fb201848002a25facc3bbffc9535a704df6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Espino?= Date: Wed, 26 Sep 2018 16:34:12 +0200 Subject: MM-7188: Cleaning push notification on every read, not only on channel switch (#9348) * MM-7188: Cleaning push notification on every read, not only on channel switch * Removed unnecesary goroutine * Fixing tests * Applying suggestion from PR --- app/notification_push.go | 186 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 138 insertions(+), 48 deletions(-) (limited to 'app/notification_push.go') diff --git a/app/notification_push.go b/app/notification_push.go index 5c7df0da7..0a24ba1e0 100644 --- a/app/notification_push.go +++ b/app/notification_push.go @@ -5,9 +5,9 @@ package app import ( "fmt" + "hash/fnv" "net/http" "strings" - "time" "github.com/mattermost/mattermost-server/mlog" "github.com/mattermost/mattermost-server/model" @@ -15,21 +15,42 @@ import ( "github.com/nicksnyder/go-i18n/i18n" ) -func (a *App) sendPushNotification(notification *postNotification, user *model.User, explicitMention, channelWideMention bool, replyToThreadType string) *model.AppError { - channel := notification.channel - post := notification.post +type NotificationType string - cfg := a.Config() +const NOTIFICATION_TYPE_CLEAR NotificationType = "clear" +const NOTIFICATION_TYPE_MESSAGE NotificationType = "message" - var nameFormat string - if result := <-a.Srv.Store.Preference().Get(user.Id, model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS, model.PREFERENCE_NAME_NAME_FORMAT); result.Err != nil { - nameFormat = *a.Config().TeamSettings.TeammateNameDisplay - } else { - nameFormat = result.Data.(model.Preference).Value - } +const PUSH_NOTIFICATION_HUB_WORKERS = 1000 +const PUSH_NOTIFICATIONS_HUB_BUFFER_PER_WORKER = 50 - channelName := notification.GetChannelName(nameFormat, user.Id) - senderName := notification.GetSenderName(nameFormat, cfg.ServiceSettings.EnablePostUsernameOverride) +type PushNotificationsHub struct { + Channels []chan PushNotification +} + +type PushNotification struct { + notificationType NotificationType + userId string + channelId string + post *model.Post + user *model.User + channel *model.Channel + senderName string + channelName string + explicitMention bool + channelWideMention bool + replyToThreadType string +} + +func (hub *PushNotificationsHub) GetGoChannelFromUserId(userId string) chan PushNotification { + h := fnv.New32a() + h.Write([]byte(userId)) + chanIdx := h.Sum32() % PUSH_NOTIFICATION_HUB_WORKERS + return hub.Channels[chanIdx] +} + +func (a *App) sendPushNotificationSync(post *model.Post, user *model.User, channel *model.Channel, channelName string, senderName string, + explicitMention, channelWideMention bool, replyToThreadType string) *model.AppError { + cfg := a.Config() sessions, err := a.getMobileAppSessions(user.Id) if err != nil { @@ -86,11 +107,7 @@ func (a *App) sendPushNotification(notification *postNotification, user *model.U mlog.Debug(fmt.Sprintf("Sending push notification to device %v for user %v with msg of '%v'", tmpMessage.DeviceId, user.Id, msg.Message), mlog.String("user_id", user.Id)) - a.Go(func(session *model.Session) func() { - return func() { - a.sendToPushProxy(tmpMessage, session) - } - }(session)) + a.sendToPushProxy(tmpMessage, session) if a.Metrics != nil { a.Metrics.IncrementPostSentPush() @@ -100,6 +117,35 @@ func (a *App) sendPushNotification(notification *postNotification, user *model.U return nil } +func (a *App) sendPushNotification(notification *postNotification, user *model.User, explicitMention, channelWideMention bool, replyToThreadType string) { + cfg := a.Config() + channel := notification.channel + post := notification.post + + var nameFormat string + if result := <-a.Srv.Store.Preference().Get(user.Id, model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS, model.PREFERENCE_NAME_NAME_FORMAT); result.Err != nil { + nameFormat = *a.Config().TeamSettings.TeammateNameDisplay + } else { + nameFormat = result.Data.(model.Preference).Value + } + + channelName := notification.GetChannelName(nameFormat, user.Id) + senderName := notification.GetSenderName(nameFormat, cfg.ServiceSettings.EnablePostUsernameOverride) + + c := a.PushNotificationsHub.GetGoChannelFromUserId(user.Id) + c <- PushNotification{ + notificationType: NOTIFICATION_TYPE_MESSAGE, + post: post, + user: user, + channel: channel, + senderName: senderName, + channelName: channelName, + explicitMention: explicitMention, + channelWideMention: channelWideMention, + replyToThreadType: replyToThreadType, + } +} + func (a *App) getPushNotificationMessage(postMessage string, explicitMention, channelWideMention, hasFiles bool, senderName, channelName, channelType, replyToThreadType string, userLocale i18n.TranslateFunc) string { message := "" @@ -140,41 +186,85 @@ func (a *App) getPushNotificationMessage(postMessage string, explicitMention, ch return message } +func (a *App) ClearPushNotificationSync(userId string, channelId string) { + sessions, err := a.getMobileAppSessions(userId) + if err != nil { + mlog.Error(err.Error()) + return + } + + msg := model.PushNotification{} + msg.Type = model.PUSH_TYPE_CLEAR + msg.ChannelId = channelId + msg.ContentAvailable = 0 + if badge := <-a.Srv.Store.User().GetUnreadCount(userId); badge.Err != nil { + msg.Badge = 0 + mlog.Error(fmt.Sprint("We could not get the unread message count for the user", userId, badge.Err), mlog.String("user_id", userId)) + } else { + msg.Badge = int(badge.Data.(int64)) + } + + mlog.Debug(fmt.Sprintf("Clearing push notification to %v with channel_id %v", msg.DeviceId, msg.ChannelId)) + + for _, session := range sessions { + tmpMessage := *model.PushNotificationFromJson(strings.NewReader(msg.ToJson())) + tmpMessage.SetDeviceIdAndPlatform(session.DeviceId) + a.sendToPushProxy(tmpMessage, session) + } +} + func (a *App) ClearPushNotification(userId string, channelId string) { - a.Go(func() { - // Sleep is to allow the read replicas a chance to fully sync - // the unread count for sending an accurate count. - // Delaying a little doesn't hurt anything and is cheaper than - // attempting to read from master. - time.Sleep(time.Second * 5) - - sessions, err := a.getMobileAppSessions(userId) - if err != nil { - mlog.Error(err.Error()) - return - } + channel := a.PushNotificationsHub.GetGoChannelFromUserId(userId) + channel <- PushNotification{ + notificationType: NOTIFICATION_TYPE_CLEAR, + userId: userId, + channelId: channelId, + } +} - msg := model.PushNotification{} - msg.Type = model.PUSH_TYPE_CLEAR - msg.ChannelId = channelId - msg.ContentAvailable = 0 - if badge := <-a.Srv.Store.User().GetUnreadCount(userId); badge.Err != nil { - msg.Badge = 0 - mlog.Error(fmt.Sprint("We could not get the unread message count for the user", userId, badge.Err), mlog.String("user_id", userId)) - } else { - msg.Badge = int(badge.Data.(int64)) +func (a *App) CreatePushNotificationsHub() { + hub := PushNotificationsHub{ + Channels: []chan PushNotification{}, + } + for x := 0; x < PUSH_NOTIFICATION_HUB_WORKERS; x++ { + hub.Channels = append(hub.Channels, make(chan PushNotification, PUSH_NOTIFICATIONS_HUB_BUFFER_PER_WORKER)) + } + a.PushNotificationsHub = hub +} + +func (a *App) pushNotificationWorker(notifications chan PushNotification) { + for notification := range notifications { + switch notification.notificationType { + case NOTIFICATION_TYPE_CLEAR: + a.ClearPushNotificationSync(notification.userId, notification.channelId) + case NOTIFICATION_TYPE_MESSAGE: + a.sendPushNotificationSync( + notification.post, + notification.user, + notification.channel, + notification.channelName, + notification.senderName, + notification.explicitMention, + notification.channelWideMention, + notification.replyToThreadType, + ) + default: + mlog.Error(fmt.Sprintf("Invalid notification type %v", notification.notificationType)) } + } +} - mlog.Debug(fmt.Sprintf("Clearing push notification to %v with channel_id %v", msg.DeviceId, msg.ChannelId)) +func (a *App) StartPushNotificationsHubWorkers() { + for x := 0; x < PUSH_NOTIFICATION_HUB_WORKERS; x++ { + channel := a.PushNotificationsHub.Channels[x] + a.Go(func() { a.pushNotificationWorker(channel) }) + } +} - for _, session := range sessions { - tmpMessage := *model.PushNotificationFromJson(strings.NewReader(msg.ToJson())) - tmpMessage.SetDeviceIdAndPlatform(session.DeviceId) - a.Go(func() { - a.sendToPushProxy(tmpMessage, session) - }) - } - }) +func (a *App) StopPushNotificationsHubWorkers() { + for _, channel := range a.PushNotificationsHub.Channels { + close(channel) + } } func (a *App) sendToPushProxy(msg model.PushNotification, session *model.Session) { -- cgit v1.2.3-1-g7c22