summaryrefslogtreecommitdiffstats
path: root/app/web_hub.go
diff options
context:
space:
mode:
authorCorey Hulen <corey@hulen.com>2017-06-19 08:44:04 -0700
committerGitHub <noreply@github.com>2017-06-19 08:44:04 -0700
commit36f216cb7cb16958d98b3d77e121198596fd2213 (patch)
treeac2a5b79494749b3dffc2f5778092f2529c98d1a /app/web_hub.go
parentfe48987a32fbd600458edd4e81318071ae558ba4 (diff)
downloadchat-36f216cb7cb16958d98b3d77e121198596fd2213.tar.gz
chat-36f216cb7cb16958d98b3d77e121198596fd2213.tar.bz2
chat-36f216cb7cb16958d98b3d77e121198596fd2213.zip
PLT-6080 moving clustering to memberlist (#6499)
* PLT-6080 adding cluster discovery service * Adding memberlist lib * Adding memberlist lib * WIP * WIP * WIP * WIP * Rolling back config changes * Fixing make file * Fixing config for cluster * WIP * Fixing system console for clustering * Fixing default config * Fixing config * Fixing system console for clustering * Tweaking hub setting * Bumping up time * merging vendor dir * Updating vendor dir * Fixing unit test * Fixing bad merge * Remove some testing code * Moving comment * PLT-6868 adding db ping retry * Removing unused loc strings * Adding defer to cancel
Diffstat (limited to 'app/web_hub.go')
-rw-r--r--app/web_hub.go116
1 files changed, 91 insertions, 25 deletions
diff --git a/app/web_hub.go b/app/web_hub.go
index 6b61430dc..cadad0de4 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -64,10 +64,11 @@ func TotalWebsocketConnections() int {
}
func HubStart() {
- l4g.Info(utils.T("api.web_hub.start.starting.debug"), runtime.NumCPU()*2)
-
// Total number of hubs is twice the number of CPUs.
- hubs = make([]*Hub, runtime.NumCPU()*2)
+ numberOfHubs := runtime.NumCPU() * 2
+ l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs)
+
+ hubs = make([]*Hub, numberOfHubs)
for i := 0; i < len(hubs); i++ {
hubs[i] = NewWebHub()
@@ -142,17 +143,28 @@ func HubUnregister(webConn *WebConn) {
}
func Publish(message *model.WebSocketEvent) {
-
if metrics := einterfaces.GetMetricsInterface(); metrics != nil {
metrics.IncrementWebsocketEvent(message.Event)
}
- for _, hub := range hubs {
- hub.Broadcast(message)
- }
+ PublishSkipClusterSend(message)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().Publish(message)
+ cm := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_PUBLISH,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: message.ToJson(),
+ }
+
+ if message.Event == model.WEBSOCKET_EVENT_POSTED ||
+ message.Event == model.WEBSOCKET_EVENT_POST_EDITED ||
+ message.Event == model.WEBSOCKET_EVENT_DIRECT_ADDED ||
+ message.Event == model.WEBSOCKET_EVENT_GROUP_ADDED ||
+ message.Event == model.WEBSOCKET_EVENT_ADDED_TO_TEAM {
+ cm.SendType = model.CLUSTER_SEND_RELIABLE
+ }
+
+ einterfaces.GetClusterInterface().SendClusterMessage(cm)
}
}
@@ -167,16 +179,28 @@ func InvalidateCacheForChannel(channel *model.Channel) {
InvalidateCacheForChannelByNameSkipClusterSend(channel.TeamId, channel.Name)
if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannel(channel.Id)
- cluster.InvalidateCacheForChannelByName(channel.TeamId, channel.Name)
- }
-}
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channel.Id,
+ }
-func InvalidateCacheForChannelMembers(channelId string) {
- InvalidateCacheForChannelMembersSkipClusterSend(channelId)
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannelMembers(channelId)
+ nameMsg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Props: make(map[string]string),
+ }
+
+ nameMsg.Props["name"] = channel.Name
+ if channel.TeamId == "" {
+ nameMsg.Props["id"] = "dm"
+ } else {
+ nameMsg.Props["id"] = channel.TeamId
+ }
+
+ einterfaces.GetClusterInterface().SendClusterMessage(nameMsg)
}
}
@@ -184,6 +208,19 @@ func InvalidateCacheForChannelSkipClusterSend(channelId string) {
Srv.Store.Channel().InvalidateChannel(channelId)
}
+func InvalidateCacheForChannelMembers(channelId string) {
+ InvalidateCacheForChannelMembersSkipClusterSend(channelId)
+
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channelId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
+ }
+}
+
func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) {
Srv.Store.User().InvalidateProfilesInChannelCache(channelId)
Srv.Store.Channel().InvalidateMemberCount(channelId)
@@ -192,8 +229,13 @@ func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) {
func InvalidateCacheForChannelMembersNotifyProps(channelId string) {
InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannelMembersNotifyProps(channelId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channelId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -202,14 +244,23 @@ func InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId string
}
func InvalidateCacheForChannelByNameSkipClusterSend(teamId, name string) {
+ if teamId == "" {
+ teamId = "dm"
+ }
+
Srv.Store.Channel().InvalidateChannelByName(teamId, name)
}
func InvalidateCacheForChannelPosts(channelId string) {
InvalidateCacheForChannelPostsSkipClusterSend(channelId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannelPosts(channelId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channelId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -221,7 +272,12 @@ func InvalidateCacheForUser(userId string) {
InvalidateCacheForUserSkipClusterSend(userId)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().InvalidateCacheForUser(userId)
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: userId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -238,8 +294,13 @@ func InvalidateCacheForUserSkipClusterSend(userId string) {
func InvalidateCacheForWebhook(webhookId string) {
InvalidateCacheForWebhookSkipClusterSend(webhookId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForWebhook(webhookId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: webhookId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -256,8 +317,13 @@ func InvalidateWebConnSessionCacheForUser(userId string) {
func InvalidateCacheForReactions(postId string) {
InvalidateCacheForReactionsSkipClusterSend(postId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForReactions(postId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: postId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}