summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--einterfaces/jobs/elasticsearch.go22
-rw-r--r--i18n/en.json16
-rw-r--r--jobs/jobs.go52
-rw-r--r--jobs/jobs_watcher.go7
-rw-r--r--jobs/testworker.go2
-rw-r--r--jobs/workers.go31
-rw-r--r--model/post.go6
-rw-r--r--store/sql_post_store.go41
-rw-r--r--store/sql_post_store_test.go69
-rw-r--r--store/store.go1
10 files changed, 216 insertions, 31 deletions
diff --git a/einterfaces/jobs/elasticsearch.go b/einterfaces/jobs/elasticsearch.go
new file mode 100644
index 000000000..6d6dbe893
--- /dev/null
+++ b/einterfaces/jobs/elasticsearch.go
@@ -0,0 +1,22 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "github.com/mattermost/platform/model"
+)
+
+type ElasticsearchIndexerInterface interface {
+ MakeWorker() model.Worker
+}
+
+var theElasticsearchIndexerInterface ElasticsearchIndexerInterface
+
+func RegisterElasticsearchIndexerInterface(newInterface ElasticsearchIndexerInterface) {
+ theElasticsearchIndexerInterface = newInterface
+}
+
+func GetElasticsearchIndexerInterface() ElasticsearchIndexerInterface {
+ return theElasticsearchIndexerInterface
+}
diff --git a/i18n/en.json b/i18n/en.json
index d6eaffc7f..1225ee824 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -3516,19 +3516,19 @@
"translation": "Failed to decode search results"
},
{
- "id": "ent.elasticsearch.start.connect_failed",
+ "id": "ent.elasticsearch.create_client.connect_failed",
"translation": "Setting up ElasticSearch Client Failed"
},
{
- "id": "ent.elasticsearch.start.index_create_failed",
+ "id": "ent.elasticsearch.create_index_if_not_exists.index_create_failed",
"translation": "Failed to create ElasticSearch index"
},
{
- "id": "ent.elasticsearch.start.index_exists_failed",
+ "id": "ent.elasticsearch.create_index_if_not_exists.index_exists_failed",
"translation": "Failed to establish whether ElasticSearch index exists"
},
{
- "id": "ent.elasticsearch.start.index_mapping_failed",
+ "id": "ent.elasticsearch.create_index_if_not_exists.index_mapping_failed",
"translation": "Failed to setup ElasticSearch index mapping"
},
{
@@ -3764,6 +3764,10 @@
"translation": "Page not found"
},
{
+ "id": "jobs.set_job_error.update.error",
+ "translation": "Failed to set job status to error"
+ },
+ {
"id": "jobs.request_cancellation.status.error",
"translation": "Could not request cancellation for job that is not in a cancelable state."
},
@@ -5484,6 +5488,10 @@
"translation": "We couldn't get the parent posts for the channel"
},
{
+ "id": "store.sql_post.get_posts_batch_for_indexing.get.app_error",
+ "translation": "We couldn't get the posts batch for indexing"
+ },
+ {
"id": "store.sql_post.get_posts_by_ids.app_error",
"translation": "We couldn't get the posts"
},
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 58c2f2f13..9247355d0 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -9,6 +9,7 @@ import (
l4g "github.com/alecthomas/log4go"
"github.com/mattermost/platform/model"
+ "net/http"
)
const (
@@ -40,27 +41,15 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) {
}
}
-func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) {
- var job *model.Job
-
- if result := <-Srv.Store.Job().Get(jobId); result.Err != nil {
- return false, result.Err
- } else {
- job = result.Data.(*model.Job)
- }
-
+func SetJobProgress(job *model.Job, progress int64) (*model.AppError) {
job.Status = model.JOB_STATUS_IN_PROGRESS
job.Progress = progress
if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
- return false, result.Err
+ return result.Err
} else {
- if !result.Data.(bool) {
- return false, nil
- }
+ return nil
}
-
- return true, nil
}
func SetJobSuccess(job *model.Job) *model.AppError {
@@ -68,9 +57,34 @@ func SetJobSuccess(job *model.Job) *model.AppError {
return result.Err
}
-func SetJobError(job *model.Job) *model.AppError {
- result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
- return result.Err
+func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
+ if jobError == nil {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
+ return result.Err
+ }
+
+ job.Status = model.JOB_STATUS_ERROR
+ job.Progress = -1
+ if job.Data == nil {
+ job.Data = make(map[string]interface{})
+ }
+ job.Data["error"] = jobError
+
+ if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return result.Err
+ } else {
+ if !result.Data.(bool) {
+ if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ return result.Err
+ } else {
+ if !result.Data.(bool) {
+ return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id=" + job.Id, http.StatusInternalServerError)
+ }
+ }
+ }
+ }
+
+ return nil
}
func SetJobCanceled(job *model.Job) *model.AppError {
@@ -91,7 +105,7 @@ func RequestCancellation(job *model.Job) *model.AppError {
return nil
}
- return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id)
+ return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id, http.StatusInternalServerError)
}
func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go
index ada957ccc..5979d6207 100644
--- a/jobs/jobs_watcher.go
+++ b/jobs/jobs_watcher.go
@@ -79,6 +79,13 @@ func (watcher *Watcher) PollAndNotify() {
default:
}
}
+ } else if js.Type == model.JOB_TYPE_SEARCH_INDEXING {
+ if watcher.workers.ElasticsearchIndexing != nil {
+ select {
+ case watcher.workers.ElasticsearchIndexing.JobChannel() <- j:
+ default:
+ }
+ }
}
}
}
diff --git a/jobs/testworker.go b/jobs/testworker.go
index f1c8a07a3..385a2073b 100644
--- a/jobs/testworker.go
+++ b/jobs/testworker.go
@@ -85,7 +85,7 @@ func (worker *TestWorker) DoJob(job *model.Job) {
}
return
} else {
- if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil {
+ if err := SetJobProgress(job, int64(counter*10)); err != nil {
l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error())
}
}
diff --git a/jobs/workers.go b/jobs/workers.go
index a42ec4607..bb80ad79a 100644
--- a/jobs/workers.go
+++ b/jobs/workers.go
@@ -13,13 +13,13 @@ import (
)
type Workers struct {
- startOnce sync.Once
- watcher *Watcher
+ startOnce sync.Once
+ watcher *Watcher
- DataRetention model.Worker
- // SearchIndexing model.Job
+ DataRetention model.Worker
+ ElasticsearchIndexing model.Worker
- listenerId string
+ listenerId string
}
func InitWorkers() *Workers {
@@ -32,6 +32,10 @@ func InitWorkers() *Workers {
workers.DataRetention = dataRetentionInterface.MakeWorker()
}
+ if elasticsearchIndexerInterface := ejobs.GetElasticsearchIndexerInterface(); elasticsearchIndexerInterface != nil {
+ workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker()
+ }
+
return workers
}
@@ -43,7 +47,9 @@ func (workers *Workers) Start() *Workers {
go workers.DataRetention.Run()
}
- // go workers.SearchIndexing.Run()
+ if workers.ElasticsearchIndexing != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing {
+ go workers.ElasticsearchIndexing.Run()
+ }
go workers.watcher.Start()
})
@@ -61,6 +67,14 @@ func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *m
workers.DataRetention.Stop()
}
}
+
+ if workers.ElasticsearchIndexing != nil {
+ if !*oldConfig.ElasticSearchSettings.EnableIndexing && *newConfig.ElasticSearchSettings.EnableIndexing {
+ go workers.ElasticsearchIndexing.Run()
+ } else if *oldConfig.ElasticSearchSettings.EnableIndexing && !*newConfig.ElasticSearchSettings.EnableIndexing {
+ workers.ElasticsearchIndexing.Stop()
+ }
+ }
}
func (workers *Workers) Stop() *Workers {
@@ -71,7 +85,10 @@ func (workers *Workers) Stop() *Workers {
if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
workers.DataRetention.Stop()
}
- // workers.SearchIndexing.Stop()
+
+ if workers.ElasticsearchIndexing != nil && *utils.Cfg.ElasticSearchSettings.EnableIndexing {
+ workers.ElasticsearchIndexing.Stop()
+ }
l4g.Info("Stopped workers")
diff --git a/model/post.go b/model/post.go
index f5a398656..55e6f591d 100644
--- a/model/post.go
+++ b/model/post.go
@@ -62,6 +62,12 @@ type PostPatch struct {
HasReactions *bool `json:"has_reactions"`
}
+type PostForIndexing struct {
+ Post
+ TeamId string `json:"team_id"`
+ ParentCreateAt *int64 `json:"parent_create_at"`
+}
+
func (o *Post) ToJson() string {
b, err := json.Marshal(o)
if err != nil {
diff --git a/store/sql_post_store.go b/store/sql_post_store.go
index 6db2d5992..16142681c 100644
--- a/store/sql_post_store.go
+++ b/store/sql_post_store.go
@@ -1315,3 +1315,44 @@ func (s SqlPostStore) GetPostsByIds(postIds []string) StoreChannel {
return storeChannel
}
+
+func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var posts []*model.PostForIndexing
+ _, err1 := s.GetSearchReplica().Select(&posts,
+ `(SELECT
+ Posts.*,
+ Channels.TeamId,
+ ParentPosts.CreateAt ParentCreateAt
+ FROM
+ Posts
+ LEFT JOIN
+ Channels
+ ON
+ Posts.ChannelId = Channels.Id
+ LEFT JOIN
+ Posts ParentPosts
+ ON
+ Posts.RootId = ParentPosts.Id
+ WHERE
+ Posts.CreateAt >= :StartTime
+ ORDER BY CreateAt ASC
+ LIMIT :NumPosts)`,
+ map[string]interface{}{"StartTime": startTime, "NumPosts": limit})
+
+ if err1 != nil {
+ result.Err = model.NewLocAppError("SqlPostStore.GetPostContext", "store.sql_post.get_posts_batch_for_indexing.get.app_error", nil, err1.Error())
+ } else {
+ result.Data = posts
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
diff --git a/store/sql_post_store_test.go b/store/sql_post_store_test.go
index 00d4185b4..27e816996 100644
--- a/store/sql_post_store_test.go
+++ b/store/sql_post_store_test.go
@@ -1592,3 +1592,72 @@ func TestPostStoreGetPostsByIds(t *testing.T) {
t.Fatalf("Expected 2 posts in results. Got %v", len(ro5))
}
}
+
+func TestPostStoreGetPostsBatchForIndexing(t *testing.T) {
+ Setup()
+
+ c1 := &model.Channel{}
+ c1.TeamId = model.NewId()
+ c1.DisplayName = "Channel1"
+ c1.Name = "zz" + model.NewId() + "b"
+ c1.Type = model.CHANNEL_OPEN
+ c1 = (<-store.Channel().Save(c1)).Data.(*model.Channel)
+
+ c2 := &model.Channel{}
+ c2.TeamId = model.NewId()
+ c2.DisplayName = "Channel2"
+ c2.Name = "zz" + model.NewId() + "b"
+ c2.Type = model.CHANNEL_OPEN
+ c2 = (<-store.Channel().Save(c2)).Data.(*model.Channel)
+
+ o1 := &model.Post{}
+ o1.ChannelId = c1.Id
+ o1.UserId = model.NewId()
+ o1.Message = "zz" + model.NewId() + "AAAAAAAAAAA"
+ o1 = (<-store.Post().Save(o1)).Data.(*model.Post)
+
+ o2 := &model.Post{}
+ o2.ChannelId = c2.Id
+ o2.UserId = model.NewId()
+ o2.Message = "zz" + model.NewId() + "CCCCCCCCC"
+ o2 = (<-store.Post().Save(o2)).Data.(*model.Post)
+
+ o3 := &model.Post{}
+ o3.ChannelId = c1.Id
+ o3.UserId = model.NewId()
+ o3.ParentId = o1.Id
+ o3.RootId = o1.Id
+ o3.Message = "zz" + model.NewId() + "QQQQQQQQQQ"
+ o3 = (<-store.Post().Save(o3)).Data.(*model.Post)
+
+ if r := Must(store.Post().GetPostsBatchForIndexing(o1.CreateAt, 100)).([]*model.PostForIndexing); len(r) != 3 {
+ t.Fatalf("Expected 3 posts in results. Got %v", len(r))
+ } else {
+ for _, p := range r {
+ if p.Id == o1.Id {
+ if p.TeamId != c1.TeamId {
+ t.Fatalf("Unexpected team ID")
+ }
+ if p.ParentCreateAt != nil {
+ t.Fatalf("Unexpected parent create at")
+ }
+ } else if p.Id == o2.Id {
+ if p.TeamId != c2.TeamId {
+ t.Fatalf("Unexpected team ID")
+ }
+ if p.ParentCreateAt != nil {
+ t.Fatalf("Unexpected parent create at")
+ }
+ } else if p.Id == o3.Id {
+ if p.TeamId != c1.TeamId {
+ t.Fatalf("Unexpected team ID")
+ }
+ if *p.ParentCreateAt != o1.CreateAt {
+ t.Fatalf("Unexpected parent create at")
+ }
+ } else {
+ t.Fatalf("unexpected post returned")
+ }
+ }
+ }
+}
diff --git a/store/store.go b/store/store.go
index 95496b609..062ed0fbd 100644
--- a/store/store.go
+++ b/store/store.go
@@ -168,6 +168,7 @@ type PostStore interface {
GetPostsCreatedAt(channelId string, time int64) StoreChannel
Overwrite(post *model.Post) StoreChannel
GetPostsByIds(postIds []string) StoreChannel
+ GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel
}
type UserStore interface {