diff options
Diffstat (limited to 'jobs/schedulers.go')
-rw-r--r-- | jobs/schedulers.go | 38 |
1 files changed, 28 insertions, 10 deletions
diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 96aa2b635..37a64bc22 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -13,12 +13,13 @@ import ( ) type Schedulers struct { - stop chan bool - stopped chan bool - configChanged chan *model.Config - listenerId string - startOnce sync.Once - jobs *JobServer + stop chan bool + stopped chan bool + configChanged chan *model.Config + clusterLeaderChanged chan bool + listenerId string + startOnce sync.Once + jobs *JobServer schedulers []model.Scheduler nextRunTimes []*time.Time @@ -28,10 +29,11 @@ func (srv *JobServer) InitSchedulers() *Schedulers { mlog.Debug("Initialising schedulers.") schedulers := &Schedulers{ - stop: make(chan bool), - stopped: make(chan bool), - configChanged: make(chan *model.Config), - jobs: srv, + stop: make(chan bool), + stopped: make(chan bool), + configChanged: make(chan *model.Config), + clusterLeaderChanged: make(chan bool), + jobs: srv, } if srv.DataRetentionJob != nil { @@ -114,6 +116,14 @@ func (schedulers *Schedulers) Start() *Schedulers { schedulers.setNextRunTime(newCfg, idx, now, false) } } + case isLeader := <-schedulers.clusterLeaderChanged: + for idx := range schedulers.schedulers { + if !isLeader { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(schedulers.jobs.Config(), idx, now, false) + } + } } } }) @@ -171,3 +181,11 @@ func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newCon mlog.Debug("Schedulers received config change.") schedulers.configChanged <- newConfig } + +func (schedulers *Schedulers) HandleClusterLeaderChange(isLeader bool) { + select { + case schedulers.clusterLeaderChanged <- isLeader: + default: + mlog.Debug("Did not send cluster leader change message to schedulers as no schedulers listening to notification channel.") + } +} |