summaryrefslogtreecommitdiffstats
path: root/store
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2017-07-07 15:21:02 +0100
committerGitHub <noreply@github.com>2017-07-07 15:21:02 +0100
commit0495a519499d6cefa289982a94d8f42de541c1f0 (patch)
tree94b6145daa41ca4d1d4a172f030071076852a09a /store
parent6e0f5f096986dad11ef182ddb51d4bfb0e558860 (diff)
downloadchat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.gz
chat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.bz2
chat-0495a519499d6cefa289982a94d8f42de541c1f0.zip
PLT-6916: Redesign the jobs package and Jobserver. (#6733)
This commit redesigns the jobserver to be based around an architecture of "workers", which carry out jobs of a particular type, and "jobs" which are a unit of work carried by a particular worker. It also introduces "schedulers" which are responsible for scheduling jobs of a particular type automatically (jobs can also be scheduled manually when apropriate). Workers may be run many times, either in instances of the platform binary, or the standalone jobserver binary. In any mattermost cluster, only one instance of platform OR jobserver must run the schedulers. At the moment this is controlled by a config variable, but in future will be controlled through the cluster leader election process.
Diffstat (limited to 'store')
-rw-r--r--store/layered_store.go4
-rw-r--r--store/sql_job_status_store.go190
-rw-r--r--store/sql_job_status_store_test.go151
-rw-r--r--store/sql_job_store.go327
-rw-r--r--store/sql_job_store_test.go341
-rw-r--r--store/sql_store.go1
-rw-r--r--store/sql_supplier.go10
-rw-r--r--store/sql_upgrade.go3
-rw-r--r--store/store.go10
9 files changed, 685 insertions, 352 deletions
diff --git a/store/layered_store.go b/store/layered_store.go
index 58c9e5ca1..ab9859c80 100644
--- a/store/layered_store.go
+++ b/store/layered_store.go
@@ -119,8 +119,8 @@ func (s *LayeredStore) Reaction() ReactionStore {
return s.DatabaseLayer.Reaction()
}
-func (s *LayeredStore) JobStatus() JobStatusStore {
- return s.DatabaseLayer.JobStatus()
+func (s *LayeredStore) Job() JobStore {
+ return s.DatabaseLayer.Job()
}
func (s *LayeredStore) MarkSystemRanUnitTests() {
diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go
deleted file mode 100644
index a87b8267b..000000000
--- a/store/sql_job_status_store.go
+++ /dev/null
@@ -1,190 +0,0 @@
-// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "database/sql"
- "net/http"
-
- "github.com/mattermost/platform/model"
-)
-
-type SqlJobStatusStore struct {
- SqlStore
-}
-
-func NewSqlJobStatusStore(sqlStore SqlStore) JobStatusStore {
- s := &SqlJobStatusStore{sqlStore}
-
- for _, db := range sqlStore.GetAllConns() {
- table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id")
- table.ColMap("Id").SetMaxSize(26)
- table.ColMap("Type").SetMaxSize(32)
- table.ColMap("Status").SetMaxSize(32)
- table.ColMap("Data").SetMaxSize(1024)
- }
-
- return s
-}
-
-func (jss SqlJobStatusStore) CreateIndexesIfNotExists() {
- jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type")
-}
-
-func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- if err := jss.GetReplica().SelectOne(&model.JobStatus{},
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil {
- if _, err := jss.GetMaster().Update(status); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error())
- }
- } else if err == sql.ErrNoRows {
- if err := jss.GetMaster().Insert(status); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error())
- }
- } else {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error())
- }
-
- if result.Err == nil {
- result.Data = status
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) Get(id string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var status *model.JobStatus
-
- if err := jss.GetReplica().SelectOne(&status,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
- if err == sql.ErrNoRows {
- result.Err = model.NewAppError("SqlJobStatusStore.Get",
- "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
- } else {
- result.Err = model.NewAppError("SqlJobStatusStore.Get",
- "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
- }
- } else {
- result.Data = status
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var statuses []*model.JobStatus
-
- if _, err := jss.GetReplica().Select(&statuses,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType",
- "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error())
- } else {
- result.Data = statuses
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var statuses []*model.JobStatus
-
- if _, err := jss.GetReplica().Select(&statuses,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Type = :Type
- ORDER BY
- StartAt ASC
- LIMIT
- :Limit
- OFFSET
- :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage",
- "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error())
- } else {
- result.Data = statuses
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) Delete(id string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- if _, err := jss.GetReplica().Exec(
- `DELETE FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType",
- "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error())
- } else {
- result.Data = id
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go
deleted file mode 100644
index 18c29e522..000000000
--- a/store/sql_job_status_store_test.go
+++ /dev/null
@@ -1,151 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "testing"
-
- "github.com/mattermost/platform/model"
-)
-
-func TestJobStatusSaveGetUpdate(t *testing.T) {
- Setup()
-
- status := &model.JobStatus{
- Id: model.NewId(),
- Type: model.NewId(),
- Status: model.NewId(),
- Data: map[string]interface{}{
- "Processed": 0,
- "Total": 12345,
- "LastProcessed": "abcd",
- },
- }
-
- if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
- t.Fatal(result.Err)
- }
-
- defer func() {
- <-store.JobStatus().Delete(status.Id)
- }()
-
- if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.(*model.JobStatus); received.Id != status.Id {
- t.Fatal("received incorrect status after save")
- }
-
- status.Status = model.NewId()
- status.Data = map[string]interface{}{
- "Processed": 12345,
- "Total": 12345,
- "LastProcessed": "abcd",
- }
-
- if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
- t.Fatal(result.Err)
- }
-
- if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status {
- t.Fatal("received incorrect status after update")
- }
-}
-
-func TestJobStatusGetAllByType(t *testing.T) {
- Setup()
-
- jobType := model.NewId()
-
- statuses := []*model.JobStatus{
- {
- Id: model.NewId(),
- Type: jobType,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- },
- {
- Id: model.NewId(),
- Type: model.NewId(),
- },
- }
-
- for _, status := range statuses {
- Must(store.JobStatus().SaveOrUpdate(status))
- defer store.JobStatus().Delete(status.Id)
- }
-
- if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id {
- t.Fatal("should've received first status")
- } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id {
- t.Fatal("should've received second status")
- }
-}
-
-func TestJobStatusGetAllByTypePage(t *testing.T) {
- Setup()
-
- jobType := model.NewId()
-
- statuses := []*model.JobStatus{
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1000,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 999,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1001,
- },
- }
-
- for _, status := range statuses {
- Must(store.JobStatus().SaveOrUpdate(status))
- defer store.JobStatus().Delete(status.Id)
- }
-
- if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[1].Id {
- t.Fatal("should've received newest job first")
- } else if received[1].Id != statuses[0].Id {
- t.Fatal("should've received second newest job second")
- }
-
- if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 1 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[2].Id {
- t.Fatal("should've received oldest job last")
- }
-}
-
-func TestJobStatusDelete(t *testing.T) {
- Setup()
-
- status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{
- Id: model.NewId(),
- })).(*model.JobStatus)
-
- if result := <-store.JobStatus().Delete(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- }
-}
diff --git a/store/sql_job_store.go b/store/sql_job_store.go
new file mode 100644
index 000000000..c00e37d86
--- /dev/null
+++ b/store/sql_job_store.go
@@ -0,0 +1,327 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "database/sql"
+ "net/http"
+
+ "github.com/mattermost/gorp"
+ "github.com/mattermost/platform/model"
+)
+
+type SqlJobStore struct {
+ SqlStore
+}
+
+func NewSqlJobStore(sqlStore SqlStore) JobStore {
+ s := &SqlJobStore{sqlStore}
+
+ for _, db := range sqlStore.GetAllConns() {
+ table := db.AddTableWithName(model.Job{}, "Jobs").SetKeys(false, "Id")
+ table.ColMap("Id").SetMaxSize(26)
+ table.ColMap("Type").SetMaxSize(32)
+ table.ColMap("Status").SetMaxSize(32)
+ table.ColMap("Data").SetMaxSize(1024)
+ }
+
+ return s
+}
+
+func (jss SqlJobStore) CreateIndexesIfNotExists() {
+ jss.CreateIndexIfNotExists("idx_jobs_type", "Jobs", "Type")
+}
+
+func (jss SqlJobStore) Save(job *model.Job) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+ if err := jss.GetMaster().Insert(job); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.Save",
+ "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if sqlResult, err := jss.GetMaster().Exec(
+ `UPDATE
+ Jobs
+ SET
+ LastActivityAt = :LastActivityAt,
+ Status = :Status,
+ Progress = :Progress,
+ Data = :Data
+ WHERE
+ Id = :Id
+ AND
+ Status = :OldStatus`,
+ map[string]interface{}{
+ "Id": job.Id,
+ "OldStatus": currentStatus,
+ "LastActivityAt": model.GetMillis(),
+ "Status": job.Status,
+ "Data": job.DataToJson(),
+ "Progress": job.Progress,
+ }); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateOptimistically",
+ "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ rows, err := sqlResult.RowsAffected()
+
+ if err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ if rows == 1 {
+ result.Data = true
+ } else {
+ result.Data = false
+ }
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateStatus(id string, status string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ job := &model.Job{
+ Id: id,
+ Status: status,
+ LastActivityAt: model.GetMillis(),
+ }
+
+ if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool {
+ return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt"
+ }, job); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ }
+
+ if result.Err == nil {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var startAtClause string
+ if newStatus == model.JOB_STATUS_IN_PROGRESS {
+ startAtClause = `StartAt = :StartAt,`
+ }
+
+ if sqlResult, err := jss.GetMaster().Exec(
+ `UPDATE
+ Jobs
+ SET `+startAtClause+`
+ Status = :NewStatus,
+ LastActivityAt = :LastActivityAt
+ WHERE
+ Id = :Id
+ AND
+ Status = :OldStatus`, map[string]interface{}{"Id": id, "OldStatus": currentStatus, "NewStatus": newStatus, "StartAt": model.GetMillis(), "LastActivityAt": model.GetMillis()}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ rows, err := sqlResult.RowsAffected()
+
+ if err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ if rows == 1 {
+ result.Data = true
+ } else {
+ result.Data = false
+ }
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) Get(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var status *model.Job
+
+ if err := jss.GetReplica().SelectOne(&status,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ if err == sql.ErrNoRows {
+ result.Err = model.NewAppError("SqlJobStore.Get",
+ "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
+ } else {
+ result.Err = model.NewAppError("SqlJobStore.Get",
+ "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
+ }
+ } else {
+ result.Data = status
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByType",
+ "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Type = :Type
+ ORDER BY
+ StartAt ASC
+ LIMIT
+ :Limit
+ OFFSET
+ :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByTypePage",
+ "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByStatus(status string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Status = :Status
+ ORDER BY
+ CreateAt ASC`, map[string]interface{}{"Status": status}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByStatus",
+ "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) Delete(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := jss.GetMaster().Exec(
+ `DELETE FROM
+ Jobs
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.DeleteByType",
+ "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ result.Data = id
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go
new file mode 100644
index 000000000..edf09a4c0
--- /dev/null
+++ b/store/sql_job_store_test.go
@@ -0,0 +1,341 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "testing"
+
+ "github.com/mattermost/platform/model"
+ "time"
+)
+
+func TestJobSaveGet(t *testing.T) {
+ Setup()
+
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.NewId(),
+ Status: model.NewId(),
+ Data: map[string]interface{}{
+ "Processed": 0,
+ "Total": 12345,
+ "LastProcessed": "abcd",
+ },
+ }
+
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer func() {
+ <-store.Job().Delete(job.Id)
+ }()
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.(*model.Job); received.Id != job.Id {
+ t.Fatal("received incorrect job after save")
+ }
+}
+
+func TestJobGetAllByType(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByType(jobType); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
+ t.Fatal("should've received first jobs")
+ } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id {
+ t.Fatal("should've received second jobs")
+ }
+}
+
+func TestJobGetAllByTypePage(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1000,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 999,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1001,
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[1].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != jobs[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ if result := <-store.Job().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 1 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[2].Id {
+ t.Fatal("should've received oldest job last")
+ }
+}
+
+func TestJobGetAllByStatus(t *testing.T) {
+ jobType := model.NewId()
+ status := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1000,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 999,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1001,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1002,
+ Status: model.NewId(),
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByStatus(status); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 3 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
+ t.Fatal("should've received first jobs")
+ } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id {
+ t.Fatal("should've received second jobs")
+ }
+}
+
+func TestJobUpdateOptimistically(t *testing.T) {
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.JOB_TYPE_DATA_RETENTION,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_PENDING,
+ }
+
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+ defer store.Job().Delete(job.Id)
+
+ job.LastActivityAt = model.GetMillis()
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.Progress = 50
+ job.Data = map[string]interface{}{
+ "Foo": "Bar",
+ }
+
+ if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ if result.Data.(bool) {
+ t.Fatal("should have failed due to incorrect old status")
+ }
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("Should have successfully updated")
+ }
+
+ var updatedJob *model.Job
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ updatedJob = result.Data.(*model.Job)
+ }
+
+ if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] {
+ t.Fatal("Some update property was not as expected")
+ }
+ }
+
+}
+
+func TestJobUpdateStatusUpdateStatusOptimistically(t *testing.T) {
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.JOB_TYPE_DATA_RETENTION,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_SUCCESS,
+ }
+
+ var lastUpdateAt int64
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ lastUpdateAt = result.Data.(*model.Job).LastActivityAt
+ }
+
+ defer store.Job().Delete(job.Id)
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_PENDING {
+ t.Fatal("status wasn't updated")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if result.Data.(bool) {
+ t.Fatal("should be false due to incorrect original status")
+ }
+ }
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_PENDING {
+ t.Fatal("should still be pending")
+ }
+ if received.LastActivityAt != lastUpdateAt {
+ t.Fatal("last activity at shouldn't have changed")
+ }
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("should have succeeded")
+ }
+ }
+
+ var startAtSet int64
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_IN_PROGRESS {
+ t.Fatal("should be in progress")
+ }
+ if received.StartAt == 0 {
+ t.Fatal("received should have start at set")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ startAtSet = received.StartAt
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("should have succeeded")
+ }
+ }
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_SUCCESS {
+ t.Fatal("should be success status")
+ }
+ if received.StartAt != startAtSet {
+ t.Fatal("startAt should not have changed")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ }
+}
+
+func TestJobDelete(t *testing.T) {
+ Setup()
+
+ status := Must(store.Job().Save(&model.Job{
+ Id: model.NewId(),
+ })).(*model.Job)
+
+ if result := <-store.Job().Delete(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
diff --git a/store/sql_store.go b/store/sql_store.go
index dc3b51d0c..a039401f3 100644
--- a/store/sql_store.go
+++ b/store/sql_store.go
@@ -79,4 +79,5 @@ type SqlStore interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
+ Job() JobStore
}
diff --git a/store/sql_supplier.go b/store/sql_supplier.go
index 6f51cbd09..0f4ab8380 100644
--- a/store/sql_supplier.go
+++ b/store/sql_supplier.go
@@ -82,7 +82,7 @@ type SqlSupplierOldStores struct {
status StatusStore
fileInfo FileInfoStore
reaction ReactionStore
- jobStatus JobStatusStore
+ job JobStore
}
type SqlSupplier struct {
@@ -121,7 +121,7 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.status = NewSqlStatusStore(supplier)
supplier.oldStores.fileInfo = NewSqlFileInfoStore(supplier)
supplier.oldStores.reaction = NewSqlReactionStore(supplier)
- supplier.oldStores.jobStatus = NewSqlJobStatusStore(supplier)
+ supplier.oldStores.job = NewSqlJobStore(supplier)
err := supplier.GetMaster().CreateTablesIfNotExists()
if err != nil {
@@ -150,7 +150,7 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.status.(*SqlStatusStore).CreateIndexesIfNotExists()
supplier.oldStores.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists()
supplier.oldStores.reaction.(*SqlReactionStore).CreateIndexesIfNotExists()
- supplier.oldStores.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists()
+ supplier.oldStores.job.(*SqlJobStore).CreateIndexesIfNotExists()
supplier.oldStores.preference.(*SqlPreferenceStore).DeleteUnusedFeatures()
@@ -752,8 +752,8 @@ func (ss *SqlSupplier) Reaction() ReactionStore {
return ss.oldStores.reaction
}
-func (ss *SqlSupplier) JobStatus() JobStatusStore {
- return ss.oldStores.jobStatus
+func (ss *SqlSupplier) Job() JobStore {
+ return ss.oldStores.job
}
func (ss *SqlSupplier) DropAllTables() {
diff --git a/store/sql_upgrade.go b/store/sql_upgrade.go
index 5a6ed0ab5..a7b72124e 100644
--- a/store/sql_upgrade.go
+++ b/store/sql_upgrade.go
@@ -280,8 +280,9 @@ func UpgradeDatabaseToVersion40(sqlStore SqlStore) {
}
func UpgradeDatabaseToVersion41(sqlStore SqlStore) {
- // TODO: Uncomment following condition when version 4.0.0 is released
+ // TODO: Uncomment following condition when version 4.1.0 is released
// if shouldPerformUpgrade(sqlStore, VERSION_4_0_0, VERSION_4_1_0) {
+ sqlStore.RemoveTableIfExists("JobStatuses")
// saveSchemaVersion(sqlStore, VERSION_4_1_0)
// }
}
diff --git a/store/store.go b/store/store.go
index 0007f495e..95496b609 100644
--- a/store/store.go
+++ b/store/store.go
@@ -48,7 +48,7 @@ type Store interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
- JobStatus() JobStatusStore
+ Job() JobStore
MarkSystemRanUnitTests()
Close()
DropAllTables()
@@ -384,10 +384,14 @@ type ReactionStore interface {
DeleteAllWithEmojiName(emojiName string) StoreChannel
}
-type JobStatusStore interface {
- SaveOrUpdate(status *model.JobStatus) StoreChannel
+type JobStore interface {
+ Save(job *model.Job) StoreChannel
+ UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel
+ UpdateStatus(id string, status string) StoreChannel
+ UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel
Get(id string) StoreChannel
GetAllByType(jobType string) StoreChannel
GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
+ GetAllByStatus(status string) StoreChannel
Delete(id string) StoreChannel
}