summaryrefslogtreecommitdiffstats
path: root/store
diff options
context:
space:
mode:
Diffstat (limited to 'store')
-rw-r--r--store/layered_store.go4
-rw-r--r--store/sql_job_status_store.go190
-rw-r--r--store/sql_job_status_store_test.go151
-rw-r--r--store/sql_job_store.go327
-rw-r--r--store/sql_job_store_test.go341
-rw-r--r--store/sql_store.go1
-rw-r--r--store/sql_supplier.go10
-rw-r--r--store/sql_upgrade.go3
-rw-r--r--store/store.go10
9 files changed, 685 insertions, 352 deletions
diff --git a/store/layered_store.go b/store/layered_store.go
index 58c9e5ca1..ab9859c80 100644
--- a/store/layered_store.go
+++ b/store/layered_store.go
@@ -119,8 +119,8 @@ func (s *LayeredStore) Reaction() ReactionStore {
return s.DatabaseLayer.Reaction()
}
-func (s *LayeredStore) JobStatus() JobStatusStore {
- return s.DatabaseLayer.JobStatus()
+func (s *LayeredStore) Job() JobStore {
+ return s.DatabaseLayer.Job()
}
func (s *LayeredStore) MarkSystemRanUnitTests() {
diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go
deleted file mode 100644
index a87b8267b..000000000
--- a/store/sql_job_status_store.go
+++ /dev/null
@@ -1,190 +0,0 @@
-// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "database/sql"
- "net/http"
-
- "github.com/mattermost/platform/model"
-)
-
-type SqlJobStatusStore struct {
- SqlStore
-}
-
-func NewSqlJobStatusStore(sqlStore SqlStore) JobStatusStore {
- s := &SqlJobStatusStore{sqlStore}
-
- for _, db := range sqlStore.GetAllConns() {
- table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id")
- table.ColMap("Id").SetMaxSize(26)
- table.ColMap("Type").SetMaxSize(32)
- table.ColMap("Status").SetMaxSize(32)
- table.ColMap("Data").SetMaxSize(1024)
- }
-
- return s
-}
-
-func (jss SqlJobStatusStore) CreateIndexesIfNotExists() {
- jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type")
-}
-
-func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- if err := jss.GetReplica().SelectOne(&model.JobStatus{},
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil {
- if _, err := jss.GetMaster().Update(status); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error())
- }
- } else if err == sql.ErrNoRows {
- if err := jss.GetMaster().Insert(status); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error())
- }
- } else {
- result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
- "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error())
- }
-
- if result.Err == nil {
- result.Data = status
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) Get(id string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var status *model.JobStatus
-
- if err := jss.GetReplica().SelectOne(&status,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
- if err == sql.ErrNoRows {
- result.Err = model.NewAppError("SqlJobStatusStore.Get",
- "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
- } else {
- result.Err = model.NewAppError("SqlJobStatusStore.Get",
- "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
- }
- } else {
- result.Data = status
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var statuses []*model.JobStatus
-
- if _, err := jss.GetReplica().Select(&statuses,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType",
- "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error())
- } else {
- result.Data = statuses
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- var statuses []*model.JobStatus
-
- if _, err := jss.GetReplica().Select(&statuses,
- `SELECT
- *
- FROM
- JobStatuses
- WHERE
- Type = :Type
- ORDER BY
- StartAt ASC
- LIMIT
- :Limit
- OFFSET
- :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage",
- "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error())
- } else {
- result.Data = statuses
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (jss SqlJobStatusStore) Delete(id string) StoreChannel {
- storeChannel := make(StoreChannel, 1)
-
- go func() {
- result := StoreResult{}
-
- if _, err := jss.GetReplica().Exec(
- `DELETE FROM
- JobStatuses
- WHERE
- Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
- result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType",
- "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error())
- } else {
- result.Data = id
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go
deleted file mode 100644
index 18c29e522..000000000
--- a/store/sql_job_status_store_test.go
+++ /dev/null
@@ -1,151 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "testing"
-
- "github.com/mattermost/platform/model"
-)
-
-func TestJobStatusSaveGetUpdate(t *testing.T) {
- Setup()
-
- status := &model.JobStatus{
- Id: model.NewId(),
- Type: model.NewId(),
- Status: model.NewId(),
- Data: map[string]interface{}{
- "Processed": 0,
- "Total": 12345,
- "LastProcessed": "abcd",
- },
- }
-
- if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
- t.Fatal(result.Err)
- }
-
- defer func() {
- <-store.JobStatus().Delete(status.Id)
- }()
-
- if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.(*model.JobStatus); received.Id != status.Id {
- t.Fatal("received incorrect status after save")
- }
-
- status.Status = model.NewId()
- status.Data = map[string]interface{}{
- "Processed": 12345,
- "Total": 12345,
- "LastProcessed": "abcd",
- }
-
- if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
- t.Fatal(result.Err)
- }
-
- if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status {
- t.Fatal("received incorrect status after update")
- }
-}
-
-func TestJobStatusGetAllByType(t *testing.T) {
- Setup()
-
- jobType := model.NewId()
-
- statuses := []*model.JobStatus{
- {
- Id: model.NewId(),
- Type: jobType,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- },
- {
- Id: model.NewId(),
- Type: model.NewId(),
- },
- }
-
- for _, status := range statuses {
- Must(store.JobStatus().SaveOrUpdate(status))
- defer store.JobStatus().Delete(status.Id)
- }
-
- if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id {
- t.Fatal("should've received first status")
- } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id {
- t.Fatal("should've received second status")
- }
-}
-
-func TestJobStatusGetAllByTypePage(t *testing.T) {
- Setup()
-
- jobType := model.NewId()
-
- statuses := []*model.JobStatus{
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1000,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 999,
- },
- {
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1001,
- },
- }
-
- for _, status := range statuses {
- Must(store.JobStatus().SaveOrUpdate(status))
- defer store.JobStatus().Delete(status.Id)
- }
-
- if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[1].Id {
- t.Fatal("should've received newest job first")
- } else if received[1].Id != statuses[0].Id {
- t.Fatal("should've received second newest job second")
- }
-
- if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
- t.Fatal(result.Err)
- } else if received := result.Data.([]*model.JobStatus); len(received) != 1 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[2].Id {
- t.Fatal("should've received oldest job last")
- }
-}
-
-func TestJobStatusDelete(t *testing.T) {
- Setup()
-
- status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{
- Id: model.NewId(),
- })).(*model.JobStatus)
-
- if result := <-store.JobStatus().Delete(status.Id); result.Err != nil {
- t.Fatal(result.Err)
- }
-}
diff --git a/store/sql_job_store.go b/store/sql_job_store.go
new file mode 100644
index 000000000..c00e37d86
--- /dev/null
+++ b/store/sql_job_store.go
@@ -0,0 +1,327 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "database/sql"
+ "net/http"
+
+ "github.com/mattermost/gorp"
+ "github.com/mattermost/platform/model"
+)
+
+type SqlJobStore struct {
+ SqlStore
+}
+
+func NewSqlJobStore(sqlStore SqlStore) JobStore {
+ s := &SqlJobStore{sqlStore}
+
+ for _, db := range sqlStore.GetAllConns() {
+ table := db.AddTableWithName(model.Job{}, "Jobs").SetKeys(false, "Id")
+ table.ColMap("Id").SetMaxSize(26)
+ table.ColMap("Type").SetMaxSize(32)
+ table.ColMap("Status").SetMaxSize(32)
+ table.ColMap("Data").SetMaxSize(1024)
+ }
+
+ return s
+}
+
+func (jss SqlJobStore) CreateIndexesIfNotExists() {
+ jss.CreateIndexIfNotExists("idx_jobs_type", "Jobs", "Type")
+}
+
+func (jss SqlJobStore) Save(job *model.Job) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+ if err := jss.GetMaster().Insert(job); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.Save",
+ "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if sqlResult, err := jss.GetMaster().Exec(
+ `UPDATE
+ Jobs
+ SET
+ LastActivityAt = :LastActivityAt,
+ Status = :Status,
+ Progress = :Progress,
+ Data = :Data
+ WHERE
+ Id = :Id
+ AND
+ Status = :OldStatus`,
+ map[string]interface{}{
+ "Id": job.Id,
+ "OldStatus": currentStatus,
+ "LastActivityAt": model.GetMillis(),
+ "Status": job.Status,
+ "Data": job.DataToJson(),
+ "Progress": job.Progress,
+ }); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateOptimistically",
+ "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ rows, err := sqlResult.RowsAffected()
+
+ if err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error())
+ } else {
+ if rows == 1 {
+ result.Data = true
+ } else {
+ result.Data = false
+ }
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateStatus(id string, status string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ job := &model.Job{
+ Id: id,
+ Status: status,
+ LastActivityAt: model.GetMillis(),
+ }
+
+ if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool {
+ return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt"
+ }, job); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ }
+
+ if result.Err == nil {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var startAtClause string
+ if newStatus == model.JOB_STATUS_IN_PROGRESS {
+ startAtClause = `StartAt = :StartAt,`
+ }
+
+ if sqlResult, err := jss.GetMaster().Exec(
+ `UPDATE
+ Jobs
+ SET `+startAtClause+`
+ Status = :NewStatus,
+ LastActivityAt = :LastActivityAt
+ WHERE
+ Id = :Id
+ AND
+ Status = :OldStatus`, map[string]interface{}{"Id": id, "OldStatus": currentStatus, "NewStatus": newStatus, "StartAt": model.GetMillis(), "LastActivityAt": model.GetMillis()}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ rows, err := sqlResult.RowsAffected()
+
+ if err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus",
+ "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ if rows == 1 {
+ result.Data = true
+ } else {
+ result.Data = false
+ }
+ }
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) Get(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var status *model.Job
+
+ if err := jss.GetReplica().SelectOne(&status,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ if err == sql.ErrNoRows {
+ result.Err = model.NewAppError("SqlJobStore.Get",
+ "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
+ } else {
+ result.Err = model.NewAppError("SqlJobStore.Get",
+ "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
+ }
+ } else {
+ result.Data = status
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByType",
+ "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Type = :Type
+ ORDER BY
+ StartAt ASC
+ LIMIT
+ :Limit
+ OFFSET
+ :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByTypePage",
+ "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetAllByStatus(status string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Status = :Status
+ ORDER BY
+ CreateAt ASC`, map[string]interface{}{"Status": status}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllByStatus",
+ "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) Delete(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := jss.GetMaster().Exec(
+ `DELETE FROM
+ Jobs
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.DeleteByType",
+ "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ result.Data = id
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go
new file mode 100644
index 000000000..edf09a4c0
--- /dev/null
+++ b/store/sql_job_store_test.go
@@ -0,0 +1,341 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "testing"
+
+ "github.com/mattermost/platform/model"
+ "time"
+)
+
+func TestJobSaveGet(t *testing.T) {
+ Setup()
+
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.NewId(),
+ Status: model.NewId(),
+ Data: map[string]interface{}{
+ "Processed": 0,
+ "Total": 12345,
+ "LastProcessed": "abcd",
+ },
+ }
+
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer func() {
+ <-store.Job().Delete(job.Id)
+ }()
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.(*model.Job); received.Id != job.Id {
+ t.Fatal("received incorrect job after save")
+ }
+}
+
+func TestJobGetAllByType(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByType(jobType); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
+ t.Fatal("should've received first jobs")
+ } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id {
+ t.Fatal("should've received second jobs")
+ }
+}
+
+func TestJobGetAllByTypePage(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1000,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 999,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1001,
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[1].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != jobs[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ if result := <-store.Job().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 1 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[2].Id {
+ t.Fatal("should've received oldest job last")
+ }
+}
+
+func TestJobGetAllByStatus(t *testing.T) {
+ jobType := model.NewId()
+ status := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1000,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 999,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1001,
+ Status: status,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1002,
+ Status: model.NewId(),
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllByStatus(status); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 3 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
+ t.Fatal("should've received first jobs")
+ } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id {
+ t.Fatal("should've received second jobs")
+ }
+}
+
+func TestJobUpdateOptimistically(t *testing.T) {
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.JOB_TYPE_DATA_RETENTION,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_PENDING,
+ }
+
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+ defer store.Job().Delete(job.Id)
+
+ job.LastActivityAt = model.GetMillis()
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.Progress = 50
+ job.Data = map[string]interface{}{
+ "Foo": "Bar",
+ }
+
+ if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ if result.Data.(bool) {
+ t.Fatal("should have failed due to incorrect old status")
+ }
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("Should have successfully updated")
+ }
+
+ var updatedJob *model.Job
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ updatedJob = result.Data.(*model.Job)
+ }
+
+ if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] {
+ t.Fatal("Some update property was not as expected")
+ }
+ }
+
+}
+
+func TestJobUpdateStatusUpdateStatusOptimistically(t *testing.T) {
+ job := &model.Job{
+ Id: model.NewId(),
+ Type: model.JOB_TYPE_DATA_RETENTION,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_SUCCESS,
+ }
+
+ var lastUpdateAt int64
+ if result := <-store.Job().Save(job); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ lastUpdateAt = result.Data.(*model.Job).LastActivityAt
+ }
+
+ defer store.Job().Delete(job.Id)
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_PENDING {
+ t.Fatal("status wasn't updated")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if result.Data.(bool) {
+ t.Fatal("should be false due to incorrect original status")
+ }
+ }
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_PENDING {
+ t.Fatal("should still be pending")
+ }
+ if received.LastActivityAt != lastUpdateAt {
+ t.Fatal("last activity at shouldn't have changed")
+ }
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("should have succeeded")
+ }
+ }
+
+ var startAtSet int64
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_IN_PROGRESS {
+ t.Fatal("should be in progress")
+ }
+ if received.StartAt == 0 {
+ t.Fatal("received should have start at set")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ startAtSet = received.StartAt
+ }
+
+ time.Sleep(2 * time.Millisecond)
+
+ if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ if !result.Data.(bool) {
+ t.Fatal("should have succeeded")
+ }
+ }
+
+ if result := <-store.Job().Get(job.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.Job)
+ if received.Status != model.JOB_STATUS_SUCCESS {
+ t.Fatal("should be success status")
+ }
+ if received.StartAt != startAtSet {
+ t.Fatal("startAt should not have changed")
+ }
+ if received.LastActivityAt <= lastUpdateAt {
+ t.Fatal("lastActivityAt wasn't updated")
+ }
+ lastUpdateAt = received.LastActivityAt
+ }
+}
+
+func TestJobDelete(t *testing.T) {
+ Setup()
+
+ status := Must(store.Job().Save(&model.Job{
+ Id: model.NewId(),
+ })).(*model.Job)
+
+ if result := <-store.Job().Delete(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
diff --git a/store/sql_store.go b/store/sql_store.go
index dc3b51d0c..a039401f3 100644
--- a/store/sql_store.go
+++ b/store/sql_store.go
@@ -79,4 +79,5 @@ type SqlStore interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
+ Job() JobStore
}
diff --git a/store/sql_supplier.go b/store/sql_supplier.go
index 6f51cbd09..0f4ab8380 100644
--- a/store/sql_supplier.go
+++ b/store/sql_supplier.go
@@ -82,7 +82,7 @@ type SqlSupplierOldStores struct {
status StatusStore
fileInfo FileInfoStore
reaction ReactionStore
- jobStatus JobStatusStore
+ job JobStore
}
type SqlSupplier struct {
@@ -121,7 +121,7 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.status = NewSqlStatusStore(supplier)
supplier.oldStores.fileInfo = NewSqlFileInfoStore(supplier)
supplier.oldStores.reaction = NewSqlReactionStore(supplier)
- supplier.oldStores.jobStatus = NewSqlJobStatusStore(supplier)
+ supplier.oldStores.job = NewSqlJobStore(supplier)
err := supplier.GetMaster().CreateTablesIfNotExists()
if err != nil {
@@ -150,7 +150,7 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.status.(*SqlStatusStore).CreateIndexesIfNotExists()
supplier.oldStores.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists()
supplier.oldStores.reaction.(*SqlReactionStore).CreateIndexesIfNotExists()
- supplier.oldStores.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists()
+ supplier.oldStores.job.(*SqlJobStore).CreateIndexesIfNotExists()
supplier.oldStores.preference.(*SqlPreferenceStore).DeleteUnusedFeatures()
@@ -752,8 +752,8 @@ func (ss *SqlSupplier) Reaction() ReactionStore {
return ss.oldStores.reaction
}
-func (ss *SqlSupplier) JobStatus() JobStatusStore {
- return ss.oldStores.jobStatus
+func (ss *SqlSupplier) Job() JobStore {
+ return ss.oldStores.job
}
func (ss *SqlSupplier) DropAllTables() {
diff --git a/store/sql_upgrade.go b/store/sql_upgrade.go
index 5a6ed0ab5..a7b72124e 100644
--- a/store/sql_upgrade.go
+++ b/store/sql_upgrade.go
@@ -280,8 +280,9 @@ func UpgradeDatabaseToVersion40(sqlStore SqlStore) {
}
func UpgradeDatabaseToVersion41(sqlStore SqlStore) {
- // TODO: Uncomment following condition when version 4.0.0 is released
+ // TODO: Uncomment following condition when version 4.1.0 is released
// if shouldPerformUpgrade(sqlStore, VERSION_4_0_0, VERSION_4_1_0) {
+ sqlStore.RemoveTableIfExists("JobStatuses")
// saveSchemaVersion(sqlStore, VERSION_4_1_0)
// }
}
diff --git a/store/store.go b/store/store.go
index 0007f495e..95496b609 100644
--- a/store/store.go
+++ b/store/store.go
@@ -48,7 +48,7 @@ type Store interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
- JobStatus() JobStatusStore
+ Job() JobStore
MarkSystemRanUnitTests()
Close()
DropAllTables()
@@ -384,10 +384,14 @@ type ReactionStore interface {
DeleteAllWithEmojiName(emojiName string) StoreChannel
}
-type JobStatusStore interface {
- SaveOrUpdate(status *model.JobStatus) StoreChannel
+type JobStore interface {
+ Save(job *model.Job) StoreChannel
+ UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel
+ UpdateStatus(id string, status string) StoreChannel
+ UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel
Get(id string) StoreChannel
GetAllByType(jobType string) StoreChannel
GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
+ GetAllByStatus(status string) StoreChannel
Delete(id string) StoreChannel
}