diff options
author | George Goldberg <george@gberg.me> | 2017-09-28 17:11:13 +0100 |
---|---|---|
committer | Corey Hulen <corey@hulen.com> | 2017-09-28 09:11:13 -0700 |
commit | a06830b2f88a8d374c326a1191870cbc7cf7dac2 (patch) | |
tree | 4879ce49de061fba894fe01b54db701c639f0e94 | |
parent | f263d2b9510fb557fe075dee5097cb32e2b1e5e2 (diff) | |
download | chat-a06830b2f88a8d374c326a1191870cbc7cf7dac2.tar.gz chat-a06830b2f88a8d374c326a1191870cbc7cf7dac2.tar.bz2 chat-a06830b2f88a8d374c326a1191870cbc7cf7dac2.zip |
PLT-7644: Improve job scheduler architecture. (#7532)
-rw-r--r-- | i18n/en.json | 8 | ||||
-rw-r--r-- | jobs/jobs.go | 26 | ||||
-rw-r--r-- | jobs/schedulers.go | 174 | ||||
-rw-r--r-- | jobs/testscheduler.go | 58 | ||||
-rw-r--r-- | model/config.go | 4 | ||||
-rw-r--r-- | model/job.go | 8 | ||||
-rw-r--r-- | store/sqlstore/job_store.go | 58 | ||||
-rw-r--r-- | store/sqlstore/job_store_test.go | 103 | ||||
-rw-r--r-- | store/store.go | 2 |
9 files changed, 319 insertions, 122 deletions
diff --git a/i18n/en.json b/i18n/en.json index 5469118d4..910f6adab 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -5680,6 +5680,14 @@ "translation": "We couldn't update the job" }, { + "id": "store.sql_job.get_newest_job_by_status_and_type.app_error", + "translation": "We couldn't get the newest job by status and type" + }, + { + "id": "store.sql_job.get_count_by_status_and_type.app_erro", + "translation": "We couldn't get the job count by status and type" + }, + { "id": "store.sql_license.get.app_error", "translation": "We encountered an error getting the license" }, diff --git a/jobs/jobs.go b/jobs/jobs.go index a51780865..22d87e850 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -141,3 +141,29 @@ func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan inte } } } + +func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Time { + nextTime := time.Date(now.Year(), now.Month(), now.Day(), nextStartTime.Hour(), nextStartTime.Minute(), 0, 0, time.Local) + + if !now.Before(nextTime) { + nextTime = nextTime.AddDate(0, 0, 1) + } + + return &nextTime +} + +func CheckForPendingJobsByType(jobType string) (bool, *model.AppError) { + if result := <-Srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil { + return false, result.Err + } else { + return result.Data.(int64) > 0, nil + } +} + +func GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) { + if result := <-Srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil { + return nil, result.Err + } else { + return result.Data.(*model.Job), nil + } +} diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 1cb4a6f28..cdf8d956d 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -5,106 +5,160 @@ package jobs import ( "sync" + "time" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" + ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) type Schedulers struct { - startOnce sync.Once - - DataRetention model.Scheduler - ElasticsearchAggregation model.Scheduler - LdapSync model.Scheduler - - listenerId string + stop chan bool + stopped chan bool + configChanged chan *model.Config + listenerId string + startOnce sync.Once + + schedulers []model.Scheduler + nextRunTimes []*time.Time } func InitSchedulers() *Schedulers { - schedulers := &Schedulers{} + l4g.Debug("Initialising schedulers.") + schedulers := &Schedulers{ + stop: make(chan bool), + stopped: make(chan bool), + configChanged: make(chan *model.Config), + } if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { - schedulers.DataRetention = dataRetentionInterface.MakeScheduler() + schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler()) } if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil { - schedulers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeScheduler() + schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler()) } - if ldaySyncInterface := ejobs.GetLdapSyncInterface(); ldaySyncInterface != nil { - schedulers.LdapSync = ldaySyncInterface.MakeScheduler() + if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil { + schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler()) } + schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers)) return schedulers } func (schedulers *Schedulers) Start() *Schedulers { - l4g.Info("Starting schedulers") - - schedulers.startOnce.Do(func() { - if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) { - go schedulers.DataRetention.Run() - } - - if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { - go schedulers.ElasticsearchAggregation.Run() - } + schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange) - if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable { - go schedulers.LdapSync.Run() - } - }) + go func() { + schedulers.startOnce.Do(func() { + l4g.Info("Starting schedulers.") + + defer func() { + l4g.Info("Schedulers stopped.") + close(schedulers.stopped) + }() + + now := time.Now() + for idx, scheduler := range schedulers.schedulers { + if !scheduler.Enabled(utils.Cfg) { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(utils.Cfg, idx, now, false) + } + } + + for { + select { + case <-schedulers.stop: + l4g.Debug("Schedulers received stop signal.") + return + case now = <-time.After(1 * time.Minute): + cfg := utils.Cfg + + for idx, nextTime := range schedulers.nextRunTimes { + if nextTime == nil { + continue + } + + if time.Now().After(*nextTime) { + scheduler := schedulers.schedulers[idx] + if scheduler != nil { + if scheduler.Enabled(cfg) { + if _, err := schedulers.scheduleJob(cfg, scheduler); err != nil { + l4g.Warn("Failed to schedule job with scheduler: %v", scheduler.Name()) + l4g.Error(err) + } else { + schedulers.setNextRunTime(cfg, idx, now, true) + } + } + } + } + } + case newCfg := <-schedulers.configChanged: + for idx, scheduler := range schedulers.schedulers { + if !scheduler.Enabled(newCfg) { + schedulers.nextRunTimes[idx] = nil + } else { + schedulers.setNextRunTime(newCfg, idx, now, false) + } + } + } + } + }) + }() - schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange) + return schedulers +} +func (schedulers *Schedulers) Stop() *Schedulers { + l4g.Info("Stopping schedulers.") + close(schedulers.stop) + <-schedulers.stopped return schedulers } -func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { - if schedulers.DataRetention != nil { - if (!*oldConfig.DataRetentionSettings.EnableMessageDeletion && !*oldConfig.DataRetentionSettings.EnableFileDeletion) && (*newConfig.DataRetentionSettings.EnableMessageDeletion || *newConfig.DataRetentionSettings.EnableFileDeletion) { - go schedulers.DataRetention.Run() - } else if (*oldConfig.DataRetentionSettings.EnableMessageDeletion || *oldConfig.DataRetentionSettings.EnableFileDeletion) && (!*newConfig.DataRetentionSettings.EnableMessageDeletion && !*newConfig.DataRetentionSettings.EnableFileDeletion) { - schedulers.DataRetention.Stop() - } - } +func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now time.Time, pendingJobs bool) { + scheduler := schedulers.schedulers[idx] - if schedulers.ElasticsearchAggregation != nil { - if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { - go schedulers.ElasticsearchAggregation.Run() - } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing { - schedulers.ElasticsearchAggregation.Stop() + if !pendingJobs { + if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil { + l4g.Error("Failed to set next job run time: " + err.Error()) + schedulers.nextRunTimes[idx] = nil + return + } else { + pendingJobs = pj } } - if schedulers.LdapSync != nil { - if !*oldConfig.LdapSettings.Enable && *newConfig.LdapSettings.Enable { - go schedulers.LdapSync.Run() - } else if *oldConfig.LdapSettings.Enable && !*newConfig.LdapSettings.Enable { - schedulers.LdapSync.Stop() - } + lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType()) + if err != nil { + l4g.Error("Failed to set next job run time: " + err.Error()) + schedulers.nextRunTimes[idx] = nil + return } -} -func (schedulers *Schedulers) Stop() *Schedulers { - utils.RemoveConfigListener(schedulers.listenerId) - - if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) { - schedulers.DataRetention.Stop() - } + schedulers.nextRunTimes[idx] = scheduler.NextScheduleTime(cfg, now, pendingJobs, lastSuccessfulJob) + l4g.Debug("Next run time for scheduler %v: %v", scheduler.Name(), schedulers.nextRunTimes[idx]) +} - if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing { - schedulers.ElasticsearchAggregation.Stop() +func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) { + pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType()) + if err != nil { + return nil, err } - if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable { - schedulers.LdapSync.Stop() + lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType()) + if err2 != nil { + return nil, err } - l4g.Info("Stopped schedulers") + return scheduler.ScheduleJob(cfg, pendingJobs, lastSuccessfulJob) +} - return schedulers +func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { + l4g.Debug("Schedulers received config change.") + schedulers.configChanged <- newConfig } diff --git a/jobs/testscheduler.go b/jobs/testscheduler.go deleted file mode 100644 index d7345f67f..000000000 --- a/jobs/testscheduler.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package jobs - -import ( - "time" - - l4g "github.com/alecthomas/log4go" -) - -type TestScheduler struct { - name string - jobType string - stop chan bool - stopped chan bool -} - -func MakeTestScheduler(name string, jobType string) *TestScheduler { - return &TestScheduler{ - name: name, - jobType: jobType, - stop: make(chan bool, 1), - stopped: make(chan bool, 1), - } -} - -func (scheduler *TestScheduler) Run() { - l4g.Debug("Scheduler %v: Started", scheduler.name) - - defer func() { - l4g.Debug("Scheduler %v: Finished", scheduler.name) - scheduler.stopped <- true - }() - - for { - select { - case <-scheduler.stop: - l4g.Debug("Scheduler %v: Received stop signal", scheduler.name) - return - case <-time.After(86400 * time.Second): - l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name) - scheduler.AddJob() - } - } -} - -func (scheduler *TestScheduler) AddJob() { - if _, err := CreateJob(scheduler.jobType, nil); err != nil { - l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err) - } -} - -func (scheduler *TestScheduler) Stop() { - l4g.Debug("Scheduler %v: Stopping", scheduler.name) - scheduler.stop <- true - <-scheduler.stopped -} diff --git a/model/config.go b/model/config.go index 8b6aad0e5..6cf716372 100644 --- a/model/config.go +++ b/model/config.go @@ -1859,7 +1859,7 @@ func (o *Config) IsValid() *AppError { return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.aggregate_posts_after_days.app_error", nil, "", http.StatusBadRequest) } - if _, err := time.Parse("03:04", *o.ElasticsearchSettings.PostsAggregatorJobStartTime); err != nil { + if _, err := time.Parse("15:04", *o.ElasticsearchSettings.PostsAggregatorJobStartTime); err != nil { return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.posts_aggregator_job_start_time.app_error", nil, err.Error(), http.StatusBadRequest) } @@ -1871,7 +1871,7 @@ func (o *Config) IsValid() *AppError { return NewAppError("Config.IsValid", "model.config.is_valid.data_retention.file_retention_days_too_low.app_error", nil, "", http.StatusBadRequest) } - if _, err := time.Parse("03:04", *o.DataRetentionSettings.DeletionJobStartTime); err != nil { + if _, err := time.Parse("15:04", *o.DataRetentionSettings.DeletionJobStartTime); err != nil { return NewAppError("Config.IsValid", "model.config.is_valid.data_retention.deletion_job_start_time.app_error", nil, err.Error(), http.StatusBadRequest) } diff --git a/model/job.go b/model/job.go index 85dfbec92..843d73fad 100644 --- a/model/job.go +++ b/model/job.go @@ -7,6 +7,7 @@ import ( "encoding/json" "io" "net/http" + "time" ) const ( @@ -116,6 +117,9 @@ type Worker interface { } type Scheduler interface { - Run() - Stop() + Name() string + JobType() string + Enabled(cfg *Config) bool + NextScheduleTime(cfg *Config, now time.Time, pendingJobs bool, lastSuccessfulJob *Job) *time.Time + ScheduleJob(cfg *Config, pendingJobs bool, lastSuccessfulJob *Job) (*Job, *AppError) } diff --git a/store/sqlstore/job_store.go b/store/sqlstore/job_store.go index c56f526af..0ae5a6a07 100644 --- a/store/sqlstore/job_store.go +++ b/store/sqlstore/job_store.go @@ -325,6 +325,64 @@ func (jss SqlJobStore) GetAllByStatus(status string) store.StoreChannel { return storeChannel } +func (jss SqlJobStore) GetNewestJobByStatusAndType(status string, jobType string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + var job *model.Job + + if err := jss.GetReplica().SelectOne(&job, + `SELECT + * + FROM + Jobs + WHERE + Status = :Status + AND + Type = :Type + ORDER BY + CreateAt DESC + LIMIT 1`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil { + result.Err = model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_newest_job_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) + } else { + result.Data = job + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetCountByStatusAndType(status string, jobType string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + if count, err := jss.GetReplica().SelectInt(`SELECT + COUNT(*) + FROM + Jobs + WHERE + Status = :Status + AND + Type = :Type`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil { + result.Err = model.NewAppError("SqlJobStore.GetCountByStatusAndType", "store.sql_job.get_count_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) + } else { + result.Data = count + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + func (jss SqlJobStore) Delete(id string) store.StoreChannel { storeChannel := make(store.StoreChannel, 1) diff --git a/store/sqlstore/job_store_test.go b/store/sqlstore/job_store_test.go index bbeca6c3a..148d8b92d 100644 --- a/store/sqlstore/job_store_test.go +++ b/store/sqlstore/job_store_test.go @@ -10,6 +10,7 @@ import ( "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" + "github.com/stretchr/testify/assert" ) func TestJobSaveGet(t *testing.T) { @@ -231,6 +232,108 @@ func TestJobGetAllByStatus(t *testing.T) { } } +func TestJobStoreGetNewestJobByStatusAndType(t *testing.T) { + ss := Setup() + + jobType1 := model.NewId() + jobType2 := model.NewId() + status1 := model.NewId() + status2 := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1001, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1000, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType2, + CreateAt: 1003, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1004, + Status: status2, + }, + } + + for _, job := range jobs { + store.Must(ss.Job().Save(job)) + defer ss.Job().Delete(job.Id) + } + + result := <-ss.Job().GetNewestJobByStatusAndType(status1, jobType1) + assert.Nil(t, result.Err) + assert.EqualValues(t, jobs[0].Id, result.Data.(*model.Job).Id) +} + +func TestJobStoreGetCountByStatusAndType(t *testing.T) { + ss := Setup() + + jobType1 := model.NewId() + jobType2 := model.NewId() + status1 := model.NewId() + status2 := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1000, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 999, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType2, + CreateAt: 1001, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1002, + Status: status2, + }, + } + + for _, job := range jobs { + store.Must(ss.Job().Save(job)) + defer ss.Job().Delete(job.Id) + } + + result := <-ss.Job().GetCountByStatusAndType(status1, jobType1) + assert.Nil(t, result.Err) + assert.EqualValues(t, 2, result.Data.(int64)) + + result = <-ss.Job().GetCountByStatusAndType(status2, jobType2) + assert.Nil(t, result.Err) + assert.EqualValues(t, 0, result.Data.(int64)) + + result = <-ss.Job().GetCountByStatusAndType(status1, jobType2) + assert.Nil(t, result.Err) + assert.EqualValues(t, 1, result.Data.(int64)) + + result = <-ss.Job().GetCountByStatusAndType(status2, jobType1) + assert.Nil(t, result.Err) + assert.EqualValues(t, 1, result.Data.(int64)) +} + func TestJobUpdateOptimistically(t *testing.T) { ss := Setup() diff --git a/store/store.go b/store/store.go index d2c30318f..9694921c8 100644 --- a/store/store.go +++ b/store/store.go @@ -411,6 +411,8 @@ type JobStore interface { GetAllByType(jobType string) StoreChannel GetAllByTypePage(jobType string, offset int, limit int) StoreChannel GetAllByStatus(status string) StoreChannel + GetNewestJobByStatusAndType(status string, jobType string) StoreChannel + GetCountByStatusAndType(status string, jobType string) StoreChannel Delete(id string) StoreChannel } |