diff options
Diffstat (limited to 'store')
-rw-r--r-- | store/layered_store.go | 4 | ||||
-rw-r--r-- | store/sql_channel_store.go | 2 | ||||
-rw-r--r-- | store/sql_job_status_store.go | 190 | ||||
-rw-r--r-- | store/sql_job_status_store_test.go | 151 | ||||
-rw-r--r-- | store/sql_job_store.go | 327 | ||||
-rw-r--r-- | store/sql_job_store_test.go | 341 | ||||
-rw-r--r-- | store/sql_post_store.go | 44 | ||||
-rw-r--r-- | store/sql_post_store_test.go | 69 | ||||
-rw-r--r-- | store/sql_store.go | 1 | ||||
-rw-r--r-- | store/sql_supplier.go | 10 | ||||
-rw-r--r-- | store/sql_upgrade.go | 10 | ||||
-rw-r--r-- | store/store.go | 11 |
12 files changed, 806 insertions, 354 deletions
diff --git a/store/layered_store.go b/store/layered_store.go index 58c9e5ca1..ab9859c80 100644 --- a/store/layered_store.go +++ b/store/layered_store.go @@ -119,8 +119,8 @@ func (s *LayeredStore) Reaction() ReactionStore { return s.DatabaseLayer.Reaction() } -func (s *LayeredStore) JobStatus() JobStatusStore { - return s.DatabaseLayer.JobStatus() +func (s *LayeredStore) Job() JobStore { + return s.DatabaseLayer.Job() } func (s *LayeredStore) MarkSystemRanUnitTests() { diff --git a/store/sql_channel_store.go b/store/sql_channel_store.go index c009d64d3..db9c2c1f4 100644 --- a/store/sql_channel_store.go +++ b/store/sql_channel_store.go @@ -361,7 +361,7 @@ func (s SqlChannelStore) GetPinnedPosts(channelId string) StoreChannel { go func() { result := StoreResult{} - pl := &model.PostList{} + pl := model.NewPostList() var posts []*model.Post if _, err := s.GetReplica().Select(&posts, "SELECT * FROM Posts WHERE IsPinned = true AND ChannelId = :ChannelId AND DeleteAt = 0 ORDER BY CreateAt ASC", map[string]interface{}{"ChannelId": channelId}); err != nil { diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go deleted file mode 100644 index a87b8267b..000000000 --- a/store/sql_job_status_store.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package store - -import ( - "database/sql" - "net/http" - - "github.com/mattermost/platform/model" -) - -type SqlJobStatusStore struct { - SqlStore -} - -func NewSqlJobStatusStore(sqlStore SqlStore) JobStatusStore { - s := &SqlJobStatusStore{sqlStore} - - for _, db := range sqlStore.GetAllConns() { - table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id") - table.ColMap("Id").SetMaxSize(26) - table.ColMap("Type").SetMaxSize(32) - table.ColMap("Status").SetMaxSize(32) - table.ColMap("Data").SetMaxSize(1024) - } - - return s -} - -func (jss SqlJobStatusStore) CreateIndexesIfNotExists() { - jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type") -} - -func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - if err := jss.GetReplica().SelectOne(&model.JobStatus{}, - `SELECT - * - FROM - JobStatuses - WHERE - Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil { - if _, err := jss.GetMaster().Update(status); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", - "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error()) - } - } else if err == sql.ErrNoRows { - if err := jss.GetMaster().Insert(status); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", - "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error()) - } - } else { - result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", - "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error()) - } - - if result.Err == nil { - result.Data = status - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (jss SqlJobStatusStore) Get(id string) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - var status *model.JobStatus - - if err := jss.GetReplica().SelectOne(&status, - `SELECT - * - FROM - JobStatuses - WHERE - Id = :Id`, map[string]interface{}{"Id": id}); err != nil { - if err == sql.ErrNoRows { - result.Err = model.NewAppError("SqlJobStatusStore.Get", - "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound) - } else { - result.Err = model.NewAppError("SqlJobStatusStore.Get", - "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError) - } - } else { - result.Data = status - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - var statuses []*model.JobStatus - - if _, err := jss.GetReplica().Select(&statuses, - `SELECT - * - FROM - JobStatuses - WHERE - Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType", - "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error()) - } else { - result.Data = statuses - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - var statuses []*model.JobStatus - - if _, err := jss.GetReplica().Select(&statuses, - `SELECT - * - FROM - JobStatuses - WHERE - Type = :Type - ORDER BY - StartAt ASC - LIMIT - :Limit - OFFSET - :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage", - "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error()) - } else { - result.Data = statuses - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (jss SqlJobStatusStore) Delete(id string) StoreChannel { - storeChannel := make(StoreChannel, 1) - - go func() { - result := StoreResult{} - - if _, err := jss.GetReplica().Exec( - `DELETE FROM - JobStatuses - WHERE - Id = :Id`, map[string]interface{}{"Id": id}); err != nil { - result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType", - "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error()) - } else { - result.Data = id - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go deleted file mode 100644 index 18c29e522..000000000 --- a/store/sql_job_status_store_test.go +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package store - -import ( - "testing" - - "github.com/mattermost/platform/model" -) - -func TestJobStatusSaveGetUpdate(t *testing.T) { - Setup() - - status := &model.JobStatus{ - Id: model.NewId(), - Type: model.NewId(), - Status: model.NewId(), - Data: map[string]interface{}{ - "Processed": 0, - "Total": 12345, - "LastProcessed": "abcd", - }, - } - - if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil { - t.Fatal(result.Err) - } - - defer func() { - <-store.JobStatus().Delete(status.Id) - }() - - if result := <-store.JobStatus().Get(status.Id); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.(*model.JobStatus); received.Id != status.Id { - t.Fatal("received incorrect status after save") - } - - status.Status = model.NewId() - status.Data = map[string]interface{}{ - "Processed": 12345, - "Total": 12345, - "LastProcessed": "abcd", - } - - if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil { - t.Fatal(result.Err) - } - - if result := <-store.JobStatus().Get(status.Id); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status { - t.Fatal("received incorrect status after update") - } -} - -func TestJobStatusGetAllByType(t *testing.T) { - Setup() - - jobType := model.NewId() - - statuses := []*model.JobStatus{ - { - Id: model.NewId(), - Type: jobType, - }, - { - Id: model.NewId(), - Type: jobType, - }, - { - Id: model.NewId(), - Type: model.NewId(), - }, - } - - for _, status := range statuses { - Must(store.JobStatus().SaveOrUpdate(status)) - defer store.JobStatus().Delete(status.Id) - } - - if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.JobStatus); len(received) != 2 { - t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id { - t.Fatal("should've received first status") - } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id { - t.Fatal("should've received second status") - } -} - -func TestJobStatusGetAllByTypePage(t *testing.T) { - Setup() - - jobType := model.NewId() - - statuses := []*model.JobStatus{ - { - Id: model.NewId(), - Type: jobType, - StartAt: 1000, - }, - { - Id: model.NewId(), - Type: jobType, - StartAt: 999, - }, - { - Id: model.NewId(), - Type: jobType, - StartAt: 1001, - }, - } - - for _, status := range statuses { - Must(store.JobStatus().SaveOrUpdate(status)) - defer store.JobStatus().Delete(status.Id) - } - - if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.JobStatus); len(received) != 2 { - t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[1].Id { - t.Fatal("should've received newest job first") - } else if received[1].Id != statuses[0].Id { - t.Fatal("should've received second newest job second") - } - - if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.JobStatus); len(received) != 1 { - t.Fatal("received wrong number of statuses") - } else if received[0].Id != statuses[2].Id { - t.Fatal("should've received oldest job last") - } -} - -func TestJobStatusDelete(t *testing.T) { - Setup() - - status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{ - Id: model.NewId(), - })).(*model.JobStatus) - - if result := <-store.JobStatus().Delete(status.Id); result.Err != nil { - t.Fatal(result.Err) - } -} diff --git a/store/sql_job_store.go b/store/sql_job_store.go new file mode 100644 index 000000000..c00e37d86 --- /dev/null +++ b/store/sql_job_store.go @@ -0,0 +1,327 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "database/sql" + "net/http" + + "github.com/mattermost/gorp" + "github.com/mattermost/platform/model" +) + +type SqlJobStore struct { + SqlStore +} + +func NewSqlJobStore(sqlStore SqlStore) JobStore { + s := &SqlJobStore{sqlStore} + + for _, db := range sqlStore.GetAllConns() { + table := db.AddTableWithName(model.Job{}, "Jobs").SetKeys(false, "Id") + table.ColMap("Id").SetMaxSize(26) + table.ColMap("Type").SetMaxSize(32) + table.ColMap("Status").SetMaxSize(32) + table.ColMap("Data").SetMaxSize(1024) + } + + return s +} + +func (jss SqlJobStore) CreateIndexesIfNotExists() { + jss.CreateIndexIfNotExists("idx_jobs_type", "Jobs", "Type") +} + +func (jss SqlJobStore) Save(job *model.Job) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + if err := jss.GetMaster().Insert(job); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.Save", + "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error()) + } else { + result.Data = job + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if sqlResult, err := jss.GetMaster().Exec( + `UPDATE + Jobs + SET + LastActivityAt = :LastActivityAt, + Status = :Status, + Progress = :Progress, + Data = :Data + WHERE + Id = :Id + AND + Status = :OldStatus`, + map[string]interface{}{ + "Id": job.Id, + "OldStatus": currentStatus, + "LastActivityAt": model.GetMillis(), + "Status": job.Status, + "Data": job.DataToJson(), + "Progress": job.Progress, + }); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateOptimistically", + "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error()) + } else { + rows, err := sqlResult.RowsAffected() + + if err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus", + "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error()) + } else { + if rows == 1 { + result.Data = true + } else { + result.Data = false + } + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) UpdateStatus(id string, status string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + job := &model.Job{ + Id: id, + Status: status, + LastActivityAt: model.GetMillis(), + } + + if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool { + return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt" + }, job); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus", + "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error()) + } + + if result.Err == nil { + result.Data = job + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var startAtClause string + if newStatus == model.JOB_STATUS_IN_PROGRESS { + startAtClause = `StartAt = :StartAt,` + } + + if sqlResult, err := jss.GetMaster().Exec( + `UPDATE + Jobs + SET `+startAtClause+` + Status = :NewStatus, + LastActivityAt = :LastActivityAt + WHERE + Id = :Id + AND + Status = :OldStatus`, map[string]interface{}{"Id": id, "OldStatus": currentStatus, "NewStatus": newStatus, "StartAt": model.GetMillis(), "LastActivityAt": model.GetMillis()}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus", + "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error()) + } else { + rows, err := sqlResult.RowsAffected() + + if err != nil { + result.Err = model.NewLocAppError("SqlJobStore.UpdateStatus", + "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error()) + } else { + if rows == 1 { + result.Data = true + } else { + result.Data = false + } + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) Get(id string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var status *model.Job + + if err := jss.GetReplica().SelectOne(&status, + `SELECT + * + FROM + Jobs + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + if err == sql.ErrNoRows { + result.Err = model.NewAppError("SqlJobStore.Get", + "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound) + } else { + result.Err = model.NewAppError("SqlJobStore.Get", + "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError) + } + } else { + result.Data = status + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + WHERE + Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.GetAllByType", + "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + WHERE + Type = :Type + ORDER BY + StartAt ASC + LIMIT + :Limit + OFFSET + :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.GetAllByTypePage", + "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) GetAllByStatus(status string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.Job + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + Jobs + WHERE + Status = :Status + ORDER BY + CreateAt ASC`, map[string]interface{}{"Status": status}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.GetAllByStatus", + "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStore) Delete(id string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if _, err := jss.GetMaster().Exec( + `DELETE FROM + Jobs + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + result.Err = model.NewLocAppError("SqlJobStore.DeleteByType", + "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error()) + } else { + result.Data = id + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go new file mode 100644 index 000000000..edf09a4c0 --- /dev/null +++ b/store/sql_job_store_test.go @@ -0,0 +1,341 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "testing" + + "github.com/mattermost/platform/model" + "time" +) + +func TestJobSaveGet(t *testing.T) { + Setup() + + job := &model.Job{ + Id: model.NewId(), + Type: model.NewId(), + Status: model.NewId(), + Data: map[string]interface{}{ + "Processed": 0, + "Total": 12345, + "LastProcessed": "abcd", + }, + } + + if result := <-store.Job().Save(job); result.Err != nil { + t.Fatal(result.Err) + } + + defer func() { + <-store.Job().Delete(job.Id) + }() + + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.(*model.Job); received.Id != job.Id { + t.Fatal("received incorrect job after save") + } +} + +func TestJobGetAllByType(t *testing.T) { + Setup() + + jobType := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType, + }, + { + Id: model.NewId(), + Type: jobType, + }, + { + Id: model.NewId(), + Type: model.NewId(), + }, + } + + for _, job := range jobs { + Must(store.Job().Save(job)) + defer store.Job().Delete(job.Id) + } + + if result := <-store.Job().GetAllByType(jobType); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) != 2 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id { + t.Fatal("should've received first jobs") + } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id { + t.Fatal("should've received second jobs") + } +} + +func TestJobGetAllByTypePage(t *testing.T) { + Setup() + + jobType := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType, + StartAt: 1000, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 999, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 1001, + }, + } + + for _, job := range jobs { + Must(store.Job().Save(job)) + defer store.Job().Delete(job.Id) + } + + if result := <-store.Job().GetAllByTypePage(jobType, 0, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) != 2 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[1].Id { + t.Fatal("should've received newest job first") + } else if received[1].Id != jobs[0].Id { + t.Fatal("should've received second newest job second") + } + + if result := <-store.Job().GetAllByTypePage(jobType, 2, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) != 1 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[2].Id { + t.Fatal("should've received oldest job last") + } +} + +func TestJobGetAllByStatus(t *testing.T) { + jobType := model.NewId() + status := model.NewId() + + jobs := []*model.Job{ + { + Id: model.NewId(), + Type: jobType, + CreateAt: 1000, + Status: status, + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: 999, + Status: status, + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: 1001, + Status: status, + }, + { + Id: model.NewId(), + Type: jobType, + CreateAt: 1002, + Status: model.NewId(), + }, + } + + for _, job := range jobs { + Must(store.Job().Save(job)) + defer store.Job().Delete(job.Id) + } + + if result := <-store.Job().GetAllByStatus(status); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.Job); len(received) != 3 { + t.Fatal("received wrong number of jobs") + } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id { + t.Fatal("should've received first jobs") + } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id { + t.Fatal("should've received second jobs") + } +} + +func TestJobUpdateOptimistically(t *testing.T) { + job := &model.Job{ + Id: model.NewId(), + Type: model.JOB_TYPE_DATA_RETENTION, + CreateAt: model.GetMillis(), + Status: model.JOB_STATUS_PENDING, + } + + if result := <-store.Job().Save(job); result.Err != nil { + t.Fatal(result.Err) + } + defer store.Job().Delete(job.Id) + + job.LastActivityAt = model.GetMillis() + job.Status = model.JOB_STATUS_IN_PROGRESS + job.Progress = 50 + job.Data = map[string]interface{}{ + "Foo": "Bar", + } + + if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); result.Err != nil { + if result.Data.(bool) { + t.Fatal("should have failed due to incorrect old status") + } + } + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING); result.Err != nil { + t.Fatal(result.Err) + } else { + if !result.Data.(bool) { + t.Fatal("Should have successfully updated") + } + + var updatedJob *model.Job + + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else { + updatedJob = result.Data.(*model.Job) + } + + if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] { + t.Fatal("Some update property was not as expected") + } + } + +} + +func TestJobUpdateStatusUpdateStatusOptimistically(t *testing.T) { + job := &model.Job{ + Id: model.NewId(), + Type: model.JOB_TYPE_DATA_RETENTION, + CreateAt: model.GetMillis(), + Status: model.JOB_STATUS_SUCCESS, + } + + var lastUpdateAt int64 + if result := <-store.Job().Save(job); result.Err != nil { + t.Fatal(result.Err) + } else { + lastUpdateAt = result.Data.(*model.Job).LastActivityAt + } + + defer store.Job().Delete(job.Id) + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.Job) + if received.Status != model.JOB_STATUS_PENDING { + t.Fatal("status wasn't updated") + } + if received.LastActivityAt <= lastUpdateAt { + t.Fatal("lastActivityAt wasn't updated") + } + lastUpdateAt = received.LastActivityAt + } + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil { + t.Fatal(result.Err) + } else { + if result.Data.(bool) { + t.Fatal("should be false due to incorrect original status") + } + } + + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.Job) + if received.Status != model.JOB_STATUS_PENDING { + t.Fatal("should still be pending") + } + if received.LastActivityAt != lastUpdateAt { + t.Fatal("last activity at shouldn't have changed") + } + } + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { + t.Fatal(result.Err) + } else { + if !result.Data.(bool) { + t.Fatal("should have succeeded") + } + } + + var startAtSet int64 + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.Job) + if received.Status != model.JOB_STATUS_IN_PROGRESS { + t.Fatal("should be in progress") + } + if received.StartAt == 0 { + t.Fatal("received should have start at set") + } + if received.LastActivityAt <= lastUpdateAt { + t.Fatal("lastActivityAt wasn't updated") + } + lastUpdateAt = received.LastActivityAt + startAtSet = received.StartAt + } + + time.Sleep(2 * time.Millisecond) + + if result := <-store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil { + t.Fatal(result.Err) + } else { + if !result.Data.(bool) { + t.Fatal("should have succeeded") + } + } + + if result := <-store.Job().Get(job.Id); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.Job) + if received.Status != model.JOB_STATUS_SUCCESS { + t.Fatal("should be success status") + } + if received.StartAt != startAtSet { + t.Fatal("startAt should not have changed") + } + if received.LastActivityAt <= lastUpdateAt { + t.Fatal("lastActivityAt wasn't updated") + } + lastUpdateAt = received.LastActivityAt + } +} + +func TestJobDelete(t *testing.T) { + Setup() + + status := Must(store.Job().Save(&model.Job{ + Id: model.NewId(), + })).(*model.Job) + + if result := <-store.Job().Delete(status.Id); result.Err != nil { + t.Fatal(result.Err) + } +} diff --git a/store/sql_post_store.go b/store/sql_post_store.go index 6db2d5992..e89b5e042 100644 --- a/store/sql_post_store.go +++ b/store/sql_post_store.go @@ -910,8 +910,7 @@ func (s SqlPostStore) Search(teamId string, userId string, params *model.SearchP result := StoreResult{} if !*utils.Cfg.ServiceSettings.EnablePostSearch { - list := &model.PostList{} - list.MakeNonNil() + list := model.NewPostList() result.Data = list result.Err = model.NewLocAppError("SqlPostStore.Search", "store.sql_post.search.disabled", nil, fmt.Sprintf("teamId=%v userId=%v params=%v", teamId, userId, params.ToJson())) @@ -1315,3 +1314,44 @@ func (s SqlPostStore) GetPostsByIds(postIds []string) StoreChannel { return storeChannel } + +func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var posts []*model.PostForIndexing + _, err1 := s.GetSearchReplica().Select(&posts, + `(SELECT + Posts.*, + Channels.TeamId, + ParentPosts.CreateAt ParentCreateAt + FROM + Posts + LEFT JOIN + Channels + ON + Posts.ChannelId = Channels.Id + LEFT JOIN + Posts ParentPosts + ON + Posts.RootId = ParentPosts.Id + WHERE + Posts.CreateAt >= :StartTime + ORDER BY CreateAt ASC + LIMIT :NumPosts)`, + map[string]interface{}{"StartTime": startTime, "NumPosts": limit}) + + if err1 != nil { + result.Err = model.NewLocAppError("SqlPostStore.GetPostContext", "store.sql_post.get_posts_batch_for_indexing.get.app_error", nil, err1.Error()) + } else { + result.Data = posts + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_post_store_test.go b/store/sql_post_store_test.go index 00d4185b4..27e816996 100644 --- a/store/sql_post_store_test.go +++ b/store/sql_post_store_test.go @@ -1592,3 +1592,72 @@ func TestPostStoreGetPostsByIds(t *testing.T) { t.Fatalf("Expected 2 posts in results. Got %v", len(ro5)) } } + +func TestPostStoreGetPostsBatchForIndexing(t *testing.T) { + Setup() + + c1 := &model.Channel{} + c1.TeamId = model.NewId() + c1.DisplayName = "Channel1" + c1.Name = "zz" + model.NewId() + "b" + c1.Type = model.CHANNEL_OPEN + c1 = (<-store.Channel().Save(c1)).Data.(*model.Channel) + + c2 := &model.Channel{} + c2.TeamId = model.NewId() + c2.DisplayName = "Channel2" + c2.Name = "zz" + model.NewId() + "b" + c2.Type = model.CHANNEL_OPEN + c2 = (<-store.Channel().Save(c2)).Data.(*model.Channel) + + o1 := &model.Post{} + o1.ChannelId = c1.Id + o1.UserId = model.NewId() + o1.Message = "zz" + model.NewId() + "AAAAAAAAAAA" + o1 = (<-store.Post().Save(o1)).Data.(*model.Post) + + o2 := &model.Post{} + o2.ChannelId = c2.Id + o2.UserId = model.NewId() + o2.Message = "zz" + model.NewId() + "CCCCCCCCC" + o2 = (<-store.Post().Save(o2)).Data.(*model.Post) + + o3 := &model.Post{} + o3.ChannelId = c1.Id + o3.UserId = model.NewId() + o3.ParentId = o1.Id + o3.RootId = o1.Id + o3.Message = "zz" + model.NewId() + "QQQQQQQQQQ" + o3 = (<-store.Post().Save(o3)).Data.(*model.Post) + + if r := Must(store.Post().GetPostsBatchForIndexing(o1.CreateAt, 100)).([]*model.PostForIndexing); len(r) != 3 { + t.Fatalf("Expected 3 posts in results. Got %v", len(r)) + } else { + for _, p := range r { + if p.Id == o1.Id { + if p.TeamId != c1.TeamId { + t.Fatalf("Unexpected team ID") + } + if p.ParentCreateAt != nil { + t.Fatalf("Unexpected parent create at") + } + } else if p.Id == o2.Id { + if p.TeamId != c2.TeamId { + t.Fatalf("Unexpected team ID") + } + if p.ParentCreateAt != nil { + t.Fatalf("Unexpected parent create at") + } + } else if p.Id == o3.Id { + if p.TeamId != c1.TeamId { + t.Fatalf("Unexpected team ID") + } + if *p.ParentCreateAt != o1.CreateAt { + t.Fatalf("Unexpected parent create at") + } + } else { + t.Fatalf("unexpected post returned") + } + } + } +} diff --git a/store/sql_store.go b/store/sql_store.go index dc3b51d0c..a039401f3 100644 --- a/store/sql_store.go +++ b/store/sql_store.go @@ -79,4 +79,5 @@ type SqlStore interface { Status() StatusStore FileInfo() FileInfoStore Reaction() ReactionStore + Job() JobStore } diff --git a/store/sql_supplier.go b/store/sql_supplier.go index 6f51cbd09..0f4ab8380 100644 --- a/store/sql_supplier.go +++ b/store/sql_supplier.go @@ -82,7 +82,7 @@ type SqlSupplierOldStores struct { status StatusStore fileInfo FileInfoStore reaction ReactionStore - jobStatus JobStatusStore + job JobStore } type SqlSupplier struct { @@ -121,7 +121,7 @@ func NewSqlSupplier() *SqlSupplier { supplier.oldStores.status = NewSqlStatusStore(supplier) supplier.oldStores.fileInfo = NewSqlFileInfoStore(supplier) supplier.oldStores.reaction = NewSqlReactionStore(supplier) - supplier.oldStores.jobStatus = NewSqlJobStatusStore(supplier) + supplier.oldStores.job = NewSqlJobStore(supplier) err := supplier.GetMaster().CreateTablesIfNotExists() if err != nil { @@ -150,7 +150,7 @@ func NewSqlSupplier() *SqlSupplier { supplier.oldStores.status.(*SqlStatusStore).CreateIndexesIfNotExists() supplier.oldStores.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists() supplier.oldStores.reaction.(*SqlReactionStore).CreateIndexesIfNotExists() - supplier.oldStores.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists() + supplier.oldStores.job.(*SqlJobStore).CreateIndexesIfNotExists() supplier.oldStores.preference.(*SqlPreferenceStore).DeleteUnusedFeatures() @@ -752,8 +752,8 @@ func (ss *SqlSupplier) Reaction() ReactionStore { return ss.oldStores.reaction } -func (ss *SqlSupplier) JobStatus() JobStatusStore { - return ss.oldStores.jobStatus +func (ss *SqlSupplier) Job() JobStore { + return ss.oldStores.job } func (ss *SqlSupplier) DropAllTables() { diff --git a/store/sql_upgrade.go b/store/sql_upgrade.go index 463415851..a7b72124e 100644 --- a/store/sql_upgrade.go +++ b/store/sql_upgrade.go @@ -15,6 +15,7 @@ import ( ) const ( + VERSION_4_1_0 = "4.1.0" VERSION_4_0_0 = "4.0.0" VERSION_3_10_0 = "3.10.0" VERSION_3_9_0 = "3.9.0" @@ -49,6 +50,7 @@ func UpgradeDatabase(sqlStore SqlStore) { UpgradeDatabaseToVersion39(sqlStore) UpgradeDatabaseToVersion310(sqlStore) UpgradeDatabaseToVersion40(sqlStore) + UpgradeDatabaseToVersion41(sqlStore) // If the SchemaVersion is empty this this is the first time it has ran // so lets set it to the current version. @@ -276,3 +278,11 @@ func UpgradeDatabaseToVersion40(sqlStore SqlStore) { saveSchemaVersion(sqlStore, VERSION_4_0_0) } } + +func UpgradeDatabaseToVersion41(sqlStore SqlStore) { + // TODO: Uncomment following condition when version 4.1.0 is released + // if shouldPerformUpgrade(sqlStore, VERSION_4_0_0, VERSION_4_1_0) { + sqlStore.RemoveTableIfExists("JobStatuses") + // saveSchemaVersion(sqlStore, VERSION_4_1_0) + // } +} diff --git a/store/store.go b/store/store.go index 0007f495e..062ed0fbd 100644 --- a/store/store.go +++ b/store/store.go @@ -48,7 +48,7 @@ type Store interface { Status() StatusStore FileInfo() FileInfoStore Reaction() ReactionStore - JobStatus() JobStatusStore + Job() JobStore MarkSystemRanUnitTests() Close() DropAllTables() @@ -168,6 +168,7 @@ type PostStore interface { GetPostsCreatedAt(channelId string, time int64) StoreChannel Overwrite(post *model.Post) StoreChannel GetPostsByIds(postIds []string) StoreChannel + GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel } type UserStore interface { @@ -384,10 +385,14 @@ type ReactionStore interface { DeleteAllWithEmojiName(emojiName string) StoreChannel } -type JobStatusStore interface { - SaveOrUpdate(status *model.JobStatus) StoreChannel +type JobStore interface { + Save(job *model.Job) StoreChannel + UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel + UpdateStatus(id string, status string) StoreChannel + UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel Get(id string) StoreChannel GetAllByType(jobType string) StoreChannel GetAllByTypePage(jobType string, offset int, limit int) StoreChannel + GetAllByStatus(status string) StoreChannel Delete(id string) StoreChannel } |