diff options
Diffstat (limited to 'jobs/jobs.go')
-rw-r--r-- | jobs/jobs.go | 136 |
1 files changed, 95 insertions, 41 deletions
diff --git a/jobs/jobs.go b/jobs/jobs.go index 8c84f4eea..9247355d0 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -4,71 +4,125 @@ package jobs import ( - "sync" + "context" + "time" l4g "github.com/alecthomas/log4go" - ejobs "github.com/mattermost/platform/einterfaces/jobs" "github.com/mattermost/platform/model" - "github.com/mattermost/platform/store" - "github.com/mattermost/platform/utils" + "net/http" ) -type Jobs struct { - startOnce sync.Once +const ( + CANCEL_WATCHER_POLLING_INTERVAL = 5000 +) - DataRetention model.Job - // SearchIndexing model.Job +func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) { + job := model.Job{ + Id: model.NewId(), + Type: jobType, + CreateAt: model.GetMillis(), + Status: model.JOB_STATUS_PENDING, + Data: jobData, + } - listenerId string + if result := <-Srv.Store.Job().Save(&job); result.Err != nil { + return nil, result.Err + } + + return &job, nil } -func InitJobs(s store.Store) *Jobs { - jobs := &Jobs{ - // SearchIndexing: MakeTestJob(s, "SearchIndexing"), +func ClaimJob(job *model.Job) (bool, *model.AppError) { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return false, result.Err + } else { + success := result.Data.(bool) + return success, nil } +} - if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { - jobs.DataRetention = dataRetentionInterface.MakeJob(s) +func SetJobProgress(job *model.Job, progress int64) (*model.AppError) { + job.Status = model.JOB_STATUS_IN_PROGRESS + job.Progress = progress + + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return result.Err + } else { + return nil } +} - return jobs +func SetJobSuccess(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) + return result.Err } -func (jobs *Jobs) Start() *Jobs { - l4g.Info("Starting jobs") +func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { + if jobError == nil { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + return result.Err + } - jobs.startOnce.Do(func() { - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() + job.Status = model.JOB_STATUS_ERROR + job.Progress = -1 + if job.Data == nil { + job.Data = make(map[string]interface{}) + } + job.Data["error"] = jobError + + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + return result.Err + } else { + if !result.Data.(bool) { + if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + return result.Err + } else { + if !result.Data.(bool) { + return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id=" + job.Id, http.StatusInternalServerError) + } + } } + } - // go jobs.SearchIndexing.Run() - }) - - jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange) - - return jobs + return nil } -func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { - if jobs.DataRetention != nil { - if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { - go jobs.DataRetention.Run() - } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() - } - } +func SetJobCanceled(job *model.Job) *model.AppError { + result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) + return result.Err } -func (jobs *Jobs) Stop() *Jobs { - utils.RemoveConfigListener(jobs.listenerId) +func RequestCancellation(job *model.Job) *model.AppError { + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil + } - if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { - jobs.DataRetention.Stop() + if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { + return result.Err + } else if result.Data.(bool) { + return nil } - // jobs.SearchIndexing.Stop() - l4g.Info("Stopped jobs") + return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id, http.StatusInternalServerError) +} - return jobs +func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) { + for { + select { + case <-ctx.Done(): + l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId) + return + case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond): + l4g.Debug("CancellationWatcher for Job: %v polling.", jobId) + if result := <-Srv.Store.Job().Get(jobId); result.Err == nil { + jobStatus := result.Data.(*model.Job) + if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED { + close(cancelChan) + return + } + } + } + } } |