summaryrefslogtreecommitdiffstats
path: root/jobs
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2017-07-07 15:21:02 +0100
committerGitHub <noreply@github.com>2017-07-07 15:21:02 +0100
commit0495a519499d6cefa289982a94d8f42de541c1f0 (patch)
tree94b6145daa41ca4d1d4a172f030071076852a09a /jobs
parent6e0f5f096986dad11ef182ddb51d4bfb0e558860 (diff)
downloadchat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.gz
chat-0495a519499d6cefa289982a94d8f42de541c1f0.tar.bz2
chat-0495a519499d6cefa289982a94d8f42de541c1f0.zip
PLT-6916: Redesign the jobs package and Jobserver. (#6733)
This commit redesigns the jobserver to be based around an architecture of "workers", which carry out jobs of a particular type, and "jobs" which are a unit of work carried by a particular worker. It also introduces "schedulers" which are responsible for scheduling jobs of a particular type automatically (jobs can also be scheduled manually when apropriate). Workers may be run many times, either in instances of the platform binary, or the standalone jobserver binary. In any mattermost cluster, only one instance of platform OR jobserver must run the schedulers. At the moment this is controlled by a config variable, but in future will be controlled through the cluster leader election process.
Diffstat (limited to 'jobs')
-rw-r--r--jobs/jobs.go122
-rw-r--r--jobs/jobs_watcher.go85
-rw-r--r--jobs/jobserver/jobserver.go15
-rw-r--r--jobs/schedulers.go68
-rw-r--r--jobs/server.go31
-rw-r--r--jobs/testjob.go54
-rw-r--r--jobs/testscheduler.go58
-rw-r--r--jobs/testworker.go104
-rw-r--r--jobs/workers.go79
9 files changed, 511 insertions, 105 deletions
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 8c84f4eea..58c2f2f13 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -4,71 +4,111 @@
package jobs
import (
- "sync"
+ "context"
+ "time"
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/platform/einterfaces/jobs"
"github.com/mattermost/platform/model"
- "github.com/mattermost/platform/store"
- "github.com/mattermost/platform/utils"
)
-type Jobs struct {
- startOnce sync.Once
+const (
+ CANCEL_WATCHER_POLLING_INTERVAL = 5000
+)
- DataRetention model.Job
- // SearchIndexing model.Job
+func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *model.AppError) {
+ job := model.Job{
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis(),
+ Status: model.JOB_STATUS_PENDING,
+ Data: jobData,
+ }
- listenerId string
+ if result := <-Srv.Store.Job().Save(&job); result.Err != nil {
+ return nil, result.Err
+ }
+
+ return &job, nil
}
-func InitJobs(s store.Store) *Jobs {
- jobs := &Jobs{
- // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+func ClaimJob(job *model.Job) (bool, *model.AppError) {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ success := result.Data.(bool)
+ return success, nil
}
+}
- if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
- jobs.DataRetention = dataRetentionInterface.MakeJob(s)
- }
+func SetJobProgress(jobId string, progress int64) (bool, *model.AppError) {
+ var job *model.Job
- return jobs
-}
+ if result := <-Srv.Store.Job().Get(jobId); result.Err != nil {
+ return false, result.Err
+ } else {
+ job = result.Data.(*model.Job)
+ }
-func (jobs *Jobs) Start() *Jobs {
- l4g.Info("Starting jobs")
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.Progress = progress
- jobs.startOnce.Do(func() {
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
+ if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ return false, result.Err
+ } else {
+ if !result.Data.(bool) {
+ return false, nil
}
+ }
- // go jobs.SearchIndexing.Run()
- })
+ return true, nil
+}
- jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange)
+func SetJobSuccess(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
+ return result.Err
+}
- return jobs
+func SetJobError(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
+ return result.Err
}
-func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
- if jobs.DataRetention != nil {
- if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
- go jobs.DataRetention.Run()
- } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
- }
- }
+func SetJobCanceled(job *model.Job) *model.AppError {
+ result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
+ return result.Err
}
-func (jobs *Jobs) Stop() *Jobs {
- utils.RemoveConfigListener(jobs.listenerId)
+func RequestCancellation(job *model.Job) *model.AppError {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
+ }
- if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
- jobs.DataRetention.Stop()
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ return result.Err
+ } else if result.Data.(bool) {
+ return nil
}
- // jobs.SearchIndexing.Stop()
- l4g.Info("Stopped jobs")
+ return model.NewLocAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id)
+}
- return jobs
+func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
+ for {
+ select {
+ case <-ctx.Done():
+ l4g.Debug("CancellationWatcher for Job: %v Aborting as job has finished.", jobId)
+ return
+ case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond):
+ l4g.Debug("CancellationWatcher for Job: %v polling.", jobId)
+ if result := <-Srv.Store.Job().Get(jobId); result.Err == nil {
+ jobStatus := result.Data.(*model.Job)
+ if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED {
+ close(cancelChan)
+ return
+ }
+ }
+ }
+ }
}
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go
new file mode 100644
index 000000000..ada957ccc
--- /dev/null
+++ b/jobs/jobs_watcher.go
@@ -0,0 +1,85 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "math/rand"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+const (
+ WATCHER_POLLING_INTERVAL = 15000
+)
+
+type Watcher struct {
+ workers *Workers
+
+ stop chan bool
+ stopped chan bool
+}
+
+func MakeWatcher(workers *Workers) *Watcher {
+ return &Watcher{
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ workers: workers,
+ }
+}
+
+func (watcher *Watcher) Start() {
+ l4g.Debug("Watcher Started")
+
+ // Delay for some random number of milliseconds before starting to ensure that multiple
+ // instances of the jobserver don't poll at a time too close to each other.
+ rand.Seed(time.Now().UTC().UnixNano())
+ _ = <-time.After(time.Duration(rand.Intn(WATCHER_POLLING_INTERVAL)) * time.Millisecond)
+
+ defer func(){
+ l4g.Debug("Watcher Finished")
+ watcher.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-watcher.stop:
+ l4g.Debug("Watcher: Received stop signal")
+ return
+ case <-time.After(WATCHER_POLLING_INTERVAL * time.Millisecond):
+ watcher.PollAndNotify()
+ }
+ }
+}
+
+func (watcher *Watcher) Stop() {
+ l4g.Debug("Watcher Stopping")
+ watcher.stop <- true
+ <-watcher.stopped
+}
+
+func (watcher *Watcher) PollAndNotify() {
+ if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
+ l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error())
+ } else {
+ jobStatuses := result.Data.([]*model.Job)
+
+ for _, js := range jobStatuses {
+ j := model.Job{
+ Type: js.Type,
+ Id: js.Id,
+ }
+
+ if js.Type == model.JOB_TYPE_DATA_RETENTION {
+ if watcher.workers.DataRetention != nil {
+ select {
+ case watcher.workers.DataRetention.JobChannel() <- j:
+ default:
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/jobs/jobserver/jobserver.go b/jobs/jobserver/jobserver.go
index 5f491a815..aabe5d3b2 100644
--- a/jobs/jobserver/jobserver.go
+++ b/jobs/jobserver/jobserver.go
@@ -16,22 +16,20 @@ import (
_ "github.com/mattermost/platform/imports"
)
-var Srv jobs.JobServer
-
func main() {
// Initialize
utils.InitAndLoadConfig("config.json")
defer l4g.Close()
- Srv.Store = store.NewLayeredStore()
- defer Srv.Store.Close()
+ jobs.Srv.Store = store.NewLayeredStore()
+ defer jobs.Srv.Store.Close()
- Srv.LoadLicense()
+ jobs.Srv.LoadLicense()
// Run jobs
l4g.Info("Starting Mattermost job server")
- Srv.Jobs = jobs.InitJobs(Srv.Store)
- Srv.Jobs.Start()
+ jobs.Srv.StartWorkers()
+ jobs.Srv.StartSchedulers()
var signalChan chan os.Signal = make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
@@ -40,7 +38,8 @@ func main() {
// Cleanup anything that isn't handled by a defer statement
l4g.Info("Stopping Mattermost job server")
- Srv.Jobs.Stop()
+ jobs.Srv.StopSchedulers()
+ jobs.Srv.StopWorkers()
l4g.Info("Stopped Mattermost job server")
}
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
new file mode 100644
index 000000000..73ec6661a
--- /dev/null
+++ b/jobs/schedulers.go
@@ -0,0 +1,68 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "sync"
+
+ l4g "github.com/alecthomas/log4go"
+ ejobs "github.com/mattermost/platform/einterfaces/jobs"
+
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+type Schedulers struct {
+ startOnce sync.Once
+
+ DataRetention model.Scheduler
+
+ listenerId string
+}
+
+func InitSchedulers() *Schedulers {
+ schedulers := &Schedulers{}
+
+ if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ schedulers.DataRetention = dataRetentionInterface.MakeScheduler()
+ }
+
+ return schedulers
+}
+
+func (schedulers *Schedulers) Start() *Schedulers {
+ l4g.Info("Starting schedulers")
+
+ schedulers.startOnce.Do(func() {
+ if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ go schedulers.DataRetention.Run()
+ }
+ })
+
+ schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
+
+ return schedulers
+}
+
+func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ if schedulers.DataRetention != nil {
+ if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
+ go schedulers.DataRetention.Run()
+ } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
+ schedulers.DataRetention.Stop()
+ }
+ }
+}
+
+func (schedulers *Schedulers) Stop() *Schedulers {
+ utils.RemoveConfigListener(schedulers.listenerId)
+
+ if schedulers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ schedulers.DataRetention.Stop()
+ }
+
+ l4g.Info("Stopped schedulers")
+
+ return schedulers
+}
diff --git a/jobs/server.go b/jobs/server.go
index dd3448842..7920cb2d5 100644
--- a/jobs/server.go
+++ b/jobs/server.go
@@ -11,10 +11,13 @@ import (
)
type JobServer struct {
- Store store.Store
- Jobs *Jobs
+ Store store.Store
+ Workers *Workers
+ Schedulers *Schedulers
}
+var Srv JobServer
+
func (server *JobServer) LoadLicense() {
licenseId := ""
if result := <-server.Store.System().Get(); result.Err == nil {
@@ -44,3 +47,27 @@ func (server *JobServer) LoadLicense() {
l4g.Info(utils.T("mattermost.load_license.find.warn"))
}
}
+
+func (server *JobServer) StartWorkers() {
+ if *utils.Cfg.JobSettings.RunJobs {
+ Srv.Workers = InitWorkers().Start()
+ }
+}
+
+func (server *JobServer) StartSchedulers() {
+ if *utils.Cfg.JobSettings.RunJobs {
+ Srv.Schedulers = InitSchedulers().Start()
+ }
+}
+
+func (server *JobServer) StopWorkers() {
+ if Srv.Workers != nil {
+ Srv.Workers.Stop()
+ }
+}
+
+func (server *JobServer) StopSchedulers() {
+ if Srv.Schedulers != nil {
+ Srv.Schedulers.Stop()
+ }
+}
diff --git a/jobs/testjob.go b/jobs/testjob.go
deleted file mode 100644
index 59d5274e5..000000000
--- a/jobs/testjob.go
+++ /dev/null
@@ -1,54 +0,0 @@
-// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package jobs
-
-import (
- "time"
-
- l4g "github.com/alecthomas/log4go"
- "github.com/mattermost/platform/store"
-)
-
-type TestJob struct {
- store store.Store
-
- name string
- stop chan bool
- stopped chan bool
-}
-
-func MakeTestJob(s store.Store, name string) *TestJob {
- return &TestJob{
- store: s,
- name: name,
- stop: make(chan bool, 1),
- stopped: make(chan bool, 1),
- }
-}
-
-func (job *TestJob) Run() {
- l4g.Debug("Job %v: Started", job.name)
-
- running := true
- for running {
- l4g.Debug("Job %v: Tick", job.name)
-
- select {
- case <-job.stop:
- l4g.Debug("Job %v: Received stop signal", job.name)
- running = false
- case <-time.After(10 * time.Second):
- continue
- }
- }
-
- l4g.Debug("Job %v: Finished", job.name)
- job.stopped <- true
-}
-
-func (job *TestJob) Stop() {
- l4g.Debug("Job %v: Stopping", job.name)
- job.stop <- true
- <-job.stopped
-}
diff --git a/jobs/testscheduler.go b/jobs/testscheduler.go
new file mode 100644
index 000000000..31b5d144c
--- /dev/null
+++ b/jobs/testscheduler.go
@@ -0,0 +1,58 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+)
+
+type TestScheduler struct {
+ name string
+ jobType string
+ stop chan bool
+ stopped chan bool
+}
+
+func MakeTestScheduler(name string, jobType string) *TestScheduler {
+ return &TestScheduler{
+ name: name,
+ jobType: jobType,
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ }
+}
+
+func (scheduler *TestScheduler) Run() {
+ l4g.Debug("Scheduler %v: Started", scheduler.name)
+
+ defer func(){
+ l4g.Debug("Scheduler %v: Finished", scheduler.name)
+ scheduler.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-scheduler.stop:
+ l4g.Debug("Scheduler %v: Received stop signal", scheduler.name)
+ return
+ case <-time.After(86400 * time.Second):
+ l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name)
+ scheduler.AddJob()
+ }
+ }
+}
+
+func (scheduler *TestScheduler) AddJob() {
+ if _, err := CreateJob(scheduler.jobType, nil); err != nil {
+ l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err)
+ }
+}
+
+func (scheduler *TestScheduler) Stop() {
+ l4g.Debug("Scheduler %v: Stopping", scheduler.name)
+ scheduler.stop <- true
+ <-scheduler.stopped
+}
diff --git a/jobs/testworker.go b/jobs/testworker.go
new file mode 100644
index 000000000..f1c8a07a3
--- /dev/null
+++ b/jobs/testworker.go
@@ -0,0 +1,104 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "context"
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+)
+
+type TestWorker struct {
+ name string
+ stop chan bool
+ stopped chan bool
+ jobs chan model.Job
+}
+
+func MakeTestWorker(name string) *TestWorker {
+ return &TestWorker{
+ name: name,
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ jobs: make(chan model.Job),
+ }
+}
+
+func (worker *TestWorker) Run() {
+ l4g.Debug("Worker %v: Started", worker.name)
+
+ defer func() {
+ l4g.Debug("Worker %v: Finished", worker.name)
+ worker.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-worker.stop:
+ l4g.Debug("Worker %v: Received stop signal", worker.name)
+ return
+ case job := <-worker.jobs:
+ l4g.Debug("Worker %v: Received a new candidate job.", worker.name)
+ worker.DoJob(&job)
+ }
+ }
+}
+
+func (worker *TestWorker) DoJob(job *model.Job) {
+ if claimed, err := ClaimJob(job); err != nil {
+ l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error())
+ return
+ } else if !claimed {
+ return
+ }
+
+ cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
+ cancelWatcherChan := make(chan interface{}, 1)
+ go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
+
+ defer cancelCancelWatcher()
+
+ counter := 0
+ for {
+ select {
+ case <-cancelWatcherChan:
+ l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id)
+ if err := SetJobCanceled(job); err != nil {
+ l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
+ }
+ return
+ case <-worker.stop:
+ l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id)
+ if err := SetJobCanceled(job); err != nil {
+ l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
+ }
+ return
+ case <-time.After(5 * time.Second):
+ counter++
+ if counter > 10 {
+ l4g.Debug("Job %v: Job completed.", job.Id)
+ if err := SetJobSuccess(job); err != nil {
+ l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error())
+ }
+ return
+ } else {
+ if _, err := SetJobProgress(job.Id, int64(counter*10)); err != nil {
+ l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error())
+ }
+ }
+ }
+ }
+}
+
+func (worker *TestWorker) Stop() {
+ l4g.Debug("Worker %v: Stopping", worker.name)
+ worker.stop <- true
+ <-worker.stopped
+}
+
+func (worker *TestWorker) JobChannel() chan<- model.Job {
+ return worker.jobs
+}
diff --git a/jobs/workers.go b/jobs/workers.go
new file mode 100644
index 000000000..a42ec4607
--- /dev/null
+++ b/jobs/workers.go
@@ -0,0 +1,79 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "sync"
+
+ l4g "github.com/alecthomas/log4go"
+ ejobs "github.com/mattermost/platform/einterfaces/jobs"
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+type Workers struct {
+ startOnce sync.Once
+ watcher *Watcher
+
+ DataRetention model.Worker
+ // SearchIndexing model.Job
+
+ listenerId string
+}
+
+func InitWorkers() *Workers {
+ workers := &Workers{
+ // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+ }
+ workers.watcher = MakeWatcher(workers)
+
+ if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ workers.DataRetention = dataRetentionInterface.MakeWorker()
+ }
+
+ return workers
+}
+
+func (workers *Workers) Start() *Workers {
+ l4g.Info("Starting workers")
+
+ workers.startOnce.Do(func() {
+ if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ go workers.DataRetention.Run()
+ }
+
+ // go workers.SearchIndexing.Run()
+
+ go workers.watcher.Start()
+ })
+
+ workers.listenerId = utils.AddConfigListener(workers.handleConfigChange)
+
+ return workers
+}
+
+func (workers *Workers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ if workers.DataRetention != nil {
+ if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
+ go workers.DataRetention.Run()
+ } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
+ workers.DataRetention.Stop()
+ }
+ }
+}
+
+func (workers *Workers) Stop() *Workers {
+ utils.RemoveConfigListener(workers.listenerId)
+
+ workers.watcher.Stop()
+
+ if workers.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ workers.DataRetention.Stop()
+ }
+ // workers.SearchIndexing.Stop()
+
+ l4g.Info("Stopped workers")
+
+ return workers
+}