From 36f216cb7cb16958d98b3d77e121198596fd2213 Mon Sep 17 00:00:00 2001 From: Corey Hulen Date: Mon, 19 Jun 2017 08:44:04 -0700 Subject: PLT-6080 moving clustering to memberlist (#6499) * PLT-6080 adding cluster discovery service * Adding memberlist lib * Adding memberlist lib * WIP * WIP * WIP * WIP * Rolling back config changes * Fixing make file * Fixing config for cluster * WIP * Fixing system console for clustering * Fixing default config * Fixing config * Fixing system console for clustering * Tweaking hub setting * Bumping up time * merging vendor dir * Updating vendor dir * Fixing unit test * Fixing bad merge * Remove some testing code * Moving comment * PLT-6868 adding db ping retry * Removing unused loc strings * Adding defer to cancel --- app/cluster_discovery.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 app/cluster_discovery.go (limited to 'app/cluster_discovery.go') diff --git a/app/cluster_discovery.go b/app/cluster_discovery.go new file mode 100644 index 000000000..6584418f1 --- /dev/null +++ b/app/cluster_discovery.go @@ -0,0 +1,77 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import ( + "fmt" + "time" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/model" +) + +const ( + DISCOVERY_SERVICE_WRITE_PING = 60 * time.Second +) + +type ClusterDiscoveryService struct { + model.ClusterDiscovery + stop chan bool +} + +func NewClusterDiscoveryService() *ClusterDiscoveryService { + ds := &ClusterDiscoveryService{ + ClusterDiscovery: model.ClusterDiscovery{}, + stop: make(chan bool), + } + + return ds +} + +func (me *ClusterDiscoveryService) Start() { + + <-Srv.Store.ClusterDiscovery().Cleanup() + + if cresult := <-Srv.Store.ClusterDiscovery().Exists(&me.ClusterDiscovery); cresult.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to check if row exists for %v with err=%v", me.ClusterDiscovery.ToJson(), cresult.Err)) + } else { + if cresult.Data.(bool) { + if u := <-Srv.Store.ClusterDiscovery().Delete(&me.ClusterDiscovery); u.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to start clean for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err)) + } + } + } + + if result := <-Srv.Store.ClusterDiscovery().Save(&me.ClusterDiscovery); result.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to save for %v with err=%v", me.ClusterDiscovery.ToJson(), result.Err)) + return + } + + go func() { + l4g.Debug(fmt.Sprintf("ClusterDiscoveryService ping writer started for %v", me.ClusterDiscovery.ToJson())) + ticker := time.NewTicker(DISCOVERY_SERVICE_WRITE_PING) + defer func() { + ticker.Stop() + if u := <-Srv.Store.ClusterDiscovery().Delete(&me.ClusterDiscovery); u.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to cleanup for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err)) + } + l4g.Debug(fmt.Sprintf("ClusterDiscoveryService ping writer stopped for %v", me.ClusterDiscovery.ToJson())) + }() + + for { + select { + case <-ticker.C: + if u := <-Srv.Store.ClusterDiscovery().SetLastPingAt(&me.ClusterDiscovery); u.Err != nil { + l4g.Error(fmt.Sprintf("ClusterDiscoveryService failed to write ping for %v with err=%v", me.ClusterDiscovery.ToJson(), u.Err)) + } + case <-me.stop: + return + } + } + }() +} + +func (me *ClusterDiscoveryService) Stop() { + me.stop <- true +} -- cgit v1.2.3-1-g7c22