summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2017-07-07 15:21:02 +0100
committerGitHub <noreply@github.com>2017-07-07 15:21:02 +0100
commit0495a519499d6cefa289982a94d8f42de541c1f0 (patch)
tree94b6145daa41ca4d1d4a172f030071076852a09a
parent6e0f5f096986dad11ef182ddb51d4bfb0e558860 (diff)
downloadchat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.gz
chat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.bz2
chat-0495a519499d6cefa289982a94d8f42de541c1f0.zip
PLT-6916: Redesign the jobs package and Jobserver. (#6733)
This commit redesigns the jobserver to be based around an architecture of "workers", which carry out jobs of a particular type, and "jobs" which are a unit of work carried by a particular worker. It also introduces "schedulers" which are responsible for scheduling jobs of a particular type automatically (jobs can also be scheduled manually when apropriate). Workers may be run many times, either in instances of the platform binary, or the standalone jobserver binary. In any mattermost cluster, only one instance of platform OR jobserver must run the schedulers. At the moment this is controlled by a config variable, but in future will be controlled through the cluster leader election process.
-rw-r--r--api4/job.go14
-rw-r--r--api4/job_test.go30
-rw-r--r--app/admin.go3
-rw-r--r--app/job.go16
-rw-r--r--app/job_test.go18
-rw-r--r--cmd/platform/server.go7
-rw-r--r--config/config.json4
-rw-r--r--einterfaces/jobs/data_retention.go4
-rw-r--r--i18n/en.json24
-rw-r--r--jobs/jobs.go122
-rw-r--r--jobs/jobs_watcher.go85
-rw-r--r--jobs/jobserver/jobserver.go15
-rw-r--r--jobs/schedulers.go68
-rw-r--r--jobs/server.go31
-rw-r--r--jobs/testjob.go54
-rw-r--r--jobs/testscheduler.go58
-rw-r--r--jobs/testworker.go104
-rw-r--r--jobs/workers.go79
-rw-r--r--model/client4.go12
-rw-r--r--model/config.go16
-rw-r--r--model/job.go79
-rw-r--r--model/job_status.go59
-rw-r--r--store/layered_store.go4
-rw-r--r--store/sql_job_status_store.go190
-rw-r--r--store/sql_job_status_store_test.go151
-rw-r--r--store/sql_job_store.go327
-rw-r--r--store/sql_job_store_test.go341
-rw-r--r--store/sql_store.go1
-rw-r--r--store/sql_supplier.go10
-rw-r--r--store/sql_upgrade.go3
-rw-r--r--store/store.go10
31 files changed, 1363 insertions, 576 deletions
diff --git a/api4/job.go b/api4/job.go
index 8610d9e74..e6c17c42d 100644
--- a/api4/job.go
+++ b/api4/job.go
@@ -14,11 +14,11 @@ import (
func InitJob() {
l4g.Info("Initializing job API routes")
- BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobStatusesByType)).Methods("GET")
- BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJobStatus)).Methods("GET")
+ BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobsByType)).Methods("GET")
+ BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJob)).Methods("GET")
}
-func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) {
+func getJob(c *Context, w http.ResponseWriter, r *http.Request) {
c.RequireJobId()
if c.Err != nil {
return
@@ -29,7 +29,7 @@ func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- if status, err := app.GetJobStatus(c.Params.JobId); err != nil {
+ if status, err := app.GetJob(c.Params.JobId); err != nil {
c.Err = err
return
} else {
@@ -37,7 +37,7 @@ func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) {
}
}
-func getJobStatusesByType(c *Context, w http.ResponseWriter, r *http.Request) {
+func getJobsByType(c *Context, w http.ResponseWriter, r *http.Request) {
c.RequireJobType()
if c.Err != nil {
return
@@ -48,10 +48,10 @@ func getJobStatusesByType(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- if statuses, err := app.GetJobStatusesByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil {
+ if statuses, err := app.GetJobsByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil {
c.Err = err
return
} else {
- w.Write([]byte(model.JobStatusesToJson(statuses)))
+ w.Write([]byte(model.JobsToJson(statuses)))
}
}
diff --git a/api4/job_test.go b/api4/job_test.go
index 0f39fc306..8bbea83e1 100644
--- a/api4/job_test.go
+++ b/api4/job_test.go
@@ -16,30 +16,30 @@ func TestGetJobStatus(t *testing.T) {
th := Setup().InitBasic().InitSystemAdmin()
defer TearDown()
- status := &model.JobStatus{
+ status := &model.Job{
Id: model.NewId(),
Status: model.NewId(),
}
- if result := <-app.Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ if result := <-app.Srv.Store.Job().Save(status); result.Err != nil {
t.Fatal(result.Err)
}
- defer app.Srv.Store.JobStatus().Delete(status.Id)
+ defer app.Srv.Store.Job().Delete(status.Id)
- received, resp := th.SystemAdminClient.GetJobStatus(status.Id)
+ received, resp := th.SystemAdminClient.GetJob(status.Id)
CheckNoError(t, resp)
if received.Id != status.Id || received.Status != status.Status {
t.Fatal("incorrect job status received")
}
- _, resp = th.SystemAdminClient.GetJobStatus("1234")
+ _, resp = th.SystemAdminClient.GetJob("1234")
CheckBadRequestStatus(t, resp)
- _, resp = th.Client.GetJobStatus(status.Id)
+ _, resp = th.Client.GetJob(status.Id)
CheckForbiddenStatus(t, resp)
- _, resp = th.SystemAdminClient.GetJobStatus(model.NewId())
+ _, resp = th.SystemAdminClient.GetJob(model.NewId())
CheckNotFoundStatus(t, resp)
}
@@ -49,7 +49,7 @@ func TestGetJobStatusesByType(t *testing.T) {
jobType := model.NewId()
- statuses := []*model.JobStatus{
+ statuses := []*model.Job{
{
Id: model.NewId(),
Type: jobType,
@@ -68,11 +68,11 @@ func TestGetJobStatusesByType(t *testing.T) {
}
for _, status := range statuses {
- store.Must(app.Srv.Store.JobStatus().SaveOrUpdate(status))
- defer app.Srv.Store.JobStatus().Delete(status.Id)
+ store.Must(app.Srv.Store.Job().Save(status))
+ defer app.Srv.Store.Job().Delete(status.Id)
}
- received, resp := th.SystemAdminClient.GetJobStatusesByType(jobType, 0, 2)
+ received, resp := th.SystemAdminClient.GetJobsByType(jobType, 0, 2)
CheckNoError(t, resp)
if len(received) != 2 {
@@ -83,7 +83,7 @@ func TestGetJobStatusesByType(t *testing.T) {
t.Fatal("should've received second newest job second")
}
- received, resp = th.SystemAdminClient.GetJobStatusesByType(jobType, 1, 2)
+ received, resp = th.SystemAdminClient.GetJobsByType(jobType, 1, 2)
CheckNoError(t, resp)
if len(received) != 1 {
@@ -92,12 +92,12 @@ func TestGetJobStatusesByType(t *testing.T) {
t.Fatal("should've received oldest job last")
}
- _, resp = th.SystemAdminClient.GetJobStatusesByType("", 0, 60)
+ _, resp = th.SystemAdminClient.GetJobsByType("", 0, 60)
CheckNotFoundStatus(t, resp)
- _, resp = th.SystemAdminClient.GetJobStatusesByType(strings.Repeat("a", 33), 0, 60)
+ _, resp = th.SystemAdminClient.GetJobsByType(strings.Repeat("a", 33), 0, 60)
CheckBadRequestStatus(t, resp)
- _, resp = th.Client.GetJobStatusesByType(jobType, 0, 60)
+ _, resp = th.Client.GetJobsByType(jobType, 0, 60)
CheckForbiddenStatus(t, resp)
}
diff --git a/app/admin.go b/app/admin.go
index 8b7d64b53..6fbe150c4 100644
--- a/app/admin.go
+++ b/app/admin.go
@@ -16,6 +16,7 @@ import (
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/store"
"github.com/mattermost/platform/utils"
+ "github.com/mattermost/platform/jobs"
)
func GetLogs(page, perPage int) ([]string, *model.AppError) {
@@ -187,6 +188,8 @@ func RecycleDatabaseConnection() {
l4g.Warn(utils.T("api.admin.recycle_db_start.warn"))
Srv.Store = store.NewLayeredStore()
+ jobs.Srv.Store = Srv.Store
+
time.Sleep(20 * time.Second)
oldStore.Close()
diff --git a/app/job.go b/app/job.go
index 00439e4d2..c625ce15f 100644
--- a/app/job.go
+++ b/app/job.go
@@ -7,22 +7,22 @@ import (
"github.com/mattermost/platform/model"
)
-func GetJobStatus(id string) (*model.JobStatus, *model.AppError) {
- if result := <-Srv.Store.JobStatus().Get(id); result.Err != nil {
+func GetJob(id string) (*model.Job, *model.AppError) {
+ if result := <-Srv.Store.Job().Get(id); result.Err != nil {
return nil, result.Err
} else {
- return result.Data.(*model.JobStatus), nil
+ return result.Data.(*model.Job), nil
}
}
-func GetJobStatusesByTypePage(jobType string, page int, perPage int) ([]*model.JobStatus, *model.AppError) {
- return GetJobStatusesByType(jobType, page*perPage, perPage)
+func GetJobsByTypePage(jobType string, page int, perPage int) ([]*model.Job, *model.AppError) {
+ return GetJobsByType(jobType, page*perPage, perPage)
}
-func GetJobStatusesByType(jobType string, offset int, limit int) ([]*model.JobStatus, *model.AppError) {
- if result := <-Srv.Store.JobStatus().GetAllByTypePage(jobType, offset, limit); result.Err != nil {
+func GetJobsByType(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) {
+ if result := <-Srv.Store.Job().GetAllByTypePage(jobType, offset, limit); result.Err != nil {
return nil, result.Err
} else {
- return result.Data.([]*model.JobStatus), nil
+ return result.Data.([]*model.Job), nil
}
}
diff --git a/app/job_test.go b/app/job_test.go
index 20e9dee8a..ced65788f 100644
--- a/app/job_test.go
+++ b/app/job_test.go
@@ -13,17 +13,17 @@ import (
func TestGetJobStatus(t *testing.T) {
Setup()
- status := &model.JobStatus{
+ status := &model.Job{
Id: model.NewId(),
Status: model.NewId(),
}
- if result := <-Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ if result := <-Srv.Store.Job().Save(status); result.Err != nil {
t.Fatal(result.Err)
}
- defer Srv.Store.JobStatus().Delete(status.Id)
+ defer Srv.Store.Job().Delete(status.Id)
- if received, err := GetJobStatus(status.Id); err != nil {
+ if received, err := GetJob(status.Id); err != nil {
t.Fatal(err)
} else if received.Id != status.Id || received.Status != status.Status {
t.Fatal("inccorrect job status received")
@@ -35,7 +35,7 @@ func TestGetJobStatusesByType(t *testing.T) {
jobType := model.NewId()
- statuses := []*model.JobStatus{
+ statuses := []*model.Job{
{
Id: model.NewId(),
Type: jobType,
@@ -54,11 +54,11 @@ func TestGetJobStatusesByType(t *testing.T) {
}
for _, status := range statuses {
- store.Must(Srv.Store.JobStatus().SaveOrUpdate(status))
- defer Srv.Store.JobStatus().Delete(status.Id)
+ store.Must(Srv.Store.Job().Save(status))
+ defer Srv.Store.Job().Delete(status.Id)
}
- if received, err := GetJobStatusesByType(jobType, 0, 2); err != nil {
+ if received, err := GetJobsByType(jobType, 0, 2); err != nil {
t.Fatal(err)
} else if len(received) != 2 {
t.Fatal("received wrong number of statuses")
@@ -68,7 +68,7 @@ func TestGetJobStatusesByType(t *testing.T) {
t.Fatal("should've received second newest job second")
}
- if received, err := GetJobStatusesByType(jobType, 2, 2); err != nil {
+ if received, err := GetJobsByType(jobType, 2, 2); err != nil {
t.Fatal(err)
} else if len(received) != 1 {
t.Fatal("received wrong number of statuses")
diff --git a/cmd/platform/server.go b/cmd/platform/server.go
index 2eedbd54a..1edb6c2f3 100644
--- a/cmd/platform/server.go
+++ b/cmd/platform/server.go
@@ -126,7 +126,9 @@ func runServer(configFileLocation string) {
}
}
- jobs := jobs.InitJobs(app.Srv.Store).Start()
+ jobs.Srv.Store = app.Srv.Store
+ jobs.Srv.StartWorkers()
+ jobs.Srv.StartSchedulers()
// wait for kill signal before attempting to gracefully shutdown
// the running service
@@ -142,7 +144,8 @@ func runServer(configFileLocation string) {
einterfaces.GetMetricsInterface().StopServer()
}
- jobs.Stop()
+ jobs.Srv.StopSchedulers()
+ jobs.Srv.StopWorkers()
app.StopServer()
}
diff --git a/config/config.json b/config/config.json
index 3401a5e4f..56bd3d9fa 100644
--- a/config/config.json
+++ b/config/config.json
@@ -289,5 +289,9 @@
},
"DataRetentionSettings": {
"Enable": false
+ },
+ "JobSettings": {
+ "RunJobs": true,
+ "RunScheduler": true
}
}
diff --git a/einterfaces/jobs/data_retention.go b/einterfaces/jobs/data_retention.go
index 340ed1b88..442f667fa 100644
--- a/einterfaces/jobs/data_retention.go
+++ b/einterfaces/jobs/data_retention.go
@@ -5,11 +5,11 @@ package jobs
import (
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
)
type DataRetentionInterface interface {
- MakeJob(store store.Store) model.Job
+ MakeWorker() model.Worker
+ MakeScheduler() model.Scheduler
}
var theDataRetentionInterface DataRetentionInterface
diff --git a/i18n/en.json b/i18n/en.json
index 7d23a13c1..03e833fa3 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -3744,6 +3744,10 @@
"translation": "Page not found"
},
{
+ "id": "jobs.request_cancellation.status.error",
+ "translation": "Could not request cancellation for job that is not in a cancelable state."
+ },
+ {
"id": "manaultesting.get_channel_id.no_found.debug",
"translation": "Could not find channel: %v, %v possibilities searched"
},
@@ -5292,24 +5296,24 @@
"translation": "We couldn't save or update the file info"
},
{
- "id": "store.sql_job_status.delete_by_type.app_error",
- "translation": "We couldn't delete the job status"
+ "id": "store.sql_job.delete.app_error",
+ "translation": "We couldn't delete the job"
},
{
- "id": "store.sql_job_status.get.app_error",
- "translation": "We couldn't get the job status"
+ "id": "store.sql_job.get.app_error",
+ "translation": "We couldn't get the job"
},
{
- "id": "store.sql_job_status.get_all.app_error",
- "translation": "We couldn't get all job statuses"
+ "id": "store.sql_job.get_all.app_error",
+ "translation": "We couldn't get the jobs"
},
{
- "id": "store.sql_job_status.save.app_error",
- "translation": "We couldn't save the job status"
+ "id": "store.sql_job.save.app_error",
+ "translation": "We couldn't save the job"
},
{
- "id": "store.sql_job_status.update.app_error",
- "translation": "We couldn't update the job status"
+ "id": "store.sql_job.update.app_error",
+ "translation": "We couldn't update the job"
},
{
"id": "store.sql_license.get.app_error",
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 8c84f4eea..58c2f2f13 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -4,71 +4,111 @@
package jobs
import (
- "sync"
+ "context"
+ "time"
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/platform/einterfaces/jobs"
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
- "github.com/mattermost/platform/utils"
)
-type Jobs struct {
- startOnce sync.Once
+const (
+ CANCEL_WATCHER_POLLING_INTERVAL = 5000
+)
- DataRetention model.Job
- // SearchIndexing model.Job
+func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) {
+ job := model.Job{
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_PENDING,
+ Data: jobData,
+ }
- listenerId string
+ if result := <-Srv.Store.Job().Save(&job); result.Err != nil {
+ return nil, result.Err
+ }
+
+ return &job, nil
}
-func InitJobs(s store.Store) *Jobs {
- jobs := &Jobs{
- // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+func ClaimJob(job *model.Job) (bool, *model.AppError) {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ success := result.Data.(bool)
+ return success, nil
}
+}
- if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
- jobs.DataRetention = dataRetentionInterface.MakeJob(s)
- }
+func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) {
+ var job *model.Job
- return jobs
-}
+ if result := <-Srv.Store.Job().Get(jobId); result.Err != nil {
+ return false, result.Err
+ } else {
+ job = result.Data.(*model.Job)
+ }
-func (jobs *Jobs) Start() *Jobs {
- l4g.Info("Starting jobs")
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.Progress = progress
- jobs.startOnce.Do(func() {
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
+ if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ if !result.Data.(bool) {
+ return false, nil
}
+ }
- // go jobs.SearchIndexing.Run()
- })
+ return true, nil
+}
- jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange)
+func SetJobSuccess(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
+ return result.Err
+}
- return jobs
+func SetJobError(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
+ return result.Err
}
-func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
- if jobs.DataRetention != nil {
- if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
- } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
- }
- }
+func SetJobCanceled(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
+ return result.Err
}
-func (jobs *Jobs) Stop() *Jobs {
- utils.RemoveConfigListener(jobs.listenerId)
+func RequestCancellation(job *model.Job) *model.AppError {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
+ }
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
}
- // jobs.SearchIndexing.Stop()
- l4g.Info("Stopped jobs")
+ return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id)
+}
- return jobs
+func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
+ for {
+ select {
+ case <-ctx.Done():
+ l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId)
+ return
+ case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond):
+ l4g.Debug("CancellationWatcher for Job: %v polling.", jobId)
+ if result := <-Srv.Store.Job().Get(jobId); result.Err == nil {
+ jobStatus := result.Data.(*model.Job)
+ if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED {
+ close(cancelChan)
+ return
+ }
+ }
+ }
+ }
}
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go
new file mode 100644
index 000000000..ada957ccc
--- /dev/null
+++ b/jobs/jobs_watcher.go
@@ -0,0 +1,85 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "math/rand"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+const (
+ WATCHER_POLLING_INTERVAL = 15000
+)
+
+type Watcher struct {
+ workers *Workers
+
+ stop chan bool
+ stopped chan bool
+}
+
+func MakeWatcher(workers *Workers) *Watcher {
+ return &Watcher{
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ workers: workers,
+ }
+}
+
+func (watcher *Watcher) Start() {
+ l4g.Debug("Watcher Started")
+
+ // Delay for some random number of milliseconds before starting to ensure that multiple
+ // instances of the jobserver don't poll at a time too close to each other.
+ rand.Seed(time.Now().UTC().UnixNano())
+ _ = <-time.After(time.Duration(rand.Intn(WATCHER_POLLING_INTERVAL)) * time.Millisecond)
+
+ defer func(){
+ l4g.Debug("Watcher Finished")
+ watcher.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-watcher.stop:
+ l4g.Debug("Watcher: Received stop signal")
+ return
+ case <-time.After(WATCHER_POLLING_INTERVAL * time.Millisecond):
+ watcher.PollAndNotify()
+ }
+ }
+}
+
+func (watcher *Watcher) Stop() {
+ l4g.Debug("Watcher Stopping")
+ watcher.stop <- true
+ <-watcher.stopped
+}
+
+func (watcher *Watcher) PollAndNotify() {
+ if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
+ l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error())
+ } else {
+ jobStatuses := result.Data.([]*model.Job)
+
+ for _, js := range jobStatuses {
+ j := model.Job{
+ Type: js.Type,
+ Id: js.Id,
+ }
+
+ if js.Type == model.JOB_TYPE_DATA_RETENTION {
+ if watcher.workers.DataRetention != nil {
+ select {
+ case watcher.workers.DataRetention.JobChannel() <- j:
+ default:
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/jobs/jobserver/jobserver.go b/jobs/jobserver/jobserver.go
index 5f491a815..aabe5d3b2 100644
--- a/jobs/jobserver/jobserver.go
+++ b/jobs/jobserver/jobserver.go
@@ -16,22 +16,20 @@ import (
_ "github.com/mattermost/platform/imports"
)
-var Srv jobs.JobServer
-
func main() {
// Initialize
utils.InitAndLoadConfig("config.json")
defer l4g.Close()
- Srv.Store = store.NewLayeredStore()
- defer Srv.Store.Close()
+ jobs.Srv.Store = store.NewLayeredStore()
+ defer jobs.Srv.Store.Close()
- Srv.LoadLicense()
+ jobs.Srv.LoadLicense()
// Run jobs
l4g.Info("Starting Mattermost job server")
- Srv.Jobs = jobs.InitJobs(Srv.Store)
- Srv.Jobs.Start()
+ jobs.Srv.StartWorkers()
+ jobs.Srv.StartSchedulers()
var signalChan chan os.Signal = make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
@@ -40,7 +38,8 @@ func main() {
// Cleanup anything that isn't handled by a defer statement
l4g.Info("Stopping Mattermost job server")
- Srv.Jobs.Stop()
+ jobs.Srv.StopSchedulers()
+ jobs.Srv.StopWorkers()
l4g.Info("Stopped Mattermost job server")
}
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
new file mode 100644
index 000000000..73ec6661a
--- /dev/null
+++ b/jobs/schedulers.go
@@ -0,0 +1,68 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "sync"
+
+ l4g "github.com/alecthomas/log4go"
+ ejobs "github.com/mattermost/platform/einterfaces/jobs"
+
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+type Schedulers struct {
+ startOnce sync.Once
+
+ DataRetention model.Scheduler
+
+ listenerId string
+}
+
+func InitSchedulers() *Schedulers {
+ schedulers := &Schedulers{}
+
+ if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ schedulers.DataRetention = dataRetentionInterface.MakeScheduler()
+ }
+
+ return schedulers
+}
+
+func (schedulers *Schedulers) Start() *Schedulers {
+ l4g.Info("Starting schedulers")
+
+ schedulers.startOnce.Do(func() {
+ if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ go schedulers.DataRetention.Run()
+ }
+ })
+
+ schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
+
+ return schedulers
+}
+
+func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ if schedulers.DataRetention != nil {
+ if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
+ go schedulers.DataRetention.Run()
+ } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
+ schedulers.DataRetention.Stop()
+ }
+ }
+}
+
+func (schedulers *Schedulers) Stop() *Schedulers {
+ utils.RemoveConfigListener(schedulers.listenerId)
+
+ if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ schedulers.DataRetention.Stop()
+ }
+
+ l4g.Info("Stopped schedulers")
+
+ return schedulers
+}
diff --git a/jobs/server.go b/jobs/server.go
index dd3448842..7920cb2d5 100644
--- a/jobs/server.go
+++ b/jobs/server.go
@@ -11,10 +11,13 @@ import (
)
type JobServer struct {
- Store store.Store
- Jobs *Jobs
+ Store store.Store
+ Workers *Workers
+ Schedulers *Schedulers
}
+var Srv JobServer
+
func (server *JobServer) LoadLicense() {
licenseId := ""
if result := <-server.Store.System().Get(); result.Err == nil {
@@ -44,3 +47,27 @@ func (server *JobServer) LoadLicense() {
l4g.Info(utils.T("mattermost.load_license.find.warn"))
}
}
+
+func (server *JobServer) StartWorkers() {
+ if *utils.Cfg.JobSettings.RunJobs {
+ Srv.Workers = InitWorkers().Start()
+ }
+}
+
+func (server *JobServer) StartSchedulers() {
+ if *utils.Cfg.JobSettings.RunJobs {
+ Srv.Schedulers = InitSchedulers().Start()
+ }
+}
+
+func (server *JobServer) StopWorkers() {
+ if Srv.Workers != nil {
+ Srv.Workers.Stop()
+ }
+}
+
+func (server *JobServer) StopSchedulers() {
+ if Srv.Schedulers != nil {
+ Srv.Schedulers.Stop()
+ }
+}
diff --git a/jobs/testjob.go b/jobs/testjob.go
deleted file mode 100644
index 59d5274e5..000000000
--- a/jobs/testjob.go
+++ /dev/null
@@ -1,54 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package jobs
-
-import (
- "time"
-
- l4g "github.com/alecthomas/log4go"
- "github.com/mattermost/platform/store"
-)
-
-type TestJob struct {
- store store.Store
-
- name string
- stop chan bool
- stopped chan bool
-}
-
-func MakeTestJob(s store.Store, name string) *TestJob {
- return &TestJob{
- store: s,
- name: name,
- stop: make(chan bool, 1),
- stopped: make(chan bool, 1),
- }
-}
-
-func (job *TestJob) Run() {
- l4g.Debug("Job %v: Started", job.name)
-
- running := true
- for running {
- l4g.Debug("Job %v: Tick", job.name)
-
- select {
- case <-job.stop:
- l4g.Debug("Job %v: Received stop signal", job.name)
- running = false
- case <-time.After(10 * time.Second):
- continue
- }
- }
-
- l4g.Debug("Job %v: Finished", job.name)
- job.stopped <- true
-}
-
-func (job *TestJob) Stop() {
- l4g.Debug("Job %v: Stopping", job.name)
- job.stop <- true
- <-job.stopped
-}
diff --git a/jobs/testscheduler.go b/jobs/testscheduler.go
new file mode 100644
index 000000000..31b5d144c
--- /dev/null
+++ b/jobs/testscheduler.go
@@ -0,0 +1,58 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+)
+
+type TestScheduler struct {
+ name string
+ jobType string
+ stop chan bool
+ stopped chan bool
+}
+
+func MakeTestScheduler(name string, jobType string) *TestScheduler {
+ return &TestScheduler{
+ name: name,
+ jobType: jobType,
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ }
+}
+
+func (scheduler *TestScheduler) Run() {
+ l4g.Debug("Scheduler %v: Started", scheduler.name)
+
+ defer func(){
+ l4g.Debug("Scheduler %v: Finished", scheduler.name)
+ scheduler.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-scheduler.stop:
+ l4g.Debug("Scheduler %v: Received stop signal", scheduler.name)
+ return
+ case <-time.After(86400 * time.Second):
+ l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name)
+ scheduler.AddJob()
+ }
+ }
+}
+
+func (scheduler *TestScheduler) AddJob() {
+ if _, err := CreateJob(scheduler.jobType, nil); err != nil {
+ l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err)
+ }
+}
+
+func (scheduler *TestScheduler) Stop() {
+ l4g.Debug("Scheduler %v: Stopping", scheduler.name)
+ scheduler.stop <- true
+ <-scheduler.stopped
+}
diff --git a/jobs/testworker.go b/jobs/testworker.go
new file mode 100644
index 000000000..f1c8a07a3
--- /dev/null
+++ b/jobs/testworker.go
@@ -0,0 +1,104 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "context"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+type TestWorker struct {
+ name string
+ stop chan bool
+ stopped chan bool
+ jobs chan model.Job
+}
+
+func MakeTestWorker(name string) *TestWorker {
+ return &TestWorker{
+ name: name,
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ jobs: make(chan model.Job),
+ }
+}
+
+func (worker *TestWorker) Run() {
+ l4g.Debug("Worker %v: Started", worker.name)
+
+ defer func() {
+ l4g.Debug("Worker %v: Finished", worker.name)
+ worker.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-worker.stop:
+ l4g.Debug("Worker %v: Received stop signal", worker.name)
+ return
+ case job := <-worker.jobs:
+ l4g.Debug("Worker %v: Received a new candidate job.", worker.name)
+ worker.DoJob(&job)
+ }
+ }
+}
+
+func (worker *TestWorker) DoJob(job *model.Job) {
+ if claimed, err := ClaimJob(job); err != nil {
+ l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error())
+ return
+ } else if !claimed {
+ return
+ }
+
+ cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
+ cancelWatcherChan := make(chan interface{}, 1)
+ go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
+
+ defer cancelCancelWatcher()
+
+ counter := 0
+ for {
+ select {
+ case <-cancelWatcherChan:
+ l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id)
+ if err := SetJobCanceled(job); err != nil {
+ l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
+ }
+ return
+ case <-worker.stop:
+ l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id)
+ if err := SetJobCanceled(job); err != nil {
+ l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
+ }
+ return
+ case <-time.After(5 * time.Second):
+ counter++
+ if counter > 10 {
+ l4g.Debug("Job %v: Job completed.", job.Id)
+ if err := SetJobSuccess(job); err != nil {
+ l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error())
+ }
+ return
+ } else {
+ if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil {
+ l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error())
+ }
+ }
+ }
+ }
+}
+
+func (worker *TestWorker) Stop() {
+ l4g.Debug("Worker %v: Stopping", worker.name)
+ worker.stop <- true
+ <-worker.stopped
+}
+
+func (worker *TestWorker) JobChannel() chan<- model.Job {
+ return worker.jobs
+}
diff --git a/jobs/workers.go b/jobs/workers.go
new file mode 100644
index 000000000..a42ec4607
--- /dev/null
+++ b/jobs/workers.go
@@ -0,0 +1,79 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "sync"
+
+ l4g "github.com/alecthomas/log4go"
+ ejobs "github.com/mattermost/platform/einterfaces/jobs"
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+type Workers struct {
+ startOnce sync.Once
+ watcher *Watcher
+
+ DataRetention model.Worker
+ // SearchIndexing model.Job
+
+ listenerId string
+}
+
+func InitWorkers() *Workers {
+ workers := &Workers{
+ // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+ }
+ workers.watcher = MakeWatcher(workers)
+
+ if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ workers.DataRetention = dataRetentionInterface.MakeWorker()
+ }
+
+ return workers
+}
+
+func (workers *Workers) Start() *Workers {
+ l4g.Info("Starting workers")
+
+ workers.startOnce.Do(func() {
+ if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ go workers.DataRetention.Run()
+ }
+
+ // go workers.SearchIndexing.Run()
+
+ go workers.watcher.Start()
+ })
+
+ workers.listenerId = utils.AddConfigListener(workers.handleConfigChange)
+
+ return workers
+}
+
+func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ if workers.DataRetention != nil {
+ if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
+ go workers.DataRetention.Run()
+ } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
+ workers.DataRetention.Stop()
+ }
+ }
+}
+
+func (workers *Workers) Stop() *Workers {
+ utils.RemoveConfigListener(workers.listenerId)
+
+ workers.watcher.Stop()
+
+ if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ workers.DataRetention.Stop()
+ }
+ // workers.SearchIndexing.Stop()
+
+ l4g.Info("Stopped workers")
+
+ return workers
+}
diff --git a/model/client4.go b/model/client4.go
index da3dfacb7..996d9362c 100644
--- a/model/client4.go
+++ b/model/client4.go
@@ -2790,22 +2790,22 @@ func (c *Client4) OpenGraph(url string) (map[string]string, *Response) {
// Jobs Section
-// GetJobStatus gets the status of a single job.
-func (c *Client4) GetJobStatus(id string) (*JobStatus, *Response) {
+// GetJob gets a single job.
+func (c *Client4) GetJob(id string) (*Job, *Response) {
if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/%v/status", id), ""); err != nil {
return nil, BuildErrorResponse(r, err)
} else {
defer closeBody(r)
- return JobStatusFromJson(r.Body), BuildResponse(r)
+ return JobFromJson(r.Body), BuildResponse(r)
}
}
-// GetJobStatusesByType gets the status of all jobs of a given type, sorted with the job that most recently started first.
-func (c *Client4) GetJobStatusesByType(jobType string, page int, perPage int) ([]*JobStatus, *Response) {
+// GetJobsByType gets all jobs of a given type, sorted with the job that most recently started first.
+func (c *Client4) GetJobsByType(jobType string, page int, perPage int) ([]*Job, *Response) {
if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/type/%v/statuses?page=%v&per_page=%v", jobType, page, perPage), ""); err != nil {
return nil, BuildErrorResponse(r, err)
} else {
defer closeBody(r)
- return JobStatusesFromJson(r.Body), BuildResponse(r)
+ return JobsFromJson(r.Body), BuildResponse(r)
}
}
diff --git a/model/config.go b/model/config.go
index b7526925f..3e98aa8f6 100644
--- a/model/config.go
+++ b/model/config.go
@@ -436,6 +436,11 @@ type DataRetentionSettings struct {
Enable *bool
}
+type JobSettings struct {
+ RunJobs *bool
+ RunScheduler *bool
+}
+
type Config struct {
ServiceSettings ServiceSettings
TeamSettings TeamSettings
@@ -462,6 +467,7 @@ type Config struct {
WebrtcSettings WebrtcSettings
ElasticSearchSettings ElasticSearchSettings
DataRetentionSettings DataRetentionSettings
+ JobSettings JobSettings
}
func (o *Config) ToJson() string {
@@ -1380,6 +1386,16 @@ func (o *Config) SetDefaults() {
*o.DataRetentionSettings.Enable = false
}
+ if o.JobSettings.RunJobs == nil {
+ o.JobSettings.RunJobs = new(bool)
+ *o.JobSettings.RunJobs = true
+ }
+
+ if o.JobSettings.RunScheduler == nil {
+ o.JobSettings.RunScheduler = new(bool)
+ *o.JobSettings.RunScheduler = true
+ }
+
o.defaultWebrtcSettings()
}
diff --git a/model/job.go b/model/job.go
index d539b5bf9..b0567bf1a 100644
--- a/model/job.go
+++ b/model/job.go
@@ -3,7 +3,84 @@
package model
-type Job interface {
+import (
+ "encoding/json"
+ "io"
+)
+
+const (
+ JOB_TYPE_DATA_RETENTION = "data_retention"
+ JOB_TYPE_SEARCH_INDEXING = "search_indexing"
+
+ JOB_STATUS_PENDING = "pending"
+ JOB_STATUS_IN_PROGRESS = "in_progress"
+ JOB_STATUS_SUCCESS = "success"
+ JOB_STATUS_ERROR = "error"
+ JOB_STATUS_CANCEL_REQUESTED = "cancel_requested"
+ JOB_STATUS_CANCELED = "canceled"
+)
+
+type Job struct {
+ Id string `json:"id"`
+ Type string `json:"type"`
+ Priority int64 `json:"priority"`
+ CreateAt int64 `json:"create_at"`
+ StartAt int64 `json:"start_at"`
+ LastActivityAt int64 `json:"last_activity_at"`
+ Status string `json:"status"`
+ Progress int64 `json:"progress"`
+ Data map[string]interface{} `json:"data"`
+}
+
+func (js *Job) ToJson() string {
+ if b, err := json.Marshal(js); err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+func JobFromJson(data io.Reader) *Job {
+ var status Job
+ if err := json.NewDecoder(data).Decode(&status); err == nil {
+ return &status
+ } else {
+ return nil
+ }
+}
+
+func JobsToJson(jobs []*Job) string {
+ if b, err := json.Marshal(jobs); err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+func JobsFromJson(data io.Reader) []*Job {
+ var jobs []*Job
+ if err := json.NewDecoder(data).Decode(&jobs); err == nil {
+ return jobs
+ } else {
+ return nil
+ }
+}
+
+func (js *Job) DataToJson() string {
+ if b, err := json.Marshal(js.Data); err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+type Worker interface {
+ Run()
+ Stop()
+ JobChannel() chan<- Job
+}
+
+type Scheduler interface {
Run()
Stop()
}
diff --git a/model/job_status.go b/model/job_status.go
deleted file mode 100644
index cf490648f..000000000
--- a/model/job_status.go
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package model
-
-import (
- "encoding/json"
- "io"
-)
-
-const (
- JOB_TYPE_DATA_RETENTION = "data_retention"
- JOB_TYPE_SEARCH_INDEXING = "search_indexing"
-)
-
-type JobStatus struct {
- Id string `json:"id"`
- Type string `json:"type"`
- StartAt int64 `json:"start_at"`
- LastActivityAt int64 `json:"last_activity_at"`
- LastRunStartedAt int64 `json:"last_run_started_at"`
- LastRunCompletedAt int64 `json:"last_run_completed_at"`
- Status string `json:"status"`
- Data map[string]interface{} `json:"data"`
-}
-
-func (js *JobStatus) ToJson() string {
- if b, err := json.Marshal(js); err != nil {
- return ""
- } else {
- return string(b)
- }
-}
-
-func JobStatusFromJson(data io.Reader) *JobStatus {
- var status JobStatus
- if err := json.NewDecoder(data).Decode(&status); err == nil {
- return &status
- } else {
- return nil
- }
-}
-
-func JobStatusesToJson(statuses []*JobStatus) string {
- if b, err := json.Marshal(statuses); err != nil {
- return ""
- } else {
- return string(b)
- }
-}
-
-func JobStatusesFromJson(data io.Reader) []*JobStatus {
- var statuses []*JobStatus
- if err := json.NewDecoder(data).Decode(&statuses); err == nil {
- return statuses
- } else {
- return nil
- }
-}
diff --git a/store/layered_store.go b/store/layered_store.go
index 58c9e5ca1..ab9859c80 100644
--- a/store/layered_store.go
+++ b/store/layered_store.go
@@ -119,8 +119,8 @@ func (s *LayeredStore) Reaction() ReactionStore {
return s.DatabaseLayer.Reaction()
}
-func (s *LayeredStore) JobStatus() JobStatusStore {
- return s.DatabaseLayer.JobStatus()
+func (s *LayeredStore) Job() JobStore {
+ return s.DatabaseLayer.Job()
}
func (s *LayeredStore) MarkSystemRanUnitTests() {
diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go
deleted file mode 100644
index a87b8267b..000000000
--- a/store/sql_job_status_store.go
+++ /dev/null
@@ -1,190 +0,0 @@
-// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "database/sql"
- "net/http"
-
- "github.com/mattermost/platform/model"
-)
-
-type SqlJobStatusStore struct {
- SqlStore
-}
-
-func NewSqlJobStatusStore(sqlStore SqlStore) JobStatusStore {
- s := &SqlJobStatusStore{sqlStore}
-
- for _, db := range sqlStore.GetAllConns() {
- table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id")
- table.ColMap("Id").SetMaxSize(26)
- table.ColMap("Type").SetMaxSize(32)
- table.ColMap("Status").SetMaxSize(32)
- table.ColMap("Data").SetMaxSize(1024)
- }
-
- return s
-}
-
-func (jss SqlJobStatusStore) CreateIndexesIfNotExists() {
- jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type")
-}
-
-func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- if err := jss.GetReplica().SelectOne(&model.JobStatus{},
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil {
- if _, err := jss.GetMaster().Update(status); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error())
- }
- } else if err == sql.ErrNoRows {
- if err := jss.GetMaster().Insert(status); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error())
- }
- } else {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error())
- }
-
- if result.Err == nil {
- result.Data = status
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) Get(id string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var status *model.JobStatus
-
- if err := jss.GetReplica().SelectOne(&status,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
- if err == sql.ErrNoRows {
- result.Err = model.NewAppError("SqlJobStatusStore.Get",
- "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
- } else {
- result.Err = model.NewAppError("SqlJobStatusStore.Get",
- "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
- }
- } else {
- result.Data = status
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var statuses []*model.JobStatus
-
- if _, err := jss.GetReplica().Select(&statuses,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType",
- "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error())
- } else {
- result.Data = statuses
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var statuses []*model.JobStatus
-
- if _, err := jss.GetReplica().Select(&statuses,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Type = :Type
- ORDER BY
- StartAt ASC
- LIMIT
- :Limit
- OFFSET
- :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage",
- "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error())
- } else {
- result.Data = statuses
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) Delete(id string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- if _, err := jss.GetReplica().Exec(
- `DELETE FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType",
- "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error())
- } else {
- result.Data = id
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go
deleted file mode 100644
index 18c29e522..000000000
--- a/store/sql_job_status_store_test.go
+++ /dev/null
@@ -1,151 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "testing"
-
- "github.com/mattermost/platform/model"
-)
-
-func TestJobStatusSaveGetUpdate(t *testing.T) {
- Setup()
-
- status := &model.JobStatus{
- Id: model.NewId(),
- Type: model.NewId(),
- Status: model.NewId(),
- Data: map[string]interface{}{
- "Processed": 0,
- "Total": 12345,
- "LastProcessed": "abcd",
- },
- }
-
- if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
- t.Fatal(result.Err)
- }
-
- defer func() {
- <-store.JobStatus().Delete(status.Id)
- }()
-
- if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.(*model.JobStatus); received.Id != status.Id {
- t.Fatal("received incorrect status after save")
- }
-
- status.Status = model.NewId()
- status.Data = map[string]interface{}{
- "Processed": 12345,
- "Total": 12345,
- "LastProcessed": "abcd",
- }
-
- if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
- t.Fatal(result.Err)
- }
-
- if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status {
- t.Fatal("received incorrect status after update")
- }
-}
-
-func TestJobStatusGetAllByType(t *testing.T) {
- Setup()
-
- jobType := model.NewId()
-
- statuses := []*model.JobStatus{
- {
- Id: model.NewId(),
- Type: jobType,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- },
- {
- Id: model.NewId(),
- Type: model.NewId(),
- },
- }
-
- for _, status := range statuses {
- Must(store.JobStatus().SaveOrUpdate(status))
- defer store.JobStatus().Delete(status.Id)
- }
-
- if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id {
- t.Fatal("should've received first status")
- } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id {
- t.Fatal("should've received second status")
- }
-}
-
-func TestJobStatusGetAllByTypePage(t *testing.T) {
- Setup()
-
- jobType := model.NewId()
-
- statuses := []*model.JobStatus{
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1000,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 999,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1001,
- },
- }
-
- for _, status := range statuses {
- Must(store.JobStatus().SaveOrUpdate(status))
- defer store.JobStatus().Delete(status.Id)
- }
-
- if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[1].Id {
- t.Fatal("should've received newest job first")
- } else if received[1].Id != statuses[0].Id {
- t.Fatal("should've received second newest job second")
- }
-
- if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 1 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[2].Id {
- t.Fatal("should've received oldest job last")
- }
-}
-
-func TestJobStatusDelete(t *testing.T) {
- Setup()
-
- status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{
- Id: model.NewId(),
- })).(*model.JobStatus)
-
- if result := <-store.JobStatus().Delete(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- }
-}
diff --git a/store/sql_job_store.go b/store/sql_job_store.go
new file mode 100644
index 000000000..c00e37d86
--- /dev/null
+++ b/store/sql_job_store.go
@@ -0,0 +1,327 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "database/sql"
+ "net/http"
+
+ "github.com/mattermost/gorp"
+ "github.com/mattermost/platform/model"
+)
+
+type SqlJobStore struct {
+ SqlStore
+}
+
+func NewSqlJobStore(sqlStore SqlStore) JobStore {
+ s := &SqlJobStore{sqlStore}
+
+ for _, db := range sqlStore.GetAllConns() {
+ table := db.AddTableWithName(model.Job{}, "Jobs").SetKeys(false, "Id")
+ table.ColMap("Id").SetMaxSize(26)
+ table.ColMap("Type").SetMaxSize(32)
+ table.ColMap("Status").SetMaxSize(32)
+ table.ColMap("Data").SetMaxSize(1024)
+ }
+
+ return s
+}
+
+func (jss SqlJobStore) CreateIndexesIfNotExists() {
+ jss.CreateIndexIfNotExists("idx_jobs_type", "Jobs", "Type")
+}
+
+func (jss SqlJobStore) Save(job *model.Job) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+ if err := jss.GetMaster().Insert(job); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.Save",
+ "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if sqlResult, err := jss.GetMaster().Exec(
+ `UPDATE
+ Jobs
+ SET
+ LastActivityAt = :LastActivityAt,
+ Status = :Status,
+ Progress = :Progress,
+ Data = :Data
+ WHERE
+ Id = :Id
+ AND
+ Status = :OldStatus`,
+ map[string]interface{}{
+ "Id": job.Id,
+ "OldStatus": currentStatus,
+ "LastActivityAt": model.GetMillis(),
+ "Status": job.Status,
+ "Data": job.DataToJson(),
+ "Progress": job.Progress,
+ }); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateOptimistically",
+ "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ rows, err := sqlResult.RowsAffected()
+
+ if err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ if rows == 1 {
+ result.Data = true
+ } else {
+ result.Data = false
+ }
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateStatus(id string, status string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ job := &model.Job{
+ Id: id,
+ Status: status,
+ LastActivityAt: model.GetMillis(),
+ }
+
+ if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool {
+ return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt"
+ }, job); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ }
+
+ if result.Err == nil {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var startAtClause string
+ if newStatus == model.JOB_STATUS_IN_PROGRESS {
+ startAtClause = `StartAt = :StartAt,`
+ }
+
+ if sqlResult, err := jss.GetMaster().Exec(
+ `UPDATE
+ Jobs
+ SET `+startAtClause+`
+ Status = :NewStatus,
+ LastActivityAt = :LastActivityAt
+ WHERE
+ Id = :Id
+ AND
+ Status = :OldStatus`, map[string]interface{}{"Id": id, "OldStatus": currentStatus, "NewStatus": newStatus, "StartAt": model.GetMillis(), "LastActivityAt": model.GetMillis()}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ rows, err := sqlResult.RowsAffected()
+
+ if err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ if rows == 1 {
+ result.Data = true
+ } else {
+ result.Data = false
+ }
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) Get(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var status *model.Job
+
+ if err := jss.GetReplica().SelectOne(&status,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ if err == sql.ErrNoRows {
+ result.Err = model.NewAppError("SqlJobStore.Get",
+ "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
+ } else {
+ result.Err = model.NewAppError("SqlJobStore.Get",
+ "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
+ }
+ } else {
+ result.Data = status
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByType",
+ "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Type = :Type
+ ORDER BY
+ StartAt ASC
+ LIMIT
+ :Limit
+ OFFSET
+ :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByTypePage",
+ "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByStatus(status string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Status = :Status
+ ORDER BY
+ CreateAt ASC`, map[string]interface{}{"Status": status}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByStatus",
+ "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) Delete(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := jss.GetMaster().Exec(
+ `DELETE FROM
+ Jobs
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.DeleteByType",
+ "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ result.Data = id
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go
new file mode 100644
index 000000000..edf09a4c0
--- /dev/null
+++ b/store/sql_job_store_test.go
@@ -0,0 +1,341 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "testing"
+
+ "github.com/mattermost/platform/model"
+ "time"
+)
+
+func TestJobSaveGet(t *testing.T) {
+ Setup()
+
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.NewId(),
+ Status: model.NewId(),
+ Data: map[string]interface{}{
+ "Processed": 0,
+ "Total": 12345,
+ "LastProcessed": "abcd",
+ },
+ }
+
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer func() {
+ <-store.Job().Delete(job.Id)
+ }()
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.(*model.Job); received.Id != job.Id {
+ t.Fatal("received incorrect job after save")
+ }
+}
+
+func TestJobGetAllByType(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByType(jobType); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
+ t.Fatal("should've received first jobs")
+ } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id {
+ t.Fatal("should've received second jobs")
+ }
+}
+
+func TestJobGetAllByTypePage(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1000,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 999,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1001,
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[1].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != jobs[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ if result := <-store.Job().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 1 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[2].Id {
+ t.Fatal("should've received oldest job last")
+ }
+}
+
+func TestJobGetAllByStatus(t *testing.T) {
+ jobType := model.NewId()
+ status := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1000,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 999,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1001,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1002,
+ Status: model.NewId(),
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByStatus(status); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 3 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
+ t.Fatal("should've received first jobs")
+ } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id {
+ t.Fatal("should've received second jobs")
+ }
+}
+
+func TestJobUpdateOptimistically(t *testing.T) {
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.JOB_TYPE_DATA_RETENTION,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_PENDING,
+ }
+
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+ defer store.Job().Delete(job.Id)
+
+ job.LastActivityAt = model.GetMillis()
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.Progress = 50
+ job.Data = map[string]interface{}{
+ "Foo": "Bar",
+ }
+
+ if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ if result.Data.(bool) {
+ t.Fatal("should have failed due to incorrect old status")
+ }
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("Should have successfully updated")
+ }
+
+ var updatedJob *model.Job
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ updatedJob = result.Data.(*model.Job)
+ }
+
+ if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] {
+ t.Fatal("Some update property was not as expected")
+ }
+ }
+
+}
+
+func TestJobUpdateStatusUpdateStatusOptimistically(t *testing.T) {
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.JOB_TYPE_DATA_RETENTION,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_SUCCESS,
+ }
+
+ var lastUpdateAt int64
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ lastUpdateAt = result.Data.(*model.Job).LastActivityAt
+ }
+
+ defer store.Job().Delete(job.Id)
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_PENDING {
+ t.Fatal("status wasn't updated")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if result.Data.(bool) {
+ t.Fatal("should be false due to incorrect original status")
+ }
+ }
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_PENDING {
+ t.Fatal("should still be pending")
+ }
+ if received.LastActivityAt != lastUpdateAt {
+ t.Fatal("last activity at shouldn't have changed")
+ }
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("should have succeeded")
+ }
+ }
+
+ var startAtSet int64
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_IN_PROGRESS {
+ t.Fatal("should be in progress")
+ }
+ if received.StartAt == 0 {
+ t.Fatal("received should have start at set")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ startAtSet = received.StartAt
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("should have succeeded")
+ }
+ }
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_SUCCESS {
+ t.Fatal("should be success status")
+ }
+ if received.StartAt != startAtSet {
+ t.Fatal("startAt should not have changed")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ }
+}
+
+func TestJobDelete(t *testing.T) {
+ Setup()
+
+ status := Must(store.Job().Save(&model.Job{
+ Id: model.NewId(),
+ })).(*model.Job)
+
+ if result := <-store.Job().Delete(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
diff --git a/store/sql_store.go b/store/sql_store.go
index dc3b51d0c..a039401f3 100644
--- a/store/sql_store.go
+++ b/store/sql_store.go
@@ -79,4 +79,5 @@ type SqlStore interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
+ Job() JobStore
}
diff --git a/store/sql_supplier.go b/store/sql_supplier.go
index 6f51cbd09..0f4ab8380 100644
--- a/store/sql_supplier.go
+++ b/store/sql_supplier.go
@@ -82,7 +82,7 @@ type SqlSupplierOldStores struct {
status StatusStore
fileInfo FileInfoStore
reaction ReactionStore
- jobStatus JobStatusStore
+ job JobStore
}
type SqlSupplier struct {
@@ -121,7 +121,7 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.status = NewSqlStatusStore(supplier)
supplier.oldStores.fileInfo = NewSqlFileInfoStore(supplier)
supplier.oldStores.reaction = NewSqlReactionStore(supplier)
- supplier.oldStores.jobStatus = NewSqlJobStatusStore(supplier)
+ supplier.oldStores.job = NewSqlJobStore(supplier)
err := supplier.GetMaster().CreateTablesIfNotExists()
if err != nil {
@@ -150,7 +150,7 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.status.(*SqlStatusStore).CreateIndexesIfNotExists()
supplier.oldStores.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists()
supplier.oldStores.reaction.(*SqlReactionStore).CreateIndexesIfNotExists()
- supplier.oldStores.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists()
+ supplier.oldStores.job.(*SqlJobStore).CreateIndexesIfNotExists()
supplier.oldStores.preference.(*SqlPreferenceStore).DeleteUnusedFeatures()
@@ -752,8 +752,8 @@ func (ss *SqlSupplier) Reaction() ReactionStore {
return ss.oldStores.reaction
}
-func (ss *SqlSupplier) JobStatus() JobStatusStore {
- return ss.oldStores.jobStatus
+func (ss *SqlSupplier) Job() JobStore {
+ return ss.oldStores.job
}
func (ss *SqlSupplier) DropAllTables() {
diff --git a/store/sql_upgrade.go b/store/sql_upgrade.go
index 5a6ed0ab5..a7b72124e 100644
--- a/store/sql_upgrade.go
+++ b/store/sql_upgrade.go
@@ -280,8 +280,9 @@ func UpgradeDatabaseToVersion40(sqlStore SqlStore) {
}
func UpgradeDatabaseToVersion41(sqlStore SqlStore) {
- // TODO: Uncomment following condition when version 4.0.0 is released
+ // TODO: Uncomment following condition when version 4.1.0 is released
// if shouldPerformUpgrade(sqlStore, VERSION_4_0_0, VERSION_4_1_0) {
+ sqlStore.RemoveTableIfExists("JobStatuses")
// saveSchemaVersion(sqlStore, VERSION_4_1_0)
// }
}
diff --git a/store/store.go b/store/store.go
index 0007f495e..95496b609 100644
--- a/store/store.go
+++ b/store/store.go
@@ -48,7 +48,7 @@ type Store interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
- JobStatus() JobStatusStore
+ Job() JobStore
MarkSystemRanUnitTests()
Close()
DropAllTables()
@@ -384,10 +384,14 @@ type ReactionStore interface {
DeleteAllWithEmojiName(emojiName string) StoreChannel
}
-type JobStatusStore interface {
- SaveOrUpdate(status *model.JobStatus) StoreChannel
+type JobStore interface {
+ Save(job *model.Job) StoreChannel
+ UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel
+ UpdateStatus(id string, status string) StoreChannel
+ UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel
Get(id string) StoreChannel
GetAllByType(jobType string) StoreChannel
GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
+ GetAllByStatus(status string) StoreChannel
Delete(id string) StoreChannel
}