summaryrefslogtreecommitdiffstats
path: root/store
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2017-09-28 17:11:13 +0100
committerCorey Hulen <corey@hulen.com>2017-09-28 09:11:13 -0700
commita06830b2f88a8d374c326a1191870cbc7cf7dac2 (patch)
tree4879ce49de061fba894fe01b54db701c639f0e94 /store
parentf263d2b9510fb557fe075dee5097cb32e2b1e5e2 (diff)
downloadchat-a06830b2f88a8d374c326a1191870cbc7cf7dac2.tar.gz
chat-a06830b2f88a8d374c326a1191870cbc7cf7dac2.tar.bz2
chat-a06830b2f88a8d374c326a1191870cbc7cf7dac2.zip
PLT-7644: Improve job scheduler architecture. (#7532)
Diffstat (limited to 'store')
-rw-r--r--store/sqlstore/job_store.go58
-rw-r--r--store/sqlstore/job_store_test.go103
-rw-r--r--store/store.go2
3 files changed, 163 insertions, 0 deletions
diff --git a/store/sqlstore/job_store.go b/store/sqlstore/job_store.go
index c56f526af..0ae5a6a07 100644
--- a/store/sqlstore/job_store.go
+++ b/store/sqlstore/job_store.go
@@ -325,6 +325,64 @@ func (jss SqlJobStore) GetAllByStatus(status string) store.StoreChannel {
return storeChannel
}
+func (jss SqlJobStore) GetNewestJobByStatusAndType(status string, jobType string) store.StoreChannel {
+ storeChannel := make(store.StoreChannel, 1)
+
+ go func() {
+ result := store.StoreResult{}
+
+ var job *model.Job
+
+ if err := jss.GetReplica().SelectOne(&job,
+ `SELECT
+ *
+ FROM
+ Jobs
+ WHERE
+ Status = :Status
+ AND
+ Type = :Type
+ ORDER BY
+ CreateAt DESC
+ LIMIT 1`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil {
+ result.Err = model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_newest_job_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
+ } else {
+ result.Data = job
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStore) GetCountByStatusAndType(status string, jobType string) store.StoreChannel {
+ storeChannel := make(store.StoreChannel, 1)
+
+ go func() {
+ result := store.StoreResult{}
+
+ if count, err := jss.GetReplica().SelectInt(`SELECT
+ COUNT(*)
+ FROM
+ Jobs
+ WHERE
+ Status = :Status
+ AND
+ Type = :Type`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil {
+ result.Err = model.NewAppError("SqlJobStore.GetCountByStatusAndType", "store.sql_job.get_count_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
+ } else {
+ result.Data = count
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
func (jss SqlJobStore) Delete(id string) store.StoreChannel {
storeChannel := make(store.StoreChannel, 1)
diff --git a/store/sqlstore/job_store_test.go b/store/sqlstore/job_store_test.go
index bbeca6c3a..148d8b92d 100644
--- a/store/sqlstore/job_store_test.go
+++ b/store/sqlstore/job_store_test.go
@@ -10,6 +10,7 @@ import (
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
+ "github.com/stretchr/testify/assert"
)
func TestJobSaveGet(t *testing.T) {
@@ -231,6 +232,108 @@ func TestJobGetAllByStatus(t *testing.T) {
}
}
+func TestJobStoreGetNewestJobByStatusAndType(t *testing.T) {
+ ss := Setup()
+
+ jobType1 := model.NewId()
+ jobType2 := model.NewId()
+ status1 := model.NewId()
+ status2 := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1001,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1000,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType2,
+ CreateAt: 1003,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1004,
+ Status: status2,
+ },
+ }
+
+ for _, job := range jobs {
+ store.Must(ss.Job().Save(job))
+ defer ss.Job().Delete(job.Id)
+ }
+
+ result := <-ss.Job().GetNewestJobByStatusAndType(status1, jobType1)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, jobs[0].Id, result.Data.(*model.Job).Id)
+}
+
+func TestJobStoreGetCountByStatusAndType(t *testing.T) {
+ ss := Setup()
+
+ jobType1 := model.NewId()
+ jobType2 := model.NewId()
+ status1 := model.NewId()
+ status2 := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1000,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 999,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType2,
+ CreateAt: 1001,
+ Status: status1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType1,
+ CreateAt: 1002,
+ Status: status2,
+ },
+ }
+
+ for _, job := range jobs {
+ store.Must(ss.Job().Save(job))
+ defer ss.Job().Delete(job.Id)
+ }
+
+ result := <-ss.Job().GetCountByStatusAndType(status1, jobType1)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, 2, result.Data.(int64))
+
+ result = <-ss.Job().GetCountByStatusAndType(status2, jobType2)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, 0, result.Data.(int64))
+
+ result = <-ss.Job().GetCountByStatusAndType(status1, jobType2)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, 1, result.Data.(int64))
+
+ result = <-ss.Job().GetCountByStatusAndType(status2, jobType1)
+ assert.Nil(t, result.Err)
+ assert.EqualValues(t, 1, result.Data.(int64))
+}
+
func TestJobUpdateOptimistically(t *testing.T) {
ss := Setup()
diff --git a/store/store.go b/store/store.go
index d2c30318f..9694921c8 100644
--- a/store/store.go
+++ b/store/store.go
@@ -411,6 +411,8 @@ type JobStore interface {
GetAllByType(jobType string) StoreChannel
GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
GetAllByStatus(status string) StoreChannel
+ GetNewestJobByStatusAndType(status string, jobType string) StoreChannel
+ GetCountByStatusAndType(status string, jobType string) StoreChannel
Delete(id string) StoreChannel
}