summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2018-07-31 15:40:23 +0100
committerChristopher Speller <crspeller@gmail.com>2018-07-31 07:40:23 -0700
commit8766690c81fcefdbe0c9d85590de1eea07a908d7 (patch)
treeab37e369a2c8afc87a3238bbf028aa82ef1bc125
parentfcb4ee935ef97ca5c79c7433b2be2709fc62e87f (diff)
downloadchat-8766690c81fcefdbe0c9d85590de1eea07a908d7.tar.gz
chat-8766690c81fcefdbe0c9d85590de1eea07a908d7.tar.bz2
chat-8766690c81fcefdbe0c9d85590de1eea07a908d7.zip
MM-10502: Only cluster master should run job schedulers. (#9174)
* MM-10502: Only cluster master should run job schedulers. * Use sync.Map for thread safety. * Fix tests.
-rw-r--r--app/app.go33
-rw-r--r--app/cluster.go29
-rw-r--r--jobs/schedulers.go38
-rw-r--r--jobs/server.go4
4 files changed, 80 insertions, 24 deletions
diff --git a/app/app.go b/app/app.go
index 6da16c28c..5cedca2ad 100644
--- a/app/app.go
+++ b/app/app.go
@@ -64,10 +64,11 @@ type App struct {
Mfa einterfaces.MfaInterface
Saml einterfaces.SamlInterface
- config atomic.Value
- envConfig map[string]interface{}
- configFile string
- configListeners map[string]func(*model.Config, *model.Config)
+ config atomic.Value
+ envConfig map[string]interface{}
+ configFile string
+ configListeners map[string]func(*model.Config, *model.Config)
+ clusterLeaderListeners sync.Map
licenseValue atomic.Value
clientLicenseValue atomic.Value
@@ -79,14 +80,15 @@ type App struct {
newStore func() store.Store
- htmlTemplateWatcher *utils.HTMLTemplateWatcher
- sessionCache *utils.Cache
- configListenerId string
- licenseListenerId string
- logListenerId string
- disableConfigWatch bool
- configWatcher *utils.ConfigWatcher
- asymmetricSigningKey *ecdsa.PrivateKey
+ htmlTemplateWatcher *utils.HTMLTemplateWatcher
+ sessionCache *utils.Cache
+ configListenerId string
+ licenseListenerId string
+ logListenerId string
+ clusterLeaderListenerId string
+ disableConfigWatch bool
+ configWatcher *utils.ConfigWatcher
+ asymmetricSigningKey *ecdsa.PrivateKey
pluginCommands []*PluginCommand
pluginCommandsLock sync.RWMutex
@@ -218,6 +220,10 @@ func New(options ...Option) (outApp *App, outErr error) {
app.initJobs()
})
+ app.clusterLeaderListenerId = app.AddClusterLeaderChangedListener(func() {
+ app.Jobs.Schedulers.HandleClusterLeaderChange(app.IsLeader())
+ })
+
subpath, err := utils.GetSubpathFromConfig(app.Config())
if err != nil {
return nil, errors.Wrap(err, "failed to parse SiteURL subpath")
@@ -270,6 +276,7 @@ func (a *App) Shutdown() {
a.RemoveConfigListener(a.configListenerId)
a.RemoveLicenseListener(a.licenseListenerId)
a.RemoveConfigListener(a.logListenerId)
+ a.RemoveClusterLeaderChangedListener(a.clusterLeaderListenerId)
mlog.Info("Server stopped")
a.DisableConfigWatch()
@@ -432,6 +439,8 @@ func (a *App) initJobs() {
if jobsMigrationsInterface != nil {
a.Jobs.Migrations = jobsMigrationsInterface(a)
}
+ a.Jobs.Workers = a.Jobs.InitWorkers()
+ a.Jobs.Schedulers = a.Jobs.InitSchedulers()
}
func (a *App) DiagnosticId() string {
diff --git a/app/cluster.go b/app/cluster.go
new file mode 100644
index 000000000..020e57c61
--- /dev/null
+++ b/app/cluster.go
@@ -0,0 +1,29 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import "github.com/mattermost/mattermost-server/model"
+
+// Registers a given function to be called when the cluster leader may have changed. Returns a unique ID for the
+// listener which can later be used to remove it. If clustering is not enabled in this build, the callback will never
+// be called.
+func (a *App) AddClusterLeaderChangedListener(listener func()) string {
+ id := model.NewId()
+ a.clusterLeaderListeners.Store(id, listener)
+ return id
+}
+
+// Removes a listener function by the unique ID returned when AddConfigListener was called
+func (a *App) RemoveClusterLeaderChangedListener(id string) {
+ a.clusterLeaderListeners.Delete(id)
+}
+
+func (a *App) InvokeClusterLeaderChangedListeners() {
+ a.Go(func() {
+ a.clusterLeaderListeners.Range(func(_, listener interface{}) bool {
+ listener.(func())()
+ return true
+ })
+ })
+}
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
index 96aa2b635..37a64bc22 100644
--- a/jobs/schedulers.go
+++ b/jobs/schedulers.go
@@ -13,12 +13,13 @@ import (
)
type Schedulers struct {
- stop chan bool
- stopped chan bool
- configChanged chan *model.Config
- listenerId string
- startOnce sync.Once
- jobs *JobServer
+ stop chan bool
+ stopped chan bool
+ configChanged chan *model.Config
+ clusterLeaderChanged chan bool
+ listenerId string
+ startOnce sync.Once
+ jobs *JobServer
schedulers []model.Scheduler
nextRunTimes []*time.Time
@@ -28,10 +29,11 @@ func (srv *JobServer) InitSchedulers() *Schedulers {
mlog.Debug("Initialising schedulers.")
schedulers := &Schedulers{
- stop: make(chan bool),
- stopped: make(chan bool),
- configChanged: make(chan *model.Config),
- jobs: srv,
+ stop: make(chan bool),
+ stopped: make(chan bool),
+ configChanged: make(chan *model.Config),
+ clusterLeaderChanged: make(chan bool),
+ jobs: srv,
}
if srv.DataRetentionJob != nil {
@@ -114,6 +116,14 @@ func (schedulers *Schedulers) Start() *Schedulers {
schedulers.setNextRunTime(newCfg, idx, now, false)
}
}
+ case isLeader := <-schedulers.clusterLeaderChanged:
+ for idx := range schedulers.schedulers {
+ if !isLeader {
+ schedulers.nextRunTimes[idx] = nil
+ } else {
+ schedulers.setNextRunTime(schedulers.jobs.Config(), idx, now, false)
+ }
+ }
}
}
})
@@ -171,3 +181,11 @@ func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newCon
mlog.Debug("Schedulers received config change.")
schedulers.configChanged <- newConfig
}
+
+func (schedulers *Schedulers) HandleClusterLeaderChange(isLeader bool) {
+ select {
+ case schedulers.clusterLeaderChanged <- isLeader:
+ default:
+ mlog.Debug("Did not send cluster leader change message to schedulers as no schedulers listening to notification channel.")
+ }
+}
diff --git a/jobs/server.go b/jobs/server.go
index 10ea9a46f..cffc60da1 100644
--- a/jobs/server.go
+++ b/jobs/server.go
@@ -50,11 +50,11 @@ func (srv *JobServer) Config() *model.Config {
}
func (srv *JobServer) StartWorkers() {
- srv.Workers = srv.InitWorkers().Start()
+ srv.Workers = srv.Workers.Start()
}
func (srv *JobServer) StartSchedulers() {
- srv.Schedulers = srv.InitSchedulers().Start()
+ srv.Schedulers = srv.Schedulers.Start()
}
func (srv *JobServer) StopWorkers() {