diff options
Diffstat (limited to 'jobs')
-rw-r--r-- | jobs/jobs_watcher.go | 7 | ||||
-rw-r--r-- | jobs/schedulers.go | 4 | ||||
-rw-r--r-- | jobs/server.go | 1 | ||||
-rw-r--r-- | jobs/workers.go | 21 |
4 files changed, 33 insertions, 0 deletions
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index b36a99051..f519e7cca 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -78,6 +78,13 @@ func (watcher *Watcher) PollAndNotify() { default: } } + } else if job.Type == model.JOB_TYPE_MESSAGE_EXPORT { + if watcher.workers.MessageExport != nil { + select { + case watcher.workers.MessageExport.JobChannel() <- *job: + default: + } + } } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING { if watcher.workers.ElasticsearchIndexing != nil { select { diff --git a/jobs/schedulers.go b/jobs/schedulers.go index cbe5f1749..bec53a49b 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -39,6 +39,10 @@ func (srv *JobServer) InitSchedulers() *Schedulers { schedulers.schedulers = append(schedulers.schedulers, srv.DataRetentionJob.MakeScheduler()) } + if srv.MessageExportJob != nil { + schedulers.schedulers = append(schedulers.schedulers, srv.MessageExportJob.MakeScheduler()) + } + if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil { schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler()) } diff --git a/jobs/server.go b/jobs/server.go index 40cfb1f64..777b02a26 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -19,6 +19,7 @@ type JobServer struct { Schedulers *Schedulers DataRetentionJob ejobs.DataRetentionJobInterface + MessageExportJob ejobs.MessageExportJobInterface ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface LdapSync ejobs.LdapSyncInterface diff --git a/jobs/workers.go b/jobs/workers.go index b1d275658..3abd7131c 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -17,6 +17,7 @@ type Workers struct { Watcher *Watcher DataRetention model.Worker + MessageExport model.Worker ElasticsearchIndexing model.Worker ElasticsearchAggregation model.Worker LdapSync model.Worker @@ -34,6 +35,10 @@ func (srv *JobServer) InitWorkers() *Workers { workers.DataRetention = srv.DataRetentionJob.MakeWorker() } + if srv.MessageExportJob != nil { + workers.MessageExport = srv.MessageExportJob.MakeWorker() + } + if elasticsearchIndexerInterface := srv.ElasticsearchIndexer; elasticsearchIndexerInterface != nil { workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker() } @@ -57,6 +62,10 @@ func (workers *Workers) Start() *Workers { go workers.DataRetention.Run() } + if workers.MessageExport != nil && *workers.Config().MessageExportSettings.EnableExport { + go workers.MessageExport.Run() + } + if workers.ElasticsearchIndexing != nil && *workers.Config().ElasticsearchSettings.EnableIndexing { go workers.ElasticsearchIndexing.Run() } @@ -86,6 +95,14 @@ func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *m } } + if workers.MessageExport != nil { + if !*oldConfig.MessageExportSettings.EnableExport && *newConfig.MessageExportSettings.EnableExport { + go workers.MessageExport.Run() + } else if *oldConfig.MessageExportSettings.EnableExport && !*newConfig.MessageExportSettings.EnableExport { + workers.MessageExport.Stop() + } + } + if workers.ElasticsearchIndexing != nil { if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing { go workers.ElasticsearchIndexing.Run() @@ -120,6 +137,10 @@ func (workers *Workers) Stop() *Workers { workers.DataRetention.Stop() } + if workers.MessageExport != nil && *workers.Config().MessageExportSettings.EnableExport { + workers.MessageExport.Stop() + } + if workers.ElasticsearchIndexing != nil && *workers.Config().ElasticsearchSettings.EnableIndexing { workers.ElasticsearchIndexing.Stop() } |