summaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
authorCorey Hulen <corey@hulen.com>2017-06-19 08:44:04 -0700
committerGitHub <noreply@github.com>2017-06-19 08:44:04 -0700
commit36f216cb7cb16958d98b3d77e121198596fd2213 (patch)
treeac2a5b79494749b3dffc2f5778092f2529c98d1a /app
parentfe48987a32fbd600458edd4e81318071ae558ba4 (diff)
downloadchat-36f216cb7cb16958d98b3d77e121198596fd2213.tar.gz
chat-36f216cb7cb16958d98b3d77e121198596fd2213.tar.bz2
chat-36f216cb7cb16958d98b3d77e121198596fd2213.zip
PLT-6080 moving clustering to memberlist (#6499)
* PLT-6080 adding cluster discovery service * Adding memberlist lib * Adding memberlist lib * WIP * WIP * WIP * WIP * Rolling back config changes * Fixing make file * Fixing config for cluster * WIP * Fixing system console for clustering * Fixing default config * Fixing config * Fixing system console for clustering * Tweaking hub setting * Bumping up time * merging vendor dir * Updating vendor dir * Fixing unit test * Fixing bad merge * Remove some testing code * Moving comment * PLT-6868 adding db ping retry * Removing unused loc strings * Adding defer to cancel
Diffstat (limited to 'app')
-rw-r--r--app/admin.go44
-rw-r--r--app/cluster_discovery.go77
-rw-r--r--app/cluster_discovery_test.go27
-rw-r--r--app/cluster_handlers.go77
-rw-r--r--app/session.go7
-rw-r--r--app/status.go7
-rw-r--r--app/web_hub.go116
7 files changed, 313 insertions, 42 deletions
diff --git a/app/admin.go b/app/admin.go
index 103c4617b..4f8125106 100644
--- a/app/admin.go
+++ b/app/admin.go
@@ -19,12 +19,23 @@ import (
)
func GetLogs(page, perPage int) ([]string, *model.AppError) {
- lines, err := GetLogsSkipSend(page, perPage)
+ var lines []string
+ if einterfaces.GetClusterInterface() != nil && *utils.Cfg.ClusterSettings.Enable {
+ lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
+ lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
+ lines = append(lines, einterfaces.GetClusterInterface().GetClusterId())
+ lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
+ lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
+ }
+
+ melines, err := GetLogsSkipSend(page, perPage)
if err != nil {
return nil, err
}
- if einterfaces.GetClusterInterface() != nil {
+ lines = append(lines, melines...)
+
+ if einterfaces.GetClusterInterface() != nil && *utils.Cfg.ClusterSettings.Enable {
clines, err := einterfaces.GetClusterInterface().GetLogs(page, perPage)
if err != nil {
return nil, err
@@ -84,10 +95,14 @@ func InvalidateAllCaches() *model.AppError {
InvalidateAllCachesSkipSend()
if einterfaces.GetClusterInterface() != nil {
- err := einterfaces.GetClusterInterface().InvalidateAllCaches()
- if err != nil {
- return err
+
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_ALL_CACHES,
+ SendType: model.CLUSTER_SEND_RELIABLE,
+ WaitForAllToSend: true,
}
+
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
return nil
@@ -120,7 +135,8 @@ func ReloadConfig() {
InitEmailBatching()
}
-func SaveConfig(cfg *model.Config) *model.AppError {
+func SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool) *model.AppError {
+ oldCfg := utils.Cfg
cfg.SetDefaults()
utils.Desanitize(cfg)
@@ -132,7 +148,7 @@ func SaveConfig(cfg *model.Config) *model.AppError {
return err
}
- if *utils.Cfg.ClusterSettings.Enable {
+ if *utils.Cfg.ClusterSettings.Enable && *utils.Cfg.ClusterSettings.ReadOnlyConfig {
return model.NewLocAppError("saveConfig", "ent.cluster.save_config.error", nil, "")
}
@@ -149,14 +165,12 @@ func SaveConfig(cfg *model.Config) *model.AppError {
}
}
- // oldCfg := utils.Cfg
- // Future feature is to sync the configuration files
- // if einterfaces.GetClusterInterface() != nil {
- // err := einterfaces.GetClusterInterface().ConfigChanged(cfg, oldCfg, true)
- // if err != nil {
- // return err
- // }
- // }
+ if einterfaces.GetClusterInterface() != nil {
+ err := einterfaces.GetClusterInterface().ConfigChanged(cfg, oldCfg, sendConfigChangeClusterMessage)
+ if err != nil {
+ return err
+ }
+ }
// start/restart email batching job if necessary
InitEmailBatching()
diff --git a/app/cluster_discovery.go b/app/cluster_discovery.go
new file mode 100644
index 000000000..6584418f1
--- /dev/null
+++ b/app/cluster_discovery.go
@@ -0,0 +1,77 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "fmt"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+const (
+ DISCOVERY_SERVICE_WRITE_PING = 60 * time.Second
+)
+
+type ClusterDiscoveryService struct {
+ model.ClusterDiscovery
+ stop chan bool
+}
+
+func NewClusterDiscoveryService() *ClusterDiscoveryService {
+ ds := &ClusterDiscoveryService{
+ ClusterDiscovery: model.ClusterDiscovery{},
+ stop: make(chan bool),
+ }
+
+ return ds
+}
+
+func (me *ClusterDiscoveryService) Start() {
+
+ <-Srv.Store.ClusterDiscovery().Cleanup()
+
+ if cresult := <-Srv.Store.ClusterDiscovery().Exists(&me.ClusterDiscovery); cresult.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to check if row exists for %v with err=%v", me.ClusterDiscovery.ToJson(), cresult.Err))
+ } else {
+ if cresult.Data.(bool) {
+ if u := <-Srv.Store.ClusterDiscovery().Delete(&me.ClusterDiscovery); u.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to start clean for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err))
+ }
+ }
+ }
+
+ if result := <-Srv.Store.ClusterDiscovery().Save(&me.ClusterDiscovery); result.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to save for %v with err=%v", me.ClusterDiscovery.ToJson(), result.Err))
+ return
+ }
+
+ go func() {
+ l4g.Debug(fmt.Sprintf("ClusterDiscoveryService ping writer started for %v", me.ClusterDiscovery.ToJson()))
+ ticker := time.NewTicker(DISCOVERY_SERVICE_WRITE_PING)
+ defer func() {
+ ticker.Stop()
+ if u := <-Srv.Store.ClusterDiscovery().Delete(&me.ClusterDiscovery); u.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to cleanup for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err))
+ }
+ l4g.Debug(fmt.Sprintf("ClusterDiscoveryService ping writer stopped for %v", me.ClusterDiscovery.ToJson()))
+ }()
+
+ for {
+ select {
+ case <-ticker.C:
+ if u := <-Srv.Store.ClusterDiscovery().SetLastPingAt(&me.ClusterDiscovery); u.Err != nil {
+ l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to write ping for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err))
+ }
+ case <-me.stop:
+ return
+ }
+ }
+ }()
+}
+
+func (me *ClusterDiscoveryService) Stop() {
+ me.stop <- true
+}
diff --git a/app/cluster_discovery_test.go b/app/cluster_discovery_test.go
new file mode 100644
index 000000000..ca5b1bfa4
--- /dev/null
+++ b/app/cluster_discovery_test.go
@@ -0,0 +1,27 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "testing"
+
+ "time"
+
+ "github.com/mattermost/platform/model"
+)
+
+func TestClusterDiscoveryService(t *testing.T) {
+ Setup()
+
+ ds := NewClusterDiscoveryService()
+ ds.Type = model.CDS_TYPE_APP
+ ds.ClusterName = "ClusterA"
+ ds.AutoFillHostname()
+
+ ds.Start()
+ time.Sleep(2 * time.Second)
+
+ ds.Stop()
+ time.Sleep(2 * time.Second)
+}
diff --git a/app/cluster_handlers.go b/app/cluster_handlers.go
new file mode 100644
index 000000000..d15bb851a
--- /dev/null
+++ b/app/cluster_handlers.go
@@ -0,0 +1,77 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "strings"
+
+ "github.com/mattermost/platform/einterfaces"
+ "github.com/mattermost/platform/model"
+)
+
+func RegisterAllClusterMessageHandlers() {
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_PUBLISH, ClusterPublishHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_UPDATE_STATUS, ClusterUpdateStatusHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_ALL_CACHES, ClusterInvalidateAllCachesHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS, ClusterInvalidateCacheForReactionsHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK, ClusterInvalidateCacheForWebhookHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS, ClusterInvalidateCacheForChannelPostsHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS, ClusterInvalidateCacheForChannelMembersNotifyPropHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS, ClusterInvalidateCacheForChannelMembersHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME, ClusterInvalidateCacheForChannelByNameHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL, ClusterInvalidateCacheForChannelHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER, ClusterInvalidateCacheForUserHandler)
+ einterfaces.GetClusterInterface().RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, ClusterClearSessionCacheForUserHandler)
+
+}
+
+func ClusterPublishHandler(msg *model.ClusterMessage) {
+ event := model.WebSocketEventFromJson(strings.NewReader(msg.Data))
+ PublishSkipClusterSend(event)
+}
+
+func ClusterUpdateStatusHandler(msg *model.ClusterMessage) {
+ status := model.StatusFromJson(strings.NewReader(msg.Data))
+ AddStatusCacheSkipClusterSend(status)
+}
+
+func ClusterInvalidateAllCachesHandler(msg *model.ClusterMessage) {
+ InvalidateAllCachesSkipSend()
+}
+
+func ClusterInvalidateCacheForReactionsHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForReactionsSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForWebhookHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForWebhookSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForChannelPostsHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForWebhookSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForChannelMembersNotifyPropHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForChannelMembersHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForChannelMembersSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForChannelByNameHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForChannelByNameSkipClusterSend(msg.Props["id"], msg.Props["name"])
+}
+
+func ClusterInvalidateCacheForChannelHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForChannelSkipClusterSend(msg.Data)
+}
+
+func ClusterInvalidateCacheForUserHandler(msg *model.ClusterMessage) {
+ InvalidateCacheForUserSkipClusterSend(msg.Data)
+}
+
+func ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) {
+ ClearSessionCacheForUserSkipClusterSend(msg.Data)
+}
diff --git a/app/session.go b/app/session.go
index 7290bfd88..4b1ea18f2 100644
--- a/app/session.go
+++ b/app/session.go
@@ -101,7 +101,12 @@ func ClearSessionCacheForUser(userId string) {
ClearSessionCacheForUserSkipClusterSend(userId)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().ClearSessionCacheForUser(userId)
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: userId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
diff --git a/app/status.go b/app/status.go
index 868e57563..9f6ad2e05 100644
--- a/app/status.go
+++ b/app/status.go
@@ -26,7 +26,12 @@ func AddStatusCache(status *model.Status) {
AddStatusCacheSkipClusterSend(status)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().UpdateStatus(status)
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_UPDATE_STATUS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: status.ToJson(),
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
diff --git a/app/web_hub.go b/app/web_hub.go
index 6b61430dc..cadad0de4 100644
--- a/app/web_hub.go
+++ b/app/web_hub.go
@@ -64,10 +64,11 @@ func TotalWebsocketConnections() int {
}
func HubStart() {
- l4g.Info(utils.T("api.web_hub.start.starting.debug"), runtime.NumCPU()*2)
-
// Total number of hubs is twice the number of CPUs.
- hubs = make([]*Hub, runtime.NumCPU()*2)
+ numberOfHubs := runtime.NumCPU() * 2
+ l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs)
+
+ hubs = make([]*Hub, numberOfHubs)
for i := 0; i < len(hubs); i++ {
hubs[i] = NewWebHub()
@@ -142,17 +143,28 @@ func HubUnregister(webConn *WebConn) {
}
func Publish(message *model.WebSocketEvent) {
-
if metrics := einterfaces.GetMetricsInterface(); metrics != nil {
metrics.IncrementWebsocketEvent(message.Event)
}
- for _, hub := range hubs {
- hub.Broadcast(message)
- }
+ PublishSkipClusterSend(message)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().Publish(message)
+ cm := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_PUBLISH,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: message.ToJson(),
+ }
+
+ if message.Event == model.WEBSOCKET_EVENT_POSTED ||
+ message.Event == model.WEBSOCKET_EVENT_POST_EDITED ||
+ message.Event == model.WEBSOCKET_EVENT_DIRECT_ADDED ||
+ message.Event == model.WEBSOCKET_EVENT_GROUP_ADDED ||
+ message.Event == model.WEBSOCKET_EVENT_ADDED_TO_TEAM {
+ cm.SendType = model.CLUSTER_SEND_RELIABLE
+ }
+
+ einterfaces.GetClusterInterface().SendClusterMessage(cm)
}
}
@@ -167,16 +179,28 @@ func InvalidateCacheForChannel(channel *model.Channel) {
InvalidateCacheForChannelByNameSkipClusterSend(channel.TeamId, channel.Name)
if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannel(channel.Id)
- cluster.InvalidateCacheForChannelByName(channel.TeamId, channel.Name)
- }
-}
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channel.Id,
+ }
-func InvalidateCacheForChannelMembers(channelId string) {
- InvalidateCacheForChannelMembersSkipClusterSend(channelId)
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannelMembers(channelId)
+ nameMsg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Props: make(map[string]string),
+ }
+
+ nameMsg.Props["name"] = channel.Name
+ if channel.TeamId == "" {
+ nameMsg.Props["id"] = "dm"
+ } else {
+ nameMsg.Props["id"] = channel.TeamId
+ }
+
+ einterfaces.GetClusterInterface().SendClusterMessage(nameMsg)
}
}
@@ -184,6 +208,19 @@ func InvalidateCacheForChannelSkipClusterSend(channelId string) {
Srv.Store.Channel().InvalidateChannel(channelId)
}
+func InvalidateCacheForChannelMembers(channelId string) {
+ InvalidateCacheForChannelMembersSkipClusterSend(channelId)
+
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channelId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
+ }
+}
+
func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) {
Srv.Store.User().InvalidateProfilesInChannelCache(channelId)
Srv.Store.Channel().InvalidateMemberCount(channelId)
@@ -192,8 +229,13 @@ func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) {
func InvalidateCacheForChannelMembersNotifyProps(channelId string) {
InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannelMembersNotifyProps(channelId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channelId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -202,14 +244,23 @@ func InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId string
}
func InvalidateCacheForChannelByNameSkipClusterSend(teamId, name string) {
+ if teamId == "" {
+ teamId = "dm"
+ }
+
Srv.Store.Channel().InvalidateChannelByName(teamId, name)
}
func InvalidateCacheForChannelPosts(channelId string) {
InvalidateCacheForChannelPostsSkipClusterSend(channelId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForChannelPosts(channelId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: channelId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -221,7 +272,12 @@ func InvalidateCacheForUser(userId string) {
InvalidateCacheForUserSkipClusterSend(userId)
if einterfaces.GetClusterInterface() != nil {
- einterfaces.GetClusterInterface().InvalidateCacheForUser(userId)
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: userId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -238,8 +294,13 @@ func InvalidateCacheForUserSkipClusterSend(userId string) {
func InvalidateCacheForWebhook(webhookId string) {
InvalidateCacheForWebhookSkipClusterSend(webhookId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForWebhook(webhookId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: webhookId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}
@@ -256,8 +317,13 @@ func InvalidateWebConnSessionCacheForUser(userId string) {
func InvalidateCacheForReactions(postId string) {
InvalidateCacheForReactionsSkipClusterSend(postId)
- if cluster := einterfaces.GetClusterInterface(); cluster != nil {
- cluster.InvalidateCacheForReactions(postId)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS,
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: postId,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
}
}