summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--api4/job_test.go2
-rw-r--r--config/config.json4
-rw-r--r--einterfaces/jobs/elasticsearch.go15
-rw-r--r--i18n/en.json32
-rw-r--r--jobs/jobs.go6
-rw-r--r--jobs/jobs_watcher.go24
-rw-r--r--jobs/schedulers.go23
-rw-r--r--jobs/workers.go29
-rw-r--r--model/config.go49
-rw-r--r--model/job.go24
-rw-r--r--store/sql_job_store_test.go21
-rw-r--r--store/sql_supplier.go12
12 files changed, 187 insertions, 54 deletions
diff --git a/api4/job_test.go b/api4/job_test.go
index 3dcdbe58b..511386810 100644
--- a/api4/job_test.go
+++ b/api4/job_test.go
@@ -18,7 +18,7 @@ func TestCreateJob(t *testing.T) {
job := &model.Job{
Type: model.JOB_TYPE_DATA_RETENTION,
- Data: map[string]interface{}{
+ Data: map[string]string{
"thing": "stuff",
},
}
diff --git a/config/config.json b/config/config.json
index 5acd7d177..b8657d8d2 100644
--- a/config/config.json
+++ b/config/config.json
@@ -296,7 +296,9 @@
"EnableSearching": false,
"Sniff": true,
"PostIndexReplicas": 1,
- "PostIndexShards": 1
+ "PostIndexShards": 1,
+ "AggregatePostsAfterDays": 365,
+ "PostsAggregatorJobStartTime": "03:00"
},
"DataRetentionSettings": {
"Enable": false
diff --git a/einterfaces/jobs/elasticsearch.go b/einterfaces/jobs/elasticsearch.go
index 6d6dbe893..ca05b2ef3 100644
--- a/einterfaces/jobs/elasticsearch.go
+++ b/einterfaces/jobs/elasticsearch.go
@@ -20,3 +20,18 @@ func RegisterElasticsearchIndexerInterface(newInterface ElasticsearchIndexerInte
func GetElasticsearchIndexerInterface() ElasticsearchIndexerInterface {
return theElasticsearchIndexerInterface
}
+
+type ElasticsearchAggregatorInterface interface {
+ MakeWorker() model.Worker
+ MakeScheduler() model.Scheduler
+}
+
+var theElasticsearchAggregatorInterface ElasticsearchAggregatorInterface
+
+func RegisterElasticsearchAggregatorInterface(newInterface ElasticsearchAggregatorInterface) {
+ theElasticsearchAggregatorInterface = newInterface
+}
+
+func GetElasticsearchAggregatorInterface() ElasticsearchAggregatorInterface {
+ return theElasticsearchAggregatorInterface
+}
diff --git a/i18n/en.json b/i18n/en.json
index 8a2d0d770..49f5c1310 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -3788,6 +3788,30 @@
"translation": "Compliance export started for job '{{.JobName}}' at '{{.FilePath}}'"
},
{
+ "id": "ent.elasticsearch.aggregator_worker.get_indexes.error",
+ "translation": "Elasticsearch aggregator worker failed to get indexes"
+ },
+ {
+ "id": "ent.elasticsearch.aggregator_worker.create_index_job.error",
+ "translation": "Elasticsearch aggregator worker failed to create the indexing job"
+ },
+ {
+ "id": "ent.elasticsearch.aggregator_worker.delete_indexes.error",
+ "translation": "Elasticsearch aggregator worker failed to delete the indexes"
+ },
+ {
+ "id": "ent.elasticsearch.aggregator_worker.index_job_failed.error",
+ "translation": "Elasticsearch aggregator worker failed due to the indexing job failing"
+ },
+ {
+ "id": "ent.elasticsearch.indexer.do_job.parse_start_time.error",
+ "translation": "Elasticsearch indexing worker failed to parse the start time"
+ },
+ {
+ "id": "ent.elasticsearch.indexer.do_job.parse_end_time.error",
+ "translation": "Elasticsearch indexing worker failed to parse the end time"
+ },
+ {
"id": "ent.elasticsearch.create_client.connect_failed",
"translation": "Setting up Elasticsearch Client Failed"
},
@@ -4436,6 +4460,14 @@
"translation": "Elastic Search Username setting must be provided when Elastic Search indexing is enabled."
},
{
+ "id": "model.config.is_valid.elastic_search.aggregate_posts_after_days.app_error",
+ "translation": "Elasticsearch AggregatePostsAfterDays setting must be a number greater than or equal to 1"
+ },
+ {
+ "id": "model.config.is_valid.elastic_search.posts_aggregator_job_start_time.app_error",
+ "translation": "Elasticsearch PostsAggregatorJobStartTime setting must be a time in the format \"hh:mm\""
+ },
+ {
"id": "model.config.is_valid.email_batching_buffer_size.app_error",
"translation": "Invalid email batching buffer size for email settings. Must be zero or a positive number."
},
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 1986b22b6..e478c5a19 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -16,7 +16,7 @@ const (
CANCEL_WATCHER_POLLING_INTERVAL = 5000
)
-func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) {
+func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) {
job := model.Job{
Id: model.NewId(),
Type: jobType,
@@ -70,9 +70,9 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
job.Status = model.JOB_STATUS_ERROR
job.Progress = -1
if job.Data == nil {
- job.Data = make(map[string]interface{})
+ job.Data = make(map[string]string)
}
- job.Data["error"] = jobError
+ job.Data["error"] = jobError.Error()
if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return result.Err
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go
index 9ba68e85e..83d4249eb 100644
--- a/jobs/jobs_watcher.go
+++ b/jobs/jobs_watcher.go
@@ -64,25 +64,27 @@ func (watcher *Watcher) PollAndNotify() {
if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error())
} else {
- jobStatuses := result.Data.([]*model.Job)
+ jobs := result.Data.([]*model.Job)
- for _, js := range jobStatuses {
- j := model.Job{
- Type: js.Type,
- Id: js.Id,
- }
-
- if js.Type == model.JOB_TYPE_DATA_RETENTION {
+ for _, job := range jobs {
+ if job.Type == model.JOB_TYPE_DATA_RETENTION {
if watcher.workers.DataRetention != nil {
select {
- case watcher.workers.DataRetention.JobChannel() <- j:
+ case watcher.workers.DataRetention.JobChannel() <- *job:
default:
}
}
- } else if js.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING {
+ } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING {
if watcher.workers.ElasticsearchIndexing != nil {
select {
- case watcher.workers.ElasticsearchIndexing.JobChannel() <- j:
+ case watcher.workers.ElasticsearchIndexing.JobChannel() <- *job:
+ default:
+ }
+ }
+ } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION {
+ if watcher.workers.ElasticsearchAggregation != nil {
+ select {
+ case watcher.workers.ElasticsearchAggregation.JobChannel() <- *job:
default:
}
}
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
index 73ec6661a..2f4e18001 100644
--- a/jobs/schedulers.go
+++ b/jobs/schedulers.go
@@ -16,7 +16,8 @@ import (
type Schedulers struct {
startOnce sync.Once
- DataRetention model.Scheduler
+ DataRetention model.Scheduler
+ ElasticsearchAggregation model.Scheduler
listenerId string
}
@@ -28,6 +29,10 @@ func InitSchedulers() *Schedulers {
schedulers.DataRetention = dataRetentionInterface.MakeScheduler()
}
+ if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil {
+ schedulers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeScheduler()
+ }
+
return schedulers
}
@@ -38,6 +43,10 @@ func (schedulers *Schedulers) Start() *Schedulers {
if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
go schedulers.DataRetention.Run()
}
+
+ if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
+ go schedulers.ElasticsearchAggregation.Run()
+ }
})
schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
@@ -53,6 +62,14 @@ func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newCon
schedulers.DataRetention.Stop()
}
}
+
+ if schedulers.ElasticsearchAggregation != nil {
+ if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing {
+ go schedulers.ElasticsearchAggregation.Run()
+ } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing {
+ schedulers.ElasticsearchAggregation.Stop()
+ }
+ }
}
func (schedulers *Schedulers) Stop() *Schedulers {
@@ -62,6 +79,10 @@ func (schedulers *Schedulers) Stop() *Schedulers {
schedulers.DataRetention.Stop()
}
+ if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
+ schedulers.ElasticsearchAggregation.Stop()
+ }
+
l4g.Info("Stopped schedulers")
return schedulers
diff --git a/jobs/workers.go b/jobs/workers.go
index 592c001fb..fe38641e7 100644
--- a/jobs/workers.go
+++ b/jobs/workers.go
@@ -16,16 +16,15 @@ type Workers struct {
startOnce sync.Once
watcher *Watcher
- DataRetention model.Worker
- ElasticsearchIndexing model.Worker
+ DataRetention model.Worker
+ ElasticsearchIndexing model.Worker
+ ElasticsearchAggregation model.Worker
listenerId string
}
func InitWorkers() *Workers {
- workers := &Workers{
- // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
- }
+ workers := &Workers{}
workers.watcher = MakeWatcher(workers)
if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
@@ -36,6 +35,10 @@ func InitWorkers() *Workers {
workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker()
}
+ if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil {
+ workers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeWorker()
+ }
+
return workers
}
@@ -51,6 +54,10 @@ func (workers *Workers) Start() *Workers {
go workers.ElasticsearchIndexing.Run()
}
+ if workers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
+ go workers.ElasticsearchAggregation.Run()
+ }
+
go workers.watcher.Start()
})
@@ -75,6 +82,14 @@ func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *m
workers.ElasticsearchIndexing.Stop()
}
}
+
+ if workers.ElasticsearchAggregation != nil {
+ if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing {
+ go workers.ElasticsearchAggregation.Run()
+ } else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing {
+ workers.ElasticsearchAggregation.Stop()
+ }
+ }
}
func (workers *Workers) Stop() *Workers {
@@ -90,6 +105,10 @@ func (workers *Workers) Stop() *Workers {
workers.ElasticsearchIndexing.Stop()
}
+ if workers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
+ workers.ElasticsearchAggregation.Stop()
+ }
+
l4g.Info("Stopped workers")
return workers
diff --git a/model/config.go b/model/config.go
index 933c643f2..9906723e7 100644
--- a/model/config.go
+++ b/model/config.go
@@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
+ "time"
)
const (
@@ -123,11 +124,13 @@ const (
ANNOUNCEMENT_SETTINGS_DEFAULT_BANNER_COLOR = "#f2a93b"
ANNOUNCEMENT_SETTINGS_DEFAULT_BANNER_TEXT_COLOR = "#333333"
- ELASTICSEARCH_SETTINGS_DEFAULT_CONNECTION_URL = ""
- ELASTICSEARCH_SETTINGS_DEFAULT_USERNAME = ""
- ELASTICSEARCH_SETTINGS_DEFAULT_PASSWORD = ""
- ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_REPLICAS = 1
- ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS = 1
+ ELASTICSEARCH_SETTINGS_DEFAULT_CONNECTION_URL = ""
+ ELASTICSEARCH_SETTINGS_DEFAULT_USERNAME = ""
+ ELASTICSEARCH_SETTINGS_DEFAULT_PASSWORD = ""
+ ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_REPLICAS = 1
+ ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS = 1
+ ELASTICSEARCH_SETTINGS_DEFAULT_AGGREGATE_POSTS_AFTER_DAYS = 365
+ ELASTICSEARCH_SETTINGS_DEFAULT_POSTS_AGGREGATOR_JOB_START_TIME = "03:00"
)
type ServiceSettings struct {
@@ -441,14 +444,16 @@ type WebrtcSettings struct {
}
type ElasticsearchSettings struct {
- ConnectionUrl *string
- Username *string
- Password *string
- EnableIndexing *bool
- EnableSearching *bool
- Sniff *bool
- PostIndexReplicas *int
- PostIndexShards *int
+ ConnectionUrl *string
+ Username *string
+ Password *string
+ EnableIndexing *bool
+ EnableSearching *bool
+ Sniff *bool
+ PostIndexReplicas *int
+ PostIndexShards *int
+ AggregatePostsAfterDays *int
+ PostsAggregatorJobStartTime *string
}
type DataRetentionSettings struct {
@@ -1452,6 +1457,16 @@ func (o *Config) SetDefaults() {
*o.ElasticsearchSettings.PostIndexShards = ELASTICSEARCH_SETTINGS_DEFAULT_POST_INDEX_SHARDS
}
+ if o.ElasticsearchSettings.AggregatePostsAfterDays == nil {
+ o.ElasticsearchSettings.AggregatePostsAfterDays = new(int)
+ *o.ElasticsearchSettings.AggregatePostsAfterDays = ELASTICSEARCH_SETTINGS_DEFAULT_AGGREGATE_POSTS_AFTER_DAYS
+ }
+
+ if o.ElasticsearchSettings.PostsAggregatorJobStartTime == nil {
+ o.ElasticsearchSettings.PostsAggregatorJobStartTime = new(string)
+ *o.ElasticsearchSettings.PostsAggregatorJobStartTime = ELASTICSEARCH_SETTINGS_DEFAULT_POSTS_AGGREGATOR_JOB_START_TIME
+ }
+
if o.DataRetentionSettings.Enable == nil {
o.DataRetentionSettings.Enable = new(bool)
*o.DataRetentionSettings.Enable = false
@@ -1700,6 +1715,14 @@ func (o *Config) IsValid() *AppError {
return NewLocAppError("Config.IsValid", "model.config.is_valid.elastic_search.enable_searching.app_error", nil, "")
}
+ if *o.ElasticsearchSettings.AggregatePostsAfterDays < 1 {
+ return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.aggregate_posts_after_days.app_error", nil, "", http.StatusBadRequest)
+ }
+
+ if _, err := time.Parse("03:04", *o.ElasticsearchSettings.PostsAggregatorJobStartTime); err != nil {
+ return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.posts_aggregator_job_start_time.app_error", nil, err.Error(), http.StatusBadRequest)
+ }
+
return nil
}
diff --git a/model/job.go b/model/job.go
index 004331a1f..258fa2bd3 100644
--- a/model/job.go
+++ b/model/job.go
@@ -10,8 +10,9 @@ import (
)
const (
- JOB_TYPE_DATA_RETENTION = "data_retention"
- JOB_TYPE_ELASTICSEARCH_POST_INDEXING = "elasticsearch_post_indexing"
+ JOB_TYPE_DATA_RETENTION = "data_retention"
+ JOB_TYPE_ELASTICSEARCH_POST_INDEXING = "elasticsearch_post_indexing"
+ JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION = "elasticsearch_post_aggregation"
JOB_STATUS_PENDING = "pending"
JOB_STATUS_IN_PROGRESS = "in_progress"
@@ -22,15 +23,15 @@ const (
)
type Job struct {
- Id string `json:"id"`
- Type string `json:"type"`
- Priority int64 `json:"priority"`
- CreateAt int64 `json:"create_at"`
- StartAt int64 `json:"start_at"`
- LastActivityAt int64 `json:"last_activity_at"`
- Status string `json:"status"`
- Progress int64 `json:"progress"`
- Data map[string]interface{} `json:"data"`
+ Id string `json:"id"`
+ Type string `json:"type"`
+ Priority int64 `json:"priority"`
+ CreateAt int64 `json:"create_at"`
+ StartAt int64 `json:"start_at"`
+ LastActivityAt int64 `json:"last_activity_at"`
+ Status string `json:"status"`
+ Progress int64 `json:"progress"`
+ Data map[string]string `json:"data"`
}
func (j *Job) IsValid() *AppError {
@@ -45,6 +46,7 @@ func (j *Job) IsValid() *AppError {
switch j.Type {
case JOB_TYPE_DATA_RETENTION:
case JOB_TYPE_ELASTICSEARCH_POST_INDEXING:
+ case JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION:
default:
return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest)
}
diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go
index 97e95ab92..394a09192 100644
--- a/store/sql_job_store_test.go
+++ b/store/sql_job_store_test.go
@@ -17,9 +17,9 @@ func TestJobSaveGet(t *testing.T) {
Id: model.NewId(),
Type: model.NewId(),
Status: model.NewId(),
- Data: map[string]interface{}{
- "Processed": 0,
- "Total": 12345,
+ Data: map[string]string{
+ "Processed": "0",
+ "Total": "12345",
"LastProcessed": "abcd",
},
}
@@ -36,6 +36,8 @@ func TestJobSaveGet(t *testing.T) {
t.Fatal(result.Err)
} else if received := result.Data.(*model.Job); received.Id != job.Id {
t.Fatal("received incorrect job after save")
+ } else if received.Data["Total"] != "12345" {
+ t.Fatal("data field was not retrieved successfully:", received.Data)
}
}
@@ -184,6 +186,9 @@ func TestJobGetAllByStatus(t *testing.T) {
Type: jobType,
CreateAt: 1000,
Status: status,
+ Data: map[string]string{
+ "test": "data",
+ },
},
{
Id: model.NewId(),
@@ -214,10 +219,10 @@ func TestJobGetAllByStatus(t *testing.T) {
t.Fatal(result.Err)
} else if received := result.Data.([]*model.Job); len(received) != 3 {
t.Fatal("received wrong number of jobs")
- } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
- t.Fatal("should've received first jobs")
- } else if received[0].Id != jobs[1].Id && received[1].Id != jobs[1].Id {
- t.Fatal("should've received second jobs")
+ } else if received[0].Id != jobs[1].Id || received[1].Id != jobs[0].Id || received[2].Id != jobs[2].Id {
+ t.Fatal("should've received jobs ordered by CreateAt time")
+ } else if received[1].Data["test"] != "data" {
+ t.Fatal("should've received job data field back as saved")
}
}
@@ -237,7 +242,7 @@ func TestJobUpdateOptimistically(t *testing.T) {
job.LastActivityAt = model.GetMillis()
job.Status = model.JOB_STATUS_IN_PROGRESS
job.Progress = 50
- job.Data = map[string]interface{}{
+ job.Data = map[string]string{
"Foo": "Bar",
}
diff --git a/store/sql_supplier.go b/store/sql_supplier.go
index 5b9c268bb..f56a9f448 100644
--- a/store/sql_supplier.go
+++ b/store/sql_supplier.go
@@ -786,6 +786,8 @@ func (me mattermConverter) ToDb(val interface{}) (interface{}, error) {
switch t := val.(type) {
case model.StringMap:
return model.MapToJson(t), nil
+ case map[string]string:
+ return model.MapToJson(model.StringMap(t)), nil
case model.StringArray:
return model.ArrayToJson(t), nil
case model.StringInterface:
@@ -809,6 +811,16 @@ func (me mattermConverter) FromDb(target interface{}) (gorp.CustomScanner, bool)
return json.Unmarshal(b, target)
}
return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true
+ case *map[string]string:
+ binder := func(holder, target interface{}) error {
+ s, ok := holder.(*string)
+ if !ok {
+ return errors.New(utils.T("store.sql.convert_string_map"))
+ }
+ b := []byte(*s)
+ return json.Unmarshal(b, target)
+ }
+ return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true
case *model.StringArray:
binder := func(holder, target interface{}) error {
s, ok := holder.(*string)