summaryrefslogtreecommitdiffstats
path: root/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'jobs')
-rw-r--r--jobs/jobs.go122
-rw-r--r--jobs/jobs_watcher.go85
-rw-r--r--jobs/jobserver/jobserver.go15
-rw-r--r--jobs/schedulers.go68
-rw-r--r--jobs/server.go31
-rw-r--r--jobs/testjob.go54
-rw-r--r--jobs/testscheduler.go58
-rw-r--r--jobs/testworker.go104
-rw-r--r--jobs/workers.go79
9 files changed, 511 insertions, 105 deletions
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 8c84f4eea..58c2f2f13 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -4,71 +4,111 @@
package jobs
import (
- "sync"
+ "context"
+ "time"
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/platform/einterfaces/jobs"
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
- "github.com/mattermost/platform/utils"
)
-type Jobs struct {
- startOnce sync.Once
+const (
+ CANCEL_WATCHER_POLLING_INTERVAL = 5000
+)
- DataRetention model.Job
- // SearchIndexing model.Job
+func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) {
+ job := model.Job{
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_PENDING,
+ Data: jobData,
+ }
- listenerId string
+ if result := <-Srv.Store.Job().Save(&job); result.Err != nil {
+ return nil, result.Err
+ }
+
+ return &job, nil
}
-func InitJobs(s store.Store) *Jobs {
- jobs := &Jobs{
- // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+func ClaimJob(job *model.Job) (bool, *model.AppError) {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ success := result.Data.(bool)
+ return success, nil
}
+}
- if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
- jobs.DataRetention = dataRetentionInterface.MakeJob(s)
- }
+func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) {
+ var job *model.Job
- return jobs
-}
+ if result := <-Srv.Store.Job().Get(jobId); result.Err != nil {
+ return false, result.Err
+ } else {
+ job = result.Data.(*model.Job)
+ }
-func (jobs *Jobs) Start() *Jobs {
- l4g.Info("Starting jobs")
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.Progress = progress
- jobs.startOnce.Do(func() {
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
+ if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ if !result.Data.(bool) {
+ return false, nil
}
+ }
- // go jobs.SearchIndexing.Run()
- })
+ return true, nil
+}
- jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange)
+func SetJobSuccess(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
+ return result.Err
+}
- return jobs
+func SetJobError(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
+ return result.Err
}
-func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
- if jobs.DataRetention != nil {
- if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
- } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
- }
- }
+func SetJobCanceled(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
+ return result.Err
}
-func (jobs *Jobs) Stop() *Jobs {
- utils.RemoveConfigListener(jobs.listenerId)
+func RequestCancellation(job *model.Job) *model.AppError {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
+ }
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
}
- // jobs.SearchIndexing.Stop()
- l4g.Info("Stopped jobs")
+ return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id)
+}
- return jobs
+func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
+ for {
+ select {
+ case <-ctx.Done():
+ l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId)
+ return
+ case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond):
+ l4g.Debug("CancellationWatcher for Job: %v polling.", jobId)
+ if result := <-Srv.Store.Job().Get(jobId); result.Err == nil {
+ jobStatus := result.Data.(*model.Job)
+ if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED {
+ close(cancelChan)
+ return
+ }
+ }
+ }
+ }
}
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go
new file mode 100644
index 000000000..ada957ccc
--- /dev/null
+++ b/jobs/jobs_watcher.go
@@ -0,0 +1,85 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "math/rand"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+const (
+ WATCHER_POLLING_INTERVAL = 15000
+)
+
+type Watcher struct {
+ workers *Workers
+
+ stop chan bool
+ stopped chan bool
+}
+
+func MakeWatcher(workers *Workers) *Watcher {
+ return &Watcher{
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ workers: workers,
+ }
+}
+
+func (watcher *Watcher) Start() {
+ l4g.Debug("Watcher Started")
+
+ // Delay for some random number of milliseconds before starting to ensure that multiple
+ // instances of the jobserver don't poll at a time too close to each other.
+ rand.Seed(time.Now().UTC().UnixNano())
+ _ = <-time.After(time.Duration(rand.Intn(WATCHER_POLLING_INTERVAL)) * time.Millisecond)
+
+ defer func(){
+ l4g.Debug("Watcher Finished")
+ watcher.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-watcher.stop:
+ l4g.Debug("Watcher: Received stop signal")
+ return
+ case <-time.After(WATCHER_POLLING_INTERVAL * time.Millisecond):
+ watcher.PollAndNotify()
+ }
+ }
+}
+
+func (watcher *Watcher) Stop() {
+ l4g.Debug("Watcher Stopping")
+ watcher.stop <- true
+ <-watcher.stopped
+}
+
+func (watcher *Watcher) PollAndNotify() {
+ if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
+ l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error())
+ } else {
+ jobStatuses := result.Data.([]*model.Job)
+
+ for _, js := range jobStatuses {
+ j := model.Job{
+ Type: js.Type,
+ Id: js.Id,
+ }
+
+ if js.Type == model.JOB_TYPE_DATA_RETENTION {
+ if watcher.workers.DataRetention != nil {
+ select {
+ case watcher.workers.DataRetention.JobChannel() <- j:
+ default:
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/jobs/jobserver/jobserver.go b/jobs/jobserver/jobserver.go
index 5f491a815..aabe5d3b2 100644
--- a/jobs/jobserver/jobserver.go
+++ b/jobs/jobserver/jobserver.go
@@ -16,22 +16,20 @@ import (
_ "github.com/mattermost/platform/imports"
)
-var Srv jobs.JobServer
-
func main() {
// Initialize
utils.InitAndLoadConfig("config.json")
defer l4g.Close()
- Srv.Store = store.NewLayeredStore()
- defer Srv.Store.Close()
+ jobs.Srv.Store = store.NewLayeredStore()
+ defer jobs.Srv.Store.Close()
- Srv.LoadLicense()
+ jobs.Srv.LoadLicense()
// Run jobs
l4g.Info("Starting Mattermost job server")
- Srv.Jobs = jobs.InitJobs(Srv.Store)
- Srv.Jobs.Start()
+ jobs.Srv.StartWorkers()
+ jobs.Srv.StartSchedulers()
var signalChan chan os.Signal = make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
@@ -40,7 +38,8 @@ func main() {
// Cleanup anything that isn't handled by a defer statement
l4g.Info("Stopping Mattermost job server")
- Srv.Jobs.Stop()
+ jobs.Srv.StopSchedulers()
+ jobs.Srv.StopWorkers()
l4g.Info("Stopped Mattermost job server")
}
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
new file mode 100644
index 000000000..73ec6661a
--- /dev/null
+++ b/jobs/schedulers.go
@@ -0,0 +1,68 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "sync"
+
+ l4g "github.com/alecthomas/log4go"
+ ejobs "github.com/mattermost/platform/einterfaces/jobs"
+
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+type Schedulers struct {
+ startOnce sync.Once
+
+ DataRetention model.Scheduler
+
+ listenerId string
+}
+
+func InitSchedulers() *Schedulers {
+ schedulers := &Schedulers{}
+
+ if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ schedulers.DataRetention = dataRetentionInterface.MakeScheduler()
+ }
+
+ return schedulers
+}
+
+func (schedulers *Schedulers) Start() *Schedulers {
+ l4g.Info("Starting schedulers")
+
+ schedulers.startOnce.Do(func() {
+ if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ go schedulers.DataRetention.Run()
+ }
+ })
+
+ schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
+
+ return schedulers
+}
+
+func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ if schedulers.DataRetention != nil {
+ if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
+ go schedulers.DataRetention.Run()
+ } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
+ schedulers.DataRetention.Stop()
+ }
+ }
+}
+
+func (schedulers *Schedulers) Stop() *Schedulers {
+ utils.RemoveConfigListener(schedulers.listenerId)
+
+ if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ schedulers.DataRetention.Stop()
+ }
+
+ l4g.Info("Stopped schedulers")
+
+ return schedulers
+}
diff --git a/jobs/server.go b/jobs/server.go
index dd3448842..7920cb2d5 100644
--- a/jobs/server.go
+++ b/jobs/server.go
@@ -11,10 +11,13 @@ import (
)
type JobServer struct {
- Store store.Store
- Jobs *Jobs
+ Store store.Store
+ Workers *Workers
+ Schedulers *Schedulers
}
+var Srv JobServer
+
func (server *JobServer) LoadLicense() {
licenseId := ""
if result := <-server.Store.System().Get(); result.Err == nil {
@@ -44,3 +47,27 @@ func (server *JobServer) LoadLicense() {
l4g.Info(utils.T("mattermost.load_license.find.warn"))
}
}
+
+func (server *JobServer) StartWorkers() {
+ if *utils.Cfg.JobSettings.RunJobs {
+ Srv.Workers = InitWorkers().Start()
+ }
+}
+
+func (server *JobServer) StartSchedulers() {
+ if *utils.Cfg.JobSettings.RunJobs {
+ Srv.Schedulers = InitSchedulers().Start()
+ }
+}
+
+func (server *JobServer) StopWorkers() {
+ if Srv.Workers != nil {
+ Srv.Workers.Stop()
+ }
+}
+
+func (server *JobServer) StopSchedulers() {
+ if Srv.Schedulers != nil {
+ Srv.Schedulers.Stop()
+ }
+}
diff --git a/jobs/testjob.go b/jobs/testjob.go
deleted file mode 100644
index 59d5274e5..000000000
--- a/jobs/testjob.go
+++ /dev/null
@@ -1,54 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package jobs
-
-import (
- "time"
-
- l4g "github.com/alecthomas/log4go"
- "github.com/mattermost/platform/store"
-)
-
-type TestJob struct {
- store store.Store
-
- name string
- stop chan bool
- stopped chan bool
-}
-
-func MakeTestJob(s store.Store, name string) *TestJob {
- return &TestJob{
- store: s,
- name: name,
- stop: make(chan bool, 1),
- stopped: make(chan bool, 1),
- }
-}
-
-func (job *TestJob) Run() {
- l4g.Debug("Job %v: Started", job.name)
-
- running := true
- for running {
- l4g.Debug("Job %v: Tick", job.name)
-
- select {
- case <-job.stop:
- l4g.Debug("Job %v: Received stop signal", job.name)
- running = false
- case <-time.After(10 * time.Second):
- continue
- }
- }
-
- l4g.Debug("Job %v: Finished", job.name)
- job.stopped <- true
-}
-
-func (job *TestJob) Stop() {
- l4g.Debug("Job %v: Stopping", job.name)
- job.stop <- true
- <-job.stopped
-}
diff --git a/jobs/testscheduler.go b/jobs/testscheduler.go
new file mode 100644
index 000000000..31b5d144c
--- /dev/null
+++ b/jobs/testscheduler.go
@@ -0,0 +1,58 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+)
+
+type TestScheduler struct {
+ name string
+ jobType string
+ stop chan bool
+ stopped chan bool
+}
+
+func MakeTestScheduler(name string, jobType string) *TestScheduler {
+ return &TestScheduler{
+ name: name,
+ jobType: jobType,
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ }
+}
+
+func (scheduler *TestScheduler) Run() {
+ l4g.Debug("Scheduler %v: Started", scheduler.name)
+
+ defer func(){
+ l4g.Debug("Scheduler %v: Finished", scheduler.name)
+ scheduler.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-scheduler.stop:
+ l4g.Debug("Scheduler %v: Received stop signal", scheduler.name)
+ return
+ case <-time.After(86400 * time.Second):
+ l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name)
+ scheduler.AddJob()
+ }
+ }
+}
+
+func (scheduler *TestScheduler) AddJob() {
+ if _, err := CreateJob(scheduler.jobType, nil); err != nil {
+ l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err)
+ }
+}
+
+func (scheduler *TestScheduler) Stop() {
+ l4g.Debug("Scheduler %v: Stopping", scheduler.name)
+ scheduler.stop <- true
+ <-scheduler.stopped
+}
diff --git a/jobs/testworker.go b/jobs/testworker.go
new file mode 100644
index 000000000..f1c8a07a3
--- /dev/null
+++ b/jobs/testworker.go
@@ -0,0 +1,104 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "context"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+type TestWorker struct {
+ name string
+ stop chan bool
+ stopped chan bool
+ jobs chan model.Job
+}
+
+func MakeTestWorker(name string) *TestWorker {
+ return &TestWorker{
+ name: name,
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ jobs: make(chan model.Job),
+ }
+}
+
+func (worker *TestWorker) Run() {
+ l4g.Debug("Worker %v: Started", worker.name)
+
+ defer func() {
+ l4g.Debug("Worker %v: Finished", worker.name)
+ worker.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-worker.stop:
+ l4g.Debug("Worker %v: Received stop signal", worker.name)
+ return
+ case job := <-worker.jobs:
+ l4g.Debug("Worker %v: Received a new candidate job.", worker.name)
+ worker.DoJob(&job)
+ }
+ }
+}
+
+func (worker *TestWorker) DoJob(job *model.Job) {
+ if claimed, err := ClaimJob(job); err != nil {
+ l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error())
+ return
+ } else if !claimed {
+ return
+ }
+
+ cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
+ cancelWatcherChan := make(chan interface{}, 1)
+ go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
+
+ defer cancelCancelWatcher()
+
+ counter := 0
+ for {
+ select {
+ case <-cancelWatcherChan:
+ l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id)
+ if err := SetJobCanceled(job); err != nil {
+ l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
+ }
+ return
+ case <-worker.stop:
+ l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id)
+ if err := SetJobCanceled(job); err != nil {
+ l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
+ }
+ return
+ case <-time.After(5 * time.Second):
+ counter++
+ if counter > 10 {
+ l4g.Debug("Job %v: Job completed.", job.Id)
+ if err := SetJobSuccess(job); err != nil {
+ l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error())
+ }
+ return
+ } else {
+ if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil {
+ l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error())
+ }
+ }
+ }
+ }
+}
+
+func (worker *TestWorker) Stop() {
+ l4g.Debug("Worker %v: Stopping", worker.name)
+ worker.stop <- true
+ <-worker.stopped
+}
+
+func (worker *TestWorker) JobChannel() chan<- model.Job {
+ return worker.jobs
+}
diff --git a/jobs/workers.go b/jobs/workers.go
new file mode 100644
index 000000000..a42ec4607
--- /dev/null
+++ b/jobs/workers.go
@@ -0,0 +1,79 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "sync"
+
+ l4g "github.com/alecthomas/log4go"
+ ejobs "github.com/mattermost/platform/einterfaces/jobs"
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+type Workers struct {
+ startOnce sync.Once
+ watcher *Watcher
+
+ DataRetention model.Worker
+ // SearchIndexing model.Job
+
+ listenerId string
+}
+
+func InitWorkers() *Workers {
+ workers := &Workers{
+ // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+ }
+ workers.watcher = MakeWatcher(workers)
+
+ if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ workers.DataRetention = dataRetentionInterface.MakeWorker()
+ }
+
+ return workers
+}
+
+func (workers *Workers) Start() *Workers {
+ l4g.Info("Starting workers")
+
+ workers.startOnce.Do(func() {
+ if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ go workers.DataRetention.Run()
+ }
+
+ // go workers.SearchIndexing.Run()
+
+ go workers.watcher.Start()
+ })
+
+ workers.listenerId = utils.AddConfigListener(workers.handleConfigChange)
+
+ return workers
+}
+
+func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ if workers.DataRetention != nil {
+ if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
+ go workers.DataRetention.Run()
+ } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
+ workers.DataRetention.Stop()
+ }
+ }
+}
+
+func (workers *Workers) Stop() *Workers {
+ utils.RemoveConfigListener(workers.listenerId)
+
+ workers.watcher.Stop()
+
+ if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ workers.DataRetention.Stop()
+ }
+ // workers.SearchIndexing.Stop()
+
+ l4g.Info("Stopped workers")
+
+ return workers
+}