summaryrefslogtreecommitdiffstats
path: root/jobs/schedulers.go
diff options
context:
space:
mode:
Diffstat (limited to 'jobs/schedulers.go')
-rw-r--r--jobs/schedulers.go174
1 files changed, 114 insertions, 60 deletions
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
index 1cb4a6f28..cdf8d956d 100644
--- a/jobs/schedulers.go
+++ b/jobs/schedulers.go
@@ -5,106 +5,160 @@ package jobs
import (
"sync"
+ "time"
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
+ ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
type Schedulers struct {
- startOnce sync.Once
-
- DataRetention model.Scheduler
- ElasticsearchAggregation model.Scheduler
- LdapSync model.Scheduler
-
- listenerId string
+ stop chan bool
+ stopped chan bool
+ configChanged chan *model.Config
+ listenerId string
+ startOnce sync.Once
+
+ schedulers []model.Scheduler
+ nextRunTimes []*time.Time
}
func InitSchedulers() *Schedulers {
- schedulers := &Schedulers{}
+ l4g.Debug("Initialising schedulers.")
+ schedulers := &Schedulers{
+ stop: make(chan bool),
+ stopped: make(chan bool),
+ configChanged: make(chan *model.Config),
+ }
if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
- schedulers.DataRetention = dataRetentionInterface.MakeScheduler()
+ schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler())
}
if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil {
- schedulers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeScheduler()
+ schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler())
}
- if ldaySyncInterface := ejobs.GetLdapSyncInterface(); ldaySyncInterface != nil {
- schedulers.LdapSync = ldaySyncInterface.MakeScheduler()
+ if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil {
+ schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler())
}
+ schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers))
return schedulers
}
func (schedulers *Schedulers) Start() *Schedulers {
- l4g.Info("Starting schedulers")
-
- schedulers.startOnce.Do(func() {
- if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) {
- go schedulers.DataRetention.Run()
- }
-
- if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
- go schedulers.ElasticsearchAggregation.Run()
- }
+ schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
- if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
- go schedulers.LdapSync.Run()
- }
- })
+ go func() {
+ schedulers.startOnce.Do(func() {
+ l4g.Info("Starting schedulers.")
+
+ defer func() {
+ l4g.Info("Schedulers stopped.")
+ close(schedulers.stopped)
+ }()
+
+ now := time.Now()
+ for idx, scheduler := range schedulers.schedulers {
+ if !scheduler.Enabled(utils.Cfg) {
+ schedulers.nextRunTimes[idx] = nil
+ } else {
+ schedulers.setNextRunTime(utils.Cfg, idx, now, false)
+ }
+ }
+
+ for {
+ select {
+ case <-schedulers.stop:
+ l4g.Debug("Schedulers received stop signal.")
+ return
+ case now = <-time.After(1 * time.Minute):
+ cfg := utils.Cfg
+
+ for idx, nextTime := range schedulers.nextRunTimes {
+ if nextTime == nil {
+ continue
+ }
+
+ if time.Now().After(*nextTime) {
+ scheduler := schedulers.schedulers[idx]
+ if scheduler != nil {
+ if scheduler.Enabled(cfg) {
+ if _, err := schedulers.scheduleJob(cfg, scheduler); err != nil {
+ l4g.Warn("Failed to schedule job with scheduler: %v", scheduler.Name())
+ l4g.Error(err)
+ } else {
+ schedulers.setNextRunTime(cfg, idx, now, true)
+ }
+ }
+ }
+ }
+ }
+ case newCfg := <-schedulers.configChanged:
+ for idx, scheduler := range schedulers.schedulers {
+ if !scheduler.Enabled(newCfg) {
+ schedulers.nextRunTimes[idx] = nil
+ } else {
+ schedulers.setNextRunTime(newCfg, idx, now, false)
+ }
+ }
+ }
+ }
+ })
+ }()
- schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
+ return schedulers
+}
+func (schedulers *Schedulers) Stop() *Schedulers {
+ l4g.Info("Stopping schedulers.")
+ close(schedulers.stop)
+ <-schedulers.stopped
return schedulers
}
-func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
- if schedulers.DataRetention != nil {
- if (!*oldConfig.DataRetentionSettings.EnableMessageDeletion && !*oldConfig.DataRetentionSettings.EnableFileDeletion) && (*newConfig.DataRetentionSettings.EnableMessageDeletion || *newConfig.DataRetentionSettings.EnableFileDeletion) {
- go schedulers.DataRetention.Run()
- } else if (*oldConfig.DataRetentionSettings.EnableMessageDeletion || *oldConfig.DataRetentionSettings.EnableFileDeletion) && (!*newConfig.DataRetentionSettings.EnableMessageDeletion && !*newConfig.DataRetentionSettings.EnableFileDeletion) {
- schedulers.DataRetention.Stop()
- }
- }
+func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now time.Time, pendingJobs bool) {
+ scheduler := schedulers.schedulers[idx]
- if schedulers.ElasticsearchAggregation != nil {
- if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing {
- go schedulers.ElasticsearchAggregation.Run()
- } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing {
- schedulers.ElasticsearchAggregation.Stop()
+ if !pendingJobs {
+ if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil {
+ l4g.Error("Failed to set next job run time: " + err.Error())
+ schedulers.nextRunTimes[idx] = nil
+ return
+ } else {
+ pendingJobs = pj
}
}
- if schedulers.LdapSync != nil {
- if !*oldConfig.LdapSettings.Enable && *newConfig.LdapSettings.Enable {
- go schedulers.LdapSync.Run()
- } else if *oldConfig.LdapSettings.Enable && !*newConfig.LdapSettings.Enable {
- schedulers.LdapSync.Stop()
- }
+ lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType())
+ if err != nil {
+ l4g.Error("Failed to set next job run time: " + err.Error())
+ schedulers.nextRunTimes[idx] = nil
+ return
}
-}
-func (schedulers *Schedulers) Stop() *Schedulers {
- utils.RemoveConfigListener(schedulers.listenerId)
-
- if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) {
- schedulers.DataRetention.Stop()
- }
+ schedulers.nextRunTimes[idx] = scheduler.NextScheduleTime(cfg, now, pendingJobs, lastSuccessfulJob)
+ l4g.Debug("Next run time for scheduler %v: %v", scheduler.Name(), schedulers.nextRunTimes[idx])
+}
- if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
- schedulers.ElasticsearchAggregation.Stop()
+func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) {
+ pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType())
+ if err != nil {
+ return nil, err
}
- if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
- schedulers.LdapSync.Stop()
+ lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType())
+ if err2 != nil {
+ return nil, err
}
- l4g.Info("Stopped schedulers")
+ return scheduler.ScheduleJob(cfg, pendingJobs, lastSuccessfulJob)
+}
- return schedulers
+func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ l4g.Debug("Schedulers received config change.")
+ schedulers.configChanged <- newConfig
}