summaryrefslogtreecommitdiffstats
path: root/jobs/jobs.go
diff options
context:
space:
mode:
Diffstat (limited to 'jobs/jobs.go')
-rw-r--r--jobs/jobs.go122
1 files changed, 81 insertions, 41 deletions
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 8c84f4eea..58c2f2f13 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -4,71 +4,111 @@
package jobs
import (
- "sync"
+ "context"
+ "time"
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/platform/einterfaces/jobs"
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
- "github.com/mattermost/platform/utils"
)
-type Jobs struct {
- startOnce sync.Once
+const (
+ CANCEL_WATCHER_POLLING_INTERVAL = 5000
+)
- DataRetention model.Job
- // SearchIndexing model.Job
+func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) {
+ job := model.Job{
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_PENDING,
+ Data: jobData,
+ }
- listenerId string
+ if result := <-Srv.Store.Job().Save(&job); result.Err != nil {
+ return nil, result.Err
+ }
+
+ return &job, nil
}
-func InitJobs(s store.Store) *Jobs {
- jobs := &Jobs{
- // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+func ClaimJob(job *model.Job) (bool, *model.AppError) {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ success := result.Data.(bool)
+ return success, nil
}
+}
- if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
- jobs.DataRetention = dataRetentionInterface.MakeJob(s)
- }
+func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) {
+ var job *model.Job
- return jobs
-}
+ if result := <-Srv.Store.Job().Get(jobId); result.Err != nil {
+ return false, result.Err
+ } else {
+ job = result.Data.(*model.Job)
+ }
-func (jobs *Jobs) Start() *Jobs {
- l4g.Info("Starting jobs")
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.Progress = progress
- jobs.startOnce.Do(func() {
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
+ if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ if !result.Data.(bool) {
+ return false, nil
}
+ }
- // go jobs.SearchIndexing.Run()
- })
+ return true, nil
+}
- jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange)
+func SetJobSuccess(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
+ return result.Err
+}
- return jobs
+func SetJobError(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
+ return result.Err
}
-func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
- if jobs.DataRetention != nil {
- if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
- } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
- }
- }
+func SetJobCanceled(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
+ return result.Err
}
-func (jobs *Jobs) Stop() *Jobs {
- utils.RemoveConfigListener(jobs.listenerId)
+func RequestCancellation(job *model.Job) *model.AppError {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
+ }
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
}
- // jobs.SearchIndexing.Stop()
- l4g.Info("Stopped jobs")
+ return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id)
+}
- return jobs
+func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
+ for {
+ select {
+ case <-ctx.Done():
+ l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId)
+ return
+ case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond):
+ l4g.Debug("CancellationWatcher for Job: %v polling.", jobId)
+ if result := <-Srv.Store.Job().Get(jobId); result.Err == nil {
+ jobStatus := result.Data.(*model.Job)
+ if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED {
+ close(cancelChan)
+ return
+ }
+ }
+ }
+ }
}