From a06830b2f88a8d374c326a1191870cbc7cf7dac2 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Thu, 28 Sep 2017 17:11:13 +0100 Subject: PLT-7644: Improve job scheduler architecture. (#7532) --- store/sqlstore/job_store.go | 58 ++++++++++++++++++++++ store/sqlstore/job_store_test.go | 103 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) (limited to 'store/sqlstore') diff --git a/store/sqlstore/job_store.go b/store/sqlstore/job_store.go index c56f526af..0ae5a6a07 100644 --- a/store/sqlstore/job_store.go +++ b/store/sqlstore/job_store.go @@ -325,6 +325,64 @@ func (jss SqlJobStore) GetAllByStatus(status string) store.StoreChannel { return storeChannel } +func (jss SqlJobStore) GetNewestJobByStatusAndType(status string, jobType string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + var job *model.Job + + if err := jss.GetReplica().SelectOne(&job, + `SELECT + * + FROM + Jobs + WHERE + Status = :Status + AND + Type = :Type + ORDER BY + CreateAt DESC + LIMIT 1`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil { + result.Err = model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_newest_job_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) + } else { + result.Data = job + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetCountByStatusAndType(status string, jobType string) store.StoreChannel { + storeChannel := make(store.StoreChannel, 1) + + go func() { + result := store.StoreResult{} + + if count, err := jss.GetReplica().SelectInt(`SELECT + COUNT(*) + FROM + Jobs + WHERE + Status = :Status + AND + Type = :Type`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil { + result.Err = model.NewAppError("SqlJobStore.GetCountByStatusAndType", "store.sql_job.get_count_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) + } else { + result.Data = count + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + func (jss SqlJobStore) Delete(id string) store.StoreChannel { storeChannel := make(store.StoreChannel, 1) diff --git a/store/sqlstore/job_store_test.go b/store/sqlstore/job_store_test.go index bbeca6c3a..148d8b92d 100644 --- a/store/sqlstore/job_store_test.go +++ b/store/sqlstore/job_store_test.go @@ -10,6 +10,7 @@ import ( "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" + "github.com/stretchr/testify/assert" ) func TestJobSaveGet(t *testing.T) { @@ -231,6 +232,108 @@ func TestJobGetAllByStatus(t *testing.T) { } } +func TestJobStoreGetNewestJobByStatusAndType(t *testing.T) { + ss := Setup() + + jobType1 := model.NewId() + jobType2 := model.NewId() + status1 := model.NewId() + status2 := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1001, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1000, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType2, + CreateAt: 1003, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1004, + Status: status2, + }, + } + + for _, job := range jobs { + store.Must(ss.Job().Save(job)) + defer ss.Job().Delete(job.Id) + } + + result := <-ss.Job().GetNewestJobByStatusAndType(status1, jobType1) + assert.Nil(t, result.Err) + assert.EqualValues(t, jobs[0].Id, result.Data.(*model.Job).Id) +} + +func TestJobStoreGetCountByStatusAndType(t *testing.T) { + ss := Setup() + + jobType1 := model.NewId() + jobType2 := model.NewId() + status1 := model.NewId() + status2 := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1000, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 999, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType2, + CreateAt: 1001, + Status: status1, + }, + { + Id: model.NewId(), + Type: jobType1, + CreateAt: 1002, + Status: status2, + }, + } + + for _, job := range jobs { + store.Must(ss.Job().Save(job)) + defer ss.Job().Delete(job.Id) + } + + result := <-ss.Job().GetCountByStatusAndType(status1, jobType1) + assert.Nil(t, result.Err) + assert.EqualValues(t, 2, result.Data.(int64)) + + result = <-ss.Job().GetCountByStatusAndType(status2, jobType2) + assert.Nil(t, result.Err) + assert.EqualValues(t, 0, result.Data.(int64)) + + result = <-ss.Job().GetCountByStatusAndType(status1, jobType2) + assert.Nil(t, result.Err) + assert.EqualValues(t, 1, result.Data.(int64)) + + result = <-ss.Job().GetCountByStatusAndType(status2, jobType1) + assert.Nil(t, result.Err) + assert.EqualValues(t, 1, result.Data.(int64)) +} + func TestJobUpdateOptimistically(t *testing.T) { ss := Setup() -- cgit v1.2.3-1-g7c22