From 51bd710ecdca6628461c9fa2679737073e4d5059 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Mon, 14 May 2018 15:59:04 +0100 Subject: MM-9728: Online migration for advanced permissions phase 2 (#8744) * MM-9728: Online migration for advanced permissions phase 2 * Add unit tests for new store functions. * Move migration specific code to own file. * Add migration state function test. * Style fixes. * Add i18n strings. * Fix mocks. * Add TestMain to migrations package tests. * Fix typo. * Fix review comments. * Fix up the "Check if migration is done" check to actually work. --- jobs/interfaces/migrations_interface.go | 11 +++++++++++ jobs/jobs.go | 7 +++++++ jobs/jobs_watcher.go | 7 +++++++ jobs/schedulers.go | 4 ++++ jobs/server.go | 2 ++ jobs/workers.go | 13 +++++++++++++ 6 files changed, 44 insertions(+) create mode 100644 jobs/interfaces/migrations_interface.go (limited to 'jobs') diff --git a/jobs/interfaces/migrations_interface.go b/jobs/interfaces/migrations_interface.go new file mode 100644 index 000000000..48dc9f579 --- /dev/null +++ b/jobs/interfaces/migrations_interface.go @@ -0,0 +1,11 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package interfaces + +import "github.com/mattermost/mattermost-server/model" + +type MigrationsJobInterface interface { + MakeWorker() model.Worker + MakeScheduler() model.Scheduler +} diff --git a/jobs/jobs.go b/jobs/jobs.go index 850491403..ddbc4489b 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -106,6 +106,13 @@ func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError { return result.Err } +func (srv *JobServer) UpdateInProgressJobData(job *model.Job) *model.AppError { + job.Status = model.JOB_STATUS_IN_PROGRESS + job.LastActivityAt = model.GetMillis() + result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS) + return result.Err +} + func (srv *JobServer) RequestCancellation(jobId string) *model.AppError { if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { return result.Err diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index 07979442d..01d0a8d0f 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -107,6 +107,13 @@ func (watcher *Watcher) PollAndNotify() { default: } } + } else if job.Type == model.JOB_TYPE_MIGRATIONS { + if watcher.workers.Migrations != nil { + select { + case watcher.workers.Migrations.JobChannel() <- *job: + default: + } + } } } } diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 2823036df..96aa2b635 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -50,6 +50,10 @@ func (srv *JobServer) InitSchedulers() *Schedulers { schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler()) } + if migrationsInterface := srv.Migrations; migrationsInterface != nil { + schedulers.schedulers = append(schedulers.schedulers, migrationsInterface.MakeScheduler()) + } + schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers)) return schedulers } diff --git a/jobs/server.go b/jobs/server.go index 01cf821dc..10ea9a46f 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -5,6 +5,7 @@ package jobs import ( ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs" + tjobs "github.com/mattermost/mattermost-server/jobs/interfaces" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" ) @@ -34,6 +35,7 @@ type JobServer struct { ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface LdapSync ejobs.LdapSyncInterface + Migrations tjobs.MigrationsJobInterface } func NewJobServer(configService ConfigService, store store.Store) *JobServer { diff --git a/jobs/workers.go b/jobs/workers.go index 57a255013..67ab43241 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -20,6 +20,7 @@ type Workers struct { ElasticsearchIndexing model.Worker ElasticsearchAggregation model.Worker LdapSync model.Worker + Migrations model.Worker listenerId string } @@ -50,6 +51,10 @@ func (srv *JobServer) InitWorkers() *Workers { workers.LdapSync = ldapSyncInterface.MakeWorker() } + if migrationsInterface := srv.Migrations; migrationsInterface != nil { + workers.Migrations = migrationsInterface.MakeWorker() + } + return workers } @@ -77,6 +82,10 @@ func (workers *Workers) Start() *Workers { go workers.LdapSync.Run() } + if workers.Migrations != nil { + go workers.Migrations.Run() + } + go workers.Watcher.Start() }) @@ -152,6 +161,10 @@ func (workers *Workers) Stop() *Workers { workers.LdapSync.Stop() } + if workers.Migrations != nil { + workers.Migrations.Stop() + } + mlog.Info("Stopped workers") return workers -- cgit v1.2.3-1-g7c22