summaryrefslogtreecommitdiffstats
path: root/model
diff options
context:
space:
mode:
authorCorey Hulen <corey@hulen.com>2017-06-19 08:44:04 -0700
committerGitHub <noreply@github.com>2017-06-19 08:44:04 -0700
commit36f216cb7cb16958d98b3d77e121198596fd2213 (patch)
treeac2a5b79494749b3dffc2f5778092f2529c98d1a /model
parentfe48987a32fbd600458edd4e81318071ae558ba4 (diff)
downloadchat-36f216cb7cb16958d98b3d77e121198596fd2213.tar.gz
chat-36f216cb7cb16958d98b3d77e121198596fd2213.tar.bz2
chat-36f216cb7cb16958d98b3d77e121198596fd2213.zip
PLT-6080 moving clustering to memberlist (#6499)
* PLT-6080 adding cluster discovery service * Adding memberlist lib * Adding memberlist lib * WIP * WIP * WIP * WIP * Rolling back config changes * Fixing make file * Fixing config for cluster * WIP * Fixing system console for clustering * Fixing default config * Fixing config * Fixing system console for clustering * Tweaking hub setting * Bumping up time * merging vendor dir * Updating vendor dir * Fixing unit test * Fixing bad merge * Remove some testing code * Moving comment * PLT-6868 adding db ping retry * Removing unused loc strings * Adding defer to cancel
Diffstat (limited to 'model')
-rw-r--r--model/cluster_discovery.go132
-rw-r--r--model/cluster_discovery_test.go59
-rw-r--r--model/cluster_info.go49
-rw-r--r--model/cluster_info_test.go18
-rw-r--r--model/cluster_message.go55
-rw-r--r--model/cluster_message_test.go28
-rw-r--r--model/config.go51
-rw-r--r--model/utils.go19
-rw-r--r--model/utils_test.go6
9 files changed, 348 insertions, 69 deletions
diff --git a/model/cluster_discovery.go b/model/cluster_discovery.go
new file mode 100644
index 000000000..4b9269656
--- /dev/null
+++ b/model/cluster_discovery.go
@@ -0,0 +1,132 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "encoding/json"
+ "io"
+ "os"
+)
+
+const (
+ CDS_OFFLINE_AFTER_MILLIS = 1000 * 60 * 30 // 30 minutes
+ CDS_TYPE_APP = "mattermost_app"
+)
+
+type ClusterDiscovery struct {
+ Id string `json:"id"`
+ Type string `json:"type"`
+ ClusterName string `json:"cluster_name"`
+ Hostname string `json:"hostname"`
+ GossipPort int32 `json:"gossip_port"`
+ Port int32 `json:"port"`
+ CreateAt int64 `json:"create_at"`
+ LastPingAt int64 `json:"last_ping_at"`
+}
+
+func (o *ClusterDiscovery) PreSave() {
+ if o.Id == "" {
+ o.Id = NewId()
+ }
+
+ if o.CreateAt == 0 {
+ o.CreateAt = GetMillis()
+ o.LastPingAt = o.CreateAt
+ }
+}
+
+func (o *ClusterDiscovery) AutoFillHostname() {
+ // attempt to set the hostname from the OS
+ if len(o.Hostname) == 0 {
+ if hn, err := os.Hostname(); err == nil {
+ o.Hostname = hn
+ }
+ }
+}
+
+func (o *ClusterDiscovery) AutoFillIpAddress() {
+ // attempt to set the hostname to the first non-local IP address
+ if len(o.Hostname) == 0 {
+ o.Hostname = GetServerIpAddress()
+ }
+}
+
+func (o *ClusterDiscovery) IsEqual(in *ClusterDiscovery) bool {
+ if in == nil {
+ return false
+ }
+
+ if o.Type != in.Type {
+ return false
+ }
+
+ if o.ClusterName != in.ClusterName {
+ return false
+ }
+
+ if o.Hostname != in.Hostname {
+ return false
+ }
+
+ return true
+}
+
+func FilterClusterDiscovery(vs []*ClusterDiscovery, f func(*ClusterDiscovery) bool) []*ClusterDiscovery {
+ copy := make([]*ClusterDiscovery, 0)
+ for _, v := range vs {
+ if f(v) {
+ copy = append(copy, v)
+ }
+ }
+
+ return copy
+}
+
+func (o *ClusterDiscovery) IsValid() *AppError {
+ if len(o.Id) != 26 {
+ return NewLocAppError("Channel.IsValid", "model.channel.is_valid.id.app_error", nil, "")
+ }
+
+ if len(o.ClusterName) == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "ClusterName must be set", nil, "")
+ }
+
+ if len(o.Type) == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "Type must be set", nil, "")
+ }
+
+ if len(o.Hostname) == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "Hostname must be set", nil, "")
+ }
+
+ if o.CreateAt == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "CreateAt must be set", nil, "")
+ }
+
+ if o.LastPingAt == 0 {
+ return NewLocAppError("ClusterDiscovery.IsValid", "LastPingAt must be set", nil, "")
+ }
+
+ return nil
+}
+
+func (o *ClusterDiscovery) ToJson() string {
+ b, err := json.Marshal(o)
+ if err != nil {
+ return ""
+ }
+
+ return string(b)
+}
+
+func ClusterDiscoveryFromJson(data io.Reader) *ClusterDiscovery {
+ decoder := json.NewDecoder(data)
+ var me ClusterDiscovery
+ err := decoder.Decode(&me)
+ if err == nil {
+ return &me
+ }
+
+ return nil
+}
diff --git a/model/cluster_discovery_test.go b/model/cluster_discovery_test.go
new file mode 100644
index 000000000..bfbdbd303
--- /dev/null
+++ b/model/cluster_discovery_test.go
@@ -0,0 +1,59 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestClusterDiscovery(t *testing.T) {
+ o := ClusterDiscovery{
+ Type: "test_type",
+ ClusterName: "cluster_name",
+ Hostname: "test_hostname",
+ }
+
+ json := o.ToJson()
+ result1 := ClusterDiscoveryFromJson(strings.NewReader(json))
+
+ if result1.ClusterName != "cluster_name" {
+ t.Fatal("should be set")
+ }
+
+ result2 := ClusterDiscoveryFromJson(strings.NewReader(json))
+ result3 := ClusterDiscoveryFromJson(strings.NewReader(json))
+
+ o.Id = "0"
+ result1.Id = "1"
+ result2.Id = "2"
+ result3.Id = "3"
+ result3.Hostname = "something_diff"
+
+ if !o.IsEqual(result1) {
+ t.Fatal("Should be equal")
+ }
+
+ list := make([]*ClusterDiscovery, 0)
+ list = append(list, &o)
+ list = append(list, result1)
+ list = append(list, result2)
+ list = append(list, result3)
+
+ rlist := FilterClusterDiscovery(list, func(in *ClusterDiscovery) bool {
+ return !o.IsEqual(in)
+ })
+
+ if len(rlist) != 1 {
+ t.Fatal("should only have 1 result")
+ }
+
+ o.AutoFillHostname()
+ o.Hostname = ""
+ o.AutoFillHostname()
+
+ o.AutoFillIpAddress()
+ o.Hostname = ""
+ o.AutoFillIpAddress()
+}
diff --git a/model/cluster_info.go b/model/cluster_info.go
index f76a03c0b..1e468044e 100644
--- a/model/cluster_info.go
+++ b/model/cluster_info.go
@@ -7,24 +7,16 @@ import (
"encoding/json"
"io"
"strings"
- "sync"
- "sync/atomic"
)
type ClusterInfo struct {
- Id string `json:"id"`
- Version string `json:"version"`
- ConfigHash string `json:"config_hash"`
- InterNodeUrl string `json:"internode_url"`
- Hostname string `json:"hostname"`
- LastSuccessfulPing int64 `json:"last_ping"`
- Alive int32 `json:"is_alive"`
- Mutex sync.RWMutex `json:"-"`
+ Version string `json:"version"`
+ ConfigHash string `json:"config_hash"`
+ IpAddress string `json:"ipaddress"`
+ Hostname string `json:"hostname"`
}
func (me *ClusterInfo) ToJson() string {
- me.Mutex.RLock()
- defer me.Mutex.RUnlock()
b, err := json.Marshal(me)
if err != nil {
return ""
@@ -41,7 +33,6 @@ func (me *ClusterInfo) Copy() *ClusterInfo {
func ClusterInfoFromJson(data io.Reader) *ClusterInfo {
decoder := json.NewDecoder(data)
var me ClusterInfo
- me.Mutex = sync.RWMutex{}
err := decoder.Decode(&me)
if err == nil {
return &me
@@ -50,38 +41,6 @@ func ClusterInfoFromJson(data io.Reader) *ClusterInfo {
}
}
-func (me *ClusterInfo) SetAlive(alive bool) {
- if alive {
- atomic.StoreInt32(&me.Alive, 1)
- } else {
- atomic.StoreInt32(&me.Alive, 0)
- }
-}
-
-func (me *ClusterInfo) IsAlive() bool {
- return atomic.LoadInt32(&me.Alive) == 1
-}
-
-func (me *ClusterInfo) HaveEstablishedInitialContact() bool {
- me.Mutex.RLock()
- defer me.Mutex.RUnlock()
- if me.Id != "" {
- return true
- }
-
- return false
-}
-
-func (me *ClusterInfo) IdEqualTo(in string) bool {
- me.Mutex.RLock()
- defer me.Mutex.RUnlock()
- if me.Id == in {
- return true
- }
-
- return false
-}
-
func ClusterInfosToJson(objmap []*ClusterInfo) string {
if b, err := json.Marshal(objmap); err != nil {
return ""
diff --git a/model/cluster_info_test.go b/model/cluster_info_test.go
index 038927120..c019df40a 100644
--- a/model/cluster_info_test.go
+++ b/model/cluster_info_test.go
@@ -9,33 +9,23 @@ import (
)
func TestClusterInfoJson(t *testing.T) {
- cluster := ClusterInfo{Id: NewId(), InterNodeUrl: NewId(), Hostname: NewId()}
+ cluster := ClusterInfo{IpAddress: NewId(), Hostname: NewId()}
json := cluster.ToJson()
result := ClusterInfoFromJson(strings.NewReader(json))
- if cluster.Id != result.Id {
+ if cluster.IpAddress != result.IpAddress {
t.Fatal("Ids do not match")
}
-
- cluster.SetAlive(true)
- if !cluster.IsAlive() {
- t.Fatal("should be live")
- }
-
- cluster.SetAlive(false)
- if cluster.IsAlive() {
- t.Fatal("should be not live")
- }
}
func TestClusterInfosJson(t *testing.T) {
- cluster := ClusterInfo{Id: NewId(), InterNodeUrl: NewId(), Hostname: NewId()}
+ cluster := ClusterInfo{IpAddress: NewId(), Hostname: NewId()}
clusterInfos := make([]*ClusterInfo, 1)
clusterInfos[0] = &cluster
json := ClusterInfosToJson(clusterInfos)
result := ClusterInfosFromJson(strings.NewReader(json))
- if clusterInfos[0].Id != result[0].Id {
+ if clusterInfos[0].IpAddress != result[0].IpAddress {
t.Fatal("Ids do not match")
}
diff --git a/model/cluster_message.go b/model/cluster_message.go
new file mode 100644
index 000000000..a6dec2e7f
--- /dev/null
+++ b/model/cluster_message.go
@@ -0,0 +1,55 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "encoding/json"
+ "io"
+)
+
+const (
+ CLUSTER_EVENT_PUBLISH = "publish"
+ CLUSTER_EVENT_UPDATE_STATUS = "update_status"
+ CLUSTER_EVENT_INVALIDATE_ALL_CACHES = "inv_all_caches"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS = "inv_reactions"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOK = "inv_webhook"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_POSTS = "inv_channel_posts"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS_NOTIFY_PROPS = "inv_channel_members_notify_props"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBERS = "inv_channel_members"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME = "inv_channel_name"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL = "inv_channel"
+ CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER = "inv_user"
+ CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER = "clear_session_user"
+
+ CLUSTER_SEND_BEST_EFFORT = "best_effort"
+ CLUSTER_SEND_RELIABLE = "reliable"
+)
+
+type ClusterMessage struct {
+ Event string `json:"event"`
+ SendType string `json:"-"`
+ WaitForAllToSend bool `json:"-"`
+ Data string `json:"data,omitempty"`
+ Props map[string]string `json:"props,omitempty"`
+}
+
+func (o *ClusterMessage) ToJson() string {
+ b, err := json.Marshal(o)
+ if err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+func ClusterMessageFromJson(data io.Reader) *ClusterMessage {
+ decoder := json.NewDecoder(data)
+ var o ClusterMessage
+ err := decoder.Decode(&o)
+ if err == nil {
+ return &o
+ } else {
+ return nil
+ }
+}
diff --git a/model/cluster_message_test.go b/model/cluster_message_test.go
new file mode 100644
index 000000000..38603e577
--- /dev/null
+++ b/model/cluster_message_test.go
@@ -0,0 +1,28 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestClusterMessage(t *testing.T) {
+ m := ClusterMessage{
+ Event: CLUSTER_EVENT_PUBLISH,
+ SendType: CLUSTER_SEND_BEST_EFFORT,
+ Data: "hello",
+ }
+ json := m.ToJson()
+ result := ClusterMessageFromJson(strings.NewReader(json))
+
+ if result.Data != "hello" {
+ t.Fatal()
+ }
+
+ badresult := ClusterMessageFromJson(strings.NewReader("junk"))
+ if badresult != nil {
+ t.Fatal("should not have parsed")
+ }
+}
diff --git a/model/config.go b/model/config.go
index 4e3a3f7cc..f2b17bced 100644
--- a/model/config.go
+++ b/model/config.go
@@ -163,9 +163,14 @@ type ServiceSettings struct {
}
type ClusterSettings struct {
- Enable *bool
- InterNodeListenAddress *string
- InterNodeUrls []string
+ Enable *bool
+ ClusterName *string
+ OverrideHostname *string
+ UseIpAddress *bool
+ UseExperimentalGossip *bool
+ ReadOnlyConfig *bool
+ GossipPort *int
+ StreamingPort *int
}
type MetricsSettings struct {
@@ -1036,18 +1041,44 @@ func (o *Config) SetDefaults() {
*o.ServiceSettings.PostEditTimeLimit = 300
}
- if o.ClusterSettings.InterNodeListenAddress == nil {
- o.ClusterSettings.InterNodeListenAddress = new(string)
- *o.ClusterSettings.InterNodeListenAddress = ":8075"
- }
-
if o.ClusterSettings.Enable == nil {
o.ClusterSettings.Enable = new(bool)
*o.ClusterSettings.Enable = false
}
- if o.ClusterSettings.InterNodeUrls == nil {
- o.ClusterSettings.InterNodeUrls = []string{}
+ if o.ClusterSettings.ClusterName == nil {
+ o.ClusterSettings.ClusterName = new(string)
+ *o.ClusterSettings.ClusterName = ""
+ }
+
+ if o.ClusterSettings.OverrideHostname == nil {
+ o.ClusterSettings.OverrideHostname = new(string)
+ *o.ClusterSettings.OverrideHostname = ""
+ }
+
+ if o.ClusterSettings.UseIpAddress == nil {
+ o.ClusterSettings.UseIpAddress = new(bool)
+ *o.ClusterSettings.UseIpAddress = true
+ }
+
+ if o.ClusterSettings.UseExperimentalGossip == nil {
+ o.ClusterSettings.UseExperimentalGossip = new(bool)
+ *o.ClusterSettings.UseExperimentalGossip = false
+ }
+
+ if o.ClusterSettings.ReadOnlyConfig == nil {
+ o.ClusterSettings.ReadOnlyConfig = new(bool)
+ *o.ClusterSettings.ReadOnlyConfig = true
+ }
+
+ if o.ClusterSettings.GossipPort == nil {
+ o.ClusterSettings.GossipPort = new(int)
+ *o.ClusterSettings.GossipPort = 8074
+ }
+
+ if o.ClusterSettings.StreamingPort == nil {
+ o.ClusterSettings.StreamingPort = new(int)
+ *o.ClusterSettings.StreamingPort = 8075
}
if o.MetricsSettings.ListenAddress == nil {
diff --git a/model/utils.go b/model/utils.go
index d24540683..e7d8bfdac 100644
--- a/model/utils.go
+++ b/model/utils.go
@@ -17,6 +17,8 @@ import (
"strings"
"time"
+ "net"
+
goi18n "github.com/nicksnyder/go-i18n/i18n"
"github.com/pborman/uuid"
)
@@ -264,6 +266,23 @@ func StringFromJson(data io.Reader) string {
}
}
+func GetServerIpAddress() string {
+ if addrs, err := net.InterfaceAddrs(); err != nil {
+ return ""
+ } else {
+ for _, addr := range addrs {
+
+ if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() {
+ if ip.IP.To4() != nil {
+ return ip.IP.String()
+ }
+ }
+ }
+ }
+
+ return ""
+}
+
func IsLower(s string) bool {
if strings.ToLower(s) == s {
return true
diff --git a/model/utils_test.go b/model/utils_test.go
index 94ee55aa9..bc2aa6ce7 100644
--- a/model/utils_test.go
+++ b/model/utils_test.go
@@ -193,6 +193,12 @@ func TestIsValidAlphaNum(t *testing.T) {
}
}
+func TestGetServerIpAddress(t *testing.T) {
+ if len(GetServerIpAddress()) == 0 {
+ t.Fatal("Should find local ip address")
+ }
+}
+
func TestIsValidAlphaNumHyphenUnderscore(t *testing.T) {
casesWithFormat := []struct {
Input string