summaryrefslogtreecommitdiffstats
path: root/migrations
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2018-05-14 15:59:04 +0100
committerGitHub <noreply@github.com>2018-05-14 15:59:04 +0100
commit51bd710ecdca6628461c9fa2679737073e4d5059 (patch)
treeb2a4837ced3ed515ee505728917a6630b0553f76 /migrations
parent91557bbd978500388a11b99401783164e143a966 (diff)
downloadchat-51bd710ecdca6628461c9fa2679737073e4d5059.tar.gz
chat-51bd710ecdca6628461c9fa2679737073e4d5059.tar.bz2
chat-51bd710ecdca6628461c9fa2679737073e4d5059.zip
MM-9728: Online migration for advanced permissions phase 2 (#8744)
* MM-9728: Online migration for advanced permissions phase 2 * Add unit tests for new store functions. * Move migration specific code to own file. * Add migration state function test. * Style fixes. * Add i18n strings. * Fix mocks. * Add TestMain to migrations package tests. * Fix typo. * Fix review comments. * Fix up the "Check if migration is done" check to actually work.
Diffstat (limited to 'migrations')
-rw-r--r--migrations/advanced_permissions_phase_2.go106
-rw-r--r--migrations/migrations.go63
-rw-r--r--migrations/migrations_test.go140
-rw-r--r--migrations/migrationstestlib.go419
-rw-r--r--migrations/scheduler.go110
-rw-r--r--migrations/worker.go166
6 files changed, 1004 insertions, 0 deletions
diff --git a/migrations/advanced_permissions_phase_2.go b/migrations/advanced_permissions_phase_2.go
new file mode 100644
index 000000000..55b1876c4
--- /dev/null
+++ b/migrations/advanced_permissions_phase_2.go
@@ -0,0 +1,106 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package migrations
+
+import (
+ "encoding/json"
+ "io"
+ "net/http"
+ "strings"
+
+ "github.com/mattermost/mattermost-server/model"
+)
+
+type AdvancedPermissionsPhase2Progress struct {
+ CurrentTable string `json:"current_table"`
+ LastTeamId string `json:"last_team_id"`
+ LastChannelId string `json:"last_channel_id"`
+ LastUserId string `json:"last_user"`
+}
+
+func (p *AdvancedPermissionsPhase2Progress) ToJson() string {
+ b, _ := json.Marshal(p)
+ return string(b)
+}
+
+func AdvancedPermissionsPhase2ProgressFromJson(data io.Reader) *AdvancedPermissionsPhase2Progress {
+ var o *AdvancedPermissionsPhase2Progress
+ json.NewDecoder(data).Decode(&o)
+ return o
+}
+
+func (p *AdvancedPermissionsPhase2Progress) IsValid() bool {
+ if len(p.LastChannelId) != 26 {
+ return false
+ }
+
+ if len(p.LastTeamId) != 26 {
+ return false
+ }
+
+ if len(p.LastUserId) != 26 {
+ return false
+ }
+
+ switch p.CurrentTable {
+ case "TeamMembers":
+ case "ChannelMembers":
+ default:
+ return false
+ }
+
+ return true
+}
+
+func (worker *Worker) runAdvancedPermissionsPhase2Migration(lastDone string) (bool, string, *model.AppError) {
+ var progress *AdvancedPermissionsPhase2Progress
+ if len(lastDone) == 0 {
+ // Haven't started the migration yet.
+ progress = new(AdvancedPermissionsPhase2Progress)
+ progress.CurrentTable = "TeamMembers"
+ progress.LastChannelId = strings.Repeat("0", 26)
+ progress.LastTeamId = strings.Repeat("0", 26)
+ progress.LastUserId = strings.Repeat("0", 26)
+ } else {
+ progress = AdvancedPermissionsPhase2ProgressFromJson(strings.NewReader(lastDone))
+ if !progress.IsValid() {
+ return false, "", model.NewAppError("MigrationsWorker.runAdvancedPermissionsPhase2Migration", "migrations.worker.run_advanced_permissions_phase_2_migration.invalid_progress", map[string]interface{}{"progress": progress.ToJson()}, "", http.StatusInternalServerError)
+ }
+ }
+
+ if progress.CurrentTable == "TeamMembers" {
+ // Run a TeamMembers migration batch.
+ if result := <-worker.app.Srv.Store.Team().MigrateTeamMembers(progress.LastTeamId, progress.LastUserId); result.Err != nil {
+ return false, progress.ToJson(), result.Err
+ } else {
+ if result.Data == nil {
+ // We haven't progressed. That means that we've reached the end of this stage of the migration, and should now advance to the next stage.
+ progress.LastUserId = strings.Repeat("0", 26)
+ progress.CurrentTable = "ChannelMembers"
+ return false, progress.ToJson(), nil
+ }
+
+ data := result.Data.(map[string]string)
+ progress.LastTeamId = data["TeamId"]
+ progress.LastUserId = data["UserId"]
+ }
+ } else if progress.CurrentTable == "ChannelMembers" {
+ // Run a ChannelMembers migration batch.
+ if result := <-worker.app.Srv.Store.Channel().MigrateChannelMembers(progress.LastChannelId, progress.LastUserId); result.Err != nil {
+ return false, progress.ToJson(), result.Err
+ } else {
+ if result.Data == nil {
+ // We haven't progressed. That means we've reached the end of this final stage of the migration.
+
+ return true, progress.ToJson(), nil
+ }
+
+ data := result.Data.(map[string]string)
+ progress.LastChannelId = data["ChannelId"]
+ progress.LastUserId = data["UserId"]
+ }
+ }
+
+ return false, progress.ToJson(), nil
+}
diff --git a/migrations/migrations.go b/migrations/migrations.go
new file mode 100644
index 000000000..940992839
--- /dev/null
+++ b/migrations/migrations.go
@@ -0,0 +1,63 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package migrations
+
+import (
+ "github.com/mattermost/mattermost-server/app"
+ tjobs "github.com/mattermost/mattermost-server/jobs/interfaces"
+ "github.com/mattermost/mattermost-server/model"
+ "github.com/mattermost/mattermost-server/store"
+)
+
+const (
+ MIGRATION_STATE_UNSCHEDULED = "unscheduled"
+ MIGRATION_STATE_IN_PROGRESS = "in_progress"
+ MIGRATION_STATE_COMPLETED = "completed"
+
+ JOB_DATA_KEY_MIGRATION = "migration_key"
+ JOB_DATA_KEY_MIGRATION_LAST_DONE = "last_done"
+)
+
+type MigrationsJobInterfaceImpl struct {
+ App *app.App
+}
+
+func init() {
+ app.RegisterJobsMigrationsJobInterface(func(a *app.App) tjobs.MigrationsJobInterface {
+ return &MigrationsJobInterfaceImpl{a}
+ })
+}
+
+func MakeMigrationsList() []string {
+ return []string{
+ model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2,
+ }
+}
+
+func GetMigrationState(migration string, store store.Store) (string, *model.Job, *model.AppError) {
+ if result := <-store.System().GetByName(migration); result.Err == nil {
+ return MIGRATION_STATE_COMPLETED, nil, nil
+ }
+
+ if result := <-store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); result.Err != nil {
+ return "", nil, result.Err
+ } else {
+ for _, job := range result.Data.([]*model.Job) {
+ if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok {
+ if key != migration {
+ continue
+ }
+
+ switch job.Status {
+ case model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_PENDING:
+ return MIGRATION_STATE_IN_PROGRESS, job, nil
+ default:
+ return MIGRATION_STATE_UNSCHEDULED, job, nil
+ }
+ }
+ }
+ }
+
+ return MIGRATION_STATE_UNSCHEDULED, nil, nil
+}
diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go
new file mode 100644
index 000000000..308319430
--- /dev/null
+++ b/migrations/migrations_test.go
@@ -0,0 +1,140 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package migrations
+
+import (
+ "flag"
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+ "github.com/mattermost/mattermost-server/store/storetest"
+ "github.com/mattermost/mattermost-server/utils"
+)
+
+func TestMain(m *testing.M) {
+ flag.Parse()
+
+ // Setup a global logger to catch tests logging outside of app context
+ // The global logger will be stomped by apps initalizing but that's fine for testing. Ideally this won't happen.
+ mlog.InitGlobalLogger(mlog.NewLogger(&mlog.LoggerConfiguration{
+ EnableConsole: true,
+ ConsoleJson: true,
+ ConsoleLevel: "error",
+ EnableFile: false,
+ }))
+
+ utils.TranslationsPreInit()
+
+ // In the case where a dev just wants to run a single test, it's faster to just use the default
+ // store.
+ if filter := flag.Lookup("test.run").Value.String(); filter != "" && filter != "." {
+ mlog.Info("-test.run used, not creating temporary containers")
+ os.Exit(m.Run())
+ }
+
+ status := 0
+
+ container, settings, err := storetest.NewMySQLContainer()
+ if err != nil {
+ panic(err)
+ }
+
+ UseTestStore(container, settings)
+
+ defer func() {
+ StopTestStore()
+ os.Exit(status)
+ }()
+
+ status = m.Run()
+}
+
+func TestGetMigrationState(t *testing.T) {
+ th := Setup()
+ defer th.TearDown()
+
+ migrationKey := model.NewId()
+
+ th.DeleteAllJobsByTypeAndMigrationKey(model.JOB_TYPE_MIGRATIONS, migrationKey)
+
+ // Test with no job yet.
+ state, job, err := GetMigrationState(migrationKey, th.App.Srv.Store)
+ assert.Nil(t, err)
+ assert.Nil(t, job)
+ assert.Equal(t, "unscheduled", state)
+
+ // Test with the system table showing the migration as done.
+ system := model.System{
+ Name: migrationKey,
+ Value: "true",
+ }
+ res1 := <-th.App.Srv.Store.System().Save(&system)
+ assert.Nil(t, res1.Err)
+
+ state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
+ assert.Nil(t, err)
+ assert.Nil(t, job)
+ assert.Equal(t, "completed", state)
+
+ res2 := <-th.App.Srv.Store.System().PermanentDeleteByName(migrationKey)
+ assert.Nil(t, res2.Err)
+
+ // Test with a job scheduled in "pending" state.
+ j1 := &model.Job{
+ Id: model.NewId(),
+ CreateAt: model.GetMillis(),
+ Data: map[string]string{
+ JOB_DATA_KEY_MIGRATION: migrationKey,
+ },
+ Status: model.JOB_STATUS_PENDING,
+ Type: model.JOB_TYPE_MIGRATIONS,
+ }
+
+ j1 = (<-th.App.Srv.Store.Job().Save(j1)).Data.(*model.Job)
+
+ state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
+ assert.Nil(t, err)
+ assert.Equal(t, j1.Id, job.Id)
+ assert.Equal(t, "in_progress", state)
+
+ // Test with a job scheduled in "in progress" state.
+ j2 := &model.Job{
+ Id: model.NewId(),
+ CreateAt: j1.CreateAt + 1,
+ Data: map[string]string{
+ JOB_DATA_KEY_MIGRATION: migrationKey,
+ },
+ Status: model.JOB_STATUS_IN_PROGRESS,
+ Type: model.JOB_TYPE_MIGRATIONS,
+ }
+
+ j2 = (<-th.App.Srv.Store.Job().Save(j2)).Data.(*model.Job)
+
+ state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
+ assert.Nil(t, err)
+ assert.Equal(t, j2.Id, job.Id)
+ assert.Equal(t, "in_progress", state)
+
+ // Test with a job scheduled in "error" state.
+ j3 := &model.Job{
+ Id: model.NewId(),
+ CreateAt: j2.CreateAt + 1,
+ Data: map[string]string{
+ JOB_DATA_KEY_MIGRATION: migrationKey,
+ },
+ Status: model.JOB_STATUS_ERROR,
+ Type: model.JOB_TYPE_MIGRATIONS,
+ }
+
+ j3 = (<-th.App.Srv.Store.Job().Save(j3)).Data.(*model.Job)
+
+ state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
+ assert.Nil(t, err)
+ assert.Equal(t, j3.Id, job.Id)
+ assert.Equal(t, "unscheduled", state)
+}
diff --git a/migrations/migrationstestlib.go b/migrations/migrationstestlib.go
new file mode 100644
index 000000000..b52f7af79
--- /dev/null
+++ b/migrations/migrationstestlib.go
@@ -0,0 +1,419 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package migrations
+
+import (
+ "encoding/json"
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/mattermost/mattermost-server/app"
+ "github.com/mattermost/mattermost-server/einterfaces"
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+ "github.com/mattermost/mattermost-server/plugin"
+ "github.com/mattermost/mattermost-server/plugin/pluginenv"
+ "github.com/mattermost/mattermost-server/store"
+ "github.com/mattermost/mattermost-server/store/sqlstore"
+ "github.com/mattermost/mattermost-server/store/storetest"
+ "github.com/mattermost/mattermost-server/utils"
+)
+
+type TestHelper struct {
+ App *app.App
+ BasicTeam *model.Team
+ BasicUser *model.User
+ BasicUser2 *model.User
+ BasicChannel *model.Channel
+ BasicPost *model.Post
+
+ SystemAdminUser *model.User
+
+ tempConfigPath string
+ tempWorkspace string
+ pluginHooks map[string]plugin.Hooks
+}
+
+type persistentTestStore struct {
+ store.Store
+}
+
+func (*persistentTestStore) Close() {}
+
+var testStoreContainer *storetest.RunningContainer
+var testStore *persistentTestStore
+var testStoreSqlSupplier *sqlstore.SqlSupplier
+var testClusterInterface *FakeClusterInterface
+
+// UseTestStore sets the container and corresponding settings to use for tests. Once the tests are
+// complete (e.g. at the end of your TestMain implementation), you should call StopTestStore.
+func UseTestStore(container *storetest.RunningContainer, settings *model.SqlSettings) {
+ testClusterInterface = &FakeClusterInterface{}
+ testStoreContainer = container
+ testStoreSqlSupplier = sqlstore.NewSqlSupplier(*settings, nil)
+ testStore = &persistentTestStore{store.NewLayeredStore(testStoreSqlSupplier, nil, testClusterInterface)}
+}
+
+func StopTestStore() {
+ if testStoreContainer != nil {
+ testStoreContainer.Stop()
+ testStoreContainer = nil
+ }
+}
+
+func setupTestHelper(enterprise bool) *TestHelper {
+ permConfig, err := os.Open(utils.FindConfigFile("config.json"))
+ if err != nil {
+ panic(err)
+ }
+ defer permConfig.Close()
+ tempConfig, err := ioutil.TempFile("", "")
+ if err != nil {
+ panic(err)
+ }
+ _, err = io.Copy(tempConfig, permConfig)
+ tempConfig.Close()
+ if err != nil {
+ panic(err)
+ }
+
+ options := []app.Option{app.ConfigFile(tempConfig.Name()), app.DisableConfigWatch}
+ if testStore != nil {
+ options = append(options, app.StoreOverride(testStore))
+ }
+
+ a, err := app.New(options...)
+ if err != nil {
+ panic(err)
+ }
+
+ th := &TestHelper{
+ App: a,
+ pluginHooks: make(map[string]plugin.Hooks),
+ tempConfigPath: tempConfig.Name(),
+ }
+
+ th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.MaxUsersPerTeam = 50 })
+ th.App.UpdateConfig(func(cfg *model.Config) { *cfg.RateLimitSettings.Enable = false })
+ prevListenAddress := *th.App.Config().ServiceSettings.ListenAddress
+ if testStore != nil {
+ th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = ":0" })
+ }
+ serverErr := th.App.StartServer()
+ if serverErr != nil {
+ panic(serverErr)
+ }
+
+ th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = prevListenAddress })
+
+ th.App.DoAdvancedPermissionsMigration()
+
+ th.App.Srv.Store.MarkSystemRanUnitTests()
+
+ th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.EnableOpenServer = true })
+
+ if enterprise {
+ th.App.SetLicense(model.NewTestLicense())
+ } else {
+ th.App.SetLicense(nil)
+ }
+
+ return th
+}
+
+func SetupEnterprise() *TestHelper {
+ return setupTestHelper(true)
+}
+
+func Setup() *TestHelper {
+ return setupTestHelper(false)
+}
+
+func (me *TestHelper) InitBasic() *TestHelper {
+ me.BasicTeam = me.CreateTeam()
+ me.BasicUser = me.CreateUser()
+ me.LinkUserToTeam(me.BasicUser, me.BasicTeam)
+ me.BasicUser2 = me.CreateUser()
+ me.LinkUserToTeam(me.BasicUser2, me.BasicTeam)
+ me.BasicChannel = me.CreateChannel(me.BasicTeam)
+ me.BasicPost = me.CreatePost(me.BasicChannel)
+
+ return me
+}
+
+func (me *TestHelper) InitSystemAdmin() *TestHelper {
+ me.SystemAdminUser = me.CreateUser()
+ me.App.UpdateUserRoles(me.SystemAdminUser.Id, model.SYSTEM_USER_ROLE_ID+" "+model.SYSTEM_ADMIN_ROLE_ID, false)
+ me.SystemAdminUser, _ = me.App.GetUser(me.SystemAdminUser.Id)
+
+ return me
+}
+
+func (me *TestHelper) MakeEmail() string {
+ return "success_" + model.NewId() + "@simulator.amazonses.com"
+}
+
+func (me *TestHelper) CreateTeam() *model.Team {
+ id := model.NewId()
+ team := &model.Team{
+ DisplayName: "dn_" + id,
+ Name: "name" + id,
+ Email: "success+" + id + "@simulator.amazonses.com",
+ Type: model.TEAM_OPEN,
+ }
+
+ utils.DisableDebugLogForTest()
+ var err *model.AppError
+ if team, err = me.App.CreateTeam(team); err != nil {
+ mlog.Error(err.Error())
+
+ time.Sleep(time.Second)
+ panic(err)
+ }
+ utils.EnableDebugLogForTest()
+ return team
+}
+
+func (me *TestHelper) CreateUser() *model.User {
+ id := model.NewId()
+
+ user := &model.User{
+ Email: "success+" + id + "@simulator.amazonses.com",
+ Username: "un_" + id,
+ Nickname: "nn_" + id,
+ Password: "Password1",
+ EmailVerified: true,
+ }
+
+ utils.DisableDebugLogForTest()
+ var err *model.AppError
+ if user, err = me.App.CreateUser(user); err != nil {
+ mlog.Error(err.Error())
+
+ time.Sleep(time.Second)
+ panic(err)
+ }
+ utils.EnableDebugLogForTest()
+ return user
+}
+
+func (me *TestHelper) CreateChannel(team *model.Team) *model.Channel {
+ return me.createChannel(team, model.CHANNEL_OPEN)
+}
+
+func (me *TestHelper) createChannel(team *model.Team, channelType string) *model.Channel {
+ id := model.NewId()
+
+ channel := &model.Channel{
+ DisplayName: "dn_" + id,
+ Name: "name_" + id,
+ Type: channelType,
+ TeamId: team.Id,
+ CreatorId: me.BasicUser.Id,
+ }
+
+ utils.DisableDebugLogForTest()
+ var err *model.AppError
+ if channel, err = me.App.CreateChannel(channel, true); err != nil {
+ mlog.Error(err.Error())
+
+ time.Sleep(time.Second)
+ panic(err)
+ }
+ utils.EnableDebugLogForTest()
+ return channel
+}
+
+func (me *TestHelper) CreateDmChannel(user *model.User) *model.Channel {
+ utils.DisableDebugLogForTest()
+ var err *model.AppError
+ var channel *model.Channel
+ if channel, err = me.App.CreateDirectChannel(me.BasicUser.Id, user.Id); err != nil {
+ mlog.Error(err.Error())
+
+ time.Sleep(time.Second)
+ panic(err)
+ }
+ utils.EnableDebugLogForTest()
+ return channel
+}
+
+func (me *TestHelper) CreatePost(channel *model.Channel) *model.Post {
+ id := model.NewId()
+
+ post := &model.Post{
+ UserId: me.BasicUser.Id,
+ ChannelId: channel.Id,
+ Message: "message_" + id,
+ CreateAt: model.GetMillis() - 10000,
+ }
+
+ utils.DisableDebugLogForTest()
+ var err *model.AppError
+ if post, err = me.App.CreatePost(post, channel, false); err != nil {
+ mlog.Error(err.Error())
+
+ time.Sleep(time.Second)
+ panic(err)
+ }
+ utils.EnableDebugLogForTest()
+ return post
+}
+
+func (me *TestHelper) LinkUserToTeam(user *model.User, team *model.Team) {
+ utils.DisableDebugLogForTest()
+
+ err := me.App.JoinUserToTeam(team, user, "")
+ if err != nil {
+ mlog.Error(err.Error())
+
+ time.Sleep(time.Second)
+ panic(err)
+ }
+
+ utils.EnableDebugLogForTest()
+}
+
+func (me *TestHelper) AddUserToChannel(user *model.User, channel *model.Channel) *model.ChannelMember {
+ utils.DisableDebugLogForTest()
+
+ member, err := me.App.AddUserToChannel(user, channel)
+ if err != nil {
+ mlog.Error(err.Error())
+
+ time.Sleep(time.Second)
+ panic(err)
+ }
+
+ utils.EnableDebugLogForTest()
+
+ return member
+}
+
+func (me *TestHelper) TearDown() {
+ me.App.Shutdown()
+ os.Remove(me.tempConfigPath)
+ if err := recover(); err != nil {
+ StopTestStore()
+ panic(err)
+ }
+ if me.tempWorkspace != "" {
+ os.RemoveAll(me.tempWorkspace)
+ }
+}
+
+type mockPluginSupervisor struct {
+ hooks plugin.Hooks
+}
+
+func (s *mockPluginSupervisor) Start(api plugin.API) error {
+ return s.hooks.OnActivate(api)
+}
+
+func (s *mockPluginSupervisor) Stop() error {
+ return nil
+}
+
+func (s *mockPluginSupervisor) Hooks() plugin.Hooks {
+ return s.hooks
+}
+
+func (me *TestHelper) InstallPlugin(manifest *model.Manifest, hooks plugin.Hooks) {
+ if me.tempWorkspace == "" {
+ dir, err := ioutil.TempDir("", "apptest")
+ if err != nil {
+ panic(err)
+ }
+ me.tempWorkspace = dir
+ }
+
+ pluginDir := filepath.Join(me.tempWorkspace, "plugins")
+ webappDir := filepath.Join(me.tempWorkspace, "webapp")
+ me.App.InitPlugins(pluginDir, webappDir, func(bundle *model.BundleInfo) (plugin.Supervisor, error) {
+ if hooks, ok := me.pluginHooks[bundle.Manifest.Id]; ok {
+ return &mockPluginSupervisor{hooks}, nil
+ }
+ return pluginenv.DefaultSupervisorProvider(bundle)
+ })
+
+ me.pluginHooks[manifest.Id] = hooks
+
+ manifestCopy := *manifest
+ if manifestCopy.Backend == nil {
+ manifestCopy.Backend = &model.ManifestBackend{}
+ }
+ manifestBytes, err := json.Marshal(&manifestCopy)
+ if err != nil {
+ panic(err)
+ }
+
+ if err := os.MkdirAll(filepath.Join(pluginDir, manifest.Id), 0700); err != nil {
+ panic(err)
+ }
+
+ if err := ioutil.WriteFile(filepath.Join(pluginDir, manifest.Id, "plugin.json"), manifestBytes, 0600); err != nil {
+ panic(err)
+ }
+}
+
+func (me *TestHelper) ResetRoleMigration() {
+ if _, err := testStoreSqlSupplier.GetMaster().Exec("DELETE from Roles"); err != nil {
+ panic(err)
+ }
+
+ testClusterInterface.sendClearRoleCacheMessage()
+
+ if _, err := testStoreSqlSupplier.GetMaster().Exec("DELETE from Systems where Name = :Name", map[string]interface{}{"Name": app.ADVANCED_PERMISSIONS_MIGRATION_KEY}); err != nil {
+ panic(err)
+ }
+}
+
+func (me *TestHelper) DeleteAllJobsByTypeAndMigrationKey(jobType string, migrationKey string) {
+ if res := <-me.App.Srv.Store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); res.Err != nil {
+ panic(res.Err)
+ } else {
+ jobs := res.Data.([]*model.Job)
+
+ for _, job := range jobs {
+ if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok && key == migrationKey {
+ if res := <-me.App.Srv.Store.Job().Delete(job.Id); res.Err != nil {
+ panic(res.Err)
+ }
+ }
+ }
+ }
+}
+
+type FakeClusterInterface struct {
+ clusterMessageHandler einterfaces.ClusterMessageHandler
+}
+
+func (me *FakeClusterInterface) StartInterNodeCommunication() {}
+func (me *FakeClusterInterface) StopInterNodeCommunication() {}
+func (me *FakeClusterInterface) RegisterClusterMessageHandler(event string, crm einterfaces.ClusterMessageHandler) {
+ me.clusterMessageHandler = crm
+}
+func (me *FakeClusterInterface) GetClusterId() string { return "" }
+func (me *FakeClusterInterface) IsLeader() bool { return false }
+func (me *FakeClusterInterface) GetMyClusterInfo() *model.ClusterInfo { return nil }
+func (me *FakeClusterInterface) GetClusterInfos() []*model.ClusterInfo { return nil }
+func (me *FakeClusterInterface) SendClusterMessage(cluster *model.ClusterMessage) {}
+func (me *FakeClusterInterface) NotifyMsg(buf []byte) {}
+func (me *FakeClusterInterface) GetClusterStats() ([]*model.ClusterStats, *model.AppError) {
+ return nil, nil
+}
+func (me *FakeClusterInterface) GetLogs(page, perPage int) ([]string, *model.AppError) {
+ return []string{}, nil
+}
+func (me *FakeClusterInterface) ConfigChanged(previousConfig *model.Config, newConfig *model.Config, sendToOtherServer bool) *model.AppError {
+ return nil
+}
+func (me *FakeClusterInterface) sendClearRoleCacheMessage() {
+ me.clusterMessageHandler(&model.ClusterMessage{
+ Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_ROLES,
+ })
+}
diff --git a/migrations/scheduler.go b/migrations/scheduler.go
new file mode 100644
index 000000000..8a7ac30d0
--- /dev/null
+++ b/migrations/scheduler.go
@@ -0,0 +1,110 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package migrations
+
+import (
+ "time"
+
+ "github.com/mattermost/mattermost-server/app"
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+ "github.com/mattermost/mattermost-server/store"
+)
+
+const (
+ MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS = 3600000 // 1 hour
+)
+
+type Scheduler struct {
+ App *app.App
+ allMigrationsCompleted bool
+}
+
+func (m *MigrationsJobInterfaceImpl) MakeScheduler() model.Scheduler {
+ return &Scheduler{m.App, false}
+}
+
+func (scheduler *Scheduler) Name() string {
+ return "MigrationsScheduler"
+}
+
+func (scheduler *Scheduler) JobType() string {
+ return model.JOB_TYPE_MIGRATIONS
+}
+
+func (scheduler *Scheduler) Enabled(cfg *model.Config) bool {
+ return true
+}
+
+func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time {
+ if scheduler.allMigrationsCompleted {
+ return nil
+ }
+
+ nextTime := time.Now().Add(60 * time.Second)
+ return &nextTime
+}
+
+func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) {
+ mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name()))
+
+ // Work through the list of migrations in order. Schedule the first one that isn't done (assuming it isn't in progress already).
+ for _, key := range MakeMigrationsList() {
+ state, job, err := GetMigrationState(key, scheduler.App.Srv.Store)
+ if err != nil {
+ mlog.Error("Failed to determine status of migration: ", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key), mlog.String("error", err.Error()))
+ return nil, nil
+ }
+
+ if state == MIGRATION_STATE_IN_PROGRESS {
+ // Check the migration job isn't wedged.
+ if job != nil && job.LastActivityAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS {
+ mlog.Warn("Job appears to be wedged. Rescheduling another instance.", mlog.String("scheduler", scheduler.Name()), mlog.String("wedged_job_id", job.Id), mlog.String("migration_key", key))
+ if err := scheduler.App.Jobs.SetJobError(job, nil); err != nil {
+ mlog.Error("Worker: Failed to set job error", mlog.String("scheduler", scheduler.Name()), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ }
+ return scheduler.createJob(key, job, scheduler.App.Srv.Store)
+ }
+
+ return nil, nil
+ }
+
+ if state == MIGRATION_STATE_COMPLETED {
+ // This migration is done. Continue to check the next.
+ continue
+ }
+
+ if state == MIGRATION_STATE_UNSCHEDULED {
+ mlog.Debug("Scheduling a new job for migration.", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key))
+ return scheduler.createJob(key, job, scheduler.App.Srv.Store)
+ }
+
+ mlog.Error("Unknown migration state. Not doing anything.", mlog.String("migration_state", state))
+ return nil, nil
+ }
+
+ // If we reached here, then there aren't any migrations left to run.
+ scheduler.allMigrationsCompleted = true
+ mlog.Debug("All migrations are complete.", mlog.String("scheduler", scheduler.Name()))
+
+ return nil, nil
+}
+
+func (scheduler *Scheduler) createJob(migrationKey string, lastJob *model.Job, store store.Store) (*model.Job, *model.AppError) {
+ var lastDone string
+ if lastJob != nil {
+ lastDone = lastJob.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE]
+ }
+
+ data := map[string]string{
+ JOB_DATA_KEY_MIGRATION: migrationKey,
+ JOB_DATA_KEY_MIGRATION_LAST_DONE: lastDone,
+ }
+
+ if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_MIGRATIONS, data); err != nil {
+ return nil, err
+ } else {
+ return job, nil
+ }
+}
diff --git a/migrations/worker.go b/migrations/worker.go
new file mode 100644
index 000000000..7a64dd609
--- /dev/null
+++ b/migrations/worker.go
@@ -0,0 +1,166 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package migrations
+
+import (
+ "context"
+ "net/http"
+ "time"
+
+ "github.com/mattermost/mattermost-server/app"
+ "github.com/mattermost/mattermost-server/jobs"
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+)
+
+const (
+ TIME_BETWEEN_BATCHES = 100
+)
+
+type Worker struct {
+ name string
+ stop chan bool
+ stopped chan bool
+ jobs chan model.Job
+ jobServer *jobs.JobServer
+ app *app.App
+}
+
+func (m *MigrationsJobInterfaceImpl) MakeWorker() model.Worker {
+ worker := Worker{
+ name: "Migrations",
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ jobs: make(chan model.Job),
+ jobServer: m.App.Jobs,
+ app: m.App,
+ }
+
+ return &worker
+}
+
+func (worker *Worker) Run() {
+ mlog.Debug("Worker started", mlog.String("worker", worker.name))
+
+ defer func() {
+ mlog.Debug("Worker finished", mlog.String("worker", worker.name))
+ worker.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-worker.stop:
+ mlog.Debug("Worker received stop signal", mlog.String("worker", worker.name))
+ return
+ case job := <-worker.jobs:
+ mlog.Debug("Worker received a new candidate job.", mlog.String("worker", worker.name))
+ worker.DoJob(&job)
+ }
+ }
+}
+
+func (worker *Worker) Stop() {
+ mlog.Debug("Worker stopping", mlog.String("worker", worker.name))
+ worker.stop <- true
+ <-worker.stopped
+}
+
+func (worker *Worker) JobChannel() chan<- model.Job {
+ return worker.jobs
+}
+
+func (worker *Worker) DoJob(job *model.Job) {
+ if claimed, err := worker.jobServer.ClaimJob(job); err != nil {
+ mlog.Info("Worker experienced an error while trying to claim job",
+ mlog.String("worker", worker.name),
+ mlog.String("job_id", job.Id),
+ mlog.String("error", err.Error()))
+ return
+ } else if !claimed {
+ return
+ }
+
+ cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
+ cancelWatcherChan := make(chan interface{}, 1)
+ go worker.app.Jobs.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
+
+ defer cancelCancelWatcher()
+
+ for {
+ select {
+ case <-cancelWatcherChan:
+ mlog.Debug("Worker: Job has been canceled via CancellationWatcher", mlog.String("worker", worker.name), mlog.String("job_id", job.Id))
+ worker.setJobCanceled(job)
+ return
+
+ case <-worker.stop:
+ mlog.Debug("Worker: Job has been canceled via Worker Stop", mlog.String("worker", worker.name), mlog.String("job_id", job.Id))
+ worker.setJobCanceled(job)
+ return
+
+ case <-time.After(TIME_BETWEEN_BATCHES * time.Millisecond):
+ done, progress, err := worker.runMigration(job.Data[JOB_DATA_KEY_MIGRATION], job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE])
+ if err != nil {
+ mlog.Error("Worker: Failed to run migration", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ return
+ } else if done {
+ mlog.Info("Worker: Job is complete", mlog.String("worker", worker.name), mlog.String("job_id", job.Id))
+ worker.setJobSuccess(job)
+ return
+ } else {
+ job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE] = progress
+ if err := worker.app.Jobs.UpdateInProgressJobData(job); err != nil {
+ mlog.Error("Worker: Failed to update migration status data for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ return
+ }
+ }
+ }
+ }
+}
+
+func (worker *Worker) setJobSuccess(job *model.Job) {
+ if err := worker.app.Jobs.SetJobSuccess(job); err != nil {
+ mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ }
+}
+
+func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) {
+ if err := worker.app.Jobs.SetJobError(job, appError); err != nil {
+ mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ }
+}
+
+func (worker *Worker) setJobCanceled(job *model.Job) {
+ if err := worker.app.Jobs.SetJobCanceled(job); err != nil {
+ mlog.Error("Worker: Failed to mark job as canceled", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ }
+}
+
+// Return parameters:
+// - whether the migration is completed on this run (true) or still incomplete (false).
+// - the updated lastDone string for the migration.
+// - any error which may have occurred while running the migration.
+func (worker *Worker) runMigration(key string, lastDone string) (bool, string, *model.AppError) {
+ var done bool
+ var progress string
+ var err *model.AppError
+
+ switch key {
+ case model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2:
+ done, progress, err = worker.runAdvancedPermissionsPhase2Migration(lastDone)
+ default:
+ return false, "", model.NewAppError("MigrationsWorker.runMigration", "migrations.worker.run_migration.unknown_key", map[string]interface{}{"key": key}, "", http.StatusInternalServerError)
+ }
+
+ if done {
+ if result := <-worker.app.Srv.Store.System().Save(&model.System{Name: key, Value: "true"}); result.Err != nil {
+ return false, "", result.Err
+ }
+ }
+
+ return done, progress, err
+}