From 8766690c81fcefdbe0c9d85590de1eea07a908d7 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Tue, 31 Jul 2018 15:40:23 +0100 Subject: MM-10502: Only cluster master should run job schedulers. (#9174) * MM-10502: Only cluster master should run job schedulers. * Use sync.Map for thread safety. * Fix tests. --- app/app.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) (limited to 'app/app.go') diff --git a/app/app.go b/app/app.go index 6da16c28c..5cedca2ad 100644 --- a/app/app.go +++ b/app/app.go @@ -64,10 +64,11 @@ type App struct { Mfa einterfaces.MfaInterface Saml einterfaces.SamlInterface - config atomic.Value - envConfig map[string]interface{} - configFile string - configListeners map[string]func(*model.Config, *model.Config) + config atomic.Value + envConfig map[string]interface{} + configFile string + configListeners map[string]func(*model.Config, *model.Config) + clusterLeaderListeners sync.Map licenseValue atomic.Value clientLicenseValue atomic.Value @@ -79,14 +80,15 @@ type App struct { newStore func() store.Store - htmlTemplateWatcher *utils.HTMLTemplateWatcher - sessionCache *utils.Cache - configListenerId string - licenseListenerId string - logListenerId string - disableConfigWatch bool - configWatcher *utils.ConfigWatcher - asymmetricSigningKey *ecdsa.PrivateKey + htmlTemplateWatcher *utils.HTMLTemplateWatcher + sessionCache *utils.Cache + configListenerId string + licenseListenerId string + logListenerId string + clusterLeaderListenerId string + disableConfigWatch bool + configWatcher *utils.ConfigWatcher + asymmetricSigningKey *ecdsa.PrivateKey pluginCommands []*PluginCommand pluginCommandsLock sync.RWMutex @@ -218,6 +220,10 @@ func New(options ...Option) (outApp *App, outErr error) { app.initJobs() }) + app.clusterLeaderListenerId = app.AddClusterLeaderChangedListener(func() { + app.Jobs.Schedulers.HandleClusterLeaderChange(app.IsLeader()) + }) + subpath, err := utils.GetSubpathFromConfig(app.Config()) if err != nil { return nil, errors.Wrap(err, "failed to parse SiteURL subpath") @@ -270,6 +276,7 @@ func (a *App) Shutdown() { a.RemoveConfigListener(a.configListenerId) a.RemoveLicenseListener(a.licenseListenerId) a.RemoveConfigListener(a.logListenerId) + a.RemoveClusterLeaderChangedListener(a.clusterLeaderListenerId) mlog.Info("Server stopped") a.DisableConfigWatch() @@ -432,6 +439,8 @@ func (a *App) initJobs() { if jobsMigrationsInterface != nil { a.Jobs.Migrations = jobsMigrationsInterface(a) } + a.Jobs.Workers = a.Jobs.InitWorkers() + a.Jobs.Schedulers = a.Jobs.InitSchedulers() } func (a *App) DiagnosticId() string { -- cgit v1.2.3-1-g7c22