From 4e79d2d4d037e7c33ec3e63d58110668106de222 Mon Sep 17 00:00:00 2001 From: Chris Date: Fri, 29 Sep 2017 04:29:29 -0500 Subject: remove jobs.Srv and other jobs-related globals (#7535) --- jobs/jobs.go | 50 +++++++++++++++++++++++++------------------------- jobs/jobs_watcher.go | 6 ++++-- jobs/schedulers.go | 20 +++++++++++--------- jobs/server.go | 35 ++++++++++++++++++++--------------- jobs/testworker.go | 16 +++++++++------- jobs/workers.go | 13 ++++++------- 6 files changed, 75 insertions(+), 65 deletions(-) (limited to 'jobs') diff --git a/jobs/jobs.go b/jobs/jobs.go index 22d87e850..7e49b2f48 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -18,7 +18,7 @@ const ( CANCEL_WATCHER_POLLING_INTERVAL = 5000 ) -func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { +func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { job := model.Job{ Id: model.NewId(), Type: jobType, @@ -31,23 +31,23 @@ func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.Ap return nil, err } - if result := <-Srv.Store.Job().Save(&job); result.Err != nil { + if result := <-srv.Store.Job().Save(&job); result.Err != nil { return nil, result.Err } return &job, nil } -func GetJob(id string) (*model.Job, *model.AppError) { - if result := <-Srv.Store.Job().Get(id); result.Err != nil { +func (srv *JobServer) GetJob(id string) (*model.Job, *model.AppError) { + if result := <-srv.Store.Job().Get(id); result.Err != nil { return nil, result.Err } else { return result.Data.(*model.Job), nil } } -func ClaimJob(job *model.Job) (bool, *model.AppError) { - if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { +func (srv *JobServer) ClaimJob(job *model.Job) (bool, *model.AppError) { + if result := <-srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { return false, result.Err } else { success := result.Data.(bool) @@ -55,25 +55,25 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) { } } -func SetJobProgress(job *model.Job, progress int64) *model.AppError { +func (srv *JobServer) SetJobProgress(job *model.Job, progress int64) *model.AppError { job.Status = model.JOB_STATUS_IN_PROGRESS job.Progress = progress - if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { return result.Err } else { return nil } } -func SetJobSuccess(job *model.Job) *model.AppError { - result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) +func (srv *JobServer) SetJobSuccess(job *model.Job) *model.AppError { + result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) return result.Err } -func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { +func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { if jobError == nil { - result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) return result.Err } @@ -85,11 +85,11 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { jobError.Translate(utils.T) job.Data["error"] = jobError.Message + " (" + jobError.DetailedError + ")" - if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { return result.Err } else { if !result.Data.(bool) { - if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { return result.Err } else { if !result.Data.(bool) { @@ -102,19 +102,19 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { return nil } -func SetJobCanceled(job *model.Job) *model.AppError { - result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) +func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError { + result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) return result.Err } -func RequestCancellation(jobId string) *model.AppError { - if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { +func (srv *JobServer) RequestCancellation(jobId string) *model.AppError { + if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { return result.Err } else if result.Data.(bool) { return nil } - if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { return result.Err } else if result.Data.(bool) { return nil @@ -123,7 +123,7 @@ func RequestCancellation(jobId string) *model.AppError { return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id="+jobId, http.StatusInternalServerError) } -func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { +func (srv *JobServer) CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { for { select { case <-ctx.Done(): @@ -131,7 +131,7 @@ func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan inte return case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond): l4g.Debug("CancellationWatcher for Job: %v polling.", jobId) - if result := <-Srv.Store.Job().Get(jobId); result.Err == nil { + if result := <-srv.Store.Job().Get(jobId); result.Err == nil { jobStatus := result.Data.(*model.Job) if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED { close(cancelChan) @@ -152,16 +152,16 @@ func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Tim return &nextTime } -func CheckForPendingJobsByType(jobType string) (bool, *model.AppError) { - if result := <-Srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil { +func (srv *JobServer) CheckForPendingJobsByType(jobType string) (bool, *model.AppError) { + if result := <-srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil { return false, result.Err } else { return result.Data.(int64) > 0, nil } } -func GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) { - if result := <-Srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil { +func (srv *JobServer) GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) { + if result := <-srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil { return nil, result.Err } else { return result.Data.(*model.Job), nil diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index 56cf9eb2e..f9a958fe3 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -16,6 +16,7 @@ const ( ) type Watcher struct { + srv *JobServer workers *Workers stop chan bool @@ -23,12 +24,13 @@ type Watcher struct { pollingInterval int } -func MakeWatcher(workers *Workers, pollingInterval int) *Watcher { +func (srv *JobServer) MakeWatcher(workers *Workers, pollingInterval int) *Watcher { return &Watcher{ stop: make(chan bool, 1), stopped: make(chan bool, 1), pollingInterval: pollingInterval, workers: workers, + srv: srv, } } @@ -63,7 +65,7 @@ func (watcher *Watcher) Stop() { } func (watcher *Watcher) PollAndNotify() { - if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil { + if result := <-watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil { l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error()) } else { jobs := result.Data.([]*model.Job) diff --git a/jobs/schedulers.go b/jobs/schedulers.go index cdf8d956d..2f1ae394f 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -9,7 +9,6 @@ import ( l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) @@ -20,28 +19,31 @@ type Schedulers struct { configChanged chan *model.Config listenerId string startOnce sync.Once + jobs *JobServer schedulers []model.Scheduler nextRunTimes []*time.Time } -func InitSchedulers() *Schedulers { +func (srv *JobServer) InitSchedulers() *Schedulers { l4g.Debug("Initialising schedulers.") + schedulers := &Schedulers{ stop: make(chan bool), stopped: make(chan bool), configChanged: make(chan *model.Config), + jobs: srv, } - if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { + if dataRetentionInterface := srv.DataRetention; dataRetentionInterface != nil { schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler()) } - if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil { + if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil { schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler()) } - if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil { + if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil { schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler()) } @@ -124,7 +126,7 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim scheduler := schedulers.schedulers[idx] if !pendingJobs { - if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil { + if pj, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType()); err != nil { l4g.Error("Failed to set next job run time: " + err.Error()) schedulers.nextRunTimes[idx] = nil return @@ -133,7 +135,7 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim } } - lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType()) + lastSuccessfulJob, err := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType()) if err != nil { l4g.Error("Failed to set next job run time: " + err.Error()) schedulers.nextRunTimes[idx] = nil @@ -145,12 +147,12 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim } func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) { - pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType()) + pendingJobs, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType()) if err != nil { return nil, err } - lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType()) + lastSuccessfulJob, err2 := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType()) if err2 != nil { return nil, err } diff --git a/jobs/server.go b/jobs/server.go index 6c857e7dc..667d6c075 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -5,6 +5,8 @@ package jobs import ( l4g "github.com/alecthomas/log4go" + + ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" "github.com/mattermost/mattermost-server/utils" @@ -14,13 +16,16 @@ type JobServer struct { Store store.Store Workers *Workers Schedulers *Schedulers -} -var Srv JobServer + DataRetention ejobs.DataRetentionInterface + ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface + ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface + LdapSync ejobs.LdapSyncInterface +} -func (server *JobServer) LoadLicense() { +func (srv *JobServer) LoadLicense() { licenseId := "" - if result := <-server.Store.System().Get(); result.Err == nil { + if result := <-srv.Store.System().Get(); result.Err == nil { props := result.Data.(model.StringMap) licenseId = props[model.SYSTEM_ACTIVE_LICENSE_ID] } @@ -31,7 +36,7 @@ func (server *JobServer) LoadLicense() { // Lets attempt to load the file from disk since it was missing from the DB _, licenseBytes = utils.GetAndValidateLicenseFileFromDisk() } else { - if result := <-server.Store.License().Get(licenseId); result.Err == nil { + if result := <-srv.Store.License().Get(licenseId); result.Err == nil { record := result.Data.(*model.LicenseRecord) licenseBytes = []byte(record.Bytes) l4g.Info("License key valid unlocking enterprise features.") @@ -48,22 +53,22 @@ func (server *JobServer) LoadLicense() { } } -func (server *JobServer) StartWorkers() { - Srv.Workers = InitWorkers().Start() +func (srv *JobServer) StartWorkers() { + srv.Workers = srv.InitWorkers().Start() } -func (server *JobServer) StartSchedulers() { - Srv.Schedulers = InitSchedulers().Start() +func (srv *JobServer) StartSchedulers() { + srv.Schedulers = srv.InitSchedulers().Start() } -func (server *JobServer) StopWorkers() { - if Srv.Workers != nil { - Srv.Workers.Stop() +func (srv *JobServer) StopWorkers() { + if srv.Workers != nil { + srv.Workers.Stop() } } -func (server *JobServer) StopSchedulers() { - if Srv.Schedulers != nil { - Srv.Schedulers.Stop() +func (srv *JobServer) StopSchedulers() { + if srv.Schedulers != nil { + srv.Schedulers.Stop() } } diff --git a/jobs/testworker.go b/jobs/testworker.go index 29608e909..9cfc8614f 100644 --- a/jobs/testworker.go +++ b/jobs/testworker.go @@ -12,14 +12,16 @@ import ( ) type TestWorker struct { + srv *JobServer name string stop chan bool stopped chan bool jobs chan model.Job } -func MakeTestWorker(name string) *TestWorker { +func (srv *JobServer) MakeTestWorker(name string) *TestWorker { return &TestWorker{ + srv: srv, name: name, stop: make(chan bool, 1), stopped: make(chan bool, 1), @@ -48,7 +50,7 @@ func (worker *TestWorker) Run() { } func (worker *TestWorker) DoJob(job *model.Job) { - if claimed, err := ClaimJob(job); err != nil { + if claimed, err := worker.srv.ClaimJob(job); err != nil { l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error()) return } else if !claimed { @@ -57,7 +59,7 @@ func (worker *TestWorker) DoJob(job *model.Job) { cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background()) cancelWatcherChan := make(chan interface{}, 1) - go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) + go worker.srv.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) defer cancelCancelWatcher() @@ -66,13 +68,13 @@ func (worker *TestWorker) DoJob(job *model.Job) { select { case <-cancelWatcherChan: l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id) - if err := SetJobCanceled(job); err != nil { + if err := worker.srv.SetJobCanceled(job); err != nil { l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) } return case <-worker.stop: l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id) - if err := SetJobCanceled(job); err != nil { + if err := worker.srv.SetJobCanceled(job); err != nil { l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error()) } return @@ -80,12 +82,12 @@ func (worker *TestWorker) DoJob(job *model.Job) { counter++ if counter > 10 { l4g.Debug("Job %v: Job completed.", job.Id) - if err := SetJobSuccess(job); err != nil { + if err := worker.srv.SetJobSuccess(job); err != nil { l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error()) } return } else { - if err := SetJobProgress(job, int64(counter*10)); err != nil { + if err := worker.srv.SetJobProgress(job, int64(counter*10)); err != nil { l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error()) } } diff --git a/jobs/workers.go b/jobs/workers.go index 9f85adaf5..dfa150ff6 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -7,7 +7,6 @@ import ( "sync" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" ) @@ -24,23 +23,23 @@ type Workers struct { listenerId string } -func InitWorkers() *Workers { +func (srv *JobServer) InitWorkers() *Workers { workers := &Workers{} - workers.Watcher = MakeWatcher(workers, DEFAULT_WATCHER_POLLING_INTERVAL) + workers.Watcher = srv.MakeWatcher(workers, DEFAULT_WATCHER_POLLING_INTERVAL) - if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { + if dataRetentionInterface := srv.DataRetention; dataRetentionInterface != nil { workers.DataRetention = dataRetentionInterface.MakeWorker() } - if elasticsearchIndexerInterface := ejobs.GetElasticsearchIndexerInterface(); elasticsearchIndexerInterface != nil { + if elasticsearchIndexerInterface := srv.ElasticsearchIndexer; elasticsearchIndexerInterface != nil { workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker() } - if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil { + if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil { workers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeWorker() } - if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil { + if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil { workers.LdapSync = ldapSyncInterface.MakeWorker() } -- cgit v1.2.3-1-g7c22