summaryrefslogtreecommitdiffstats
path: root/jobs/schedulers.go
diff options
context:
space:
mode:
Diffstat (limited to 'jobs/schedulers.go')
-rw-r--r--jobs/schedulers.go38
1 files changed, 28 insertions, 10 deletions
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
index 96aa2b635..37a64bc22 100644
--- a/jobs/schedulers.go
+++ b/jobs/schedulers.go
@@ -13,12 +13,13 @@ import (
)
type Schedulers struct {
- stop chan bool
- stopped chan bool
- configChanged chan *model.Config
- listenerId string
- startOnce sync.Once
- jobs *JobServer
+ stop chan bool
+ stopped chan bool
+ configChanged chan *model.Config
+ clusterLeaderChanged chan bool
+ listenerId string
+ startOnce sync.Once
+ jobs *JobServer
schedulers []model.Scheduler
nextRunTimes []*time.Time
@@ -28,10 +29,11 @@ func (srv *JobServer) InitSchedulers() *Schedulers {
mlog.Debug("Initialising schedulers.")
schedulers := &Schedulers{
- stop: make(chan bool),
- stopped: make(chan bool),
- configChanged: make(chan *model.Config),
- jobs: srv,
+ stop: make(chan bool),
+ stopped: make(chan bool),
+ configChanged: make(chan *model.Config),
+ clusterLeaderChanged: make(chan bool),
+ jobs: srv,
}
if srv.DataRetentionJob != nil {
@@ -114,6 +116,14 @@ func (schedulers *Schedulers) Start() *Schedulers {
schedulers.setNextRunTime(newCfg, idx, now, false)
}
}
+ case isLeader := <-schedulers.clusterLeaderChanged:
+ for idx := range schedulers.schedulers {
+ if !isLeader {
+ schedulers.nextRunTimes[idx] = nil
+ } else {
+ schedulers.setNextRunTime(schedulers.jobs.Config(), idx, now, false)
+ }
+ }
}
}
})
@@ -171,3 +181,11 @@ func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newCon
mlog.Debug("Schedulers received config change.")
schedulers.configChanged <- newConfig
}
+
+func (schedulers *Schedulers) HandleClusterLeaderChange(isLeader bool) {
+ select {
+ case schedulers.clusterLeaderChanged <- isLeader:
+ default:
+ mlog.Debug("Did not send cluster leader change message to schedulers as no schedulers listening to notification channel.")
+ }
+}