From 36f216cb7cb16958d98b3d77e121198596fd2213 Mon Sep 17 00:00:00 2001 From: Corey Hulen Date: Mon, 19 Jun 2017 08:44:04 -0700 Subject: PLT-6080 moving clustering to memberlist (#6499) * PLT-6080 adding cluster discovery service * Adding memberlist lib * Adding memberlist lib * WIP * WIP * WIP * WIP * Rolling back config changes * Fixing make file * Fixing config for cluster * WIP * Fixing system console for clustering * Fixing default config * Fixing config * Fixing system console for clustering * Tweaking hub setting * Bumping up time * merging vendor dir * Updating vendor dir * Fixing unit test * Fixing bad merge * Remove some testing code * Moving comment * PLT-6868 adding db ping retry * Removing unused loc strings * Adding defer to cancel --- app/admin.go | 44 ++++++++++------ app/cluster_discovery.go | 77 ++++++++++++++++++++++++++++ app/cluster_discovery_test.go | 27 ++++++++++ app/cluster_handlers.go | 77 ++++++++++++++++++++++++++++ app/session.go | 7 ++- app/status.go | 7 ++- app/web_hub.go | 116 +++++++++++++++++++++++++++++++++--------- 7 files changed, 313 insertions(+), 42 deletions(-) create mode 100644 app/cluster_discovery.go create mode 100644 app/cluster_discovery_test.go create mode 100644 app/cluster_handlers.go (limited to 'app') diff --git a/app/admin.go b/app/admin.go index 103c4617b..4f8125106 100644 --- a/app/admin.go +++ b/app/admin.go @@ -19,12 +19,23 @@ import ( ) func GetLogs(page, perPage int) ([]string, *model.AppError) { - lines, err := GetLogsSkipSend(page, perPage) + var lines []string + if einterfaces.GetClusterInterface() != nil && *utils.Cfg.ClusterSettings.Enable { + lines = append(lines, "-----------------------------------------------------------------------------------------------------------") + lines = append(lines, "-----------------------------------------------------------------------------------------------------------") + lines = append(lines, einterfaces.GetClusterInterface().GetClusterId()) + lines = append(lines, "-----------------------------------------------------------------------------------------------------------") + lines = append(lines, "-----------------------------------------------------------------------------------------------------------") + } + + melines, err := GetLogsSkipSend(page, perPage) if err != nil { return nil, err } - if einterfaces.GetClusterInterface() != nil { + lines = append(lines, melines...) + + if einterfaces.GetClusterInterface() != nil && *utils.Cfg.ClusterSettings.Enable { clines, err := einterfaces.GetClusterInterface().GetLogs(page, perPage) if err != nil { return nil, err @@ -84,10 +95,14 @@ func InvalidateAllCaches() *model.AppError { InvalidateAllCachesSkipSend() if einterfaces.GetClusterInterface() != nil { - err := einterfaces.GetClusterInterface().InvalidateAllCaches() - if err != nil { - return err + + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_ALL_CACHES, + SendType: model.CLUSTER_SEND_RELIABLE, + WaitForAllToSend: true, } + + einterfaces.GetClusterInterface().SendClusterMessage(msg) } return nil @@ -120,7 +135,8 @@ func ReloadConfig() { InitEmailBatching() } -func SaveConfig(cfg *model.Config) *model.AppError { +func SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool) *model.AppError { + oldCfg := utils.Cfg cfg.SetDefaults() utils.Desanitize(cfg) @@ -132,7 +148,7 @@ func SaveConfig(cfg *model.Config) *model.AppError { return err } - if *utils.Cfg.ClusterSettings.Enable { + if *utils.Cfg.ClusterSettings.Enable && *utils.Cfg.ClusterSettings.ReadOnlyConfig { return model.NewLocAppError("saveConfig", "ent.cluster.save_config.error", nil, "") } @@ -149,14 +165,12 @@ func SaveConfig(cfg *model.Config) *model.AppError { } } - // oldCfg := utils.Cfg - // Future feature is to sync the configuration files - // if einterfaces.GetClusterInterface() != nil { - // err := einterfaces.GetClusterInterface().ConfigChanged(cfg, oldCfg, true) - // if err != nil { - // return err - // } - // } + if einterfaces.GetClusterInterface() != nil { + err := einterfaces.GetClusterInterface().ConfigChanged(cfg, oldCfg, sendConfigChangeClusterMessage) + if err != nil { + return err + } + } // start/restart email batching job if necessary InitEmailBatching() diff --git a/app/cluster_discovery.go b/app/cluster_discovery.go new file mode 100644 index 000000000..6584418f1 --- /dev/null +++ b/app/cluster_discovery.go @@ -0,0 +1,77 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import ( + "fmt" + "time" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/model" +) + +const ( + DISCOVERY_SERVICE_WRITE_PING = 60 * time.Second +) + +type ClusterDiscoveryService struct { + model.ClusterDiscovery + stop chan bool +} + +func NewClusterDiscoveryService() *ClusterDiscoveryService { + ds := &ClusterDiscoveryService{ + ClusterDiscovery: model.ClusterDiscovery{}, + stop: make(chan bool), + } + + return ds +} + +func (me *ClusterDiscoveryService) Start() { + + <-Srv.Store.ClusterDiscovery().Cleanup() + + if cresult := <-Srv.Store.ClusterDiscovery().Exists(&me.ClusterDiscovery); cresult.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to check if row exists for %v with err=%v", me.ClusterDiscovery.ToJson(), cresult.Err)) + } else { + if cresult.Data.(bool) { + if u := <-Srv.Store.ClusterDiscovery().Delete(&me.ClusterDiscovery); u.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to start clean for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err)) + } + } + } + + if result := <-Srv.Store.ClusterDiscovery().Save(&me.ClusterDiscovery); result.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to save for %v with err=%v", me.ClusterDiscovery.ToJson(), result.Err)) + return + } + + go func() { + l4g.Debug(fmt.Sprintf("ClusterDiscoveryService ping writer started for %v", me.ClusterDiscovery.ToJson())) + ticker := time.NewTicker(DISCOVERY_SERVICE_WRITE_PING) + defer func() { + ticker.Stop() + if u := <-Srv.Store.ClusterDiscovery().Delete(&me.ClusterDiscovery); u.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to cleanup for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err)) + } + l4g.Debug(fmt.Sprintf("ClusterDiscoveryService ping writer stopped for %v", me.ClusterDiscovery.ToJson())) + }() + + for { + select { + case <-ticker.C: + if u := <-Srv.Store.ClusterDiscovery().SetLastPingAt(&me.ClusterDiscovery); u.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to write ping for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err)) + } + case <-me.stop: + return + } + } + }() +} + +func (me *ClusterDiscoveryService) Stop() { + me.stop <- true +} diff --git a/app/cluster_discovery_test.go b/app/cluster_discovery_test.go new file mode 100644 index 000000000..ca5b1bfa4 --- /dev/null +++ b/app/cluster_discovery_test.go @@ -0,0 +1,27 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import ( + "testing" + + "time" + + "github.com/mattermost/platform/model" +) + +func TestClusterDiscoveryService(t *testing.T) { + Setup() + + ds := NewClusterDiscoveryService() + ds.Type = model.CDS_TYPE_APP + ds.ClusterName = "ClusterA" + ds.AutoFillHostname() + + ds.Start() + time.Sleep(2 * time.Second) + + ds.Stop() + time.Sleep(2 * time.Second) +} diff --git a/app/cluster_handlers.go b/app/cluster_handlers.go new file mode 100644 index 000000000..d15bb851a --- /dev/null +++ b/app/cluster_handlers.go @@ -0,0 +1,77 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import ( + "strings" + + "github.com/mattermost/platform/einterfaces" + "github.com/mattermost/platform/model" +) + +func RegisterAllClusterMessageHandlers() { + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_PUBLISH, ClusterPublishHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_UPDATE_STATUS, ClusterUpdateStatusHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_ALL_CACHES, ClusterInvalidateAllCachesHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS, ClusterInvalidateCacheForReactionsHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK, ClusterInvalidateCacheForWebhookHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS, ClusterInvalidateCacheForChannelPostsHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS, ClusterInvalidateCacheForChannelMembersNotifyPropHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS, ClusterInvalidateCacheForChannelMembersHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME, ClusterInvalidateCacheForChannelByNameHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL, ClusterInvalidateCacheForChannelHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER, ClusterInvalidateCacheForUserHandler) + einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, ClusterClearSessionCacheForUserHandler) + +} + +func ClusterPublishHandler(msg *model.ClusterMessage) { + event := model.WebSocketEventFromJson(strings.NewReader(msg.Data)) + PublishSkipClusterSend(event) +} + +func ClusterUpdateStatusHandler(msg *model.ClusterMessage) { + status := model.StatusFromJson(strings.NewReader(msg.Data)) + AddStatusCacheSkipClusterSend(status) +} + +func ClusterInvalidateAllCachesHandler(msg *model.ClusterMessage) { + InvalidateAllCachesSkipSend() +} + +func ClusterInvalidateCacheForReactionsHandler(msg *model.ClusterMessage) { + InvalidateCacheForReactionsSkipClusterSend(msg.Data) +} + +func ClusterInvalidateCacheForWebhookHandler(msg *model.ClusterMessage) { + InvalidateCacheForWebhookSkipClusterSend(msg.Data) +} + +func ClusterInvalidateCacheForChannelPostsHandler(msg *model.ClusterMessage) { + InvalidateCacheForWebhookSkipClusterSend(msg.Data) +} + +func ClusterInvalidateCacheForChannelMembersNotifyPropHandler(msg *model.ClusterMessage) { + InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(msg.Data) +} + +func ClusterInvalidateCacheForChannelMembersHandler(msg *model.ClusterMessage) { + InvalidateCacheForChannelMembersSkipClusterSend(msg.Data) +} + +func ClusterInvalidateCacheForChannelByNameHandler(msg *model.ClusterMessage) { + InvalidateCacheForChannelByNameSkipClusterSend(msg.Props["id"], msg.Props["name"]) +} + +func ClusterInvalidateCacheForChannelHandler(msg *model.ClusterMessage) { + InvalidateCacheForChannelSkipClusterSend(msg.Data) +} + +func ClusterInvalidateCacheForUserHandler(msg *model.ClusterMessage) { + InvalidateCacheForUserSkipClusterSend(msg.Data) +} + +func ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) { + ClearSessionCacheForUserSkipClusterSend(msg.Data) +} diff --git a/app/session.go b/app/session.go index 7290bfd88..4b1ea18f2 100644 --- a/app/session.go +++ b/app/session.go @@ -101,7 +101,12 @@ func ClearSessionCacheForUser(userId string) { ClearSessionCacheForUserSkipClusterSend(userId) if einterfaces.GetClusterInterface() != nil { - einterfaces.GetClusterInterface().ClearSessionCacheForUser(userId) + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: userId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } diff --git a/app/status.go b/app/status.go index 868e57563..9f6ad2e05 100644 --- a/app/status.go +++ b/app/status.go @@ -26,7 +26,12 @@ func AddStatusCache(status *model.Status) { AddStatusCacheSkipClusterSend(status) if einterfaces.GetClusterInterface() != nil { - einterfaces.GetClusterInterface().UpdateStatus(status) + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_UPDATE_STATUS, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: status.ToJson(), + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } diff --git a/app/web_hub.go b/app/web_hub.go index 6b61430dc..cadad0de4 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -64,10 +64,11 @@ func TotalWebsocketConnections() int { } func HubStart() { - l4g.Info(utils.T("api.web_hub.start.starting.debug"), runtime.NumCPU()*2) - // Total number of hubs is twice the number of CPUs. - hubs = make([]*Hub, runtime.NumCPU()*2) + numberOfHubs := runtime.NumCPU() * 2 + l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs) + + hubs = make([]*Hub, numberOfHubs) for i := 0; i < len(hubs); i++ { hubs[i] = NewWebHub() @@ -142,17 +143,28 @@ func HubUnregister(webConn *WebConn) { } func Publish(message *model.WebSocketEvent) { - if metrics := einterfaces.GetMetricsInterface(); metrics != nil { metrics.IncrementWebsocketEvent(message.Event) } - for _, hub := range hubs { - hub.Broadcast(message) - } + PublishSkipClusterSend(message) if einterfaces.GetClusterInterface() != nil { - einterfaces.GetClusterInterface().Publish(message) + cm := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_PUBLISH, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: message.ToJson(), + } + + if message.Event == model.WEBSOCKET_EVENT_POSTED || + message.Event == model.WEBSOCKET_EVENT_POST_EDITED || + message.Event == model.WEBSOCKET_EVENT_DIRECT_ADDED || + message.Event == model.WEBSOCKET_EVENT_GROUP_ADDED || + message.Event == model.WEBSOCKET_EVENT_ADDED_TO_TEAM { + cm.SendType = model.CLUSTER_SEND_RELIABLE + } + + einterfaces.GetClusterInterface().SendClusterMessage(cm) } } @@ -167,16 +179,28 @@ func InvalidateCacheForChannel(channel *model.Channel) { InvalidateCacheForChannelByNameSkipClusterSend(channel.TeamId, channel.Name) if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForChannel(channel.Id) - cluster.InvalidateCacheForChannelByName(channel.TeamId, channel.Name) - } -} + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: channel.Id, + } -func InvalidateCacheForChannelMembers(channelId string) { - InvalidateCacheForChannelMembersSkipClusterSend(channelId) + einterfaces.GetClusterInterface().SendClusterMessage(msg) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForChannelMembers(channelId) + nameMsg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Props: make(map[string]string), + } + + nameMsg.Props["name"] = channel.Name + if channel.TeamId == "" { + nameMsg.Props["id"] = "dm" + } else { + nameMsg.Props["id"] = channel.TeamId + } + + einterfaces.GetClusterInterface().SendClusterMessage(nameMsg) } } @@ -184,6 +208,19 @@ func InvalidateCacheForChannelSkipClusterSend(channelId string) { Srv.Store.Channel().InvalidateChannel(channelId) } +func InvalidateCacheForChannelMembers(channelId string) { + InvalidateCacheForChannelMembersSkipClusterSend(channelId) + + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: channelId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) + } +} + func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) { Srv.Store.User().InvalidateProfilesInChannelCache(channelId) Srv.Store.Channel().InvalidateMemberCount(channelId) @@ -192,8 +229,13 @@ func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) { func InvalidateCacheForChannelMembersNotifyProps(channelId string) { InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForChannelMembersNotifyProps(channelId) + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: channelId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } @@ -202,14 +244,23 @@ func InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId string } func InvalidateCacheForChannelByNameSkipClusterSend(teamId, name string) { + if teamId == "" { + teamId = "dm" + } + Srv.Store.Channel().InvalidateChannelByName(teamId, name) } func InvalidateCacheForChannelPosts(channelId string) { InvalidateCacheForChannelPostsSkipClusterSend(channelId) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForChannelPosts(channelId) + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: channelId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } @@ -221,7 +272,12 @@ func InvalidateCacheForUser(userId string) { InvalidateCacheForUserSkipClusterSend(userId) if einterfaces.GetClusterInterface() != nil { - einterfaces.GetClusterInterface().InvalidateCacheForUser(userId) + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: userId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } @@ -238,8 +294,13 @@ func InvalidateCacheForUserSkipClusterSend(userId string) { func InvalidateCacheForWebhook(webhookId string) { InvalidateCacheForWebhookSkipClusterSend(webhookId) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForWebhook(webhookId) + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: webhookId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } @@ -256,8 +317,13 @@ func InvalidateWebConnSessionCacheForUser(userId string) { func InvalidateCacheForReactions(postId string) { InvalidateCacheForReactionsSkipClusterSend(postId) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForReactions(postId) + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: postId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } -- cgit v1.2.3-1-g7c22