From 5474cff0ebab8c42f1b0750ac054a8c762b43a37 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Wed, 25 Oct 2017 13:23:01 +0100 Subject: PLT-7934: Make query for bulk elasticsearch indexing more efficient. (#7664) --- config/default.json | 3 ++- i18n/en.json | 8 +++++++ model/config.go | 53 ++++++++++++++++++++++++++----------------- store/sqlstore/post_store.go | 48 +++++++++++++++++++++++++++------------ store/store.go | 3 ++- store/storetest/post_store.go | 31 ++++++++++++++++++++++++- 6 files changed, 108 insertions(+), 38 deletions(-) diff --git a/config/default.json b/config/default.json index f4eec7ec5..350b4789f 100644 --- a/config/default.json +++ b/config/default.json @@ -317,7 +317,8 @@ "AggregatePostsAfterDays": 365, "PostsAggregatorJobStartTime": "03:00", "IndexPrefix": "", - "LiveIndexingBatchSize": 1 + "LiveIndexingBatchSize": 1, + "BulkIndexingTimeWindowSeconds": 3600 }, "DataRetentionSettings": { "EnableMessageDeletion": false, diff --git a/i18n/en.json b/i18n/en.json index 4cf406411..712aa0b23 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -3855,6 +3855,14 @@ "id": "ent.elasticsearch.test_config.reenter_password", "translation": "The Elasticsearch Server URL or Username has changed. Please re-enter the Elasticsearch password to test connection." }, + { + "id": "ent.elasticsearch.indexer.do_job.get_oldest_post.error", + "translation": "The oldest post could not be retrieved from the database." + }, + { + "id": "model.config.is_valid.elastic_search.bulk_indexing_time_window_seconds.app_error", + "translation": "Elasticsearch Bulk Indexing Time Window must be at least 1 second." + }, { "id": "ent.emoji.licence_disable.app_error", "translation": "Custom emoji restrictions disabled by current license. Please contact your system administrator about upgrading your enterprise license." diff --git a/model/config.go b/model/config.go index a93defa7f..4eee71737 100644 --- a/model/config.go +++ b/model/config.go @@ -132,15 +132,16 @@ const ( TEAM_SETTINGS_DEFAULT_TEAM_TEXT = "default" - ELASTICSEARCH_SETTINGS_DEFAULT_CONNECTION_URL = "" - ELASTICSEARCH_SETTINGS_DEFAULT_USERNAME = "" - ELASTICSEARCH_SETTINGS_DEFAULT_PASSWORD = "" - ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_REPLICAS = 1 - ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS = 1 - ELASTICSEARCH_SETTINGS_DEFAULT_AGGREGATE_POSTS_AFTER_DAYS = 365 - ELASTICSEARCH_SETTINGS_DEFAULT_POSTS_AGGREGATOR_JOB_START_TIME = "03:00" - ELASTICSEARCH_SETTINGS_DEFAULT_INDEX_PREFIX = "" - ELASTICSEARCH_SETTINGS_DEFAULT_LIVE_INDEXING_BATCH_SIZE = 1 + ELASTICSEARCH_SETTINGS_DEFAULT_CONNECTION_URL = "" + ELASTICSEARCH_SETTINGS_DEFAULT_USERNAME = "" + ELASTICSEARCH_SETTINGS_DEFAULT_PASSWORD = "" + ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_REPLICAS = 1 + ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS = 1 + ELASTICSEARCH_SETTINGS_DEFAULT_AGGREGATE_POSTS_AFTER_DAYS = 365 + ELASTICSEARCH_SETTINGS_DEFAULT_POSTS_AGGREGATOR_JOB_START_TIME = "03:00" + ELASTICSEARCH_SETTINGS_DEFAULT_INDEX_PREFIX = "" + ELASTICSEARCH_SETTINGS_DEFAULT_LIVE_INDEXING_BATCH_SIZE = 1 + ELASTICSEARCH_SETTINGS_DEFAULT_BULK_INDEXING_TIME_WINDOW_SECONDS = 3600 DATA_RETENTION_SETTINGS_DEFAULT_MESSAGE_RETENTION_DAYS = 365 DATA_RETENTION_SETTINGS_DEFAULT_FILE_RETENTION_DAYS = 365 @@ -477,18 +478,19 @@ type WebrtcSettings struct { } type ElasticsearchSettings struct { - ConnectionUrl *string - Username *string - Password *string - EnableIndexing *bool - EnableSearching *bool - Sniff *bool - PostIndexReplicas *int - PostIndexShards *int - AggregatePostsAfterDays *int - PostsAggregatorJobStartTime *string - IndexPrefix *string - LiveIndexingBatchSize *int + ConnectionUrl *string + Username *string + Password *string + EnableIndexing *bool + EnableSearching *bool + Sniff *bool + PostIndexReplicas *int + PostIndexShards *int + AggregatePostsAfterDays *int + PostsAggregatorJobStartTime *string + IndexPrefix *string + LiveIndexingBatchSize *int + BulkIndexingTimeWindowSeconds *int } type DataRetentionSettings struct { @@ -1423,6 +1425,11 @@ func (o *Config) SetDefaults() { o.ElasticsearchSettings.LiveIndexingBatchSize = NewInt(ELASTICSEARCH_SETTINGS_DEFAULT_LIVE_INDEXING_BATCH_SIZE) } + if o.ElasticsearchSettings.BulkIndexingTimeWindowSeconds == nil { + o.ElasticsearchSettings.BulkIndexingTimeWindowSeconds = new(int) + *o.ElasticsearchSettings.BulkIndexingTimeWindowSeconds = ELASTICSEARCH_SETTINGS_DEFAULT_BULK_INDEXING_TIME_WINDOW_SECONDS + } + if o.DataRetentionSettings.EnableMessageDeletion == nil { o.DataRetentionSettings.EnableMessageDeletion = NewBool(false) } @@ -1808,6 +1815,10 @@ func (ess *ElasticsearchSettings) isValid() *AppError { return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.live_indexing_batch_size.app_error", nil, "", http.StatusBadRequest) } + if *ess.BulkIndexingTimeWindowSeconds < 1 { + return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.bulk_indexing_time_window_seconds.app_error", nil, "", http.StatusBadRequest) + } + return nil } diff --git a/store/sqlstore/post_store.go b/store/sqlstore/post_store.go index a1b25b5c5..d8f93d2bc 100644 --- a/store/sqlstore/post_store.go +++ b/store/sqlstore/post_store.go @@ -1067,29 +1067,37 @@ func (s SqlPostStore) GetPostsByIds(postIds []string) store.StoreChannel { }) } -func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, limit int) store.StoreChannel { +func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, endTime int64, limit int) store.StoreChannel { return store.Do(func(result *store.StoreResult) { var posts []*model.PostForIndexing _, err1 := s.GetSearchReplica().Select(&posts, - `(SELECT - Posts.*, - Channels.TeamId, - ParentPosts.CreateAt ParentCreateAt - FROM - Posts + `SELECT + PostsQuery.*, Channels.TeamId, ParentPosts.CreateAt ParentCreateAt + FROM ( + SELECT + * + FROM + Posts + WHERE + Posts.CreateAt >= :StartTime + AND + Posts.CreateAt < :EndTime + ORDER BY + CreateAt ASC + LIMIT + 1000 + ) + AS + PostsQuery LEFT JOIN Channels ON - Posts.ChannelId = Channels.Id + PostsQuery.ChannelId = Channels.Id LEFT JOIN Posts ParentPosts ON - Posts.RootId = ParentPosts.Id - WHERE - Posts.CreateAt >= :StartTime - ORDER BY CreateAt ASC - LIMIT :NumPosts)`, - map[string]interface{}{"StartTime": startTime, "NumPosts": limit}) + PostsQuery.RootId = ParentPosts.Id`, + map[string]interface{}{"StartTime": startTime, "EndTime": endTime, "NumPosts": limit}) if err1 != nil { result.Err = model.NewAppError("SqlPostStore.GetPostContext", "store.sql_post.get_posts_batch_for_indexing.get.app_error", nil, err1.Error(), http.StatusInternalServerError) @@ -1122,3 +1130,15 @@ func (s SqlPostStore) PermanentDeleteBatch(endTime int64, limit int64) store.Sto } }) } + +func (s SqlPostStore) GetOldest() store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + var post model.Post + err := s.GetReplica().SelectOne(&post, "SELECT * FROM Posts ORDER BY CreateAt LIMIT 1") + if err != nil { + result.Err = model.NewAppError("SqlPostStore.GetOldest", "store.sql_post.get.app_error", nil, err.Error(), http.StatusNotFound) + } + + result.Data = &post + }) +} diff --git a/store/store.go b/store/store.go index 5674a05d5..eada8f395 100644 --- a/store/store.go +++ b/store/store.go @@ -182,8 +182,9 @@ type PostStore interface { GetPostsCreatedAt(channelId string, time int64) StoreChannel Overwrite(post *model.Post) StoreChannel GetPostsByIds(postIds []string) StoreChannel - GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel + GetPostsBatchForIndexing(startTime int64, endTime int64, limit int) StoreChannel PermanentDeleteBatch(endTime int64, limit int64) StoreChannel + GetOldest() StoreChannel } type UserStore interface { diff --git a/store/storetest/post_store.go b/store/storetest/post_store.go index 3460c4d05..b288cde34 100644 --- a/store/storetest/post_store.go +++ b/store/storetest/post_store.go @@ -12,6 +12,7 @@ import ( "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" "github.com/mattermost/mattermost-server/utils" + "github.com/stretchr/testify/assert" ) func TestPostStore(t *testing.T, ss store.Store) { @@ -1614,7 +1615,7 @@ func testPostStoreGetPostsBatchForIndexing(t *testing.T, ss store.Store) { o3.Message = "zz" + model.NewId() + "QQQQQQQQQQ" o3 = (<-ss.Post().Save(o3)).Data.(*model.Post) - if r := store.Must(ss.Post().GetPostsBatchForIndexing(o1.CreateAt, 100)).([]*model.PostForIndexing); len(r) != 3 { + if r := store.Must(ss.Post().GetPostsBatchForIndexing(o1.CreateAt, model.GetMillis()+100000, 100)).([]*model.PostForIndexing); len(r) != 3 { t.Fatalf("Expected 3 posts in results. Got %v", len(r)) } else { for _, p := range r { @@ -1682,3 +1683,31 @@ func testPostStorePermanentDeleteBatch(t *testing.T, ss store.Store) { t.Fatalf("Should have found post 3 after purge") } } + +func testPostStoreGetOldest(t *testing.T, ss store.Store) { + o0 := &model.Post{} + o0.ChannelId = model.NewId() + o0.UserId = model.NewId() + o0.Message = "zz" + model.NewId() + "b" + o0.CreateAt = 3 + o0 = (<-ss.Post().Save(o0)).Data.(*model.Post) + + o1 := &model.Post{} + o1.ChannelId = o0.Id + o1.UserId = model.NewId() + o1.Message = "zz" + model.NewId() + "b" + o1.CreateAt = 2 + o1 = (<-ss.Post().Save(o1)).Data.(*model.Post) + + o2 := &model.Post{} + o2.Id = model.NewId() + o2.ChannelId = o1.ChannelId + o2.UserId = model.NewId() + o2.Message = "zz" + model.NewId() + "b" + o2.CreateAt = 1 + o2 = (<-ss.Post().Save(o2)).Data.(*model.Post) + + r1 := (<-ss.Post().GetOldest()).Data.(*model.Post) + + assert.EqualValues(t, o2.Id, r1.Id) +} -- cgit v1.2.3-1-g7c22