From 8766690c81fcefdbe0c9d85590de1eea07a908d7 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Tue, 31 Jul 2018 15:40:23 +0100 Subject: MM-10502: Only cluster master should run job schedulers. (#9174) * MM-10502: Only cluster master should run job schedulers. * Use sync.Map for thread safety. * Fix tests. --- jobs/schedulers.go | 38 ++++++++++++++++++++++++++++---------- jobs/server.go | 4 ++-- 2 files changed, 30 insertions(+), 12 deletions(-) (limited to 'jobs') diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 96aa2b635..37a64bc22 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -13,12 +13,13 @@ import ( ) type Schedulers struct { - stop chan bool - stopped chan bool - configChanged chan *model.Config - listenerId string - startOnce sync.Once - jobs *JobServer + stop chan bool + stopped chan bool + configChanged chan *model.Config + clusterLeaderChanged chan bool + listenerId string + startOnce sync.Once + jobs *JobServer schedulers []model.Scheduler nextRunTimes []*time.Time @@ -28,10 +29,11 @@ func (srv *JobServer) InitSchedulers() *Schedulers { mlog.Debug("Initialising schedulers.") schedulers := &Schedulers{ - stop: make(chan bool), - stopped: make(chan bool), - configChanged: make(chan *model.Config), - jobs: srv, + stop: make(chan bool), + stopped: make(chan bool), + configChanged: make(chan *model.Config), + clusterLeaderChanged: make(chan bool), + jobs: srv, } if srv.DataRetentionJob != nil { @@ -114,6 +116,14 @@ func (schedulers *Schedulers) Start() *Schedulers { schedulers.setNextRunTime(newCfg, idx, now, false) } } + case isLeader := <-schedulers.clusterLeaderChanged: + for idx := range schedulers.schedulers { + if !isLeader { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(schedulers.jobs.Config(), idx, now, false) + } + } } } }) @@ -171,3 +181,11 @@ func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newCon mlog.Debug("Schedulers received config change.") schedulers.configChanged <- newConfig } + +func (schedulers *Schedulers) HandleClusterLeaderChange(isLeader bool) { + select { + case schedulers.clusterLeaderChanged <- isLeader: + default: + mlog.Debug("Did not send cluster leader change message to schedulers as no schedulers listening to notification channel.") + } +} diff --git a/jobs/server.go b/jobs/server.go index 10ea9a46f..cffc60da1 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -50,11 +50,11 @@ func (srv *JobServer) Config() *model.Config { } func (srv *JobServer) StartWorkers() { - srv.Workers = srv.InitWorkers().Start() + srv.Workers = srv.Workers.Start() } func (srv *JobServer) StartSchedulers() { - srv.Schedulers = srv.InitSchedulers().Start() + srv.Schedulers = srv.Schedulers.Start() } func (srv *JobServer) StopWorkers() { -- cgit v1.2.3-1-g7c22