summaryrefslogtreecommitdiffstats
path: root/store
diff options
context:
space:
mode:
authorCorey Hulen <corey@hulen.com>2017-06-19 08:44:04 -0700
committerGitHub <noreply@github.com>2017-06-19 08:44:04 -0700
commit36f216cb7cb16958d98b3d77e121198596fd2213 (patch)
treeac2a5b79494749b3dffc2f5778092f2529c98d1a /store
parentfe48987a32fbd600458edd4e81318071ae558ba4 (diff)
downloadchat-36f216cb7cb16958d98b3d77e121198596fd2213.tar.gz
chat-36f216cb7cb16958d98b3d77e121198596fd2213.tar.bz2
chat-36f216cb7cb16958d98b3d77e121198596fd2213.zip
PLT-6080 moving clustering to memberlist (#6499)
* PLT-6080 adding cluster discovery service * Adding memberlist lib * Adding memberlist lib * WIP * WIP * WIP * WIP * Rolling back config changes * Fixing make file * Fixing config for cluster * WIP * Fixing system console for clustering * Fixing default config * Fixing config * Fixing system console for clustering * Tweaking hub setting * Bumping up time * merging vendor dir * Updating vendor dir * Fixing unit test * Fixing bad merge * Remove some testing code * Moving comment * PLT-6868 adding db ping retry * Removing unused loc strings * Adding defer to cancel
Diffstat (limited to 'store')
-rw-r--r--store/sql_cluster_discovery_store.go226
-rw-r--r--store/sql_cluster_discovery_store_test.go201
-rw-r--r--store/sql_store.go82
-rw-r--r--store/store.go10
4 files changed, 488 insertions, 31 deletions
diff --git a/store/sql_cluster_discovery_store.go b/store/sql_cluster_discovery_store.go
new file mode 100644
index 000000000..81d3d6e11
--- /dev/null
+++ b/store/sql_cluster_discovery_store.go
@@ -0,0 +1,226 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "github.com/mattermost/platform/model"
+)
+
+type sqlClusterDiscoveryStore struct {
+ *SqlStore
+}
+
+func NewSqlClusterDiscoveryStore(sqlStore *SqlStore) ClusterDiscoveryStore {
+ s := &sqlClusterDiscoveryStore{sqlStore}
+
+ for _, db := range sqlStore.GetAllConns() {
+ table := db.AddTableWithName(model.ClusterDiscovery{}, "ClusterDiscovery").SetKeys(false, "Id")
+ table.ColMap("Id").SetMaxSize(26)
+ table.ColMap("Type").SetMaxSize(64)
+ table.ColMap("ClusterName").SetMaxSize(64)
+ table.ColMap("Hostname").SetMaxSize(512)
+ }
+
+ return s
+}
+
+func (s sqlClusterDiscoveryStore) Save(ClusterDiscovery *model.ClusterDiscovery) StoreChannel {
+
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ ClusterDiscovery.PreSave()
+ if result.Err = ClusterDiscovery.IsValid(); result.Err != nil {
+ storeChannel <- result
+ close(storeChannel)
+ return
+ }
+
+ if err := s.GetMaster().Insert(ClusterDiscovery); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Save", "Failed to save ClusterDiscovery row", nil, err.Error())
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) Delete(ClusterDiscovery *model.ClusterDiscovery) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+ result.Data = false
+
+ if count, err := s.GetMaster().SelectInt(
+ `
+ DELETE
+ FROM
+ ClusterDiscovery
+ WHERE
+ Type = :Type
+ AND ClusterName = :ClusterName
+ AND Hostname = :Hostname
+ `,
+ map[string]interface{}{
+ "Type": ClusterDiscovery.Type,
+ "ClusterName": ClusterDiscovery.ClusterName,
+ "Hostname": ClusterDiscovery.Hostname,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Delete", "Failed to delete", nil, err.Error())
+ } else {
+ if count > 0 {
+ result.Data = true
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) Exists(ClusterDiscovery *model.ClusterDiscovery) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+ result.Data = false
+
+ if count, err := s.GetMaster().SelectInt(
+ `
+ SELECT
+ COUNT(*)
+ FROM
+ ClusterDiscovery
+ WHERE
+ Type = :Type
+ AND ClusterName = :ClusterName
+ AND Hostname = :Hostname
+ `,
+ map[string]interface{}{
+ "Type": ClusterDiscovery.Type,
+ "ClusterName": ClusterDiscovery.ClusterName,
+ "Hostname": ClusterDiscovery.Hostname,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Exists", "Failed to check if it exists", nil, err.Error())
+ } else {
+ if count > 0 {
+ result.Data = true
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) GetAll(ClusterDiscoveryType, clusterName string) StoreChannel {
+
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ lastPingAt := model.GetMillis() - model.CDS_OFFLINE_AFTER_MILLIS
+
+ var list []*model.ClusterDiscovery
+ if _, err := s.GetMaster().Select(
+ &list,
+ `
+ SELECT
+ *
+ FROM
+ ClusterDiscovery
+ WHERE
+ Type = :ClusterDiscoveryType
+ AND ClusterName = :ClusterName
+ AND LastPingAt > :LastPingAt
+ `,
+ map[string]interface{}{
+ "ClusterDiscoveryType": ClusterDiscoveryType,
+ "ClusterName": clusterName,
+ "LastPingAt": lastPingAt,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.GetAllForType", "Failed to get all disoery rows", nil, err.Error())
+ } else {
+ result.Data = list
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) SetLastPingAt(ClusterDiscovery *model.ClusterDiscovery) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := s.GetMaster().Exec(
+ `
+ UPDATE ClusterDiscovery
+ SET
+ LastPingAt = :LastPingAt
+ WHERE
+ Type = :Type
+ AND ClusterName = :ClusterName
+ AND Hostname = :Hostname
+ `,
+ map[string]interface{}{
+ "LastPingAt": model.GetMillis(),
+ "Type": ClusterDiscovery.Type,
+ "ClusterName": ClusterDiscovery.ClusterName,
+ "Hostname": ClusterDiscovery.Hostname,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.GetAllForType", "Failed to update last ping at", nil, err.Error())
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (s sqlClusterDiscoveryStore) Cleanup() StoreChannel {
+
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := s.GetMaster().Exec(
+ `
+ DELETE FROM ClusterDiscovery
+ WHERE
+ LastPingAt < :LastPingAt
+ `,
+ map[string]interface{}{
+ "LastPingAt": model.GetMillis() - model.CDS_OFFLINE_AFTER_MILLIS,
+ },
+ ); err != nil {
+ result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Save", "Failed to save ClusterDiscovery row", nil, err.Error())
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
diff --git a/store/sql_cluster_discovery_store_test.go b/store/sql_cluster_discovery_store_test.go
new file mode 100644
index 000000000..159d3b4db
--- /dev/null
+++ b/store/sql_cluster_discovery_store_test.go
@@ -0,0 +1,201 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "testing"
+
+ "time"
+
+ "github.com/mattermost/platform/model"
+)
+
+func TestSqlClusterDiscoveryStore(t *testing.T) {
+ Setup()
+
+ discovery := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test",
+ }
+
+ if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().Cleanup(); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
+
+func TestSqlClusterDiscoveryStoreDelete(t *testing.T) {
+ Setup()
+
+ discovery := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test",
+ }
+
+ if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().Delete(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
+
+func TestSqlClusterDiscoveryStoreLastPing(t *testing.T) {
+ Setup()
+
+ discovery := &model.ClusterDiscovery{
+ ClusterName: "cluster_name_lastPing",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test_lastPing" + model.NewId(),
+ }
+
+ if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().SetLastPingAt(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ ttime := model.GetMillis()
+
+ time.Sleep(1 * time.Second)
+
+ if result := <-store.ClusterDiscovery().SetLastPingAt(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().GetAll(discovery.Type, "cluster_name_lastPing"); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ list := result.Data.([]*model.ClusterDiscovery)
+
+ if len(list) != 1 {
+ t.Fatal("should only be 1 items")
+ return
+ }
+
+ if list[0].LastPingAt-ttime < 500 {
+ t.Fatal("failed to set time")
+ }
+ }
+
+ discovery2 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name_missing",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test_missing",
+ }
+
+ if result := <-store.ClusterDiscovery().SetLastPingAt(discovery2); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
+
+func TestSqlClusterDiscoveryStoreExists(t *testing.T) {
+ Setup()
+
+ discovery := &model.ClusterDiscovery{
+ ClusterName: "cluster_name_Exists",
+ Hostname: "hostname" + model.NewId(),
+ Type: "test_test_Exists" + model.NewId(),
+ }
+
+ if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.ClusterDiscovery().Exists(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ val := result.Data.(bool)
+ if !val {
+ t.Fatal("should be true")
+ }
+ }
+
+ discovery.ClusterName = "cluster_name_Exists2"
+
+ if result := <-store.ClusterDiscovery().Exists(discovery); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ val := result.Data.(bool)
+ if val {
+ t.Fatal("should be true")
+ }
+ }
+}
+
+func TestSqlClusterDiscoveryGetStore(t *testing.T) {
+ Setup()
+
+ testType1 := model.NewId()
+
+ discovery1 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname1",
+ Type: testType1,
+ }
+ Must(store.ClusterDiscovery().Save(discovery1))
+
+ discovery2 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname2",
+ Type: testType1,
+ }
+ Must(store.ClusterDiscovery().Save(discovery2))
+
+ discovery3 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname3",
+ Type: testType1,
+ CreateAt: 1,
+ LastPingAt: 1,
+ }
+ Must(store.ClusterDiscovery().Save(discovery3))
+
+ testType2 := model.NewId()
+
+ discovery4 := &model.ClusterDiscovery{
+ ClusterName: "cluster_name",
+ Hostname: "hostname1",
+ Type: testType2,
+ }
+ Must(store.ClusterDiscovery().Save(discovery4))
+
+ if result := <-store.ClusterDiscovery().GetAll(testType1, "cluster_name"); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ list := result.Data.([]*model.ClusterDiscovery)
+
+ if len(list) != 2 {
+ t.Fatal("Should only have returned 2")
+ }
+ }
+
+ if result := <-store.ClusterDiscovery().GetAll(testType2, "cluster_name"); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ list := result.Data.([]*model.ClusterDiscovery)
+
+ if len(list) != 1 {
+ t.Fatal("Should only have returned 1")
+ }
+ }
+
+ if result := <-store.ClusterDiscovery().GetAll(model.NewId(), "cluster_name"); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ list := result.Data.([]*model.ClusterDiscovery)
+
+ if len(list) != 0 {
+ t.Fatal("shouldn't be any")
+ }
+ }
+}
diff --git a/store/sql_store.go b/store/sql_store.go
index 1a681fe81..ee2c678f6 100644
--- a/store/sql_store.go
+++ b/store/sql_store.go
@@ -4,6 +4,7 @@
package store
import (
+ "context"
"crypto/aes"
"crypto/cipher"
"crypto/hmac"
@@ -35,6 +36,8 @@ const (
INDEX_TYPE_FULL_TEXT = "full_text"
INDEX_TYPE_DEFAULT = "default"
MAX_DB_CONN_LIFETIME = 60
+ DB_PING_ATTEMPTS = 18
+ DB_PING_TIMEOUT_SECS = 10
)
const (
@@ -66,31 +69,32 @@ const (
)
type SqlStore struct {
- master *gorp.DbMap
- replicas []*gorp.DbMap
- searchReplicas []*gorp.DbMap
- team TeamStore
- channel ChannelStore
- post PostStore
- user UserStore
- audit AuditStore
- compliance ComplianceStore
- session SessionStore
- oauth OAuthStore
- system SystemStore
- webhook WebhookStore
- command CommandStore
- preference PreferenceStore
- license LicenseStore
- token TokenStore
- emoji EmojiStore
- status StatusStore
- fileInfo FileInfoStore
- reaction ReactionStore
- jobStatus JobStatusStore
- SchemaVersion string
- rrCounter int64
- srCounter int64
+ master *gorp.DbMap
+ replicas []*gorp.DbMap
+ searchReplicas []*gorp.DbMap
+ team TeamStore
+ channel ChannelStore
+ post PostStore
+ user UserStore
+ audit AuditStore
+ clusterDiscovery ClusterDiscoveryStore
+ compliance ComplianceStore
+ session SessionStore
+ oauth OAuthStore
+ system SystemStore
+ webhook WebhookStore
+ command CommandStore
+ preference PreferenceStore
+ license LicenseStore
+ token TokenStore
+ emoji EmojiStore
+ status StatusStore
+ fileInfo FileInfoStore
+ reaction ReactionStore
+ jobStatus JobStatusStore
+ SchemaVersion string
+ rrCounter int64
+ srCounter int64
}
func initConnection() *SqlStore {
@@ -139,6 +143,7 @@ func NewSqlStore() Store {
sqlStore.post = NewSqlPostStore(sqlStore)
sqlStore.user = NewSqlUserStore(sqlStore)
sqlStore.audit = NewSqlAuditStore(sqlStore)
+ sqlStore.clusterDiscovery = NewSqlClusterDiscoveryStore(sqlStore)
sqlStore.compliance = NewSqlComplianceStore(sqlStore)
sqlStore.session = NewSqlSessionStore(sqlStore)
sqlStore.oauth = NewSqlOAuthStore(sqlStore)
@@ -197,12 +202,23 @@ func setupConnection(con_type string, driver string, dataSource string, maxIdle
os.Exit(EXIT_DB_OPEN)
}
- l4g.Info(utils.T("store.sql.pinging.info"), con_type)
- err = db.Ping()
- if err != nil {
- l4g.Critical(utils.T("store.sql.ping.critical"), err)
- time.Sleep(time.Second)
- os.Exit(EXIT_PING)
+ for i := 0; i < DB_PING_ATTEMPTS; i++ {
+ l4g.Info("Pinging SQL %v database", con_type)
+ ctx, cancel := context.WithTimeout(context.Background(), DB_PING_TIMEOUT_SECS*time.Second)
+ defer cancel()
+ err = db.PingContext(ctx)
+ if err == nil {
+ break
+ } else {
+ if i == DB_PING_ATTEMPTS-1 {
+ l4g.Critical("Failed to ping DB, server will exit err=%v", err)
+ time.Sleep(time.Second)
+ os.Exit(EXIT_PING)
+ } else {
+ l4g.Error("Failed to ping DB retrying in %v seconds err=%v", DB_PING_TIMEOUT_SECS, err)
+ time.Sleep(DB_PING_TIMEOUT_SECS * time.Second)
+ }
+ }
}
db.SetMaxIdleConns(maxIdle)
@@ -692,6 +708,10 @@ func (ss *SqlStore) Audit() AuditStore {
return ss.audit
}
+func (ss *SqlStore) ClusterDiscovery() ClusterDiscoveryStore {
+ return ss.clusterDiscovery
+}
+
func (ss *SqlStore) Compliance() ComplianceStore {
return ss.compliance
}
diff --git a/store/store.go b/store/store.go
index cd7792ce1..23c6acd37 100644
--- a/store/store.go
+++ b/store/store.go
@@ -34,6 +34,7 @@ type Store interface {
Post() PostStore
User() UserStore
Audit() AuditStore
+ ClusterDiscovery() ClusterDiscoveryStore
Compliance() ComplianceStore
Session() SessionStore
OAuth() OAuthStore
@@ -239,6 +240,15 @@ type AuditStore interface {
PermanentDeleteByUser(userId string) StoreChannel
}
+type ClusterDiscoveryStore interface {
+ Save(discovery *model.ClusterDiscovery) StoreChannel
+ Delete(discovery *model.ClusterDiscovery) StoreChannel
+ Exists(discovery *model.ClusterDiscovery) StoreChannel
+ GetAll(discoveryType, clusterName string) StoreChannel
+ SetLastPingAt(discovery *model.ClusterDiscovery) StoreChannel
+ Cleanup() StoreChannel
+}
+
type ComplianceStore interface {
Save(compliance *model.Compliance) StoreChannel
Update(compliance *model.Compliance) StoreChannel