From a06830b2f88a8d374c326a1191870cbc7cf7dac2 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Thu, 28 Sep 2017 17:11:13 +0100 Subject: PLT-7644: Improve job scheduler architecture. (#7532) --- jobs/jobs.go | 26 ++++++++ jobs/schedulers.go | 174 +++++++++++++++++++++++++++++++++----------------- jobs/testscheduler.go | 58 ----------------- 3 files changed, 140 insertions(+), 118 deletions(-) delete mode 100644 jobs/testscheduler.go (limited to 'jobs') diff --git a/jobs/jobs.go b/jobs/jobs.go index a51780865..22d87e850 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -141,3 +141,29 @@ func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan inte } } } + +func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Time { + nextTime := time.Date(now.Year(), now.Month(), now.Day(), nextStartTime.Hour(), nextStartTime.Minute(), 0, 0, time.Local) + + if !now.Before(nextTime) { + nextTime = nextTime.AddDate(0, 0, 1) + } + + return &nextTime +} + +func CheckForPendingJobsByType(jobType string) (bool, *model.AppError) { + if result := <-Srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil { + return false, result.Err + } else { + return result.Data.(int64) > 0, nil + } +} + +func GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) { + if result := <-Srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil { + return nil, result.Err + } else { + return result.Data.(*model.Job), nil + } +} diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 1cb4a6f28..cdf8d956d 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -5,106 +5,160 @@ package jobs import ( "sync" + "time" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" + ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) type Schedulers struct { - startOnce sync.Once - - DataRetention model.Scheduler - ElasticsearchAggregation model.Scheduler - LdapSync model.Scheduler - - listenerId string + stop chan bool + stopped chan bool + configChanged chan *model.Config + listenerId string + startOnce sync.Once + + schedulers []model.Scheduler + nextRunTimes []*time.Time } func InitSchedulers() *Schedulers { - schedulers := &Schedulers{} + l4g.Debug("Initialising schedulers.") + schedulers := &Schedulers{ + stop: make(chan bool), + stopped: make(chan bool), + configChanged: make(chan *model.Config), + } if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { - schedulers.DataRetention = dataRetentionInterface.MakeScheduler() + schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler()) } if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil { - schedulers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeScheduler() + schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler()) } - if ldaySyncInterface := ejobs.GetLdapSyncInterface(); ldaySyncInterface != nil { - schedulers.LdapSync = ldaySyncInterface.MakeScheduler() + if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil { + schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler()) } + schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers)) return schedulers } func (schedulers *Schedulers) Start() *Schedulers { - l4g.Info("Starting schedulers") - - schedulers.startOnce.Do(func() { - if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) { - go schedulers.DataRetention.Run() - } - - if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { - go schedulers.ElasticsearchAggregation.Run() - } + schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange) - if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable { - go schedulers.LdapSync.Run() - } - }) + go func() { + schedulers.startOnce.Do(func() { + l4g.Info("Starting schedulers.") + + defer func() { + l4g.Info("Schedulers stopped.") + close(schedulers.stopped) + }() + + now := time.Now() + for idx, scheduler := range schedulers.schedulers { + if !scheduler.Enabled(utils.Cfg) { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(utils.Cfg, idx, now, false) + } + } + + for { + select { + case <-schedulers.stop: + l4g.Debug("Schedulers received stop signal.") + return + case now = <-time.After(1 * time.Minute): + cfg := utils.Cfg + + for idx, nextTime := range schedulers.nextRunTimes { + if nextTime == nil { + continue + } + + if time.Now().After(*nextTime) { + scheduler := schedulers.schedulers[idx] + if scheduler != nil { + if scheduler.Enabled(cfg) { + if _, err := schedulers.scheduleJob(cfg, scheduler); err != nil { + l4g.Warn("Failed to schedule job with scheduler: %v", scheduler.Name()) + l4g.Error(err) + } else { + schedulers.setNextRunTime(cfg, idx, now, true) + } + } + } + } + } + case newCfg := <-schedulers.configChanged: + for idx, scheduler := range schedulers.schedulers { + if !scheduler.Enabled(newCfg) { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(newCfg, idx, now, false) + } + } + } + } + }) + }() - schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange) + return schedulers +} +func (schedulers *Schedulers) Stop() *Schedulers { + l4g.Info("Stopping schedulers.") + close(schedulers.stop) + <-schedulers.stopped return schedulers } -func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { - if schedulers.DataRetention != nil { - if (!*oldConfig.DataRetentionSettings.EnableMessageDeletion && !*oldConfig.DataRetentionSettings.EnableFileDeletion) && (*newConfig.DataRetentionSettings.EnableMessageDeletion || *newConfig.DataRetentionSettings.EnableFileDeletion) { - go schedulers.DataRetention.Run() - } else if (*oldConfig.DataRetentionSettings.EnableMessageDeletion || *oldConfig.DataRetentionSettings.EnableFileDeletion) && (!*newConfig.DataRetentionSettings.EnableMessageDeletion && !*newConfig.DataRetentionSettings.EnableFileDeletion) { - schedulers.DataRetention.Stop() - } - } +func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now time.Time, pendingJobs bool) { + scheduler := schedulers.schedulers[idx] - if schedulers.ElasticsearchAggregation != nil { - if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { - go schedulers.ElasticsearchAggregation.Run() - } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing { - schedulers.ElasticsearchAggregation.Stop() + if !pendingJobs { + if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil { + l4g.Error("Failed to set next job run time: " + err.Error()) + schedulers.nextRunTimes[idx] = nil + return + } else { + pendingJobs = pj } } - if schedulers.LdapSync != nil { - if !*oldConfig.LdapSettings.Enable && *newConfig.LdapSettings.Enable { - go schedulers.LdapSync.Run() - } else if *oldConfig.LdapSettings.Enable && !*newConfig.LdapSettings.Enable { - schedulers.LdapSync.Stop() - } + lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType()) + if err != nil { + l4g.Error("Failed to set next job run time: " + err.Error()) + schedulers.nextRunTimes[idx] = nil + return } -} -func (schedulers *Schedulers) Stop() *Schedulers { - utils.RemoveConfigListener(schedulers.listenerId) - - if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) { - schedulers.DataRetention.Stop() - } + schedulers.nextRunTimes[idx] = scheduler.NextScheduleTime(cfg, now, pendingJobs, lastSuccessfulJob) + l4g.Debug("Next run time for scheduler %v: %v", scheduler.Name(), schedulers.nextRunTimes[idx]) +} - if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { - schedulers.ElasticsearchAggregation.Stop() +func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) { + pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType()) + if err != nil { + return nil, err } - if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable { - schedulers.LdapSync.Stop() + lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType()) + if err2 != nil { + return nil, err } - l4g.Info("Stopped schedulers") + return scheduler.ScheduleJob(cfg, pendingJobs, lastSuccessfulJob) +} - return schedulers +func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { + l4g.Debug("Schedulers received config change.") + schedulers.configChanged <- newConfig } diff --git a/jobs/testscheduler.go b/jobs/testscheduler.go deleted file mode 100644 index d7345f67f..000000000 --- a/jobs/testscheduler.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package jobs - -import ( - "time" - - l4g "github.com/alecthomas/log4go" -) - -type TestScheduler struct { - name string - jobType string - stop chan bool - stopped chan bool -} - -func MakeTestScheduler(name string, jobType string) *TestScheduler { - return &TestScheduler{ - name: name, - jobType: jobType, - stop: make(chan bool, 1), - stopped: make(chan bool, 1), - } -} - -func (scheduler *TestScheduler) Run() { - l4g.Debug("Scheduler %v: Started", scheduler.name) - - defer func() { - l4g.Debug("Scheduler %v: Finished", scheduler.name) - scheduler.stopped <- true - }() - - for { - select { - case <-scheduler.stop: - l4g.Debug("Scheduler %v: Received stop signal", scheduler.name) - return - case <-time.After(86400 * time.Second): - l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name) - scheduler.AddJob() - } - } -} - -func (scheduler *TestScheduler) AddJob() { - if _, err := CreateJob(scheduler.jobType, nil); err != nil { - l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err) - } -} - -func (scheduler *TestScheduler) Stop() { - l4g.Debug("Scheduler %v: Stopping", scheduler.name) - scheduler.stop <- true - <-scheduler.stopped -} -- cgit v1.2.3-1-g7c22