From 36f216cb7cb16958d98b3d77e121198596fd2213 Mon Sep 17 00:00:00 2001 From: Corey Hulen Date: Mon, 19 Jun 2017 08:44:04 -0700 Subject: PLT-6080 moving clustering to memberlist (#6499) * PLT-6080 adding cluster discovery service * Adding memberlist lib * Adding memberlist lib * WIP * WIP * WIP * WIP * Rolling back config changes * Fixing make file * Fixing config for cluster * WIP * Fixing system console for clustering * Fixing default config * Fixing config * Fixing system console for clustering * Tweaking hub setting * Bumping up time * merging vendor dir * Updating vendor dir * Fixing unit test * Fixing bad merge * Remove some testing code * Moving comment * PLT-6868 adding db ping retry * Removing unused loc strings * Adding defer to cancel --- store/sql_cluster_discovery_store.go | 226 ++++++++++++++++++++++++++++++ store/sql_cluster_discovery_store_test.go | 201 ++++++++++++++++++++++++++ store/sql_store.go | 82 +++++++---- store/store.go | 10 ++ 4 files changed, 488 insertions(+), 31 deletions(-) create mode 100644 store/sql_cluster_discovery_store.go create mode 100644 store/sql_cluster_discovery_store_test.go (limited to 'store') diff --git a/store/sql_cluster_discovery_store.go b/store/sql_cluster_discovery_store.go new file mode 100644 index 000000000..81d3d6e11 --- /dev/null +++ b/store/sql_cluster_discovery_store.go @@ -0,0 +1,226 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "github.com/mattermost/platform/model" +) + +type sqlClusterDiscoveryStore struct { + *SqlStore +} + +func NewSqlClusterDiscoveryStore(sqlStore *SqlStore) ClusterDiscoveryStore { + s := &sqlClusterDiscoveryStore{sqlStore} + + for _, db := range sqlStore.GetAllConns() { + table := db.AddTableWithName(model.ClusterDiscovery{}, "ClusterDiscovery").SetKeys(false, "Id") + table.ColMap("Id").SetMaxSize(26) + table.ColMap("Type").SetMaxSize(64) + table.ColMap("ClusterName").SetMaxSize(64) + table.ColMap("Hostname").SetMaxSize(512) + } + + return s +} + +func (s sqlClusterDiscoveryStore) Save(ClusterDiscovery *model.ClusterDiscovery) StoreChannel { + + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + ClusterDiscovery.PreSave() + if result.Err = ClusterDiscovery.IsValid(); result.Err != nil { + storeChannel <- result + close(storeChannel) + return + } + + if err := s.GetMaster().Insert(ClusterDiscovery); err != nil { + result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Save", "Failed to save ClusterDiscovery row", nil, err.Error()) + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (s sqlClusterDiscoveryStore) Delete(ClusterDiscovery *model.ClusterDiscovery) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + result.Data = false + + if count, err := s.GetMaster().SelectInt( + ` + DELETE + FROM + ClusterDiscovery + WHERE + Type = :Type + AND ClusterName = :ClusterName + AND Hostname = :Hostname + `, + map[string]interface{}{ + "Type": ClusterDiscovery.Type, + "ClusterName": ClusterDiscovery.ClusterName, + "Hostname": ClusterDiscovery.Hostname, + }, + ); err != nil { + result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Delete", "Failed to delete", nil, err.Error()) + } else { + if count > 0 { + result.Data = true + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (s sqlClusterDiscoveryStore) Exists(ClusterDiscovery *model.ClusterDiscovery) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + result.Data = false + + if count, err := s.GetMaster().SelectInt( + ` + SELECT + COUNT(*) + FROM + ClusterDiscovery + WHERE + Type = :Type + AND ClusterName = :ClusterName + AND Hostname = :Hostname + `, + map[string]interface{}{ + "Type": ClusterDiscovery.Type, + "ClusterName": ClusterDiscovery.ClusterName, + "Hostname": ClusterDiscovery.Hostname, + }, + ); err != nil { + result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Exists", "Failed to check if it exists", nil, err.Error()) + } else { + if count > 0 { + result.Data = true + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (s sqlClusterDiscoveryStore) GetAll(ClusterDiscoveryType, clusterName string) StoreChannel { + + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + lastPingAt := model.GetMillis() - model.CDS_OFFLINE_AFTER_MILLIS + + var list []*model.ClusterDiscovery + if _, err := s.GetMaster().Select( + &list, + ` + SELECT + * + FROM + ClusterDiscovery + WHERE + Type = :ClusterDiscoveryType + AND ClusterName = :ClusterName + AND LastPingAt > :LastPingAt + `, + map[string]interface{}{ + "ClusterDiscoveryType": ClusterDiscoveryType, + "ClusterName": clusterName, + "LastPingAt": lastPingAt, + }, + ); err != nil { + result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.GetAllForType", "Failed to get all disoery rows", nil, err.Error()) + } else { + result.Data = list + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (s sqlClusterDiscoveryStore) SetLastPingAt(ClusterDiscovery *model.ClusterDiscovery) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if _, err := s.GetMaster().Exec( + ` + UPDATE ClusterDiscovery + SET + LastPingAt = :LastPingAt + WHERE + Type = :Type + AND ClusterName = :ClusterName + AND Hostname = :Hostname + `, + map[string]interface{}{ + "LastPingAt": model.GetMillis(), + "Type": ClusterDiscovery.Type, + "ClusterName": ClusterDiscovery.ClusterName, + "Hostname": ClusterDiscovery.Hostname, + }, + ); err != nil { + result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.GetAllForType", "Failed to update last ping at", nil, err.Error()) + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (s sqlClusterDiscoveryStore) Cleanup() StoreChannel { + + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if _, err := s.GetMaster().Exec( + ` + DELETE FROM ClusterDiscovery + WHERE + LastPingAt < :LastPingAt + `, + map[string]interface{}{ + "LastPingAt": model.GetMillis() - model.CDS_OFFLINE_AFTER_MILLIS, + }, + ); err != nil { + result.Err = model.NewLocAppError("SqlClusterDiscoveryStore.Save", "Failed to save ClusterDiscovery row", nil, err.Error()) + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_cluster_discovery_store_test.go b/store/sql_cluster_discovery_store_test.go new file mode 100644 index 000000000..159d3b4db --- /dev/null +++ b/store/sql_cluster_discovery_store_test.go @@ -0,0 +1,201 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "testing" + + "time" + + "github.com/mattermost/platform/model" +) + +func TestSqlClusterDiscoveryStore(t *testing.T) { + Setup() + + discovery := &model.ClusterDiscovery{ + ClusterName: "cluster_name", + Hostname: "hostname" + model.NewId(), + Type: "test_test", + } + + if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-store.ClusterDiscovery().Cleanup(); result.Err != nil { + t.Fatal(result.Err) + } +} + +func TestSqlClusterDiscoveryStoreDelete(t *testing.T) { + Setup() + + discovery := &model.ClusterDiscovery{ + ClusterName: "cluster_name", + Hostname: "hostname" + model.NewId(), + Type: "test_test", + } + + if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-store.ClusterDiscovery().Delete(discovery); result.Err != nil { + t.Fatal(result.Err) + } +} + +func TestSqlClusterDiscoveryStoreLastPing(t *testing.T) { + Setup() + + discovery := &model.ClusterDiscovery{ + ClusterName: "cluster_name_lastPing", + Hostname: "hostname" + model.NewId(), + Type: "test_test_lastPing" + model.NewId(), + } + + if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-store.ClusterDiscovery().SetLastPingAt(discovery); result.Err != nil { + t.Fatal(result.Err) + } + + ttime := model.GetMillis() + + time.Sleep(1 * time.Second) + + if result := <-store.ClusterDiscovery().SetLastPingAt(discovery); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-store.ClusterDiscovery().GetAll(discovery.Type, "cluster_name_lastPing"); result.Err != nil { + t.Fatal(result.Err) + } else { + list := result.Data.([]*model.ClusterDiscovery) + + if len(list) != 1 { + t.Fatal("should only be 1 items") + return + } + + if list[0].LastPingAt-ttime < 500 { + t.Fatal("failed to set time") + } + } + + discovery2 := &model.ClusterDiscovery{ + ClusterName: "cluster_name_missing", + Hostname: "hostname" + model.NewId(), + Type: "test_test_missing", + } + + if result := <-store.ClusterDiscovery().SetLastPingAt(discovery2); result.Err != nil { + t.Fatal(result.Err) + } +} + +func TestSqlClusterDiscoveryStoreExists(t *testing.T) { + Setup() + + discovery := &model.ClusterDiscovery{ + ClusterName: "cluster_name_Exists", + Hostname: "hostname" + model.NewId(), + Type: "test_test_Exists" + model.NewId(), + } + + if result := <-store.ClusterDiscovery().Save(discovery); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-store.ClusterDiscovery().Exists(discovery); result.Err != nil { + t.Fatal(result.Err) + } else { + val := result.Data.(bool) + if !val { + t.Fatal("should be true") + } + } + + discovery.ClusterName = "cluster_name_Exists2" + + if result := <-store.ClusterDiscovery().Exists(discovery); result.Err != nil { + t.Fatal(result.Err) + } else { + val := result.Data.(bool) + if val { + t.Fatal("should be true") + } + } +} + +func TestSqlClusterDiscoveryGetStore(t *testing.T) { + Setup() + + testType1 := model.NewId() + + discovery1 := &model.ClusterDiscovery{ + ClusterName: "cluster_name", + Hostname: "hostname1", + Type: testType1, + } + Must(store.ClusterDiscovery().Save(discovery1)) + + discovery2 := &model.ClusterDiscovery{ + ClusterName: "cluster_name", + Hostname: "hostname2", + Type: testType1, + } + Must(store.ClusterDiscovery().Save(discovery2)) + + discovery3 := &model.ClusterDiscovery{ + ClusterName: "cluster_name", + Hostname: "hostname3", + Type: testType1, + CreateAt: 1, + LastPingAt: 1, + } + Must(store.ClusterDiscovery().Save(discovery3)) + + testType2 := model.NewId() + + discovery4 := &model.ClusterDiscovery{ + ClusterName: "cluster_name", + Hostname: "hostname1", + Type: testType2, + } + Must(store.ClusterDiscovery().Save(discovery4)) + + if result := <-store.ClusterDiscovery().GetAll(testType1, "cluster_name"); result.Err != nil { + t.Fatal(result.Err) + } else { + list := result.Data.([]*model.ClusterDiscovery) + + if len(list) != 2 { + t.Fatal("Should only have returned 2") + } + } + + if result := <-store.ClusterDiscovery().GetAll(testType2, "cluster_name"); result.Err != nil { + t.Fatal(result.Err) + } else { + list := result.Data.([]*model.ClusterDiscovery) + + if len(list) != 1 { + t.Fatal("Should only have returned 1") + } + } + + if result := <-store.ClusterDiscovery().GetAll(model.NewId(), "cluster_name"); result.Err != nil { + t.Fatal(result.Err) + } else { + list := result.Data.([]*model.ClusterDiscovery) + + if len(list) != 0 { + t.Fatal("shouldn't be any") + } + } +} diff --git a/store/sql_store.go b/store/sql_store.go index 1a681fe81..ee2c678f6 100644 --- a/store/sql_store.go +++ b/store/sql_store.go @@ -4,6 +4,7 @@ package store import ( + "context" "crypto/aes" "crypto/cipher" "crypto/hmac" @@ -35,6 +36,8 @@ const ( INDEX_TYPE_FULL_TEXT = "full_text" INDEX_TYPE_DEFAULT = "default" MAX_DB_CONN_LIFETIME = 60 + DB_PING_ATTEMPTS = 18 + DB_PING_TIMEOUT_SECS = 10 ) const ( @@ -66,31 +69,32 @@ const ( ) type SqlStore struct { - master *gorp.DbMap - replicas []*gorp.DbMap - searchReplicas []*gorp.DbMap - team TeamStore - channel ChannelStore - post PostStore - user UserStore - audit AuditStore - compliance ComplianceStore - session SessionStore - oauth OAuthStore - system SystemStore - webhook WebhookStore - command CommandStore - preference PreferenceStore - license LicenseStore - token TokenStore - emoji EmojiStore - status StatusStore - fileInfo FileInfoStore - reaction ReactionStore - jobStatus JobStatusStore - SchemaVersion string - rrCounter int64 - srCounter int64 + master *gorp.DbMap + replicas []*gorp.DbMap + searchReplicas []*gorp.DbMap + team TeamStore + channel ChannelStore + post PostStore + user UserStore + audit AuditStore + clusterDiscovery ClusterDiscoveryStore + compliance ComplianceStore + session SessionStore + oauth OAuthStore + system SystemStore + webhook WebhookStore + command CommandStore + preference PreferenceStore + license LicenseStore + token TokenStore + emoji EmojiStore + status StatusStore + fileInfo FileInfoStore + reaction ReactionStore + jobStatus JobStatusStore + SchemaVersion string + rrCounter int64 + srCounter int64 } func initConnection() *SqlStore { @@ -139,6 +143,7 @@ func NewSqlStore() Store { sqlStore.post = NewSqlPostStore(sqlStore) sqlStore.user = NewSqlUserStore(sqlStore) sqlStore.audit = NewSqlAuditStore(sqlStore) + sqlStore.clusterDiscovery = NewSqlClusterDiscoveryStore(sqlStore) sqlStore.compliance = NewSqlComplianceStore(sqlStore) sqlStore.session = NewSqlSessionStore(sqlStore) sqlStore.oauth = NewSqlOAuthStore(sqlStore) @@ -197,12 +202,23 @@ func setupConnection(con_type string, driver string, dataSource string, maxIdle os.Exit(EXIT_DB_OPEN) } - l4g.Info(utils.T("store.sql.pinging.info"), con_type) - err = db.Ping() - if err != nil { - l4g.Critical(utils.T("store.sql.ping.critical"), err) - time.Sleep(time.Second) - os.Exit(EXIT_PING) + for i := 0; i < DB_PING_ATTEMPTS; i++ { + l4g.Info("Pinging SQL %v database", con_type) + ctx, cancel := context.WithTimeout(context.Background(), DB_PING_TIMEOUT_SECS*time.Second) + defer cancel() + err = db.PingContext(ctx) + if err == nil { + break + } else { + if i == DB_PING_ATTEMPTS-1 { + l4g.Critical("Failed to ping DB, server will exit err=%v", err) + time.Sleep(time.Second) + os.Exit(EXIT_PING) + } else { + l4g.Error("Failed to ping DB retrying in %v seconds err=%v", DB_PING_TIMEOUT_SECS, err) + time.Sleep(DB_PING_TIMEOUT_SECS * time.Second) + } + } } db.SetMaxIdleConns(maxIdle) @@ -692,6 +708,10 @@ func (ss *SqlStore) Audit() AuditStore { return ss.audit } +func (ss *SqlStore) ClusterDiscovery() ClusterDiscoveryStore { + return ss.clusterDiscovery +} + func (ss *SqlStore) Compliance() ComplianceStore { return ss.compliance } diff --git a/store/store.go b/store/store.go index cd7792ce1..23c6acd37 100644 --- a/store/store.go +++ b/store/store.go @@ -34,6 +34,7 @@ type Store interface { Post() PostStore User() UserStore Audit() AuditStore + ClusterDiscovery() ClusterDiscoveryStore Compliance() ComplianceStore Session() SessionStore OAuth() OAuthStore @@ -239,6 +240,15 @@ type AuditStore interface { PermanentDeleteByUser(userId string) StoreChannel } +type ClusterDiscoveryStore interface { + Save(discovery *model.ClusterDiscovery) StoreChannel + Delete(discovery *model.ClusterDiscovery) StoreChannel + Exists(discovery *model.ClusterDiscovery) StoreChannel + GetAll(discoveryType, clusterName string) StoreChannel + SetLastPingAt(discovery *model.ClusterDiscovery) StoreChannel + Cleanup() StoreChannel +} + type ComplianceStore interface { Save(compliance *model.Compliance) StoreChannel Update(compliance *model.Compliance) StoreChannel -- cgit v1.2.3-1-g7c22