summaryrefslogtreecommitdiffstats
path: root/jobs
diff options
context:
space:
mode:
authorChris <ccbrown112@gmail.com>2017-09-29 04:29:29 -0500
committerGeorge Goldberg <george@gberg.me>2017-09-29 10:29:29 +0100
commit4e79d2d4d037e7c33ec3e63d58110668106de222 (patch)
tree038995734b4b78cf025b47d2320d7e324eb80e0c /jobs
parentcb33179998c682bf1e48795d449d85f92cec97f7 (diff)
downloadchat-4e79d2d4d037e7c33ec3e63d58110668106de222.tar.gz
chat-4e79d2d4d037e7c33ec3e63d58110668106de222.tar.bz2
chat-4e79d2d4d037e7c33ec3e63d58110668106de222.zip
remove jobs.Srv and other jobs-related globals (#7535)
Diffstat (limited to 'jobs')
-rw-r--r--jobs/jobs.go50
-rw-r--r--jobs/jobs_watcher.go6
-rw-r--r--jobs/schedulers.go20
-rw-r--r--jobs/server.go35
-rw-r--r--jobs/testworker.go16
-rw-r--r--jobs/workers.go13
6 files changed, 75 insertions, 65 deletions
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 22d87e850..7e49b2f48 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -18,7 +18,7 @@ const (
CANCEL_WATCHER_POLLING_INTERVAL = 5000
)
-func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) {
+func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) {
job := model.Job{
Id: model.NewId(),
Type: jobType,
@@ -31,23 +31,23 @@ func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.Ap
return nil, err
}
- if result := <-Srv.Store.Job().Save(&job); result.Err != nil {
+ if result := <-srv.Store.Job().Save(&job); result.Err != nil {
return nil, result.Err
}
return &job, nil
}
-func GetJob(id string) (*model.Job, *model.AppError) {
- if result := <-Srv.Store.Job().Get(id); result.Err != nil {
+func (srv *JobServer) GetJob(id string) (*model.Job, *model.AppError) {
+ if result := <-srv.Store.Job().Get(id); result.Err != nil {
return nil, result.Err
} else {
return result.Data.(*model.Job), nil
}
}
-func ClaimJob(job *model.Job) (bool, *model.AppError) {
- if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+func (srv *JobServer) ClaimJob(job *model.Job) (bool, *model.AppError) {
+ if result := <-srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return false, result.Err
} else {
success := result.Data.(bool)
@@ -55,25 +55,25 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) {
}
}
-func SetJobProgress(job *model.Job, progress int64) *model.AppError {
+func (srv *JobServer) SetJobProgress(job *model.Job, progress int64) *model.AppError {
job.Status = model.JOB_STATUS_IN_PROGRESS
job.Progress = progress
- if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return result.Err
} else {
return nil
}
}
-func SetJobSuccess(job *model.Job) *model.AppError {
- result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
+func (srv *JobServer) SetJobSuccess(job *model.Job) *model.AppError {
+ result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
return result.Err
}
-func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
+func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
if jobError == nil {
- result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
+ result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
return result.Err
}
@@ -85,11 +85,11 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
jobError.Translate(utils.T)
job.Data["error"] = jobError.Message + " (" + jobError.DetailedError + ")"
- if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return result.Err
} else {
if !result.Data.(bool) {
- if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
return result.Err
} else {
if !result.Data.(bool) {
@@ -102,19 +102,19 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
return nil
}
-func SetJobCanceled(job *model.Job) *model.AppError {
- result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
+func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError {
+ result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
return result.Err
}
-func RequestCancellation(jobId string) *model.AppError {
- if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
+func (srv *JobServer) RequestCancellation(jobId string) *model.AppError {
+ if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
return result.Err
} else if result.Data.(bool) {
return nil
}
- if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
return result.Err
} else if result.Data.(bool) {
return nil
@@ -123,7 +123,7 @@ func RequestCancellation(jobId string) *model.AppError {
return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id="+jobId, http.StatusInternalServerError)
}
-func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
+func (srv *JobServer) CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
for {
select {
case <-ctx.Done():
@@ -131,7 +131,7 @@ func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan inte
return
case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond):
l4g.Debug("CancellationWatcher for Job: %v polling.", jobId)
- if result := <-Srv.Store.Job().Get(jobId); result.Err == nil {
+ if result := <-srv.Store.Job().Get(jobId); result.Err == nil {
jobStatus := result.Data.(*model.Job)
if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED {
close(cancelChan)
@@ -152,16 +152,16 @@ func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Tim
return &nextTime
}
-func CheckForPendingJobsByType(jobType string) (bool, *model.AppError) {
- if result := <-Srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil {
+func (srv *JobServer) CheckForPendingJobsByType(jobType string) (bool, *model.AppError) {
+ if result := <-srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil {
return false, result.Err
} else {
return result.Data.(int64) > 0, nil
}
}
-func GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) {
- if result := <-Srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil {
+func (srv *JobServer) GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) {
+ if result := <-srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil {
return nil, result.Err
} else {
return result.Data.(*model.Job), nil
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go
index 56cf9eb2e..f9a958fe3 100644
--- a/jobs/jobs_watcher.go
+++ b/jobs/jobs_watcher.go
@@ -16,6 +16,7 @@ const (
)
type Watcher struct {
+ srv *JobServer
workers *Workers
stop chan bool
@@ -23,12 +24,13 @@ type Watcher struct {
pollingInterval int
}
-func MakeWatcher(workers *Workers, pollingInterval int) *Watcher {
+func (srv *JobServer) MakeWatcher(workers *Workers, pollingInterval int) *Watcher {
return &Watcher{
stop: make(chan bool, 1),
stopped: make(chan bool, 1),
pollingInterval: pollingInterval,
workers: workers,
+ srv: srv,
}
}
@@ -63,7 +65,7 @@ func (watcher *Watcher) Stop() {
}
func (watcher *Watcher) PollAndNotify() {
- if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
+ if result := <-watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error())
} else {
jobs := result.Data.([]*model.Job)
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
index cdf8d956d..2f1ae394f 100644
--- a/jobs/schedulers.go
+++ b/jobs/schedulers.go
@@ -9,7 +9,6 @@ import (
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -20,28 +19,31 @@ type Schedulers struct {
configChanged chan *model.Config
listenerId string
startOnce sync.Once
+ jobs *JobServer
schedulers []model.Scheduler
nextRunTimes []*time.Time
}
-func InitSchedulers() *Schedulers {
+func (srv *JobServer) InitSchedulers() *Schedulers {
l4g.Debug("Initialising schedulers.")
+
schedulers := &Schedulers{
stop: make(chan bool),
stopped: make(chan bool),
configChanged: make(chan *model.Config),
+ jobs: srv,
}
- if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ if dataRetentionInterface := srv.DataRetention; dataRetentionInterface != nil {
schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler())
}
- if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil {
+ if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil {
schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler())
}
- if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil {
+ if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil {
schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler())
}
@@ -124,7 +126,7 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim
scheduler := schedulers.schedulers[idx]
if !pendingJobs {
- if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil {
+ if pj, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType()); err != nil {
l4g.Error("Failed to set next job run time: " + err.Error())
schedulers.nextRunTimes[idx] = nil
return
@@ -133,7 +135,7 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim
}
}
- lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType())
+ lastSuccessfulJob, err := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType())
if err != nil {
l4g.Error("Failed to set next job run time: " + err.Error())
schedulers.nextRunTimes[idx] = nil
@@ -145,12 +147,12 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim
}
func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) {
- pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType())
+ pendingJobs, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType())
if err != nil {
return nil, err
}
- lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType())
+ lastSuccessfulJob, err2 := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType())
if err2 != nil {
return nil, err
}
diff --git a/jobs/server.go b/jobs/server.go
index 6c857e7dc..667d6c075 100644
--- a/jobs/server.go
+++ b/jobs/server.go
@@ -5,6 +5,8 @@ package jobs
import (
l4g "github.com/alecthomas/log4go"
+
+ ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
"github.com/mattermost/mattermost-server/utils"
@@ -14,13 +16,16 @@ type JobServer struct {
Store store.Store
Workers *Workers
Schedulers *Schedulers
-}
-var Srv JobServer
+ DataRetention ejobs.DataRetentionInterface
+ ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface
+ ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface
+ LdapSync ejobs.LdapSyncInterface
+}
-func (server *JobServer) LoadLicense() {
+func (srv *JobServer) LoadLicense() {
licenseId := ""
- if result := <-server.Store.System().Get(); result.Err == nil {
+ if result := <-srv.Store.System().Get(); result.Err == nil {
props := result.Data.(model.StringMap)
licenseId = props[model.SYSTEM_ACTIVE_LICENSE_ID]
}
@@ -31,7 +36,7 @@ func (server *JobServer) LoadLicense() {
// Lets attempt to load the file from disk since it was missing from the DB
_, licenseBytes = utils.GetAndValidateLicenseFileFromDisk()
} else {
- if result := <-server.Store.License().Get(licenseId); result.Err == nil {
+ if result := <-srv.Store.License().Get(licenseId); result.Err == nil {
record := result.Data.(*model.LicenseRecord)
licenseBytes = []byte(record.Bytes)
l4g.Info("License key valid unlocking enterprise features.")
@@ -48,22 +53,22 @@ func (server *JobServer) LoadLicense() {
}
}
-func (server *JobServer) StartWorkers() {
- Srv.Workers = InitWorkers().Start()
+func (srv *JobServer) StartWorkers() {
+ srv.Workers = srv.InitWorkers().Start()
}
-func (server *JobServer) StartSchedulers() {
- Srv.Schedulers = InitSchedulers().Start()
+func (srv *JobServer) StartSchedulers() {
+ srv.Schedulers = srv.InitSchedulers().Start()
}
-func (server *JobServer) StopWorkers() {
- if Srv.Workers != nil {
- Srv.Workers.Stop()
+func (srv *JobServer) StopWorkers() {
+ if srv.Workers != nil {
+ srv.Workers.Stop()
}
}
-func (server *JobServer) StopSchedulers() {
- if Srv.Schedulers != nil {
- Srv.Schedulers.Stop()
+func (srv *JobServer) StopSchedulers() {
+ if srv.Schedulers != nil {
+ srv.Schedulers.Stop()
}
}
diff --git a/jobs/testworker.go b/jobs/testworker.go
index 29608e909..9cfc8614f 100644
--- a/jobs/testworker.go
+++ b/jobs/testworker.go
@@ -12,14 +12,16 @@ import (
)
type TestWorker struct {
+ srv *JobServer
name string
stop chan bool
stopped chan bool
jobs chan model.Job
}
-func MakeTestWorker(name string) *TestWorker {
+func (srv *JobServer) MakeTestWorker(name string) *TestWorker {
return &TestWorker{
+ srv: srv,
name: name,
stop: make(chan bool, 1),
stopped: make(chan bool, 1),
@@ -48,7 +50,7 @@ func (worker *TestWorker) Run() {
}
func (worker *TestWorker) DoJob(job *model.Job) {
- if claimed, err := ClaimJob(job); err != nil {
+ if claimed, err := worker.srv.ClaimJob(job); err != nil {
l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error())
return
} else if !claimed {
@@ -57,7 +59,7 @@ func (worker *TestWorker) DoJob(job *model.Job) {
cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
cancelWatcherChan := make(chan interface{}, 1)
- go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
+ go worker.srv.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
defer cancelCancelWatcher()
@@ -66,13 +68,13 @@ func (worker *TestWorker) DoJob(job *model.Job) {
select {
case <-cancelWatcherChan:
l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id)
- if err := SetJobCanceled(job); err != nil {
+ if err := worker.srv.SetJobCanceled(job); err != nil {
l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
}
return
case <-worker.stop:
l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id)
- if err := SetJobCanceled(job); err != nil {
+ if err := worker.srv.SetJobCanceled(job); err != nil {
l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
}
return
@@ -80,12 +82,12 @@ func (worker *TestWorker) DoJob(job *model.Job) {
counter++
if counter > 10 {
l4g.Debug("Job %v: Job completed.", job.Id)
- if err := SetJobSuccess(job); err != nil {
+ if err := worker.srv.SetJobSuccess(job); err != nil {
l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error())
}
return
} else {
- if err := SetJobProgress(job, int64(counter*10)); err != nil {
+ if err := worker.srv.SetJobProgress(job, int64(counter*10)); err != nil {
l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error())
}
}
diff --git a/jobs/workers.go b/jobs/workers.go
index 9f85adaf5..dfa150ff6 100644
--- a/jobs/workers.go
+++ b/jobs/workers.go
@@ -7,7 +7,6 @@ import (
"sync"
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -24,23 +23,23 @@ type Workers struct {
listenerId string
}
-func InitWorkers() *Workers {
+func (srv *JobServer) InitWorkers() *Workers {
workers := &Workers{}
- workers.Watcher = MakeWatcher(workers, DEFAULT_WATCHER_POLLING_INTERVAL)
+ workers.Watcher = srv.MakeWatcher(workers, DEFAULT_WATCHER_POLLING_INTERVAL)
- if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ if dataRetentionInterface := srv.DataRetention; dataRetentionInterface != nil {
workers.DataRetention = dataRetentionInterface.MakeWorker()
}
- if elasticsearchIndexerInterface := ejobs.GetElasticsearchIndexerInterface(); elasticsearchIndexerInterface != nil {
+ if elasticsearchIndexerInterface := srv.ElasticsearchIndexer; elasticsearchIndexerInterface != nil {
workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker()
}
- if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil {
+ if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil {
workers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeWorker()
}
- if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil {
+ if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil {
workers.LdapSync = ldapSyncInterface.MakeWorker()
}