summaryrefslogtreecommitdiffstats
path: root/jobs
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2018-05-14 15:59:04 +0100
committerGitHub <noreply@github.com>2018-05-14 15:59:04 +0100
commit51bd710ecdca6628461c9fa2679737073e4d5059 (patch)
treeb2a4837ced3ed515ee505728917a6630b0553f76 /jobs
parent91557bbd978500388a11b99401783164e143a966 (diff)
downloadchat-51bd710ecdca6628461c9fa2679737073e4d5059.tar.gz
chat-51bd710ecdca6628461c9fa2679737073e4d5059.tar.bz2
chat-51bd710ecdca6628461c9fa2679737073e4d5059.zip
MM-9728: Online migration for advanced permissions phase 2 (#8744)
* MM-9728: Online migration for advanced permissions phase 2 * Add unit tests for new store functions. * Move migration specific code to own file. * Add migration state function test. * Style fixes. * Add i18n strings. * Fix mocks. * Add TestMain to migrations package tests. * Fix typo. * Fix review comments. * Fix up the "Check if migration is done" check to actually work.
Diffstat (limited to 'jobs')
-rw-r--r--jobs/interfaces/migrations_interface.go11
-rw-r--r--jobs/jobs.go7
-rw-r--r--jobs/jobs_watcher.go7
-rw-r--r--jobs/schedulers.go4
-rw-r--r--jobs/server.go2
-rw-r--r--jobs/workers.go13
6 files changed, 44 insertions, 0 deletions
diff --git a/jobs/interfaces/migrations_interface.go b/jobs/interfaces/migrations_interface.go
new file mode 100644
index 000000000..48dc9f579
--- /dev/null
+++ b/jobs/interfaces/migrations_interface.go
@@ -0,0 +1,11 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package interfaces
+
+import "github.com/mattermost/mattermost-server/model"
+
+type MigrationsJobInterface interface {
+ MakeWorker() model.Worker
+ MakeScheduler() model.Scheduler
+}
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 850491403..ddbc4489b 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -106,6 +106,13 @@ func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError {
return result.Err
}
+func (srv *JobServer) UpdateInProgressJobData(job *model.Job) *model.AppError {
+ job.Status = model.JOB_STATUS_IN_PROGRESS
+ job.LastActivityAt = model.GetMillis()
+ result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS)
+ return result.Err
+}
+
func (srv *JobServer) RequestCancellation(jobId string) *model.AppError {
if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
return result.Err
diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go
index 07979442d..01d0a8d0f 100644
--- a/jobs/jobs_watcher.go
+++ b/jobs/jobs_watcher.go
@@ -107,6 +107,13 @@ func (watcher *Watcher) PollAndNotify() {
default:
}
}
+ } else if job.Type == model.JOB_TYPE_MIGRATIONS {
+ if watcher.workers.Migrations != nil {
+ select {
+ case watcher.workers.Migrations.JobChannel() <- *job:
+ default:
+ }
+ }
}
}
}
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
index 2823036df..96aa2b635 100644
--- a/jobs/schedulers.go
+++ b/jobs/schedulers.go
@@ -50,6 +50,10 @@ func (srv *JobServer) InitSchedulers() *Schedulers {
schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler())
}
+ if migrationsInterface := srv.Migrations; migrationsInterface != nil {
+ schedulers.schedulers = append(schedulers.schedulers, migrationsInterface.MakeScheduler())
+ }
+
schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers))
return schedulers
}
diff --git a/jobs/server.go b/jobs/server.go
index 01cf821dc..10ea9a46f 100644
--- a/jobs/server.go
+++ b/jobs/server.go
@@ -5,6 +5,7 @@ package jobs
import (
ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
+ tjobs "github.com/mattermost/mattermost-server/jobs/interfaces"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
)
@@ -34,6 +35,7 @@ type JobServer struct {
ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface
ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface
LdapSync ejobs.LdapSyncInterface
+ Migrations tjobs.MigrationsJobInterface
}
func NewJobServer(configService ConfigService, store store.Store) *JobServer {
diff --git a/jobs/workers.go b/jobs/workers.go
index 57a255013..67ab43241 100644
--- a/jobs/workers.go
+++ b/jobs/workers.go
@@ -20,6 +20,7 @@ type Workers struct {
ElasticsearchIndexing model.Worker
ElasticsearchAggregation model.Worker
LdapSync model.Worker
+ Migrations model.Worker
listenerId string
}
@@ -50,6 +51,10 @@ func (srv *JobServer) InitWorkers() *Workers {
workers.LdapSync = ldapSyncInterface.MakeWorker()
}
+ if migrationsInterface := srv.Migrations; migrationsInterface != nil {
+ workers.Migrations = migrationsInterface.MakeWorker()
+ }
+
return workers
}
@@ -77,6 +82,10 @@ func (workers *Workers) Start() *Workers {
go workers.LdapSync.Run()
}
+ if workers.Migrations != nil {
+ go workers.Migrations.Run()
+ }
+
go workers.Watcher.Start()
})
@@ -152,6 +161,10 @@ func (workers *Workers) Stop() *Workers {
workers.LdapSync.Stop()
}
+ if workers.Migrations != nil {
+ workers.Migrations.Stop()
+ }
+
mlog.Info("Stopped workers")
return workers