summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--api4/job.go14
-rw-r--r--api4/job_test.go30
-rw-r--r--app/admin.go3
-rw-r--r--app/job.go16
-rw-r--r--app/job_test.go18
-rw-r--r--cmd/platform/server.go7
-rw-r--r--config/config.json4
-rw-r--r--einterfaces/jobs/data_retention.go4
-rw-r--r--i18n/en.json24
-rw-r--r--jobs/jobs.go122
-rw-r--r--jobs/jobs_watcher.go85
-rw-r--r--jobs/jobserver/jobserver.go15
-rw-r--r--jobs/schedulers.go68
-rw-r--r--jobs/server.go31
-rw-r--r--jobs/testjob.go54
-rw-r--r--jobs/testscheduler.go58
-rw-r--r--jobs/testworker.go104
-rw-r--r--jobs/workers.go79
-rw-r--r--model/client4.go12
-rw-r--r--model/config.go16
-rw-r--r--model/job.go79
-rw-r--r--model/job_status.go59
-rw-r--r--store/layered_store.go4
-rw-r--r--store/sql_job_status_store.go190
-rw-r--r--store/sql_job_status_store_test.go151
-rw-r--r--store/sql_job_store.go327
-rw-r--r--store/sql_job_store_test.go341
-rw-r--r--store/sql_store.go1
-rw-r--r--store/sql_supplier.go10
-rw-r--r--store/sql_upgrade.go3
-rw-r--r--store/store.go10
31 files changed, 1363 insertions, 576 deletions
diff --git a/api4/job.go b/api4/job.go
index 8610d9e74..e6c17c42d 100644
--- a/api4/job.go
+++ b/api4/job.go
@@ -14,11 +14,11 @@ import (
func InitJob() {
l4g.Info("Initializing job API routes")
- BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobStatusesByType)).Methods("GET")
- BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJobStatus)).Methods("GET")
+ BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobsByType)).Methods("GET")
+ BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJob)).Methods("GET")
}
-func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) {
+func getJob(c *Context, w http.ResponseWriter, r *http.Request) {
c.RequireJobId()
if c.Err != nil {
return
@@ -29,7 +29,7 @@ func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- if status, err := app.GetJobStatus(c.Params.JobId); err != nil {
+ if status, err := app.GetJob(c.Params.JobId); err != nil {
c.Err = err
return
} else {
@@ -37,7 +37,7 @@ func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) {
}
}
-func getJobStatusesByType(c *Context, w http.ResponseWriter, r *http.Request) {
+func getJobsByType(c *Context, w http.ResponseWriter, r *http.Request) {
c.RequireJobType()
if c.Err != nil {
return
@@ -48,10 +48,10 @@ func getJobStatusesByType(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- if statuses, err := app.GetJobStatusesByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil {
+ if statuses, err := app.GetJobsByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil {
c.Err = err
return
} else {
- w.Write([]byte(model.JobStatusesToJson(statuses)))
+ w.Write([]byte(model.JobsToJson(statuses)))
}
}
diff --git a/api4/job_test.go b/api4/job_test.go
index 0f39fc306..8bbea83e1 100644
--- a/api4/job_test.go
+++ b/api4/job_test.go
@@ -16,30 +16,30 @@ func TestGetJobStatus(t *testing.T) {
th := Setup().InitBasic().InitSystemAdmin()
defer TearDown()
- status := &model.JobStatus{
+ status := &model.Job{
Id: model.NewId(),
Status: model.NewId(),
}
- if result := <-app.Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ if result := <-app.Srv.Store.Job().Save(status); result.Err != nil {
t.Fatal(result.Err)
}
- defer app.Srv.Store.JobStatus().Delete(status.Id)
+ defer app.Srv.Store.Job().Delete(status.Id)
- received, resp := th.SystemAdminClient.GetJobStatus(status.Id)
+ received, resp := th.SystemAdminClient.GetJob(status.Id)
CheckNoError(t, resp)
if received.Id != status.Id || received.Status != status.Status {
t.Fatal("incorrect job status received")
}
- _, resp = th.SystemAdminClient.GetJobStatus("1234")
+ _, resp = th.SystemAdminClient.GetJob("1234")
CheckBadRequestStatus(t, resp)
- _, resp = th.Client.GetJobStatus(status.Id)
+ _, resp = th.Client.GetJob(status.Id)
CheckForbiddenStatus(t, resp)
- _, resp = th.SystemAdminClient.GetJobStatus(model.NewId())
+ _, resp = th.SystemAdminClient.GetJob(model.NewId())
CheckNotFoundStatus(t, resp)
}
@@ -49,7 +49,7 @@ func TestGetJobStatusesByType(t *testing.T) {
jobType := model.NewId()
- statuses := []*model.JobStatus{
+ statuses := []*model.Job{
{
Id: model.NewId(),
Type: jobType,
@@ -68,11 +68,11 @@ func TestGetJobStatusesByType(t *testing.T) {
}
for _, status := range statuses {
- store.Must(app.Srv.Store.JobStatus().SaveOrUpdate(status))
- defer app.Srv.Store.JobStatus().Delete(status.Id)
+ store.Must(app.Srv.Store.Job().Save(status))
+ defer app.Srv.Store.Job().Delete(status.Id)
}
- received, resp := th.SystemAdminClient.GetJobStatusesByType(jobType, 0, 2)
+ received, resp := th.SystemAdminClient.GetJobsByType(jobType, 0, 2)
CheckNoError(t, resp)
if len(received) != 2 {
@@ -83,7 +83,7 @@ func TestGetJobStatusesByType(t *testing.T) {
t.Fatal("should've received second newest job second")
}
- received, resp = th.SystemAdminClient.GetJobStatusesByType(jobType, 1, 2)
+ received, resp = th.SystemAdminClient.GetJobsByType(jobType, 1, 2)
CheckNoError(t, resp)
if len(received) != 1 {
@@ -92,12 +92,12 @@ func TestGetJobStatusesByType(t *testing.T) {
t.Fatal("should've received oldest job last")
}
- _, resp = th.SystemAdminClient.GetJobStatusesByType("", 0, 60)
+ _, resp = th.SystemAdminClient.GetJobsByType("", 0, 60)
CheckNotFoundStatus(t, resp)
- _, resp = th.SystemAdminClient.GetJobStatusesByType(strings.Repeat("a", 33), 0, 60)
+ _, resp = th.SystemAdminClient.GetJobsByType(strings.Repeat("a", 33), 0, 60)
CheckBadRequestStatus(t, resp)
- _, resp = th.Client.GetJobStatusesByType(jobType, 0, 60)
+ _, resp = th.Client.GetJobsByType(jobType, 0, 60)
CheckForbiddenStatus(t, resp)
}
diff --git a/app/admin.go b/app/admin.go
index 8b7d64b53..6fbe150c4 100644
--- a/app/admin.go
+++ b/app/admin.go
@@ -16,6 +16,7 @@ import (
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/store"
"github.com/mattermost/platform/utils"
+ "github.com/mattermost/platform/jobs"
)
func GetLogs(page, perPage int) ([]string, *model.AppError) {
@@ -187,6 +188,8 @@ func RecycleDatabaseConnection() {
l4g.Warn(utils.T("api.admin.recycle_db_start.warn"))
Srv.Store = store.NewLayeredStore()
+ jobs.Srv.Store = Srv.Store
+
time.Sleep(20 * time.Second)
oldStore.Close()
diff --git a/app/job.go b/app/job.go
index 00439e4d2..c625ce15f 100644
--- a/app/job.go
+++ b/app/job.go
@@ -7,22 +7,22 @@ import (
"github.com/mattermost/platform/model"
)
-func GetJobStatus(id string) (*model.JobStatus, *model.AppError) {
- if result := <-Srv.Store.JobStatus().Get(id); result.Err != nil {
+func GetJob(id string) (*model.Job, *model.AppError) {
+ if result := <-Srv.Store.Job().Get(id); result.Err != nil {
return nil, result.Err
} else {
- return result.Data.(*model.JobStatus), nil
+ return result.Data.(*model.Job), nil
}
}
-func GetJobStatusesByTypePage(jobType string, page int, perPage int) ([]*model.JobStatus, *model.AppError) {
- return GetJobStatusesByType(jobType, page*perPage, perPage)
+func GetJobsByTypePage(jobType string, page int, perPage int) ([]*model.Job, *model.AppError) {
+ return GetJobsByType(jobType, page*perPage, perPage)
}
-func GetJobStatusesByType(jobType string, offset int, limit int) ([]*model.JobStatus, *model.AppError) {
- if result := <-Srv.Store.JobStatus().GetAllByTypePage(jobType, offset, limit); result.Err != nil {
+func GetJobsByType(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) {
+ if result := <-Srv.Store.Job().GetAllByTypePage(jobType, offset, limit); result.Err != nil {
return nil, result.Err
} else {
- return result.Data.([]*model.JobStatus), nil
+ return result.Data.([]*model.Job), nil
}
}
diff --git a/app/job_test.go b/app/job_test.go
index 20e9dee8a..ced65788f 100644
--- a/app/job_test.go
+++ b/app/job_test.go
@@ -13,17 +13,17 @@ import (
func TestGetJobStatus(t *testing.T) {
Setup()
- status := &model.JobStatus{
+ status := &model.Job{
Id: model.NewId(),
Status: model.NewId(),
}
- if result := <-Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ if result := <-Srv.Store.Job().Save(status); result.Err != nil {
t.Fatal(result.Err)
}
- defer Srv.Store.JobStatus().Delete(status.Id)
+ defer Srv.Store.Job().Delete(status.Id)
- if received, err := GetJobStatus(status.Id); err != nil {
+ if received, err := GetJob(status.Id); err != nil {
t.Fatal(err)
} else if received.Id != status.Id || received.Status != status.Status {
t.Fatal("inccorrect job status received")
@@ -35,7 +35,7 @@ func TestGetJobStatusesByType(t *testing.T) {
jobType := model.NewId()
- statuses := []*model.JobStatus{
+ statuses := []*model.Job{
{
Id: model.NewId(),
Type: jobType,
@@ -54,11 +54,11 @@ func TestGetJobStatusesByType(t *testing.T) {
}
for _, status := range statuses {
- store.Must(Srv.Store.JobStatus().SaveOrUpdate(status))
- defer Srv.Store.JobStatus().Delete(status.Id)
+ store.Must(Srv.Store.Job().Save(status))
+ defer Srv.Store.Job().Delete(status.Id)
}
- if received, err := GetJobStatusesByType(jobType, 0, 2); err != nil {
+ if received, err := GetJobsByType(jobType, 0, 2); err != nil {
t.Fatal(err)
} else if len(received) != 2 {
t.Fatal("received wrong number of statuses")
@@ -68,7 +68,7 @@ func TestGetJobStatusesByType(t *testing.T) {
t.Fatal("should've received second newest job second")
}
- if received, err := GetJobStatusesByType(jobType, 2, 2); err != nil {
+ if received, err := GetJobsByType(jobType, 2, 2); err != nil {
t.Fatal(err)
} else if len(received) != 1 {
t.Fatal("received wrong number of statuses")
diff --git a/cmd/platform/server.go b/cmd/platform/server.go
index 2eedbd54a..1edb6c2f3 100644
--- a/cmd/platform/server.go
+++ b/cmd/platform/server.go
@@ -126,7 +126,9 @@ func runServer(configFileLocation string) {
}
}
- jobs := jobs.InitJobs(app.Srv.Store).Start()
+ jobs.Srv.Store = app.Srv.Store
+ jobs.Srv.StartWorkers()
+ jobs.Srv.StartSchedulers()
// wait for kill signal before attempting to gracefully shutdown
// the running service
@@ -142,7 +144,8 @@ func runServer(configFileLocation string) {
einterfaces.GetMetricsInterface().StopServer()
}
- jobs.Stop()
+ jobs.Srv.StopSchedulers()
+ jobs.Srv.StopWorkers()
app.StopServer()
}
diff --git a/config/config.json b/config/config.json
index 3401a5e4f..56bd3d9fa 100644
--- a/config/config.json
+++ b/config/config.json
@@ -289,5 +289,9 @@
},
"DataRetentionSettings": {
"Enable": false
+ },
+ "JobSettings": {
+ "RunJobs": true,
+ "RunScheduler": true
}
}
diff --git a/einterfaces/jobs/data_retention.go b/einterfaces/jobs/data_retention.go
index 340ed1b88..442f667fa 100644
--- a/einterfaces/jobs/data_retention.go
+++ b/einterfaces/jobs/data_retention.go
@@ -5,11 +5,11 @@ package jobs
import (
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
)
type DataRetentionInterface interface {
- MakeJob(store store.Store) model.Job
+ MakeWorker() model.Worker
+ MakeScheduler() model.Scheduler
}
var theDataRetentionInterface DataRetentionInterface
diff --git a/i18n/en.json b/i18n/en.json
index 7d23a13c1..03e833fa3 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -3744,6 +3744,10 @@
"translation": "Page not found"
},
{
+ "id": "jobs.request_cancellation.status.error",
+ "translation": "Could not request cancellation for job that is not in a cancelable state."
+ },
+ {
"id": "manaultesting.get_channel_id.no_found.debug",
"translation": "Could not find channel: %v, %v possibilities searched"
},
@@ -5292,24 +5296,24 @@
"translation": "We couldn't save or update the file info"
},
{
- "id": "store.sql_job_status.delete_by_type.app_error",
- "translation": "We couldn't delete the job status"
+ "id": "store.sql_job.delete.app_error",
+ "translation": "We couldn't delete the job"
},
{
- "id": "store.sql_job_status.get.app_error",
- "translation": "We couldn't get the job status"
+ "id": "store.sql_job.get.app_error",
+ "translation": "We couldn't get the job"
},
{
- "id": "store.sql_job_status.get_all.app_error",
- "translation": "We couldn't get all job statuses"
+ "id": "store.sql_job.get_all.app_error",
+ "translation": "We couldn't get the jobs"
},
{
- "id": "store.sql_job_status.save.app_error",
- "translation": "We couldn't save the job status"
+ "id": "store.sql_job.save.app_error",
+ "translation": "We couldn't save the job"
},
{
- "id": "store.sql_job_status.update.app_error",
- "translation": "We couldn't update the job status"
+ "id": "store.sql_job.update.app_error",
+ "translation": "We couldn't update the job"
},
{
"id": "store.sql_license.get.app_error",
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 8c84f4eea..58c2f2f13 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -4,71 +4,111 @@
package jobs
import (
- "sync"
+ "context"
+ "time"
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/platform/einterfaces/jobs"
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
- "github.com/mattermost/platform/utils"
)
-type Jobs struct {
- startOnce sync.Once
+const (
+ CANCEL_WATCHER_POLLING_INTERVAL = 5000
+)
- DataRetention model.Job
- // SearchIndexing model.Job
+func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) {
+ job := model.Job{
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_PENDING,
+ Data: jobData,
+ }
- listenerId string
+ if result := <-Srv.Store.Job().Save(&job); result.Err != nil {
+ return nil, result.Err
+ }
+
+ return &job, nil
}
-func InitJobs(s store.Store) *Jobs {
- jobs := &Jobs{
- // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+func ClaimJob(job *model.Job) (bool, *model.AppError) {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ success := result.Data.(bool)
+ return success, nil
}
+}
- if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
- jobs.DataRetention = dataRetentionInterface.MakeJob(s)
- }
+func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) {
+ var job *model.Job
- return jobs
-}
+ if result := <-Srv.Store.Job().Get(jobId); result.Err != nil {
+ return false, result.Err
+ } else {
+ job = result.Data.(*model.Job)
+ }
-func (jobs *Jobs) Start() *Jobs {
- l4g.Info("Starting jobs")
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.Progress = progress
- jobs.startOnce.Do(func() {
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
+ if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ if !result.Data.(bool) {
+ return false, nil
}
+ }
- // go jobs.SearchIndexing.Run()
- })
+ return true, nil
+}
- jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange)
+func SetJobSuccess(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
+ return result.Err
+}
- return jobs
+func SetJobError(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
+ return result.Err
}
-func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
- if jobs.DataRetention != nil {
- if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
- } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
- }
- }
+func SetJobCanceled(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
+ return result.Err
}
-func (jobs *Jobs) Stop() *Jobs {
- utils.RemoveConfigListener(jobs.listenerId)
+func RequestCancellation(job *model.Job) *model.AppError {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
+ }
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
}
- // jobs.SearchIndexing.Stop()
- l4g.Info("Stopped jobs")
+ return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id)
+}
- return jobs
+func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
+ for {
+ select {
+ case <-ctx.Done():
+ l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId)
+ return
+ case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond):
+ l4g.Debug("CancellationWatcher for Job: %v polling.", jobId)
+ if result := <-Srv.Store.Job().Get(jobId); result.Err == nil {
+ jobStatus := result.Data.(*model.Job)
+ if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED {
+ close(cancelChan)
+ return
+ }
+ }
+ }
+ }
}
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go
new file mode 100644
index 000000000..ada957ccc
--- /dev/null
+++ b/jobs/jobs_watcher.go
@@ -0,0 +1,85 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "math/rand"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+const (
+ WATCHER_POLLING_INTERVAL = 15000
+)
+
+type Watcher struct {
+ workers *Workers
+
+ stop chan bool
+ stopped chan bool
+}
+
+func MakeWatcher(workers *Workers) *Watcher {
+ return &Watcher{
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ workers: workers,
+ }
+}
+
+func (watcher *Watcher) Start() {
+ l4g.Debug("Watcher Started")
+
+ // Delay for some random number of milliseconds before starting to ensure that multiple
+ // instances of the jobserver don't poll at a time too close to each other.
+ rand.Seed(time.Now().UTC().UnixNano())
+ _ = <-time.After(time.Duration(rand.Intn(WATCHER_POLLING_INTERVAL)) * time.Millisecond)
+
+ defer func(){
+ l4g.Debug("Watcher Finished")
+ watcher.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-watcher.stop:
+ l4g.Debug("Watcher: Received stop signal")
+ return
+ case <-time.After(WATCHER_POLLING_INTERVAL * time.Millisecond):
+ watcher.PollAndNotify()
+ }
+ }
+}
+
+func (watcher *Watcher) Stop() {
+ l4g.Debug("Watcher Stopping")
+ watcher.stop <- true
+ <-watcher.stopped
+}
+
+func (watcher *Watcher) PollAndNotify() {
+ if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
+ l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error())
+ } else {
+ jobStatuses := result.Data.([]*model.Job)
+
+ for _, js := range jobStatuses {
+ j := model.Job{
+ Type: js.Type,
+ Id: js.Id,
+ }
+
+ if js.Type == model.JOB_TYPE_DATA_RETENTION {
+ if watcher.workers.DataRetention != nil {
+ select {
+ case watcher.workers.DataRetention.JobChannel() <- j:
+ default:
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/jobs/jobserver/jobserver.go b/jobs/jobserver/jobserver.go
index 5f491a815..aabe5d3b2 100644
--- a/jobs/jobserver/jobserver.go
+++ b/jobs/jobserver/jobserver.go
@@ -16,22 +16,20 @@ import (
_ "github.com/mattermost/platform/imports"
)
-var Srv jobs.JobServer
-
func main() {
// Initialize
utils.InitAndLoadConfig("config.json")
defer l4g.Close()
- Srv.Store = store.NewLayeredStore()
- defer Srv.Store.Close()
+ jobs.Srv.Store = store.NewLayeredStore()
+ defer jobs.Srv.Store.Close()
- Srv.LoadLicense()
+ jobs.Srv.LoadLicense()
// Run jobs
l4g.Info("Starting Mattermost job server")
- Srv.Jobs = jobs.InitJobs(Srv.Store)
- Srv.Jobs.Start()
+ jobs.Srv.StartWorkers()
+ jobs.Srv.StartSchedulers()
var signalChan chan os.Signal = make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
@@ -40,7 +38,8 @@ func main() {
// Cleanup anything that isn't handled by a defer statement
l4g.Info("Stopping Mattermost job server")
- Srv.Jobs.Stop()
+ jobs.Srv.StopSchedulers()
+ jobs.Srv.StopWorkers()
l4g.Info("Stopped Mattermost job server")
}
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
new file mode 100644
index 000000000..73ec6661a
--- /dev/null
+++ b/jobs/schedulers.go
@@ -0,0 +1,68 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "sync"
+
+ l4g "github.com/alecthomas/log4go"
+ ejobs "github.com/mattermost/platform/einterfaces/jobs"
+
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+type Schedulers struct {
+ startOnce sync.Once
+
+ DataRetention model.Scheduler
+
+ listenerId string
+}
+
+func InitSchedulers() *Schedulers {
+ schedulers := &Schedulers{}
+
+ if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ schedulers.DataRetention = dataRetentionInterface.MakeScheduler()
+ }
+
+ return schedulers
+}
+
+func (schedulers *Schedulers) Start() *Schedulers {
+ l4g.Info("Starting schedulers")
+
+ schedulers.startOnce.Do(func() {
+ if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ go schedulers.DataRetention.Run()
+ }
+ })
+
+ schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
+
+ return schedulers
+}
+
+func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ if schedulers.DataRetention != nil {
+ if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
+ go schedulers.DataRetention.Run()
+ } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
+ schedulers.DataRetention.Stop()
+ }
+ }
+}
+
+func (schedulers *Schedulers) Stop() *Schedulers {
+ utils.RemoveConfigListener(schedulers.listenerId)
+
+ if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ schedulers.DataRetention.Stop()
+ }
+
+ l4g.Info("Stopped schedulers")
+
+ return schedulers
+}
diff --git a/jobs/server.go b/jobs/server.go
index dd3448842..7920cb2d5 100644
--- a/jobs/server.go
+++ b/jobs/server.go
@@ -11,10 +11,13 @@ import (
)
type JobServer struct {
- Store store.Store
- Jobs *Jobs
+ Store store.Store
+ Workers *Workers
+ Schedulers *Schedulers
}
+var Srv JobServer
+
func (server *JobServer) LoadLicense() {
licenseId := ""
if result := <-server.Store.System().Get(); result.Err == nil {
@@ -44,3 +47,27 @@ func (server *JobServer) LoadLicense() {
l4g.Info(utils.T("mattermost.load_license.find.warn"))
}
}
+
+func (server *JobServer) StartWorkers() {
+ if *utils.Cfg.JobSettings.RunJobs {
+ Srv.Workers = InitWorkers().Start()
+ }
+}
+
+func (server *JobServer) StartSchedulers() {
+ if *utils.Cfg.JobSettings.RunJobs {
+ Srv.Schedulers = InitSchedulers().Start()
+ }
+}
+
+func (server *JobServer) StopWorkers() {
+ if Srv.Workers != nil {
+ Srv.Workers.Stop()
+ }
+}
+
+func (server *JobServer) StopSchedulers() {
+ if Srv.Schedulers != nil {
+ Srv.Schedulers.Stop()
+ }
+}
diff --git a/jobs/testjob.go b/jobs/testjob.go
deleted file mode 100644
index 59d5274e5..000000000
--- a/jobs/testjob.go
+++ /dev/null
@@ -1,54 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package jobs
-
-import (
- "time"
-
- l4g "github.com/alecthomas/log4go"
- "github.com/mattermost/platform/store"
-)
-
-type TestJob struct {
- store store.Store
-
- name string
- stop chan bool
- stopped chan bool
-}
-
-func MakeTestJob(s store.Store, name string) *TestJob {
- return &TestJob{
- store: s,
- name: name,
- stop: make(chan bool, 1),
- stopped: make(chan bool, 1),
- }
-}
-
-func (job *TestJob) Run() {
- l4g.Debug("Job %v: Started", job.name)
-
- running := true
- for running {
- l4g.Debug("Job %v: Tick", job.name)
-
- select {
- case <-job.stop:
- l4g.Debug("Job %v: Received stop signal", job.name)
- running = false
- case <-time.After(10 * time.Second):
- continue
- }
- }
-
- l4g.Debug("Job %v: Finished", job.name)
- job.stopped <- true
-}
-
-func (job *TestJob) Stop() {
- l4g.Debug("Job %v: Stopping", job.name)
- job.stop <- true
- <-job.stopped
-}
diff --git a/jobs/testscheduler.go b/jobs/testscheduler.go
new file mode 100644
index 000000000..31b5d144c
--- /dev/null
+++ b/jobs/testscheduler.go
@@ -0,0 +1,58 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+)
+
+type TestScheduler struct {
+ name string
+ jobType string
+ stop chan bool
+ stopped chan bool
+}
+
+func MakeTestScheduler(name string, jobType string) *TestScheduler {
+ return &TestScheduler{
+ name: name,
+ jobType: jobType,
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ }
+}
+
+func (scheduler *TestScheduler) Run() {
+ l4g.Debug("Scheduler %v: Started", scheduler.name)
+
+ defer func(){
+ l4g.Debug("Scheduler %v: Finished", scheduler.name)
+ scheduler.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-scheduler.stop:
+ l4g.Debug("Scheduler %v: Received stop signal", scheduler.name)
+ return
+ case <-time.After(86400 * time.Second):
+ l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name)
+ scheduler.AddJob()
+ }
+ }
+}
+
+func (scheduler *TestScheduler) AddJob() {
+ if _, err := CreateJob(scheduler.jobType, nil); err != nil {
+ l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err)
+ }
+}
+
+func (scheduler *TestScheduler) Stop() {
+ l4g.Debug("Scheduler %v: Stopping", scheduler.name)
+ scheduler.stop <- true
+ <-scheduler.stopped
+}
diff --git a/jobs/testworker.go b/jobs/testworker.go
new file mode 100644
index 000000000..f1c8a07a3
--- /dev/null
+++ b/jobs/testworker.go
@@ -0,0 +1,104 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "context"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+type TestWorker struct {
+ name string
+ stop chan bool
+ stopped chan bool
+ jobs chan model.Job
+}
+
+func MakeTestWorker(name string) *TestWorker {
+ return &TestWorker{
+ name: name,
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ jobs: make(chan model.Job),
+ }
+}
+
+func (worker *TestWorker) Run() {
+ l4g.Debug("Worker %v: Started", worker.name)
+
+ defer func() {
+ l4g.Debug("Worker %v: Finished", worker.name)
+ worker.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-worker.stop:
+ l4g.Debug("Worker %v: Received stop signal", worker.name)
+ return
+ case job := <-worker.jobs:
+ l4g.Debug("Worker %v: Received a new candidate job.", worker.name)
+ worker.DoJob(&job)
+ }
+ }
+}
+
+func (worker *TestWorker) DoJob(job *model.Job) {
+ if claimed, err := ClaimJob(job); err != nil {
+ l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error())
+ return
+ } else if !claimed {
+ return
+ }
+
+ cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
+ cancelWatcherChan := make(chan interface{}, 1)
+ go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
+
+ defer cancelCancelWatcher()
+
+ counter := 0
+ for {
+ select {
+ case <-cancelWatcherChan:
+ l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id)
+ if err := SetJobCanceled(job); err != nil {
+ l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
+ }
+ return
+ case <-worker.stop:
+ l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id)
+ if err := SetJobCanceled(job); err != nil {
+ l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
+ }
+ return
+ case <-time.After(5 * time.Second):
+ counter++
+ if counter > 10 {
+ l4g.Debug("Job %v: Job completed.", job.Id)
+ if err := SetJobSuccess(job); err != nil {
+ l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error())
+ }
+ return
+ } else {
+ if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil {
+ l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error())
+ }
+ }
+ }
+ }
+}
+
+func (worker *TestWorker) Stop() {
+ l4g.Debug("Worker %v: Stopping", worker.name)
+ worker.stop <- true
+ <-worker.stopped
+}
+
+func (worker *TestWorker) JobChannel() chan<- model.Job {
+ return worker.jobs
+}
diff --git a/jobs/workers.go b/jobs/workers.go
new file mode 100644
index 000000000..a42ec4607
--- /dev/null
+++ b/jobs/workers.go
@@ -0,0 +1,79 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "sync"
+
+ l4g "github.com/alecthomas/log4go"
+ ejobs "github.com/mattermost/platform/einterfaces/jobs"
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+type Workers struct {
+ startOnce sync.Once
+ watcher *Watcher
+
+ DataRetention model.Worker
+ // SearchIndexing model.Job
+
+ listenerId string
+}
+
+func InitWorkers() *Workers {
+ workers := &Workers{
+ // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+ }
+ workers.watcher = MakeWatcher(workers)
+
+ if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ workers.DataRetention = dataRetentionInterface.MakeWorker()
+ }
+
+ return workers
+}
+
+func (workers *Workers) Start() *Workers {
+ l4g.Info("Starting workers")
+
+ workers.startOnce.Do(func() {
+ if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ go workers.DataRetention.Run()
+ }
+
+ // go workers.SearchIndexing.Run()
+
+ go workers.watcher.Start()
+ })
+
+ workers.listenerId = utils.AddConfigListener(workers.handleConfigChange)
+
+ return workers
+}
+
+func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ if workers.DataRetention != nil {
+ if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
+ go workers.DataRetention.Run()
+ } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
+ workers.DataRetention.Stop()
+ }
+ }
+}
+
+func (workers *Workers) Stop() *Workers {
+ utils.RemoveConfigListener(workers.listenerId)
+
+ workers.watcher.Stop()
+
+ if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ workers.DataRetention.Stop()
+ }
+ // workers.SearchIndexing.Stop()
+
+ l4g.Info("Stopped workers")
+
+ return workers
+}
diff --git a/model/client4.go b/model/client4.go
index da3dfacb7..996d9362c 100644
--- a/model/client4.go
+++ b/model/client4.go
@@ -2790,22 +2790,22 @@ func (c *Client4) OpenGraph(url string) (map[string]string, *Response) {
// Jobs Section
-// GetJobStatus gets the status of a single job.
-func (c *Client4) GetJobStatus(id string) (*JobStatus, *Response) {
+// GetJob gets a single job.
+func (c *Client4) GetJob(id string) (*Job, *Response) {
if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/%v/status", id), ""); err != nil {
return nil, BuildErrorResponse(r, err)
} else {
defer closeBody(r)
- return JobStatusFromJson(r.Body), BuildResponse(r)
+ return JobFromJson(r.Body), BuildResponse(r)
}
}
-// GetJobStatusesByType gets the status of all jobs of a given type, sorted with the job that most recently started first.
-func (c *Client4) GetJobStatusesByType(jobType string, page int, perPage int) ([]*JobStatus, *Response) {
+// GetJobsByType gets all jobs of a given type, sorted with the job that most recently started first.
+func (c *Client4) GetJobsByType(jobType string, page int, perPage int) ([]*Job, *Response) {
if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/type/%v/statuses?page=%v&per_page=%v", jobType, page, perPage), ""); err != nil {
return nil, BuildErrorResponse(r, err)
} else {
defer closeBody(r)
- return JobStatusesFromJson(r.Body), BuildResponse(r)
+ return JobsFromJson(r.Body), BuildResponse(r)
}
}
diff --git a/model/config.go b/model/config.go
index b7526925f..3e98aa8f6 100644
--- a/model/config.go
+++ b/model/config.go
@@ -436,6 +436,11 @@ type DataRetentionSettings struct {
Enable *bool
}
+type JobSettings struct {
+ RunJobs *bool
+ RunScheduler *bool
+}
+
type Config struct {
ServiceSettings ServiceSettings
TeamSettings TeamSettings
@@ -462,6 +467,7 @@ type Config struct {
WebrtcSettings WebrtcSettings
ElasticSearchSettings ElasticSearchSettings
DataRetentionSettings DataRetentionSettings
+ JobSettings JobSettings
}
func (o *Config) ToJson() string {
@@ -1380,6 +1386,16 @@ func (o *Config) SetDefaults() {
*o.DataRetentionSettings.Enable = false
}
+ if o.JobSettings.RunJobs == nil {
+ o.JobSettings.RunJobs = new(bool)
+ *o.JobSettings.RunJobs = true
+ }
+
+ if o.JobSettings.RunScheduler == nil {
+ o.JobSettings.RunScheduler = new(bool)
+ *o.JobSettings.RunScheduler = true
+ }
+
o.defaultWebrtcSettings()
}
diff --git a/model/job.go b/model/job.go
index d539b5bf9..b0567bf1a 100644
--- a/model/job.go
+++ b/model/job.go
@@ -3,7 +3,84 @@
package model
-type Job interface {
+import (
+ "encoding/json"
+ "io"
+)
+
+const (
+ JOB_TYPE_DATA_RETENTION = "data_retention"
+ JOB_TYPE_SEARCH_INDEXING = "search_indexing"
+
+ JOB_STATUS_PENDING = "pending"
+ JOB_STATUS_IN_PROGRESS = "in_progress"
+ JOB_STATUS_SUCCESS = "success"
+ JOB_STATUS_ERROR = "error"
+ JOB_STATUS_CANCEL_REQUESTED = "cancel_requested"
+ JOB_STATUS_CANCELED = "canceled"
+)
+
+type Job struct {
+ Id string `json:"id"`
+ Type string `json:"type"`
+ Priority int64 `json:"priority"`
+ CreateAt int64 `json:"create_at"`
+ StartAt int64 `json:"start_at"`
+ LastActivityAt int64 `json:"last_activity_at"`
+ Status string `json:"status"`
+ Progress int64 `json:"progress"`
+ Data map[string]interface{} `json:"data"`
+}
+
+func (js *Job) ToJson() string {
+ if b, err := json.Marshal(js); err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+func JobFromJson(data io.Reader) *Job {
+ var status Job
+ if err := json.NewDecoder(data).Decode(&status); err == nil {
+ return &status
+ } else {
+ return nil
+ }
+}
+
+func JobsToJson(jobs []*Job) string {
+ if b, err := json.Marshal(jobs); err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+func JobsFromJson(data io.Reader) []*Job {
+ var jobs []*Job
+ if err := json.NewDecoder(data).Decode(&jobs); err == nil {
+ return jobs
+ } else {
+ return nil
+ }
+}
+
+func (js *Job) DataToJson() string {
+ if b, err := json.Marshal(js.Data); err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+type Worker interface {
+ Run()
+ Stop()
+ JobChannel() chan<- Job
+}
+
+type Scheduler interface {
Run()
Stop()
}
diff --git a/model/job_status.go b/model/job_status.go
deleted file mode 100644
index cf490648f..000000000
--- a/model/job_status.go
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package model
-
-import (
- "encoding/json"
- "io"
-)
-
-const (
- JOB_TYPE_DATA_RETENTION = "data_retention"
- JOB_TYPE_SEARCH_INDEXING = "search_indexing"
-)
-
-type JobStatus struct {
- Id string `json:"id"`
- Type string `json:"type"`
- StartAt int64 `json:"start_at"`
- LastActivityAt int64 `json:"last_activity_at"`
- LastRunStartedAt int64 `json:"last_run_started_at"`
- LastRunCompletedAt int64 `json:"last_run_completed_at"`
- Status string `json:"status"`
- Data map[string]interface{} `json:"data"`
-}
-
-func (js *JobStatus) ToJson() string {
- if b, err := json.Marshal(js); err != nil {
- return ""
- } else {
- return string(b)
- }
-}
-
-func JobStatusFromJson(data io.Reader) *JobStatus {
- var status JobStatus
- if err := json.NewDecoder(data).Decode(&status); err == nil {
- return &status
- } else {
- return nil
- }
-}
-
-func JobStatusesToJson(statuses []*JobStatus) string {
- if b, err := json.Marshal(statuses); err != nil {
- return ""
- } else {
- return string(b)
- }
-}
-
-func JobStatusesFromJson(data io.Reader) []*JobStatus {
- var statuses []*JobStatus
- if err := json.NewDecoder(data).Decode(&statuses); err == nil {
- return statuses
- } else {
- return nil
- }
-}
diff --git a/store/layered_store.go b/store/layered_store.go
index 58c9e5ca1..ab9859c80 100644
--- a/store/layered_store.go
+++ b/store/layered_store.go
@@ -119,8 +119,8 @@ func (s *LayeredStore) Reaction() ReactionStore {
return s.DatabaseLayer.Reaction()
}
-func (s *LayeredStore) JobStatus() JobStatusStore {
- return s.DatabaseLayer.JobStatus()
+func (s *LayeredStore) Job() JobStore {
+ return s.DatabaseLayer.Job()
}
func (s *LayeredStore) MarkSystemRanUnitTests() {
diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go
deleted file mode 100644
index a87b8267b..000000000
--- a/store/sql_job_status_store.go
+++ /dev/null
@@ -1,190 +0,0 @@
-// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "database/sql"
- "net/http"
-
- "github.com/mattermost/platform/model"
-)
-
-type SqlJobStatusStore struct {
- SqlStore
-}
-
-func NewSqlJobStatusStore(sqlStore SqlStore) JobStatusStore {
- s := &SqlJobStatusStore{sqlStore}
-
- for _, db := range sqlStore.GetAllConns() {
- table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id")
- table.ColMap("Id").SetMaxSize(26)
- table.ColMap("Type").SetMaxSize(32)
- table.ColMap("Status").SetMaxSize(32)
- table.ColMap("Data").SetMaxSize(1024)
- }
-
- return s
-}
-
-func (jss SqlJobStatusStore) CreateIndexesIfNotExists() {
- jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type")
-}
-
-func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- if err := jss.GetReplica().SelectOne(&model.JobStatus{},
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil {
- if _, err := jss.GetMaster().Update(status); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error())
- }
- } else if err == sql.ErrNoRows {
- if err := jss.GetMaster().Insert(status); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error())
- }
- } else {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error())
- }
-
- if result.Err == nil {
- result.Data = status
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) Get(id string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var status *model.JobStatus
-
- if err := jss.GetReplica().SelectOne(&status,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
- if err == sql.ErrNoRows {
- result.Err = model.NewAppError("SqlJobStatusStore.Get",
- "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
- } else {
- result.Err = model.NewAppError("SqlJobStatusStore.Get",
- "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
- }
- } else {
- result.Data = status
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var statuses []*model.JobStatus
-
- if _, err := jss.GetReplica().Select(&statuses,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType",
- "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error())
- } else {
- result.Data = statuses
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var statuses []*model.JobStatus
-
- if _, err := jss.GetReplica().Select(&statuses,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Type = :Type
- ORDER BY
- StartAt ASC
- LIMIT
- :Limit
- OFFSET
- :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage",
- "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error())
- } else {
- result.Data = statuses
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) Delete(id string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- if _, err := jss.GetReplica().Exec(
- `DELETE FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType",
- "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error())
- } else {
- result.Data = id
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go
deleted file mode 100644
index 18c29e522..000000000
--- a/store/sql_job_status_store_test.go
+++ /dev/null
@@ -1,151 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "testing"
-
- "github.com/mattermost/platform/model"
-)
-
-func TestJobStatusSaveGetUpdate(t *testing.T) {
- Setup()
-
- status := &model.JobStatus{
- Id: model.NewId(),
- Type: model.NewId(),
- Status: model.NewId(),
- Data: map[string]interface{}{
- "Processed": 0,
- "Total": 12345,
- "LastProcessed": "abcd",
- },
- }
-
- if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
- t.Fatal(result.Err)
- }
-
- defer func() {
- <-store.JobStatus().Delete(status.Id)
- }()
-
- if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.(*model.JobStatus); received.Id != status.Id {
- t.Fatal("received incorrect status after save")
- }
-
- status.Status = model.NewId()
- status.Data = map[string]interface{}{
- "Processed": 12345,
- "Total": 12345,
- "LastProcessed": "abcd",
- }
-
- if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
- t.Fatal(result.Err)
- }
-
- if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status {
- t.Fatal("received incorrect status after update")
- }
-}
-
-func TestJobStatusGetAllByType(t *testing.T) {
- Setup()
-
- jobType := model.NewId()
-
- statuses := []*model.JobStatus{
- {
- Id: model.NewId(),
- Type: jobType,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- },
- {
- Id: model.NewId(),
- Type: model.NewId(),
- },
- }
-
- for _, status := range statuses {
- Must(store.JobStatus().SaveOrUpdate(status))
- defer store.JobStatus().Delete(status.Id)
- }
-
- if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id {
- t.Fatal("should've received first status")
- } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id {
- t.Fatal("should've received second status")
- }
-}
-
-func TestJobStatusGetAllByTypePage(t *testing.T) {
- Setup()
-
- jobType := model.NewId()
-
- statuses := []*model.JobStatus{
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1000,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 999,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1001,
- },
- }
-
- for _, status := range statuses {
- Must(store.JobStatus().SaveOrUpdate(status))
- defer store.JobStatus().Delete(status.Id)
- }
-
- if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[1].Id {
- t.Fatal("should've received newest job first")
- } else if received[1].Id != statuses[0].Id {
- t.Fatal("should've received second newest job second")
- }
-
- if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 1 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[2].Id {
- t.Fatal("should've received oldest job last")
- }
-}
-
-func TestJobStatusDelete(t *testing.T) {
- Setup()
-
- status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{
- Id: model.NewId(),
- })).(*model.JobStatus)
-
- if result := <-store.JobStatus().Delete(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- }
-}
diff --git a/store/sql_job_store.go b/store/sql_job_store.go
new file mode 100644
index 000000000..c00e37d86
--- /dev/null
+++ b/store/sql_job_store.go
@@ -0,0 +1,327 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "database/sql"
+ "net/http"
+
+ "github.com/mattermost/gorp"
+ "github.com/mattermost/platform/model"
+)
+
+type SqlJobStore struct {
+ SqlStore
+}
+
+func NewSqlJobStore(sqlStore SqlStore) JobStore {
+ s := &SqlJobStore{sqlStore}
+
+ for _, db := range sqlStore.GetAllConns() {
+ table := db.AddTableWithName(model.Job{}, "Jobs").SetKeys(false, "Id")
+ table.ColMap("Id").SetMaxSize(26)
+ table.ColMap("Type").SetMaxSize(32)
+ table.ColMap("Status").SetMaxSize(32)
+ table.ColMap("Data").SetMaxSize(1024)
+ }
+
+ return s
+}
+
+func (jss SqlJobStore) CreateIndexesIfNotExists() {
+ jss.CreateIndexIfNotExists("idx_jobs_type", "Jobs", "Type")
+}
+
+func (jss SqlJobStore) Save(job *model.Job) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+ if err := jss.GetMaster().Insert(job); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.Save",
+ "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if sqlResult, err := jss.GetMaster().Exec(
+ `UPDATE
+ Jobs
+ SET
+ LastActivityAt = :LastActivityAt,
+ Status = :Status,
+ Progress = :Progress,
+ Data = :Data
+ WHERE
+ Id = :Id
+ AND
+ Status = :OldStatus`,
+ map[string]interface{}{
+ "Id": job.Id,
+ "OldStatus": currentStatus,
+ "LastActivityAt": model.GetMillis(),
+ "Status": job.Status,
+ "Data": job.DataToJson(),
+ "Progress": job.Progress,
+ }); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateOptimistically",
+ "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ rows, err := sqlResult.RowsAffected()
+
+ if err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ if rows == 1 {
+ result.Data = true
+ } else {
+ result.Data = false
+ }
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateStatus(id string, status string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ job := &model.Job{
+ Id: id,
+ Status: status,
+ LastActivityAt: model.GetMillis(),
+ }
+
+ if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool {
+ return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt"
+ }, job); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ }
+
+ if result.Err == nil {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var startAtClause string
+ if newStatus == model.JOB_STATUS_IN_PROGRESS {
+ startAtClause = `StartAt = :StartAt,`
+ }
+
+ if sqlResult, err := jss.GetMaster().Exec(
+ `UPDATE
+ Jobs
+ SET `+startAtClause+`
+ Status = :NewStatus,
+ LastActivityAt = :LastActivityAt
+ WHERE
+ Id = :Id
+ AND
+ Status = :OldStatus`, map[string]interface{}{"Id": id, "OldStatus": currentStatus, "NewStatus": newStatus, "StartAt": model.GetMillis(), "LastActivityAt": model.GetMillis()}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ rows, err := sqlResult.RowsAffected()
+
+ if err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ if rows == 1 {
+ result.Data = true
+ } else {
+ result.Data = false
+ }
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) Get(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var status *model.Job
+
+ if err := jss.GetReplica().SelectOne(&status,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ if err == sql.ErrNoRows {
+ result.Err = model.NewAppError("SqlJobStore.Get",
+ "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
+ } else {
+ result.Err = model.NewAppError("SqlJobStore.Get",
+ "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
+ }
+ } else {
+ result.Data = status
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByType",
+ "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Type = :Type
+ ORDER BY
+ StartAt ASC
+ LIMIT
+ :Limit
+ OFFSET
+ :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByTypePage",
+ "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByStatus(status string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Status = :Status
+ ORDER BY
+ CreateAt ASC`, map[string]interface{}{"Status": status}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByStatus",
+ "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) Delete(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := jss.GetMaster().Exec(
+ `DELETE FROM
+ Jobs
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.DeleteByType",
+ "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ result.Data = id
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go
new file mode 100644
index 000000000..edf09a4c0
--- /dev/null
+++ b/store/sql_job_store_test.go
@@ -0,0 +1,341 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "testing"
+
+ "github.com/mattermost/platform/model"
+ "time"
+)
+
+func TestJobSaveGet(t *testing.T) {
+ Setup()
+
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.NewId(),
+ Status: model.NewId(),
+ Data: map[string]interface{}{
+ "Processed": 0,
+ "Total": 12345,
+ "LastProcessed": "abcd",
+ },
+ }
+
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer func() {
+ <-store.Job().Delete(job.Id)
+ }()
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.(*model.Job); received.Id != job.Id {
+ t.Fatal("received incorrect job after save")
+ }
+}
+
+func TestJobGetAllByType(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByType(jobType); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
+ t.Fatal("should've received first jobs")
+ } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id {
+ t.Fatal("should've received second jobs")
+ }
+}
+
+func TestJobGetAllByTypePage(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1000,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 999,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1001,
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[1].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != jobs[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ if result := <-store.Job().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 1 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[2].Id {
+ t.Fatal("should've received oldest job last")
+ }
+}
+
+func TestJobGetAllByStatus(t *testing.T) {
+ jobType := model.NewId()
+ status := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1000,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 999,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1001,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1002,
+ Status: model.NewId(),
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByStatus(status); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 3 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
+ t.Fatal("should've received first jobs")
+ } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id {
+ t.Fatal("should've received second jobs")
+ }
+}
+
+func TestJobUpdateOptimistically(t *testing.T) {
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.JOB_TYPE_DATA_RETENTION,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_PENDING,
+ }
+
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+ defer store.Job().Delete(job.Id)
+
+ job.LastActivityAt = model.GetMillis()
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.Progress = 50
+ job.Data = map[string]interface{}{
+ "Foo": "Bar",
+ }
+
+ if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ if result.Data.(bool) {
+ t.Fatal("should have failed due to incorrect old status")
+ }
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("Should have successfully updated")
+ }
+
+ var updatedJob *model.Job
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ updatedJob = result.Data.(*model.Job)
+ }
+
+ if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] {
+ t.Fatal("Some update property was not as expected")
+ }
+ }
+
+}
+
+func TestJobUpdateStatusUpdateStatusOptimistically(t *testing.T) {
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.JOB_TYPE_DATA_RETENTION,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_SUCCESS,
+ }
+
+ var lastUpdateAt int64
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ lastUpdateAt = result.Data.(*model.Job).LastActivityAt
+ }
+
+ defer store.Job().Delete(job.Id)
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_PENDING {
+ t.Fatal("status wasn't updated")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if result.Data.(bool) {
+ t.Fatal("should be false due to incorrect original status")
+ }
+ }
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_PENDING {
+ t.Fatal("should still be pending")
+ }
+ if received.LastActivityAt != lastUpdateAt {
+ t.Fatal("last activity at shouldn't have changed")
+ }
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("should have succeeded")
+ }
+ }
+
+ var startAtSet int64
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_IN_PROGRESS {
+ t.Fatal("should be in progress")
+ }
+ if received.StartAt == 0 {
+ t.Fatal("received should have start at set")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ startAtSet = received.StartAt
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("should have succeeded")
+ }
+ }
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_SUCCESS {
+ t.Fatal("should be success status")
+ }
+ if received.StartAt != startAtSet {
+ t.Fatal("startAt should not have changed")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ }
+}
+
+func TestJobDelete(t *testing.T) {
+ Setup()
+
+ status := Must(store.Job().Save(&model.Job{
+ Id: model.NewId(),
+ })).(*model.Job)
+
+ if result := <-store.Job().Delete(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
diff --git a/store/sql_store.go b/store/sql_store.go
index dc3b51d0c..a039401f3 100644
--- a/store/sql_store.go
+++ b/store/sql_store.go
@@ -79,4 +79,5 @@ type SqlStore interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
+ Job() JobStore
}
diff --git a/store/sql_supplier.go b/store/sql_supplier.go
index 6f51cbd09..0f4ab8380 100644
--- a/store/sql_supplier.go
+++ b/store/sql_supplier.go
@@ -82,7 +82,7 @@ type SqlSupplierOldStores struct {
status StatusStore
fileInfo FileInfoStore
reaction ReactionStore
- jobStatus JobStatusStore
+ job JobStore
}
type SqlSupplier struct {
@@ -121,7 +121,7 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.status = NewSqlStatusStore(supplier)
supplier.oldStores.fileInfo = NewSqlFileInfoStore(supplier)
supplier.oldStores.reaction = NewSqlReactionStore(supplier)
- supplier.oldStores.jobStatus = NewSqlJobStatusStore(supplier)
+ supplier.oldStores.job = NewSqlJobStore(supplier)
err := supplier.GetMaster().CreateTablesIfNotExists()
if err != nil {
@@ -150,7 +150,7 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.status.(*SqlStatusStore).CreateIndexesIfNotExists()
supplier.oldStores.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists()
supplier.oldStores.reaction.(*SqlReactionStore).CreateIndexesIfNotExists()
- supplier.oldStores.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists()
+ supplier.oldStores.job.(*SqlJobStore).CreateIndexesIfNotExists()
supplier.oldStores.preference.(*SqlPreferenceStore).DeleteUnusedFeatures()
@@ -752,8 +752,8 @@ func (ss *SqlSupplier) Reaction() ReactionStore {
return ss.oldStores.reaction
}
-func (ss *SqlSupplier) JobStatus() JobStatusStore {
- return ss.oldStores.jobStatus
+func (ss *SqlSupplier) Job() JobStore {
+ return ss.oldStores.job
}
func (ss *SqlSupplier) DropAllTables() {
diff --git a/store/sql_upgrade.go b/store/sql_upgrade.go
index 5a6ed0ab5..a7b72124e 100644
--- a/store/sql_upgrade.go
+++ b/store/sql_upgrade.go
@@ -280,8 +280,9 @@ func UpgradeDatabaseToVersion40(sqlStore SqlStore) {
}
func UpgradeDatabaseToVersion41(sqlStore SqlStore) {
- // TODO: Uncomment following condition when version 4.0.0 is released
+ // TODO: Uncomment following condition when version 4.1.0 is released
// if shouldPerformUpgrade(sqlStore, VERSION_4_0_0, VERSION_4_1_0) {
+ sqlStore.RemoveTableIfExists("JobStatuses")
// saveSchemaVersion(sqlStore, VERSION_4_1_0)
// }
}
diff --git a/store/store.go b/store/store.go
index 0007f495e..95496b609 100644
--- a/store/store.go
+++ b/store/store.go
@@ -48,7 +48,7 @@ type Store interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
- JobStatus() JobStatusStore
+ Job() JobStore
MarkSystemRanUnitTests()
Close()
DropAllTables()
@@ -384,10 +384,14 @@ type ReactionStore interface {
DeleteAllWithEmojiName(emojiName string) StoreChannel
}
-type JobStatusStore interface {
- SaveOrUpdate(status *model.JobStatus) StoreChannel
+type JobStore interface {
+ Save(job *model.Job) StoreChannel
+ UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel
+ UpdateStatus(id string, status string) StoreChannel
+ UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel
Get(id string) StoreChannel
GetAllByType(jobType string) StoreChannel
GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
+ GetAllByStatus(status string) StoreChannel
Delete(id string) StoreChannel
}