From 83d53ea98cf5486f89bd4280b6b5ef835da4fd22 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Tue, 11 Jul 2017 09:09:15 +0100 Subject: PLT-6475: Elasticsearch Indexing Worker. (#6879) --- jobs/jobs.go | 52 +++++++++++++++++++++++++++++++++------------------- jobs/jobs_watcher.go | 7 +++++++ jobs/testworker.go | 2 +- jobs/workers.go | 31 ++++++++++++++++++++++++------- 4 files changed, 65 insertions(+), 27 deletions(-) (limited to 'jobs') diff --git a/jobs/jobs.go b/jobs/jobs.go index 58c2f2f13..9247355d0 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -9,6 +9,7 @@ import ( l4g "github.com/alecthomas/log4go" "github.com/mattermost/platform/model" + "net/http" ) const ( @@ -40,27 +41,15 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) { } } -func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) { - var job *model.Job - - if result := <-Srv.Store.Job().Get(jobId); result.Err != nil { - return false, result.Err - } else { - job = result.Data.(*model.Job) - } - +func SetJobProgress(job *model.Job, progress int64) (*model.AppError) { job.Status = model.JOB_STATUS_IN_PROGRESS job.Progress = progress if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { - return false, result.Err + return result.Err } else { - if !result.Data.(bool) { - return false, nil - } + return nil } - - return true, nil } func SetJobSuccess(job *model.Job) *model.AppError { @@ -68,9 +57,34 @@ func SetJobSuccess(job *model.Job) *model.AppError { return result.Err } -func SetJobError(job *model.Job) *model.AppError { - result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) - return result.Err +func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { + if jobError == nil { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + return result.Err + } + + job.Status = model.JOB_STATUS_ERROR + job.Progress = -1 + if job.Data == nil { + job.Data = make(map[string]interface{}) + } + job.Data["error"] = jobError + + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return result.Err + } else { + if !result.Data.(bool) { + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + return result.Err + } else { + if !result.Data.(bool) { + return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id=" + job.Id, http.StatusInternalServerError) + } + } + } + } + + return nil } func SetJobCanceled(job *model.Job) *model.AppError { @@ -91,7 +105,7 @@ func RequestCancellation(job *model.Job) *model.AppError { return nil } - return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id) + return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id, http.StatusInternalServerError) } func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index ada957ccc..5979d6207 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -79,6 +79,13 @@ func (watcher *Watcher) PollAndNotify() { default: } } + } else if js.Type == model.JOB_TYPE_SEARCH_INDEXING { + if watcher.workers.ElasticsearchIndexing != nil { + select { + case watcher.workers.ElasticsearchIndexing.JobChannel() <- j: + default: + } + } } } } diff --git a/jobs/testworker.go b/jobs/testworker.go index f1c8a07a3..385a2073b 100644 --- a/jobs/testworker.go +++ b/jobs/testworker.go @@ -85,7 +85,7 @@ func (worker *TestWorker) DoJob(job *model.Job) { } return } else { - if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil { + if err := SetJobProgress(job, int64(counter*10)); err != nil { l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error()) } } diff --git a/jobs/workers.go b/jobs/workers.go index a42ec4607..bb80ad79a 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -13,13 +13,13 @@ import ( ) type Workers struct { - startOnce sync.Once - watcher *Watcher + startOnce sync.Once + watcher *Watcher - DataRetention model.Worker - // SearchIndexing model.Job + DataRetention model.Worker + ElasticsearchIndexing model.Worker - listenerId string + listenerId string } func InitWorkers() *Workers { @@ -32,6 +32,10 @@ func InitWorkers() *Workers { workers.DataRetention = dataRetentionInterface.MakeWorker() } + if elasticsearchIndexerInterface := ejobs.GetElasticsearchIndexerInterface(); elasticsearchIndexerInterface != nil { + workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker() + } + return workers } @@ -43,7 +47,9 @@ func (workers *Workers) Start() *Workers { go workers.DataRetention.Run() } - // go workers.SearchIndexing.Run() + if workers.ElasticsearchIndexing != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing { + go workers.ElasticsearchIndexing.Run() + } go workers.watcher.Start() }) @@ -61,6 +67,14 @@ func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *m workers.DataRetention.Stop() } } + + if workers.ElasticsearchIndexing != nil { + if !*oldConfig.ElasticSearchSettings.EnableIndexing && *newConfig.ElasticSearchSettings.EnableIndexing { + go workers.ElasticsearchIndexing.Run() + } else if *oldConfig.ElasticSearchSettings.EnableIndexing && !*newConfig.ElasticSearchSettings.EnableIndexing { + workers.ElasticsearchIndexing.Stop() + } + } } func (workers *Workers) Stop() *Workers { @@ -71,7 +85,10 @@ func (workers *Workers) Stop() *Workers { if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { workers.DataRetention.Stop() } - // workers.SearchIndexing.Stop() + + if workers.ElasticsearchIndexing != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing { + workers.ElasticsearchIndexing.Stop() + } l4g.Info("Stopped workers") -- cgit v1.2.3-1-g7c22