diff options
author | George Goldberg <george@gberg.me> | 2018-07-31 15:40:23 +0100 |
---|---|---|
committer | Christopher Speller <crspeller@gmail.com> | 2018-07-31 07:40:23 -0700 |
commit | 8766690c81fcefdbe0c9d85590de1eea07a908d7 (patch) | |
tree | ab37e369a2c8afc87a3238bbf028aa82ef1bc125 /jobs | |
parent | fcb4ee935ef97ca5c79c7433b2be2709fc62e87f (diff) | |
download | chat-8766690c81fcefdbe0c9d85590de1eea07a908d7.tar.gz chat-8766690c81fcefdbe0c9d85590de1eea07a908d7.tar.bz2 chat-8766690c81fcefdbe0c9d85590de1eea07a908d7.zip |
MM-10502: Only cluster master should run job schedulers. (#9174)
* MM-10502: Only cluster master should run job schedulers.
* Use sync.Map for thread safety.
* Fix tests.
Diffstat (limited to 'jobs')
-rw-r--r-- | jobs/schedulers.go | 38 | ||||
-rw-r--r-- | jobs/server.go | 4 |
2 files changed, 30 insertions, 12 deletions
diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 96aa2b635..37a64bc22 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -13,12 +13,13 @@ import ( ) type Schedulers struct { - stop chan bool - stopped chan bool - configChanged chan *model.Config - listenerId string - startOnce sync.Once - jobs *JobServer + stop chan bool + stopped chan bool + configChanged chan *model.Config + clusterLeaderChanged chan bool + listenerId string + startOnce sync.Once + jobs *JobServer schedulers []model.Scheduler nextRunTimes []*time.Time @@ -28,10 +29,11 @@ func (srv *JobServer) InitSchedulers() *Schedulers { mlog.Debug("Initialising schedulers.") schedulers := &Schedulers{ - stop: make(chan bool), - stopped: make(chan bool), - configChanged: make(chan *model.Config), - jobs: srv, + stop: make(chan bool), + stopped: make(chan bool), + configChanged: make(chan *model.Config), + clusterLeaderChanged: make(chan bool), + jobs: srv, } if srv.DataRetentionJob != nil { @@ -114,6 +116,14 @@ func (schedulers *Schedulers) Start() *Schedulers { schedulers.setNextRunTime(newCfg, idx, now, false) } } + case isLeader := <-schedulers.clusterLeaderChanged: + for idx := range schedulers.schedulers { + if !isLeader { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(schedulers.jobs.Config(), idx, now, false) + } + } } } }) @@ -171,3 +181,11 @@ func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newCon mlog.Debug("Schedulers received config change.") schedulers.configChanged <- newConfig } + +func (schedulers *Schedulers) HandleClusterLeaderChange(isLeader bool) { + select { + case schedulers.clusterLeaderChanged <- isLeader: + default: + mlog.Debug("Did not send cluster leader change message to schedulers as no schedulers listening to notification channel.") + } +} diff --git a/jobs/server.go b/jobs/server.go index 10ea9a46f..cffc60da1 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -50,11 +50,11 @@ func (srv *JobServer) Config() *model.Config { } func (srv *JobServer) StartWorkers() { - srv.Workers = srv.InitWorkers().Start() + srv.Workers = srv.Workers.Start() } func (srv *JobServer) StartSchedulers() { - srv.Schedulers = srv.InitSchedulers().Start() + srv.Schedulers = srv.Schedulers.Start() } func (srv *JobServer) StopWorkers() { |