summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2017-09-28 17:11:13 +0100
committerCorey Hulen <corey@hulen.com>2017-09-28 09:11:13 -0700
commita06830b2f88a8d374c326a1191870cbc7cf7dac2 (patch)
tree4879ce49de061fba894fe01b54db701c639f0e94
parentf263d2b9510fb557fe075dee5097cb32e2b1e5e2 (diff)
downloadchat-a06830b2f88a8d374c326a1191870cbc7cf7dac2.tar.gz
chat-a06830b2f88a8d374c326a1191870cbc7cf7dac2.tar.bz2
chat-a06830b2f88a8d374c326a1191870cbc7cf7dac2.zip
PLT-7644: Improve job scheduler architecture. (#7532)
-rw-r--r--i18n/en.json8
-rw-r--r--jobs/jobs.go26
-rw-r--r--jobs/schedulers.go174
-rw-r--r--jobs/testscheduler.go58
-rw-r--r--model/config.go4
-rw-r--r--model/job.go8
-rw-r--r--store/sqlstore/job_store.go58
-rw-r--r--store/sqlstore/job_store_test.go103
-rw-r--r--store/store.go2
9 files changed, 319 insertions, 122 deletions
diff --git a/i18n/en.json b/i18n/en.json
index 5469118d4..910f6adab 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -5680,6 +5680,14 @@
"translation": "We couldn't update the job"
},
{
+ "id": "store.sql_job.get_newest_job_by_status_and_type.app_error",
+ "translation": "We couldn't get the newest job by status and type"
+ },
+ {
+ "id": "store.sql_job.get_count_by_status_and_type.app_erro",
+ "translation": "We couldn't get the job count by status and type"
+ },
+ {
"id": "store.sql_license.get.app_error",
"translation": "We encountered an error getting the license"
},
diff --git a/jobs/jobs.go b/jobs/jobs.go
index a51780865..22d87e850 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -141,3 +141,29 @@ func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan inte
}
}
}
+
+func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Time {
+ nextTime := time.Date(now.Year(), now.Month(), now.Day(), nextStartTime.Hour(), nextStartTime.Minute(), 0, 0, time.Local)
+
+ if !now.Before(nextTime) {
+ nextTime = nextTime.AddDate(0, 0, 1)
+ }
+
+ return &nextTime
+}
+
+func CheckForPendingJobsByType(jobType string) (bool, *model.AppError) {
+ if result := <-Srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil {
+ return false, result.Err
+ } else {
+ return result.Data.(int64) > 0, nil
+ }
+}
+
+func GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) {
+ if result := <-Srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil {
+ return nil, result.Err
+ } else {
+ return result.Data.(*model.Job), nil
+ }
+}
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
index 1cb4a6f28..cdf8d956d 100644
--- a/jobs/schedulers.go
+++ b/jobs/schedulers.go
@@ -5,106 +5,160 @@ package jobs
import (
"sync"
+ "time"
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
+ ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
type Schedulers struct {
- startOnce sync.Once
-
- DataRetention model.Scheduler
- ElasticsearchAggregation model.Scheduler
- LdapSync model.Scheduler
-
- listenerId string
+ stop chan bool
+ stopped chan bool
+ configChanged chan *model.Config
+ listenerId string
+ startOnce sync.Once
+
+ schedulers []model.Scheduler
+ nextRunTimes []*time.Time
}
func InitSchedulers() *Schedulers {
- schedulers := &Schedulers{}
+ l4g.Debug("Initialising schedulers.")
+ schedulers := &Schedulers{
+ stop: make(chan bool),
+ stopped: make(chan bool),
+ configChanged: make(chan *model.Config),
+ }
if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
- schedulers.DataRetention = dataRetentionInterface.MakeScheduler()
+ schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler())
}
if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil {
- schedulers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeScheduler()
+ schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler())
}
- if ldaySyncInterface := ejobs.GetLdapSyncInterface(); ldaySyncInterface != nil {
- schedulers.LdapSync = ldaySyncInterface.MakeScheduler()
+ if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil {
+ schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler())
}
+ schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers))
return schedulers
}
func (schedulers *Schedulers) Start() *Schedulers {
- l4g.Info("Starting schedulers")
-
- schedulers.startOnce.Do(func() {
- if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) {
- go schedulers.DataRetention.Run()
- }
-
- if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
- go schedulers.ElasticsearchAggregation.Run()
- }
+ schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
- if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
- go schedulers.LdapSync.Run()
- }
- })
+ go func() {
+ schedulers.startOnce.Do(func() {
+ l4g.Info("Starting schedulers.")
+
+ defer func() {
+ l4g.Info("Schedulers stopped.")
+ close(schedulers.stopped)
+ }()
+
+ now := time.Now()
+ for idx, scheduler := range schedulers.schedulers {
+ if !scheduler.Enabled(utils.Cfg) {
+ schedulers.nextRunTimes[idx] = nil
+ } else {
+ schedulers.setNextRunTime(utils.Cfg, idx, now, false)
+ }
+ }
+
+ for {
+ select {
+ case <-schedulers.stop:
+ l4g.Debug("Schedulers received stop signal.")
+ return
+ case now = <-time.After(1 * time.Minute):
+ cfg := utils.Cfg
+
+ for idx, nextTime := range schedulers.nextRunTimes {
+ if nextTime == nil {
+ continue
+ }
+
+ if time.Now().After(*nextTime) {
+ scheduler := schedulers.schedulers[idx]
+ if scheduler != nil {
+ if scheduler.Enabled(cfg) {
+ if _, err := schedulers.scheduleJob(cfg, scheduler); err != nil {
+ l4g.Warn("Failed to schedule job with scheduler: %v", scheduler.Name())
+ l4g.Error(err)
+ } else {
+ schedulers.setNextRunTime(cfg, idx, now, true)
+ }
+ }
+ }
+ }
+ }
+ case newCfg := <-schedulers.configChanged:
+ for idx, scheduler := range schedulers.schedulers {
+ if !scheduler.Enabled(newCfg) {
+ schedulers.nextRunTimes[idx] = nil
+ } else {
+ schedulers.setNextRunTime(newCfg, idx, now, false)
+ }
+ }
+ }
+ }
+ })
+ }()
- schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
+ return schedulers
+}
+func (schedulers *Schedulers) Stop() *Schedulers {
+ l4g.Info("Stopping schedulers.")
+ close(schedulers.stop)
+ <-schedulers.stopped
return schedulers
}
-func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
- if schedulers.DataRetention != nil {
- if (!*oldConfig.DataRetentionSettings.EnableMessageDeletion && !*oldConfig.DataRetentionSettings.EnableFileDeletion) && (*newConfig.DataRetentionSettings.EnableMessageDeletion || *newConfig.DataRetentionSettings.EnableFileDeletion) {
- go schedulers.DataRetention.Run()
- } else if (*oldConfig.DataRetentionSettings.EnableMessageDeletion || *oldConfig.DataRetentionSettings.EnableFileDeletion) && (!*newConfig.DataRetentionSettings.EnableMessageDeletion && !*newConfig.DataRetentionSettings.EnableFileDeletion) {
- schedulers.DataRetention.Stop()
- }
- }
+func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now time.Time, pendingJobs bool) {
+ scheduler := schedulers.schedulers[idx]
- if schedulers.ElasticsearchAggregation != nil {
- if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing {
- go schedulers.ElasticsearchAggregation.Run()
- } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing {
- schedulers.ElasticsearchAggregation.Stop()
+ if !pendingJobs {
+ if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil {
+ l4g.Error("Failed to set next job run time: " + err.Error())
+ schedulers.nextRunTimes[idx] = nil
+ return
+ } else {
+ pendingJobs = pj
}
}
- if schedulers.LdapSync != nil {
- if !*oldConfig.LdapSettings.Enable && *newConfig.LdapSettings.Enable {
- go schedulers.LdapSync.Run()
- } else if *oldConfig.LdapSettings.Enable && !*newConfig.LdapSettings.Enable {
- schedulers.LdapSync.Stop()
- }
+ lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType())
+ if err != nil {
+ l4g.Error("Failed to set next job run time: " + err.Error())
+ schedulers.nextRunTimes[idx] = nil
+ return
}
-}
-func (schedulers *Schedulers) Stop() *Schedulers {
- utils.RemoveConfigListener(schedulers.listenerId)
-
- if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) {
- schedulers.DataRetention.Stop()
- }
+ schedulers.nextRunTimes[idx] = scheduler.NextScheduleTime(cfg, now, pendingJobs, lastSuccessfulJob)
+ l4g.Debug("Next run time for scheduler %v: %v", scheduler.Name(), schedulers.nextRunTimes[idx])
+}
- if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
- schedulers.ElasticsearchAggregation.Stop()
+func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) {
+ pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType())
+ if err != nil {
+ return nil, err
}
- if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
- schedulers.LdapSync.Stop()
+ lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType())
+ if err2 != nil {
+ return nil, err
}
- l4g.Info("Stopped schedulers")
+ return scheduler.ScheduleJob(cfg, pendingJobs, lastSuccessfulJob)
+}
- return schedulers
+func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ l4g.Debug("Schedulers received config change.")
+ schedulers.configChanged <- newConfig
}
diff --git a/jobs/testscheduler.go b/jobs/testscheduler.go
deleted file mode 100644
index d7345f67f..000000000
--- a/jobs/testscheduler.go
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package jobs
-
-import (
- "time"
-
- l4g "github.com/alecthomas/log4go"
-)
-
-type TestScheduler struct {
- name string
- jobType string
- stop chan bool
- stopped chan bool
-}
-
-func MakeTestScheduler(name string, jobType string) *TestScheduler {
- return &TestScheduler{
- name: name,
- jobType: jobType,
- stop: make(chan bool, 1),
- stopped: make(chan bool, 1),
- }
-}
-
-func (scheduler *TestScheduler) Run() {
- l4g.Debug("Scheduler %v: Started", scheduler.name)
-
- defer func() {
- l4g.Debug("Scheduler %v: Finished", scheduler.name)
- scheduler.stopped <- true
- }()
-
- for {
- select {
- case <-scheduler.stop:
- l4g.Debug("Scheduler %v: Received stop signal", scheduler.name)
- return
- case <-time.After(86400 * time.Second):
- l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name)
- scheduler.AddJob()
- }
- }
-}
-
-func (scheduler *TestScheduler) AddJob() {
- if _, err := CreateJob(scheduler.jobType, nil); err != nil {
- l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err)
- }
-}
-
-func (scheduler *TestScheduler) Stop() {
- l4g.Debug("Scheduler %v: Stopping", scheduler.name)
- scheduler.stop <- true
- <-scheduler.stopped
-}
diff --git a/model/config.go b/model/config.go
index 8b6aad0e5..6cf716372 100644
--- a/model/config.go
+++ b/model/config.go
@@ -1859,7 +1859,7 @@ func (o *Config) IsValid() *AppError {
return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.aggregate_posts_after_days.app_error", nil, "", http.StatusBadRequest)
}
- if _, err := time.Parse("03:04", *o.ElasticsearchSettings.PostsAggregatorJobStartTime); err != nil {
+ if _, err := time.Parse("15:04", *o.ElasticsearchSettings.PostsAggregatorJobStartTime); err != nil {
return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.posts_aggregator_job_start_time.app_error", nil, err.Error(), http.StatusBadRequest)
}
@@ -1871,7 +1871,7 @@ func (o *Config) IsValid() *AppError {
return NewAppError("Config.IsValid", "model.config.is_valid.data_retention.file_retention_days_too_low.app_error", nil, "", http.StatusBadRequest)
}
- if _, err := time.Parse("03:04", *o.DataRetentionSettings.DeletionJobStartTime); err != nil {
+ if _, err := time.Parse("15:04", *o.DataRetentionSettings.DeletionJobStartTime); err != nil {
return NewAppError("Config.IsValid", "model.config.is_valid.data_retention.deletion_job_start_time.app_error", nil, err.Error(), http.StatusBadRequest)
}
diff --git a/model/job.go b/model/job.go
index 85dfbec92..843d73fad 100644
--- a/model/job.go
+++ b/model/job.go
@@ -7,6 +7,7 @@ import (
"encoding/json"
"io"
"net/http"
+ "time"
)
const (
@@ -116,6 +117,9 @@ type Worker interface {
}
type Scheduler interface {
- Run()
- Stop()
+ Name() string
+ JobType() string
+ Enabled(cfg *Config) bool
+ NextScheduleTime(cfg *Config, now time.Time, pendingJobs bool, lastSuccessfulJob *Job) *time.Time
+ ScheduleJob(cfg *Config, pendingJobs bool, lastSuccessfulJob *Job) (*Job, *AppError)
}
diff --git a/store/sqlstore/job_store.go b/store/sqlstore/job_store.go
index c56f526af..0ae5a6a07 100644
--- a/store/sqlstore/job_store.go
+++ b/store/sqlstore/job_store.go
@@ -325,6 +325,64 @@ func (jss SqlJobStore) GetAllByStatus(status string) store.StoreChannel {
return storeChannel
}
+func (jss SqlJobStore) GetNewestJobByStatusAndType(status string, jobType string) store.StoreChannel {
+ storeChannel := make(store.StoreChannel, 1)
+
+ go func() {
+ result := store.StoreResult{}
+
+ var job *model.Job
+
+ if err := jss.GetReplica().SelectOne(&job,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Status = :Status
+ AND
+ Type = :Type
+ ORDER BY
+ CreateAt DESC
+ LIMIT 1`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil {
+ result.Err = model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_newest_job_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
+ } else {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetCountByStatusAndType(status string, jobType string) store.StoreChannel {
+ storeChannel := make(store.StoreChannel, 1)
+
+ go func() {
+ result := store.StoreResult{}
+
+ if count, err := jss.GetReplica().SelectInt(`SELECT
+ COUNT(*)
+ FROM
+ Jobs
+ WHERE
+ Status = :Status
+ AND
+ Type = :Type`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil {
+ result.Err = model.NewAppError("SqlJobStore.GetCountByStatusAndType", "store.sql_job.get_count_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
+ } else {
+ result.Data = count
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
func (jss SqlJobStore) Delete(id string) store.StoreChannel {
storeChannel := make(store.StoreChannel, 1)
diff --git a/store/sqlstore/job_store_test.go b/store/sqlstore/job_store_test.go
index bbeca6c3a..148d8b92d 100644
--- a/store/sqlstore/job_store_test.go
+++ b/store/sqlstore/job_store_test.go
@@ -10,6 +10,7 @@ import (
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
+ "github.com/stretchr/testify/assert"
)
func TestJobSaveGet(t *testing.T) {
@@ -231,6 +232,108 @@ func TestJobGetAllByStatus(t *testing.T) {
}
}
+func TestJobStoreGetNewestJobByStatusAndType(t *testing.T) {
+ ss := Setup()
+
+ jobType1 := model.NewId()
+ jobType2 := model.NewId()
+ status1 := model.NewId()
+ status2 := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1001,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1000,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType2,
+ CreateAt: 1003,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1004,
+ Status: status2,
+ },
+ }
+
+ for _, job := range jobs {
+ store.Must(ss.Job().Save(job))
+ defer ss.Job().Delete(job.Id)
+ }
+
+ result := <-ss.Job().GetNewestJobByStatusAndType(status1, jobType1)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, jobs[0].Id, result.Data.(*model.Job).Id)
+}
+
+func TestJobStoreGetCountByStatusAndType(t *testing.T) {
+ ss := Setup()
+
+ jobType1 := model.NewId()
+ jobType2 := model.NewId()
+ status1 := model.NewId()
+ status2 := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1000,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 999,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType2,
+ CreateAt: 1001,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1002,
+ Status: status2,
+ },
+ }
+
+ for _, job := range jobs {
+ store.Must(ss.Job().Save(job))
+ defer ss.Job().Delete(job.Id)
+ }
+
+ result := <-ss.Job().GetCountByStatusAndType(status1, jobType1)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, 2, result.Data.(int64))
+
+ result = <-ss.Job().GetCountByStatusAndType(status2, jobType2)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, 0, result.Data.(int64))
+
+ result = <-ss.Job().GetCountByStatusAndType(status1, jobType2)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, 1, result.Data.(int64))
+
+ result = <-ss.Job().GetCountByStatusAndType(status2, jobType1)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, 1, result.Data.(int64))
+}
+
func TestJobUpdateOptimistically(t *testing.T) {
ss := Setup()
diff --git a/store/store.go b/store/store.go
index d2c30318f..9694921c8 100644
--- a/store/store.go
+++ b/store/store.go
@@ -411,6 +411,8 @@ type JobStore interface {
GetAllByType(jobType string) StoreChannel
GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
GetAllByStatus(status string) StoreChannel
+ GetNewestJobByStatusAndType(status string, jobType string) StoreChannel
+ GetCountByStatusAndType(status string, jobType string) StoreChannel
Delete(id string) StoreChannel
}