summaryrefslogtreecommitdiffstats
path: root/migrations/worker.go
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2018-05-14 15:59:04 +0100
committerGitHub <noreply@github.com>2018-05-14 15:59:04 +0100
commit51bd710ecdca6628461c9fa2679737073e4d5059 (patch)
treeb2a4837ced3ed515ee505728917a6630b0553f76 /migrations/worker.go
parent91557bbd978500388a11b99401783164e143a966 (diff)
downloadchat-51bd710ecdca6628461c9fa2679737073e4d5059.tar.gz
chat-51bd710ecdca6628461c9fa2679737073e4d5059.tar.bz2
chat-51bd710ecdca6628461c9fa2679737073e4d5059.zip
MM-9728: Online migration for advanced permissions phase 2 (#8744)
* MM-9728: Online migration for advanced permissions phase 2 * Add unit tests for new store functions. * Move migration specific code to own file. * Add migration state function test. * Style fixes. * Add i18n strings. * Fix mocks. * Add TestMain to migrations package tests. * Fix typo. * Fix review comments. * Fix up the "Check if migration is done" check to actually work.
Diffstat (limited to 'migrations/worker.go')
-rw-r--r--migrations/worker.go166
1 files changed, 166 insertions, 0 deletions
diff --git a/migrations/worker.go b/migrations/worker.go
new file mode 100644
index 000000000..7a64dd609
--- /dev/null
+++ b/migrations/worker.go
@@ -0,0 +1,166 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package migrations
+
+import (
+ "context"
+ "net/http"
+ "time"
+
+ "github.com/mattermost/mattermost-server/app"
+ "github.com/mattermost/mattermost-server/jobs"
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+)
+
+const (
+ TIME_BETWEEN_BATCHES = 100
+)
+
+type Worker struct {
+ name string
+ stop chan bool
+ stopped chan bool
+ jobs chan model.Job
+ jobServer *jobs.JobServer
+ app *app.App
+}
+
+func (m *MigrationsJobInterfaceImpl) MakeWorker() model.Worker {
+ worker := Worker{
+ name: "Migrations",
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ jobs: make(chan model.Job),
+ jobServer: m.App.Jobs,
+ app: m.App,
+ }
+
+ return &worker
+}
+
+func (worker *Worker) Run() {
+ mlog.Debug("Worker started", mlog.String("worker", worker.name))
+
+ defer func() {
+ mlog.Debug("Worker finished", mlog.String("worker", worker.name))
+ worker.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-worker.stop:
+ mlog.Debug("Worker received stop signal", mlog.String("worker", worker.name))
+ return
+ case job := <-worker.jobs:
+ mlog.Debug("Worker received a new candidate job.", mlog.String("worker", worker.name))
+ worker.DoJob(&job)
+ }
+ }
+}
+
+func (worker *Worker) Stop() {
+ mlog.Debug("Worker stopping", mlog.String("worker", worker.name))
+ worker.stop <- true
+ <-worker.stopped
+}
+
+func (worker *Worker) JobChannel() chan<- model.Job {
+ return worker.jobs
+}
+
+func (worker *Worker) DoJob(job *model.Job) {
+ if claimed, err := worker.jobServer.ClaimJob(job); err != nil {
+ mlog.Info("Worker experienced an error while trying to claim job",
+ mlog.String("worker", worker.name),
+ mlog.String("job_id", job.Id),
+ mlog.String("error", err.Error()))
+ return
+ } else if !claimed {
+ return
+ }
+
+ cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
+ cancelWatcherChan := make(chan interface{}, 1)
+ go worker.app.Jobs.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
+
+ defer cancelCancelWatcher()
+
+ for {
+ select {
+ case <-cancelWatcherChan:
+ mlog.Debug("Worker: Job has been canceled via CancellationWatcher", mlog.String("worker", worker.name), mlog.String("job_id", job.Id))
+ worker.setJobCanceled(job)
+ return
+
+ case <-worker.stop:
+ mlog.Debug("Worker: Job has been canceled via Worker Stop", mlog.String("worker", worker.name), mlog.String("job_id", job.Id))
+ worker.setJobCanceled(job)
+ return
+
+ case <-time.After(TIME_BETWEEN_BATCHES * time.Millisecond):
+ done, progress, err := worker.runMigration(job.Data[JOB_DATA_KEY_MIGRATION], job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE])
+ if err != nil {
+ mlog.Error("Worker: Failed to run migration", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ return
+ } else if done {
+ mlog.Info("Worker: Job is complete", mlog.String("worker", worker.name), mlog.String("job_id", job.Id))
+ worker.setJobSuccess(job)
+ return
+ } else {
+ job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE] = progress
+ if err := worker.app.Jobs.UpdateInProgressJobData(job); err != nil {
+ mlog.Error("Worker: Failed to update migration status data for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ return
+ }
+ }
+ }
+ }
+}
+
+func (worker *Worker) setJobSuccess(job *model.Job) {
+ if err := worker.app.Jobs.SetJobSuccess(job); err != nil {
+ mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ }
+}
+
+func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) {
+ if err := worker.app.Jobs.SetJobError(job, appError); err != nil {
+ mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ }
+}
+
+func (worker *Worker) setJobCanceled(job *model.Job) {
+ if err := worker.app.Jobs.SetJobCanceled(job); err != nil {
+ mlog.Error("Worker: Failed to mark job as canceled", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ }
+}
+
+// Return parameters:
+// - whether the migration is completed on this run (true) or still incomplete (false).
+// - the updated lastDone string for the migration.
+// - any error which may have occurred while running the migration.
+func (worker *Worker) runMigration(key string, lastDone string) (bool, string, *model.AppError) {
+ var done bool
+ var progress string
+ var err *model.AppError
+
+ switch key {
+ case model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2:
+ done, progress, err = worker.runAdvancedPermissionsPhase2Migration(lastDone)
+ default:
+ return false, "", model.NewAppError("MigrationsWorker.runMigration", "migrations.worker.run_migration.unknown_key", map[string]interface{}{"key": key}, "", http.StatusInternalServerError)
+ }
+
+ if done {
+ if result := <-worker.app.Srv.Store.System().Save(&model.System{Name: key, Value: "true"}); result.Err != nil {
+ return false, "", result.Err
+ }
+ }
+
+ return done, progress, err
+}