From 36f216cb7cb16958d98b3d77e121198596fd2213 Mon Sep 17 00:00:00 2001 From: Corey Hulen Date: Mon, 19 Jun 2017 08:44:04 -0700 Subject: PLT-6080 moving clustering to memberlist (#6499) * PLT-6080 adding cluster discovery service * Adding memberlist lib * Adding memberlist lib * WIP * WIP * WIP * WIP * Rolling back config changes * Fixing make file * Fixing config for cluster * WIP * Fixing system console for clustering * Fixing default config * Fixing config * Fixing system console for clustering * Tweaking hub setting * Bumping up time * merging vendor dir * Updating vendor dir * Fixing unit test * Fixing bad merge * Remove some testing code * Moving comment * PLT-6868 adding db ping retry * Removing unused loc strings * Adding defer to cancel --- app/web_hub.go | 116 ++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 91 insertions(+), 25 deletions(-) (limited to 'app/web_hub.go') diff --git a/app/web_hub.go b/app/web_hub.go index 6b61430dc..cadad0de4 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -64,10 +64,11 @@ func TotalWebsocketConnections() int { } func HubStart() { - l4g.Info(utils.T("api.web_hub.start.starting.debug"), runtime.NumCPU()*2) - // Total number of hubs is twice the number of CPUs. - hubs = make([]*Hub, runtime.NumCPU()*2) + numberOfHubs := runtime.NumCPU() * 2 + l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs) + + hubs = make([]*Hub, numberOfHubs) for i := 0; i < len(hubs); i++ { hubs[i] = NewWebHub() @@ -142,17 +143,28 @@ func HubUnregister(webConn *WebConn) { } func Publish(message *model.WebSocketEvent) { - if metrics := einterfaces.GetMetricsInterface(); metrics != nil { metrics.IncrementWebsocketEvent(message.Event) } - for _, hub := range hubs { - hub.Broadcast(message) - } + PublishSkipClusterSend(message) if einterfaces.GetClusterInterface() != nil { - einterfaces.GetClusterInterface().Publish(message) + cm := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_PUBLISH, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: message.ToJson(), + } + + if message.Event == model.WEBSOCKET_EVENT_POSTED || + message.Event == model.WEBSOCKET_EVENT_POST_EDITED || + message.Event == model.WEBSOCKET_EVENT_DIRECT_ADDED || + message.Event == model.WEBSOCKET_EVENT_GROUP_ADDED || + message.Event == model.WEBSOCKET_EVENT_ADDED_TO_TEAM { + cm.SendType = model.CLUSTER_SEND_RELIABLE + } + + einterfaces.GetClusterInterface().SendClusterMessage(cm) } } @@ -167,16 +179,28 @@ func InvalidateCacheForChannel(channel *model.Channel) { InvalidateCacheForChannelByNameSkipClusterSend(channel.TeamId, channel.Name) if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForChannel(channel.Id) - cluster.InvalidateCacheForChannelByName(channel.TeamId, channel.Name) - } -} + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: channel.Id, + } -func InvalidateCacheForChannelMembers(channelId string) { - InvalidateCacheForChannelMembersSkipClusterSend(channelId) + einterfaces.GetClusterInterface().SendClusterMessage(msg) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForChannelMembers(channelId) + nameMsg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Props: make(map[string]string), + } + + nameMsg.Props["name"] = channel.Name + if channel.TeamId == "" { + nameMsg.Props["id"] = "dm" + } else { + nameMsg.Props["id"] = channel.TeamId + } + + einterfaces.GetClusterInterface().SendClusterMessage(nameMsg) } } @@ -184,6 +208,19 @@ func InvalidateCacheForChannelSkipClusterSend(channelId string) { Srv.Store.Channel().InvalidateChannel(channelId) } +func InvalidateCacheForChannelMembers(channelId string) { + InvalidateCacheForChannelMembersSkipClusterSend(channelId) + + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: channelId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) + } +} + func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) { Srv.Store.User().InvalidateProfilesInChannelCache(channelId) Srv.Store.Channel().InvalidateMemberCount(channelId) @@ -192,8 +229,13 @@ func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) { func InvalidateCacheForChannelMembersNotifyProps(channelId string) { InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForChannelMembersNotifyProps(channelId) + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: channelId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } @@ -202,14 +244,23 @@ func InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId string } func InvalidateCacheForChannelByNameSkipClusterSend(teamId, name string) { + if teamId == "" { + teamId = "dm" + } + Srv.Store.Channel().InvalidateChannelByName(teamId, name) } func InvalidateCacheForChannelPosts(channelId string) { InvalidateCacheForChannelPostsSkipClusterSend(channelId) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForChannelPosts(channelId) + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: channelId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } @@ -221,7 +272,12 @@ func InvalidateCacheForUser(userId string) { InvalidateCacheForUserSkipClusterSend(userId) if einterfaces.GetClusterInterface() != nil { - einterfaces.GetClusterInterface().InvalidateCacheForUser(userId) + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: userId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } @@ -238,8 +294,13 @@ func InvalidateCacheForUserSkipClusterSend(userId string) { func InvalidateCacheForWebhook(webhookId string) { InvalidateCacheForWebhookSkipClusterSend(webhookId) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForWebhook(webhookId) + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: webhookId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } @@ -256,8 +317,13 @@ func InvalidateWebConnSessionCacheForUser(userId string) { func InvalidateCacheForReactions(postId string) { InvalidateCacheForReactionsSkipClusterSend(postId) - if cluster := einterfaces.GetClusterInterface(); cluster != nil { - cluster.InvalidateCacheForReactions(postId) + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS, + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: postId, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) } } -- cgit v1.2.3-1-g7c22