summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--api4/apitestlib.go5
-rw-r--r--api4/job.go4
-rw-r--r--app/admin.go3
-rw-r--r--app/app.go45
-rw-r--r--app/job.go9
-rw-r--r--cmd/platform/jobserver.go15
-rw-r--r--cmd/platform/server.go11
-rw-r--r--einterfaces/jobs/data_retention.go10
-rw-r--r--einterfaces/jobs/elasticsearch.go20
-rw-r--r--einterfaces/jobs/ldap_sync.go10
-rw-r--r--jobs/jobs.go50
-rw-r--r--jobs/jobs_watcher.go6
-rw-r--r--jobs/schedulers.go20
-rw-r--r--jobs/server.go35
-rw-r--r--jobs/testworker.go16
-rw-r--r--jobs/workers.go13
16 files changed, 140 insertions, 132 deletions
diff --git a/api4/apitestlib.go b/api4/apitestlib.go
index fad066ff8..301e312da 100644
--- a/api4/apitestlib.go
+++ b/api4/apitestlib.go
@@ -23,7 +23,6 @@ import (
"github.com/mattermost/mattermost-server/utils"
"github.com/mattermost/mattermost-server/wsapi"
- "github.com/mattermost/mattermost-server/jobs"
s3 "github.com/minio/minio-go"
"github.com/minio/minio-go/pkg/credentials"
)
@@ -76,8 +75,8 @@ func setupTestHelper(enterprise bool) *TestHelper {
utils.License().Features.SetDefaults()
}
- if jobs.Srv.Store == nil {
- jobs.Srv.Store = th.App.Srv.Store
+ if th.App.Jobs.Store == nil {
+ th.App.Jobs.Store = th.App.Srv.Store
}
th.Client = th.CreateClient()
diff --git a/api4/job.go b/api4/job.go
index 57c604361..138c76c8f 100644
--- a/api4/job.go
+++ b/api4/job.go
@@ -52,7 +52,7 @@ func createJob(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- if job, err := app.CreateJob(job); err != nil {
+ if job, err := c.App.CreateJob(job); err != nil {
c.Err = err
return
} else {
@@ -109,7 +109,7 @@ func cancelJob(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- if err := app.CancelJob(c.Params.JobId); err != nil {
+ if err := c.App.CancelJob(c.Params.JobId); err != nil {
c.Err = err
return
}
diff --git a/app/admin.go b/app/admin.go
index 0d02c3b49..dab7e9759 100644
--- a/app/admin.go
+++ b/app/admin.go
@@ -14,7 +14,6 @@ import (
"net/http"
l4g "github.com/alecthomas/log4go"
- "github.com/mattermost/mattermost-server/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
"github.com/mattermost/mattermost-server/store/sqlstore"
@@ -190,7 +189,7 @@ func (a *App) RecycleDatabaseConnection() {
l4g.Warn(utils.T("api.admin.recycle_db_start.warn"))
a.Srv.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster)
- jobs.Srv.Store = a.Srv.Store
+ a.Jobs.Store = a.Srv.Store
time.Sleep(20 * time.Second)
oldStore.Close()
diff --git a/app/app.go b/app/app.go
index e85fa6342..26388d841 100644
--- a/app/app.go
+++ b/app/app.go
@@ -9,6 +9,8 @@ import (
"sync"
"github.com/mattermost/mattermost-server/einterfaces"
+ ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
+ "github.com/mattermost/mattermost-server/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/plugin/pluginenv"
"github.com/mattermost/mattermost-server/utils"
@@ -25,6 +27,8 @@ type App struct {
Hubs []*Hub
HubsStopCheckingForDeadlock chan bool
+ Jobs *jobs.JobServer
+
AccountMigration einterfaces.AccountMigrationInterface
Brand einterfaces.BrandInterface
Cluster einterfaces.ClusterInterface
@@ -36,7 +40,9 @@ type App struct {
Saml einterfaces.SamlInterface
}
-var globalApp App
+var globalApp App = App{
+ Jobs: &jobs.JobServer{},
+}
var initEnterprise sync.Once
@@ -65,6 +71,30 @@ func RegisterComplianceInterface(f func(*App) einterfaces.ComplianceInterface) {
complianceInterface = f
}
+var jobsDataRetentionInterface func(*App) ejobs.DataRetentionInterface
+
+func RegisterJobsDataRetentionInterface(f func(*App) ejobs.DataRetentionInterface) {
+ jobsDataRetentionInterface = f
+}
+
+var jobsElasticsearchAggregatorInterface func(*App) ejobs.ElasticsearchAggregatorInterface
+
+func RegisterJobsElasticsearchAggregatorInterface(f func(*App) ejobs.ElasticsearchAggregatorInterface) {
+ jobsElasticsearchAggregatorInterface = f
+}
+
+var jobsElasticsearchIndexerInterface func(*App) ejobs.ElasticsearchIndexerInterface
+
+func RegisterJobsElasticsearchIndexerInterface(f func(*App) ejobs.ElasticsearchIndexerInterface) {
+ jobsElasticsearchIndexerInterface = f
+}
+
+var jobsLdapSyncInterface func(*App) ejobs.LdapSyncInterface
+
+func RegisterJobsLdapSyncInterface(f func(*App) ejobs.LdapSyncInterface) {
+ jobsLdapSyncInterface = f
+}
+
var ldapInterface func(*App) einterfaces.LdapInterface
func RegisterLdapInterface(f func(*App) einterfaces.LdapInterface) {
@@ -121,6 +151,19 @@ func (a *App) initEnterprise() {
a.Saml.ConfigureSP()
})
}
+
+ if jobsDataRetentionInterface != nil {
+ a.Jobs.DataRetention = jobsDataRetentionInterface(a)
+ }
+ if jobsElasticsearchAggregatorInterface != nil {
+ a.Jobs.ElasticsearchAggregator = jobsElasticsearchAggregatorInterface(a)
+ }
+ if jobsElasticsearchIndexerInterface != nil {
+ a.Jobs.ElasticsearchIndexer = jobsElasticsearchIndexerInterface(a)
+ }
+ if jobsLdapSyncInterface != nil {
+ a.Jobs.LdapSync = jobsLdapSyncInterface(a)
+ }
}
func (a *App) Config() *model.Config {
diff --git a/app/job.go b/app/job.go
index c1058880f..d80fe6262 100644
--- a/app/job.go
+++ b/app/job.go
@@ -4,7 +4,6 @@
package app
import (
- "github.com/mattermost/mattermost-server/jobs"
"github.com/mattermost/mattermost-server/model"
)
@@ -40,10 +39,10 @@ func (a *App) GetJobsByType(jobType string, offset int, limit int) ([]*model.Job
}
}
-func CreateJob(job *model.Job) (*model.Job, *model.AppError) {
- return jobs.CreateJob(job.Type, job.Data)
+func (a *App) CreateJob(job *model.Job) (*model.Job, *model.AppError) {
+ return a.Jobs.CreateJob(job.Type, job.Data)
}
-func CancelJob(jobId string) *model.AppError {
- return jobs.RequestCancellation(jobId)
+func (a *App) CancelJob(jobId string) *model.AppError {
+ return a.Jobs.RequestCancellation(jobId)
}
diff --git a/cmd/platform/jobserver.go b/cmd/platform/jobserver.go
index 6d4e828bd..4f82a21ee 100644
--- a/cmd/platform/jobserver.go
+++ b/cmd/platform/jobserver.go
@@ -8,7 +8,6 @@ import (
"syscall"
l4g "github.com/alecthomas/log4go"
- "github.com/mattermost/mattermost-server/jobs"
"github.com/mattermost/mattermost-server/store"
"github.com/mattermost/mattermost-server/store/sqlstore"
"github.com/spf13/cobra"
@@ -37,18 +36,18 @@ func jobserverCmdF(cmd *cobra.Command, args []string) {
}
defer l4g.Close()
- jobs.Srv.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster)
- defer jobs.Srv.Store.Close()
+ a.Jobs.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster)
+ defer a.Jobs.Store.Close()
- jobs.Srv.LoadLicense()
+ a.Jobs.LoadLicense()
// Run jobs
l4g.Info("Starting Mattermost job server")
if !noJobs {
- jobs.Srv.StartWorkers()
+ a.Jobs.StartWorkers()
}
if !noSchedule {
- jobs.Srv.StartSchedulers()
+ a.Jobs.StartSchedulers()
}
var signalChan chan os.Signal = make(chan os.Signal)
@@ -58,8 +57,8 @@ func jobserverCmdF(cmd *cobra.Command, args []string) {
// Cleanup anything that isn't handled by a defer statement
l4g.Info("Stopping Mattermost job server")
- jobs.Srv.StopSchedulers()
- jobs.Srv.StopWorkers()
+ a.Jobs.StopSchedulers()
+ a.Jobs.StopWorkers()
l4g.Info("Stopped Mattermost job server")
}
diff --git a/cmd/platform/server.go b/cmd/platform/server.go
index 7f5fbf6e8..ec753c837 100644
--- a/cmd/platform/server.go
+++ b/cmd/platform/server.go
@@ -13,7 +13,6 @@ import (
"github.com/mattermost/mattermost-server/api"
"github.com/mattermost/mattermost-server/api4"
"github.com/mattermost/mattermost-server/app"
- "github.com/mattermost/mattermost-server/jobs"
"github.com/mattermost/mattermost-server/manualtesting"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
@@ -138,12 +137,12 @@ func runServer(configFileLocation string) {
}
}
- jobs.Srv.Store = a.Srv.Store
+ a.Jobs.Store = a.Srv.Store
if *utils.Cfg.JobSettings.RunJobs {
- jobs.Srv.StartWorkers()
+ a.Jobs.StartWorkers()
}
if *utils.Cfg.JobSettings.RunScheduler {
- jobs.Srv.StartSchedulers()
+ a.Jobs.StartSchedulers()
}
// wait for kill signal before attempting to gracefully shutdown
@@ -160,8 +159,8 @@ func runServer(configFileLocation string) {
a.Metrics.StopServer()
}
- jobs.Srv.StopSchedulers()
- jobs.Srv.StopWorkers()
+ a.Jobs.StopSchedulers()
+ a.Jobs.StopWorkers()
a.StopServer()
}
diff --git a/einterfaces/jobs/data_retention.go b/einterfaces/jobs/data_retention.go
index 887ce561b..5910d6120 100644
--- a/einterfaces/jobs/data_retention.go
+++ b/einterfaces/jobs/data_retention.go
@@ -11,13 +11,3 @@ type DataRetentionInterface interface {
MakeWorker() model.Worker
MakeScheduler() model.Scheduler
}
-
-var theDataRetentionInterface DataRetentionInterface
-
-func RegisterDataRetentionInterface(newInterface DataRetentionInterface) {
- theDataRetentionInterface = newInterface
-}
-
-func GetDataRetentionInterface() DataRetentionInterface {
- return theDataRetentionInterface
-}
diff --git a/einterfaces/jobs/elasticsearch.go b/einterfaces/jobs/elasticsearch.go
index 513a6c323..16e0d7697 100644
--- a/einterfaces/jobs/elasticsearch.go
+++ b/einterfaces/jobs/elasticsearch.go
@@ -11,27 +11,7 @@ type ElasticsearchIndexerInterface interface {
MakeWorker() model.Worker
}
-var theElasticsearchIndexerInterface ElasticsearchIndexerInterface
-
-func RegisterElasticsearchIndexerInterface(newInterface ElasticsearchIndexerInterface) {
- theElasticsearchIndexerInterface = newInterface
-}
-
-func GetElasticsearchIndexerInterface() ElasticsearchIndexerInterface {
- return theElasticsearchIndexerInterface
-}
-
type ElasticsearchAggregatorInterface interface {
MakeWorker() model.Worker
MakeScheduler() model.Scheduler
}
-
-var theElasticsearchAggregatorInterface ElasticsearchAggregatorInterface
-
-func RegisterElasticsearchAggregatorInterface(newInterface ElasticsearchAggregatorInterface) {
- theElasticsearchAggregatorInterface = newInterface
-}
-
-func GetElasticsearchAggregatorInterface() ElasticsearchAggregatorInterface {
- return theElasticsearchAggregatorInterface
-}
diff --git a/einterfaces/jobs/ldap_sync.go b/einterfaces/jobs/ldap_sync.go
index 97055bfcc..5565afe41 100644
--- a/einterfaces/jobs/ldap_sync.go
+++ b/einterfaces/jobs/ldap_sync.go
@@ -11,13 +11,3 @@ type LdapSyncInterface interface {
MakeWorker() model.Worker
MakeScheduler() model.Scheduler
}
-
-var theLdapSyncInterface LdapSyncInterface
-
-func RegisterLdapSyncInterface(newInterface LdapSyncInterface) {
- theLdapSyncInterface = newInterface
-}
-
-func GetLdapSyncInterface() LdapSyncInterface {
- return theLdapSyncInterface
-}
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 22d87e850..7e49b2f48 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -18,7 +18,7 @@ const (
CANCEL_WATCHER_POLLING_INTERVAL = 5000
)
-func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) {
+func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) {
job := model.Job{
Id: model.NewId(),
Type: jobType,
@@ -31,23 +31,23 @@ func CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.Ap
return nil, err
}
- if result := <-Srv.Store.Job().Save(&job); result.Err != nil {
+ if result := <-srv.Store.Job().Save(&job); result.Err != nil {
return nil, result.Err
}
return &job, nil
}
-func GetJob(id string) (*model.Job, *model.AppError) {
- if result := <-Srv.Store.Job().Get(id); result.Err != nil {
+func (srv *JobServer) GetJob(id string) (*model.Job, *model.AppError) {
+ if result := <-srv.Store.Job().Get(id); result.Err != nil {
return nil, result.Err
} else {
return result.Data.(*model.Job), nil
}
}
-func ClaimJob(job *model.Job) (bool, *model.AppError) {
- if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+func (srv *JobServer) ClaimJob(job *model.Job) (bool, *model.AppError) {
+ if result := <-srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return false, result.Err
} else {
success := result.Data.(bool)
@@ -55,25 +55,25 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) {
}
}
-func SetJobProgress(job *model.Job, progress int64) *model.AppError {
+func (srv *JobServer) SetJobProgress(job *model.Job, progress int64) *model.AppError {
job.Status = model.JOB_STATUS_IN_PROGRESS
job.Progress = progress
- if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return result.Err
} else {
return nil
}
}
-func SetJobSuccess(job *model.Job) *model.AppError {
- result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
+func (srv *JobServer) SetJobSuccess(job *model.Job) *model.AppError {
+ result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
return result.Err
}
-func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
+func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
if jobError == nil {
- result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
+ result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
return result.Err
}
@@ -85,11 +85,11 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
jobError.Translate(utils.T)
job.Data["error"] = jobError.Message + " (" + jobError.DetailedError + ")"
- if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
+ if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return result.Err
} else {
if !result.Data.(bool) {
- if result := <-Srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
return result.Err
} else {
if !result.Data.(bool) {
@@ -102,19 +102,19 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
return nil
}
-func SetJobCanceled(job *model.Job) *model.AppError {
- result := <-Srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
+func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError {
+ result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
return result.Err
}
-func RequestCancellation(jobId string) *model.AppError {
- if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
+func (srv *JobServer) RequestCancellation(jobId string) *model.AppError {
+ if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
return result.Err
} else if result.Data.(bool) {
return nil
}
- if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
return result.Err
} else if result.Data.(bool) {
return nil
@@ -123,7 +123,7 @@ func RequestCancellation(jobId string) *model.AppError {
return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id="+jobId, http.StatusInternalServerError)
}
-func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
+func (srv *JobServer) CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
for {
select {
case <-ctx.Done():
@@ -131,7 +131,7 @@ func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan inte
return
case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond):
l4g.Debug("CancellationWatcher for Job: %v polling.", jobId)
- if result := <-Srv.Store.Job().Get(jobId); result.Err == nil {
+ if result := <-srv.Store.Job().Get(jobId); result.Err == nil {
jobStatus := result.Data.(*model.Job)
if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED {
close(cancelChan)
@@ -152,16 +152,16 @@ func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Tim
return &nextTime
}
-func CheckForPendingJobsByType(jobType string) (bool, *model.AppError) {
- if result := <-Srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil {
+func (srv *JobServer) CheckForPendingJobsByType(jobType string) (bool, *model.AppError) {
+ if result := <-srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil {
return false, result.Err
} else {
return result.Data.(int64) > 0, nil
}
}
-func GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) {
- if result := <-Srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil {
+func (srv *JobServer) GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) {
+ if result := <-srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil {
return nil, result.Err
} else {
return result.Data.(*model.Job), nil
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go
index 56cf9eb2e..f9a958fe3 100644
--- a/jobs/jobs_watcher.go
+++ b/jobs/jobs_watcher.go
@@ -16,6 +16,7 @@ const (
)
type Watcher struct {
+ srv *JobServer
workers *Workers
stop chan bool
@@ -23,12 +24,13 @@ type Watcher struct {
pollingInterval int
}
-func MakeWatcher(workers *Workers, pollingInterval int) *Watcher {
+func (srv *JobServer) MakeWatcher(workers *Workers, pollingInterval int) *Watcher {
return &Watcher{
stop: make(chan bool, 1),
stopped: make(chan bool, 1),
pollingInterval: pollingInterval,
workers: workers,
+ srv: srv,
}
}
@@ -63,7 +65,7 @@ func (watcher *Watcher) Stop() {
}
func (watcher *Watcher) PollAndNotify() {
- if result := <-Srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
+ if result := <-watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error())
} else {
jobs := result.Data.([]*model.Job)
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
index cdf8d956d..2f1ae394f 100644
--- a/jobs/schedulers.go
+++ b/jobs/schedulers.go
@@ -9,7 +9,6 @@ import (
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -20,28 +19,31 @@ type Schedulers struct {
configChanged chan *model.Config
listenerId string
startOnce sync.Once
+ jobs *JobServer
schedulers []model.Scheduler
nextRunTimes []*time.Time
}
-func InitSchedulers() *Schedulers {
+func (srv *JobServer) InitSchedulers() *Schedulers {
l4g.Debug("Initialising schedulers.")
+
schedulers := &Schedulers{
stop: make(chan bool),
stopped: make(chan bool),
configChanged: make(chan *model.Config),
+ jobs: srv,
}
- if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ if dataRetentionInterface := srv.DataRetention; dataRetentionInterface != nil {
schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler())
}
- if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil {
+ if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil {
schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler())
}
- if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil {
+ if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil {
schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler())
}
@@ -124,7 +126,7 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim
scheduler := schedulers.schedulers[idx]
if !pendingJobs {
- if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil {
+ if pj, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType()); err != nil {
l4g.Error("Failed to set next job run time: " + err.Error())
schedulers.nextRunTimes[idx] = nil
return
@@ -133,7 +135,7 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim
}
}
- lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType())
+ lastSuccessfulJob, err := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType())
if err != nil {
l4g.Error("Failed to set next job run time: " + err.Error())
schedulers.nextRunTimes[idx] = nil
@@ -145,12 +147,12 @@ func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now tim
}
func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) {
- pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType())
+ pendingJobs, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType())
if err != nil {
return nil, err
}
- lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType())
+ lastSuccessfulJob, err2 := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType())
if err2 != nil {
return nil, err
}
diff --git a/jobs/server.go b/jobs/server.go
index 6c857e7dc..667d6c075 100644
--- a/jobs/server.go
+++ b/jobs/server.go
@@ -5,6 +5,8 @@ package jobs
import (
l4g "github.com/alecthomas/log4go"
+
+ ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
"github.com/mattermost/mattermost-server/utils"
@@ -14,13 +16,16 @@ type JobServer struct {
Store store.Store
Workers *Workers
Schedulers *Schedulers
-}
-var Srv JobServer
+ DataRetention ejobs.DataRetentionInterface
+ ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface
+ ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface
+ LdapSync ejobs.LdapSyncInterface
+}
-func (server *JobServer) LoadLicense() {
+func (srv *JobServer) LoadLicense() {
licenseId := ""
- if result := <-server.Store.System().Get(); result.Err == nil {
+ if result := <-srv.Store.System().Get(); result.Err == nil {
props := result.Data.(model.StringMap)
licenseId = props[model.SYSTEM_ACTIVE_LICENSE_ID]
}
@@ -31,7 +36,7 @@ func (server *JobServer) LoadLicense() {
// Lets attempt to load the file from disk since it was missing from the DB
_, licenseBytes = utils.GetAndValidateLicenseFileFromDisk()
} else {
- if result := <-server.Store.License().Get(licenseId); result.Err == nil {
+ if result := <-srv.Store.License().Get(licenseId); result.Err == nil {
record := result.Data.(*model.LicenseRecord)
licenseBytes = []byte(record.Bytes)
l4g.Info("License key valid unlocking enterprise features.")
@@ -48,22 +53,22 @@ func (server *JobServer) LoadLicense() {
}
}
-func (server *JobServer) StartWorkers() {
- Srv.Workers = InitWorkers().Start()
+func (srv *JobServer) StartWorkers() {
+ srv.Workers = srv.InitWorkers().Start()
}
-func (server *JobServer) StartSchedulers() {
- Srv.Schedulers = InitSchedulers().Start()
+func (srv *JobServer) StartSchedulers() {
+ srv.Schedulers = srv.InitSchedulers().Start()
}
-func (server *JobServer) StopWorkers() {
- if Srv.Workers != nil {
- Srv.Workers.Stop()
+func (srv *JobServer) StopWorkers() {
+ if srv.Workers != nil {
+ srv.Workers.Stop()
}
}
-func (server *JobServer) StopSchedulers() {
- if Srv.Schedulers != nil {
- Srv.Schedulers.Stop()
+func (srv *JobServer) StopSchedulers() {
+ if srv.Schedulers != nil {
+ srv.Schedulers.Stop()
}
}
diff --git a/jobs/testworker.go b/jobs/testworker.go
index 29608e909..9cfc8614f 100644
--- a/jobs/testworker.go
+++ b/jobs/testworker.go
@@ -12,14 +12,16 @@ import (
)
type TestWorker struct {
+ srv *JobServer
name string
stop chan bool
stopped chan bool
jobs chan model.Job
}
-func MakeTestWorker(name string) *TestWorker {
+func (srv *JobServer) MakeTestWorker(name string) *TestWorker {
return &TestWorker{
+ srv: srv,
name: name,
stop: make(chan bool, 1),
stopped: make(chan bool, 1),
@@ -48,7 +50,7 @@ func (worker *TestWorker) Run() {
}
func (worker *TestWorker) DoJob(job *model.Job) {
- if claimed, err := ClaimJob(job); err != nil {
+ if claimed, err := worker.srv.ClaimJob(job); err != nil {
l4g.Error("Job: %v: Error occurred while trying to claim job: %v", job.Id, err.Error())
return
} else if !claimed {
@@ -57,7 +59,7 @@ func (worker *TestWorker) DoJob(job *model.Job) {
cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
cancelWatcherChan := make(chan interface{}, 1)
- go CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
+ go worker.srv.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
defer cancelCancelWatcher()
@@ -66,13 +68,13 @@ func (worker *TestWorker) DoJob(job *model.Job) {
select {
case <-cancelWatcherChan:
l4g.Debug("Job %v: Job has been canceled via CancellationWatcher.", job.Id)
- if err := SetJobCanceled(job); err != nil {
+ if err := worker.srv.SetJobCanceled(job); err != nil {
l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
}
return
case <-worker.stop:
l4g.Debug("Job %v: Job has been canceled via Worker Stop.", job.Id)
- if err := SetJobCanceled(job); err != nil {
+ if err := worker.srv.SetJobCanceled(job); err != nil {
l4g.Error("Failed to mark job: %v as canceled. Error: %v", job.Id, err.Error())
}
return
@@ -80,12 +82,12 @@ func (worker *TestWorker) DoJob(job *model.Job) {
counter++
if counter > 10 {
l4g.Debug("Job %v: Job completed.", job.Id)
- if err := SetJobSuccess(job); err != nil {
+ if err := worker.srv.SetJobSuccess(job); err != nil {
l4g.Error("Failed to mark job: %v as succeeded. Error: %v", job.Id, err.Error())
}
return
} else {
- if err := SetJobProgress(job, int64(counter*10)); err != nil {
+ if err := worker.srv.SetJobProgress(job, int64(counter*10)); err != nil {
l4g.Error("Job: %v: an error occured while trying to set job progress: %v", job.Id, err.Error())
}
}
diff --git a/jobs/workers.go b/jobs/workers.go
index 9f85adaf5..dfa150ff6 100644
--- a/jobs/workers.go
+++ b/jobs/workers.go
@@ -7,7 +7,6 @@ import (
"sync"
l4g "github.com/alecthomas/log4go"
- ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -24,23 +23,23 @@ type Workers struct {
listenerId string
}
-func InitWorkers() *Workers {
+func (srv *JobServer) InitWorkers() *Workers {
workers := &Workers{}
- workers.Watcher = MakeWatcher(workers, DEFAULT_WATCHER_POLLING_INTERVAL)
+ workers.Watcher = srv.MakeWatcher(workers, DEFAULT_WATCHER_POLLING_INTERVAL)
- if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ if dataRetentionInterface := srv.DataRetention; dataRetentionInterface != nil {
workers.DataRetention = dataRetentionInterface.MakeWorker()
}
- if elasticsearchIndexerInterface := ejobs.GetElasticsearchIndexerInterface(); elasticsearchIndexerInterface != nil {
+ if elasticsearchIndexerInterface := srv.ElasticsearchIndexer; elasticsearchIndexerInterface != nil {
workers.ElasticsearchIndexing = elasticsearchIndexerInterface.MakeWorker()
}
- if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil {
+ if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil {
workers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeWorker()
}
- if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil {
+ if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil {
workers.LdapSync = ldapSyncInterface.MakeWorker()
}