summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config/default.json3
-rw-r--r--i18n/en.json8
-rw-r--r--model/config.go53
-rw-r--r--store/sqlstore/post_store.go48
-rw-r--r--store/store.go3
-rw-r--r--store/storetest/post_store.go31
6 files changed, 108 insertions, 38 deletions
diff --git a/config/default.json b/config/default.json
index f4eec7ec5..350b4789f 100644
--- a/config/default.json
+++ b/config/default.json
@@ -317,7 +317,8 @@
"AggregatePostsAfterDays": 365,
"PostsAggregatorJobStartTime": "03:00",
"IndexPrefix": "",
- "LiveIndexingBatchSize": 1
+ "LiveIndexingBatchSize": 1,
+ "BulkIndexingTimeWindowSeconds": 3600
},
"DataRetentionSettings": {
"EnableMessageDeletion": false,
diff --git a/i18n/en.json b/i18n/en.json
index 4cf406411..712aa0b23 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -3856,6 +3856,14 @@
"translation": "The Elasticsearch Server URL or Username has changed. Please re-enter the Elasticsearch password to test connection."
},
{
+ "id": "ent.elasticsearch.indexer.do_job.get_oldest_post.error",
+ "translation": "The oldest post could not be retrieved from the database."
+ },
+ {
+ "id": "model.config.is_valid.elastic_search.bulk_indexing_time_window_seconds.app_error",
+ "translation": "Elasticsearch Bulk Indexing Time Window must be at least 1 second."
+ },
+ {
"id": "ent.emoji.licence_disable.app_error",
"translation": "Custom emoji restrictions disabled by current license. Please contact your system administrator about upgrading your enterprise license."
},
diff --git a/model/config.go b/model/config.go
index a93defa7f..4eee71737 100644
--- a/model/config.go
+++ b/model/config.go
@@ -132,15 +132,16 @@ const (
TEAM_SETTINGS_DEFAULT_TEAM_TEXT = "default"
- ELASTICSEARCH_SETTINGS_DEFAULT_CONNECTION_URL = ""
- ELASTICSEARCH_SETTINGS_DEFAULT_USERNAME = ""
- ELASTICSEARCH_SETTINGS_DEFAULT_PASSWORD = ""
- ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_REPLICAS = 1
- ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS = 1
- ELASTICSEARCH_SETTINGS_DEFAULT_AGGREGATE_POSTS_AFTER_DAYS = 365
- ELASTICSEARCH_SETTINGS_DEFAULT_POSTS_AGGREGATOR_JOB_START_TIME = "03:00"
- ELASTICSEARCH_SETTINGS_DEFAULT_INDEX_PREFIX = ""
- ELASTICSEARCH_SETTINGS_DEFAULT_LIVE_INDEXING_BATCH_SIZE = 1
+ ELASTICSEARCH_SETTINGS_DEFAULT_CONNECTION_URL = ""
+ ELASTICSEARCH_SETTINGS_DEFAULT_USERNAME = ""
+ ELASTICSEARCH_SETTINGS_DEFAULT_PASSWORD = ""
+ ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_REPLICAS = 1
+ ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS = 1
+ ELASTICSEARCH_SETTINGS_DEFAULT_AGGREGATE_POSTS_AFTER_DAYS = 365
+ ELASTICSEARCH_SETTINGS_DEFAULT_POSTS_AGGREGATOR_JOB_START_TIME = "03:00"
+ ELASTICSEARCH_SETTINGS_DEFAULT_INDEX_PREFIX = ""
+ ELASTICSEARCH_SETTINGS_DEFAULT_LIVE_INDEXING_BATCH_SIZE = 1
+ ELASTICSEARCH_SETTINGS_DEFAULT_BULK_INDEXING_TIME_WINDOW_SECONDS = 3600
DATA_RETENTION_SETTINGS_DEFAULT_MESSAGE_RETENTION_DAYS = 365
DATA_RETENTION_SETTINGS_DEFAULT_FILE_RETENTION_DAYS = 365
@@ -477,18 +478,19 @@ type WebrtcSettings struct {
}
type ElasticsearchSettings struct {
- ConnectionUrl *string
- Username *string
- Password *string
- EnableIndexing *bool
- EnableSearching *bool
- Sniff *bool
- PostIndexReplicas *int
- PostIndexShards *int
- AggregatePostsAfterDays *int
- PostsAggregatorJobStartTime *string
- IndexPrefix *string
- LiveIndexingBatchSize *int
+ ConnectionUrl *string
+ Username *string
+ Password *string
+ EnableIndexing *bool
+ EnableSearching *bool
+ Sniff *bool
+ PostIndexReplicas *int
+ PostIndexShards *int
+ AggregatePostsAfterDays *int
+ PostsAggregatorJobStartTime *string
+ IndexPrefix *string
+ LiveIndexingBatchSize *int
+ BulkIndexingTimeWindowSeconds *int
}
type DataRetentionSettings struct {
@@ -1423,6 +1425,11 @@ func (o *Config) SetDefaults() {
o.ElasticsearchSettings.LiveIndexingBatchSize = NewInt(ELASTICSEARCH_SETTINGS_DEFAULT_LIVE_INDEXING_BATCH_SIZE)
}
+ if o.ElasticsearchSettings.BulkIndexingTimeWindowSeconds == nil {
+ o.ElasticsearchSettings.BulkIndexingTimeWindowSeconds = new(int)
+ *o.ElasticsearchSettings.BulkIndexingTimeWindowSeconds = ELASTICSEARCH_SETTINGS_DEFAULT_BULK_INDEXING_TIME_WINDOW_SECONDS
+ }
+
if o.DataRetentionSettings.EnableMessageDeletion == nil {
o.DataRetentionSettings.EnableMessageDeletion = NewBool(false)
}
@@ -1808,6 +1815,10 @@ func (ess *ElasticsearchSettings) isValid() *AppError {
return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.live_indexing_batch_size.app_error", nil, "", http.StatusBadRequest)
}
+ if *ess.BulkIndexingTimeWindowSeconds < 1 {
+ return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.bulk_indexing_time_window_seconds.app_error", nil, "", http.StatusBadRequest)
+ }
+
return nil
}
diff --git a/store/sqlstore/post_store.go b/store/sqlstore/post_store.go
index a1b25b5c5..d8f93d2bc 100644
--- a/store/sqlstore/post_store.go
+++ b/store/sqlstore/post_store.go
@@ -1067,29 +1067,37 @@ func (s SqlPostStore) GetPostsByIds(postIds []string) store.StoreChannel {
})
}
-func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, limit int) store.StoreChannel {
+func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, endTime int64, limit int) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var posts []*model.PostForIndexing
_, err1 := s.GetSearchReplica().Select(&posts,
- `(SELECT
- Posts.*,
- Channels.TeamId,
- ParentPosts.CreateAt ParentCreateAt
- FROM
- Posts
+ `SELECT
+ PostsQuery.*, Channels.TeamId, ParentPosts.CreateAt ParentCreateAt
+ FROM (
+ SELECT
+ *
+ FROM
+ Posts
+ WHERE
+ Posts.CreateAt >= :StartTime
+ AND
+ Posts.CreateAt < :EndTime
+ ORDER BY
+ CreateAt ASC
+ LIMIT
+ 1000
+ )
+ AS
+ PostsQuery
LEFT JOIN
Channels
ON
- Posts.ChannelId = Channels.Id
+ PostsQuery.ChannelId = Channels.Id
LEFT JOIN
Posts ParentPosts
ON
- Posts.RootId = ParentPosts.Id
- WHERE
- Posts.CreateAt >= :StartTime
- ORDER BY CreateAt ASC
- LIMIT :NumPosts)`,
- map[string]interface{}{"StartTime": startTime, "NumPosts": limit})
+ PostsQuery.RootId = ParentPosts.Id`,
+ map[string]interface{}{"StartTime": startTime, "EndTime": endTime, "NumPosts": limit})
if err1 != nil {
result.Err = model.NewAppError("SqlPostStore.GetPostContext", "store.sql_post.get_posts_batch_for_indexing.get.app_error", nil, err1.Error(), http.StatusInternalServerError)
@@ -1122,3 +1130,15 @@ func (s SqlPostStore) PermanentDeleteBatch(endTime int64, limit int64) store.Sto
}
})
}
+
+func (s SqlPostStore) GetOldest() store.StoreChannel {
+ return store.Do(func(result *store.StoreResult) {
+ var post model.Post
+ err := s.GetReplica().SelectOne(&post, "SELECT * FROM Posts ORDER BY CreateAt LIMIT 1")
+ if err != nil {
+ result.Err = model.NewAppError("SqlPostStore.GetOldest", "store.sql_post.get.app_error", nil, err.Error(), http.StatusNotFound)
+ }
+
+ result.Data = &post
+ })
+}
diff --git a/store/store.go b/store/store.go
index 5674a05d5..eada8f395 100644
--- a/store/store.go
+++ b/store/store.go
@@ -182,8 +182,9 @@ type PostStore interface {
GetPostsCreatedAt(channelId string, time int64) StoreChannel
Overwrite(post *model.Post) StoreChannel
GetPostsByIds(postIds []string) StoreChannel
- GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel
+ GetPostsBatchForIndexing(startTime int64, endTime int64, limit int) StoreChannel
PermanentDeleteBatch(endTime int64, limit int64) StoreChannel
+ GetOldest() StoreChannel
}
type UserStore interface {
diff --git a/store/storetest/post_store.go b/store/storetest/post_store.go
index 3460c4d05..b288cde34 100644
--- a/store/storetest/post_store.go
+++ b/store/storetest/post_store.go
@@ -12,6 +12,7 @@ import (
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
"github.com/mattermost/mattermost-server/utils"
+ "github.com/stretchr/testify/assert"
)
func TestPostStore(t *testing.T, ss store.Store) {
@@ -1614,7 +1615,7 @@ func testPostStoreGetPostsBatchForIndexing(t *testing.T, ss store.Store) {
o3.Message = "zz" + model.NewId() + "QQQQQQQQQQ"
o3 = (<-ss.Post().Save(o3)).Data.(*model.Post)
- if r := store.Must(ss.Post().GetPostsBatchForIndexing(o1.CreateAt, 100)).([]*model.PostForIndexing); len(r) != 3 {
+ if r := store.Must(ss.Post().GetPostsBatchForIndexing(o1.CreateAt, model.GetMillis()+100000, 100)).([]*model.PostForIndexing); len(r) != 3 {
t.Fatalf("Expected 3 posts in results. Got %v", len(r))
} else {
for _, p := range r {
@@ -1682,3 +1683,31 @@ func testPostStorePermanentDeleteBatch(t *testing.T, ss store.Store) {
t.Fatalf("Should have found post 3 after purge")
}
}
+
+func testPostStoreGetOldest(t *testing.T, ss store.Store) {
+ o0 := &model.Post{}
+ o0.ChannelId = model.NewId()
+ o0.UserId = model.NewId()
+ o0.Message = "zz" + model.NewId() + "b"
+ o0.CreateAt = 3
+ o0 = (<-ss.Post().Save(o0)).Data.(*model.Post)
+
+ o1 := &model.Post{}
+ o1.ChannelId = o0.Id
+ o1.UserId = model.NewId()
+ o1.Message = "zz" + model.NewId() + "b"
+ o1.CreateAt = 2
+ o1 = (<-ss.Post().Save(o1)).Data.(*model.Post)
+
+ o2 := &model.Post{}
+ o2.Id = model.NewId()
+ o2.ChannelId = o1.ChannelId
+ o2.UserId = model.NewId()
+ o2.Message = "zz" + model.NewId() + "b"
+ o2.CreateAt = 1
+ o2 = (<-ss.Post().Save(o2)).Data.(*model.Post)
+
+ r1 := (<-ss.Post().GetOldest()).Data.(*model.Post)
+
+ assert.EqualValues(t, o2.Id, r1.Id)
+}