summaryrefslogtreecommitdiffstats
path: root/store
diff options
context:
space:
mode:
authorHarrison Healey <harrisonmhealey@gmail.com>2017-05-18 15:05:57 -0400
committerGitHub <noreply@github.com>2017-05-18 15:05:57 -0400
commit577ed27f1bb060080d311342047e31943a02ccbb (patch)
treead57fa69b1daf143e914ea2480a475e5450cc236 /store
parent920bc0d8712a50691b1f698779f60132536eb214 (diff)
downloadchat-577ed27f1bb060080d311342047e31943a02ccbb.tar.gz
chat-577ed27f1bb060080d311342047e31943a02ccbb.tar.bz2
chat-577ed27f1bb060080d311342047e31943a02ccbb.zip
PLT-6408 Framework for job server (#6404)
* Added initial job server * Added job server to be ran as part of platform * Added test job to the enterprise repo * Fixed job server not loading license * Renamed job package to jobs * Fixed TE not being buildable * Added JobStatus table to database * Changed fields used by JobStatus * Added APIs to query job status * Added config change listener to server * Added option to run job server from Makefile * Added ability to enable/disable jobs from config * Commented out placeholder for search indexing job * Fixed govet * Removed debug messages and fixed job api init message
Diffstat (limited to 'store')
-rw-r--r--store/sql_job_status_store.go190
-rw-r--r--store/sql_job_status_store_test.go151
-rw-r--r--store/sql_store.go19
-rw-r--r--store/store.go9
4 files changed, 369 insertions, 0 deletions
diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go
new file mode 100644
index 000000000..ef039d99a
--- /dev/null
+++ b/store/sql_job_status_store.go
@@ -0,0 +1,190 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "database/sql"
+ "net/http"
+
+ "github.com/mattermost/platform/model"
+)
+
+type SqlJobStatusStore struct {
+ *SqlStore
+}
+
+func NewSqlJobStatusStore(sqlStore *SqlStore) JobStatusStore {
+ s := &SqlJobStatusStore{sqlStore}
+
+ for _, db := range sqlStore.GetAllConns() {
+ table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id")
+ table.ColMap("Id").SetMaxSize(26)
+ table.ColMap("Type").SetMaxSize(32)
+ table.ColMap("Status").SetMaxSize(32)
+ table.ColMap("Data").SetMaxSize(1024)
+ }
+
+ return s
+}
+
+func (jss SqlJobStatusStore) CreateIndexesIfNotExists() {
+ jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type")
+}
+
+func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if err := jss.GetReplica().SelectOne(&model.JobStatus{},
+ `SELECT
+ *
+ FROM
+ JobStatuses
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil {
+ if _, err := jss.GetMaster().Update(status); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
+ "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error())
+ }
+ } else if err == sql.ErrNoRows {
+ if err := jss.GetMaster().Insert(status); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
+ "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error())
+ }
+ } else {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
+ "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error())
+ }
+
+ if result.Err == nil {
+ result.Data = status
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStatusStore) Get(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var status *model.JobStatus
+
+ if err := jss.GetReplica().SelectOne(&status,
+ `SELECT
+ *
+ FROM
+ JobStatuses
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ if err == sql.ErrNoRows {
+ result.Err = model.NewAppError("SqlJobStatusStore.Get",
+ "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
+ } else {
+ result.Err = model.NewAppError("SqlJobStatusStore.Get",
+ "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
+ }
+ } else {
+ result.Data = status
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.JobStatus
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ JobStatuses
+ WHERE
+ Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType",
+ "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.JobStatus
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ JobStatuses
+ WHERE
+ Type = :Type
+ ORDER BY
+ StartAt ASC
+ LIMIT
+ :Limit
+ OFFSET
+ :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage",
+ "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStatusStore) Delete(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := jss.GetReplica().Exec(
+ `DELETE FROM
+ JobStatuses
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType",
+ "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ result.Data = id
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go
new file mode 100644
index 000000000..18c29e522
--- /dev/null
+++ b/store/sql_job_status_store_test.go
@@ -0,0 +1,151 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "testing"
+
+ "github.com/mattermost/platform/model"
+)
+
+func TestJobStatusSaveGetUpdate(t *testing.T) {
+ Setup()
+
+ status := &model.JobStatus{
+ Id: model.NewId(),
+ Type: model.NewId(),
+ Status: model.NewId(),
+ Data: map[string]interface{}{
+ "Processed": 0,
+ "Total": 12345,
+ "LastProcessed": "abcd",
+ },
+ }
+
+ if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer func() {
+ <-store.JobStatus().Delete(status.Id)
+ }()
+
+ if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.(*model.JobStatus); received.Id != status.Id {
+ t.Fatal("received incorrect status after save")
+ }
+
+ status.Status = model.NewId()
+ status.Data = map[string]interface{}{
+ "Processed": 12345,
+ "Total": 12345,
+ "LastProcessed": "abcd",
+ }
+
+ if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status {
+ t.Fatal("received incorrect status after update")
+ }
+}
+
+func TestJobStatusGetAllByType(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ statuses := []*model.JobStatus{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ },
+ }
+
+ for _, status := range statuses {
+ Must(store.JobStatus().SaveOrUpdate(status))
+ defer store.JobStatus().Delete(status.Id)
+ }
+
+ if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id {
+ t.Fatal("should've received first status")
+ } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id {
+ t.Fatal("should've received second status")
+ }
+}
+
+func TestJobStatusGetAllByTypePage(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ statuses := []*model.JobStatus{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1000,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 999,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1001,
+ },
+ }
+
+ for _, status := range statuses {
+ Must(store.JobStatus().SaveOrUpdate(status))
+ defer store.JobStatus().Delete(status.Id)
+ }
+
+ if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[1].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != statuses[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.JobStatus); len(received) != 1 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[2].Id {
+ t.Fatal("should've received oldest job last")
+ }
+}
+
+func TestJobStatusDelete(t *testing.T) {
+ Setup()
+
+ status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{
+ Id: model.NewId(),
+ })).(*model.JobStatus)
+
+ if result := <-store.JobStatus().Delete(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
diff --git a/store/sql_store.go b/store/sql_store.go
index f13fe2ec0..4261c849a 100644
--- a/store/sql_store.go
+++ b/store/sql_store.go
@@ -87,6 +87,7 @@ type SqlStore struct {
status StatusStore
fileInfo FileInfoStore
reaction ReactionStore
+ jobStatus JobStatusStore
SchemaVersion string
rrCounter int64
srCounter int64
@@ -151,6 +152,7 @@ func NewSqlStore() Store {
sqlStore.status = NewSqlStatusStore(sqlStore)
sqlStore.fileInfo = NewSqlFileInfoStore(sqlStore)
sqlStore.reaction = NewSqlReactionStore(sqlStore)
+ sqlStore.jobStatus = NewSqlJobStatusStore(sqlStore)
err := sqlStore.master.CreateTablesIfNotExists()
if err != nil {
@@ -179,6 +181,7 @@ func NewSqlStore() Store {
sqlStore.status.(*SqlStatusStore).CreateIndexesIfNotExists()
sqlStore.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists()
sqlStore.reaction.(*SqlReactionStore).CreateIndexesIfNotExists()
+ sqlStore.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists()
sqlStore.preference.(*SqlPreferenceStore).DeleteUnusedFeatures()
@@ -735,6 +738,10 @@ func (ss *SqlStore) Reaction() ReactionStore {
return ss.reaction
}
+func (ss *SqlStore) JobStatus() JobStatusStore {
+ return ss.jobStatus
+}
+
func (ss *SqlStore) DropAllTables() {
ss.master.TruncateTables()
}
@@ -752,6 +759,8 @@ func (me mattermConverter) ToDb(val interface{}) (interface{}, error) {
return encrypt([]byte(utils.Cfg.SqlSettings.AtRestEncryptKey), model.MapToJson(t))
case model.StringInterface:
return model.StringInterfaceToJson(t), nil
+ case map[string]interface{}:
+ return model.StringInterfaceToJson(model.StringInterface(t)), nil
}
return val, nil
@@ -805,6 +814,16 @@ func (me mattermConverter) FromDb(target interface{}) (gorp.CustomScanner, bool)
return json.Unmarshal(b, target)
}
return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true
+ case *map[string]interface{}:
+ binder := func(holder, target interface{}) error {
+ s, ok := holder.(*string)
+ if !ok {
+ return errors.New(utils.T("store.sql.convert_string_interface"))
+ }
+ b := []byte(*s)
+ return json.Unmarshal(b, target)
+ }
+ return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true
}
return gorp.CustomScanner{}, false
diff --git a/store/store.go b/store/store.go
index acbeafdd6..cd7792ce1 100644
--- a/store/store.go
+++ b/store/store.go
@@ -47,6 +47,7 @@ type Store interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
+ JobStatus() JobStatusStore
MarkSystemRanUnitTests()
Close()
DropAllTables()
@@ -371,3 +372,11 @@ type ReactionStore interface {
GetForPost(postId string, allowFromCache bool) StoreChannel
DeleteAllWithEmojiName(emojiName string) StoreChannel
}
+
+type JobStatusStore interface {
+ SaveOrUpdate(status *model.JobStatus) StoreChannel
+ Get(id string) StoreChannel
+ GetAllByType(jobType string) StoreChannel
+ GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
+ Delete(id string) StoreChannel
+}