From 51bd710ecdca6628461c9fa2679737073e4d5059 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Mon, 14 May 2018 15:59:04 +0100 Subject: MM-9728: Online migration for advanced permissions phase 2 (#8744) * MM-9728: Online migration for advanced permissions phase 2 * Add unit tests for new store functions. * Move migration specific code to own file. * Add migration state function test. * Style fixes. * Add i18n strings. * Fix mocks. * Add TestMain to migrations package tests. * Fix typo. * Fix review comments. * Fix up the "Check if migration is done" check to actually work. --- migrations/advanced_permissions_phase_2.go | 106 ++++++++ migrations/migrations.go | 63 +++++ migrations/migrations_test.go | 140 ++++++++++ migrations/migrationstestlib.go | 419 +++++++++++++++++++++++++++++ migrations/scheduler.go | 110 ++++++++ migrations/worker.go | 166 ++++++++++++ 6 files changed, 1004 insertions(+) create mode 100644 migrations/advanced_permissions_phase_2.go create mode 100644 migrations/migrations.go create mode 100644 migrations/migrations_test.go create mode 100644 migrations/migrationstestlib.go create mode 100644 migrations/scheduler.go create mode 100644 migrations/worker.go (limited to 'migrations') diff --git a/migrations/advanced_permissions_phase_2.go b/migrations/advanced_permissions_phase_2.go new file mode 100644 index 000000000..55b1876c4 --- /dev/null +++ b/migrations/advanced_permissions_phase_2.go @@ -0,0 +1,106 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "encoding/json" + "io" + "net/http" + "strings" + + "github.com/mattermost/mattermost-server/model" +) + +type AdvancedPermissionsPhase2Progress struct { + CurrentTable string `json:"current_table"` + LastTeamId string `json:"last_team_id"` + LastChannelId string `json:"last_channel_id"` + LastUserId string `json:"last_user"` +} + +func (p *AdvancedPermissionsPhase2Progress) ToJson() string { + b, _ := json.Marshal(p) + return string(b) +} + +func AdvancedPermissionsPhase2ProgressFromJson(data io.Reader) *AdvancedPermissionsPhase2Progress { + var o *AdvancedPermissionsPhase2Progress + json.NewDecoder(data).Decode(&o) + return o +} + +func (p *AdvancedPermissionsPhase2Progress) IsValid() bool { + if len(p.LastChannelId) != 26 { + return false + } + + if len(p.LastTeamId) != 26 { + return false + } + + if len(p.LastUserId) != 26 { + return false + } + + switch p.CurrentTable { + case "TeamMembers": + case "ChannelMembers": + default: + return false + } + + return true +} + +func (worker *Worker) runAdvancedPermissionsPhase2Migration(lastDone string) (bool, string, *model.AppError) { + var progress *AdvancedPermissionsPhase2Progress + if len(lastDone) == 0 { + // Haven't started the migration yet. + progress = new(AdvancedPermissionsPhase2Progress) + progress.CurrentTable = "TeamMembers" + progress.LastChannelId = strings.Repeat("0", 26) + progress.LastTeamId = strings.Repeat("0", 26) + progress.LastUserId = strings.Repeat("0", 26) + } else { + progress = AdvancedPermissionsPhase2ProgressFromJson(strings.NewReader(lastDone)) + if !progress.IsValid() { + return false, "", model.NewAppError("MigrationsWorker.runAdvancedPermissionsPhase2Migration", "migrations.worker.run_advanced_permissions_phase_2_migration.invalid_progress", map[string]interface{}{"progress": progress.ToJson()}, "", http.StatusInternalServerError) + } + } + + if progress.CurrentTable == "TeamMembers" { + // Run a TeamMembers migration batch. + if result := <-worker.app.Srv.Store.Team().MigrateTeamMembers(progress.LastTeamId, progress.LastUserId); result.Err != nil { + return false, progress.ToJson(), result.Err + } else { + if result.Data == nil { + // We haven't progressed. That means that we've reached the end of this stage of the migration, and should now advance to the next stage. + progress.LastUserId = strings.Repeat("0", 26) + progress.CurrentTable = "ChannelMembers" + return false, progress.ToJson(), nil + } + + data := result.Data.(map[string]string) + progress.LastTeamId = data["TeamId"] + progress.LastUserId = data["UserId"] + } + } else if progress.CurrentTable == "ChannelMembers" { + // Run a ChannelMembers migration batch. + if result := <-worker.app.Srv.Store.Channel().MigrateChannelMembers(progress.LastChannelId, progress.LastUserId); result.Err != nil { + return false, progress.ToJson(), result.Err + } else { + if result.Data == nil { + // We haven't progressed. That means we've reached the end of this final stage of the migration. + + return true, progress.ToJson(), nil + } + + data := result.Data.(map[string]string) + progress.LastChannelId = data["ChannelId"] + progress.LastUserId = data["UserId"] + } + } + + return false, progress.ToJson(), nil +} diff --git a/migrations/migrations.go b/migrations/migrations.go new file mode 100644 index 000000000..940992839 --- /dev/null +++ b/migrations/migrations.go @@ -0,0 +1,63 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "github.com/mattermost/mattermost-server/app" + tjobs "github.com/mattermost/mattermost-server/jobs/interfaces" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" +) + +const ( + MIGRATION_STATE_UNSCHEDULED = "unscheduled" + MIGRATION_STATE_IN_PROGRESS = "in_progress" + MIGRATION_STATE_COMPLETED = "completed" + + JOB_DATA_KEY_MIGRATION = "migration_key" + JOB_DATA_KEY_MIGRATION_LAST_DONE = "last_done" +) + +type MigrationsJobInterfaceImpl struct { + App *app.App +} + +func init() { + app.RegisterJobsMigrationsJobInterface(func(a *app.App) tjobs.MigrationsJobInterface { + return &MigrationsJobInterfaceImpl{a} + }) +} + +func MakeMigrationsList() []string { + return []string{ + model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, + } +} + +func GetMigrationState(migration string, store store.Store) (string, *model.Job, *model.AppError) { + if result := <-store.System().GetByName(migration); result.Err == nil { + return MIGRATION_STATE_COMPLETED, nil, nil + } + + if result := <-store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); result.Err != nil { + return "", nil, result.Err + } else { + for _, job := range result.Data.([]*model.Job) { + if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok { + if key != migration { + continue + } + + switch job.Status { + case model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_PENDING: + return MIGRATION_STATE_IN_PROGRESS, job, nil + default: + return MIGRATION_STATE_UNSCHEDULED, job, nil + } + } + } + } + + return MIGRATION_STATE_UNSCHEDULED, nil, nil +} diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go new file mode 100644 index 000000000..308319430 --- /dev/null +++ b/migrations/migrations_test.go @@ -0,0 +1,140 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "flag" + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store/storetest" + "github.com/mattermost/mattermost-server/utils" +) + +func TestMain(m *testing.M) { + flag.Parse() + + // Setup a global logger to catch tests logging outside of app context + // The global logger will be stomped by apps initalizing but that's fine for testing. Ideally this won't happen. + mlog.InitGlobalLogger(mlog.NewLogger(&mlog.LoggerConfiguration{ + EnableConsole: true, + ConsoleJson: true, + ConsoleLevel: "error", + EnableFile: false, + })) + + utils.TranslationsPreInit() + + // In the case where a dev just wants to run a single test, it's faster to just use the default + // store. + if filter := flag.Lookup("test.run").Value.String(); filter != "" && filter != "." { + mlog.Info("-test.run used, not creating temporary containers") + os.Exit(m.Run()) + } + + status := 0 + + container, settings, err := storetest.NewMySQLContainer() + if err != nil { + panic(err) + } + + UseTestStore(container, settings) + + defer func() { + StopTestStore() + os.Exit(status) + }() + + status = m.Run() +} + +func TestGetMigrationState(t *testing.T) { + th := Setup() + defer th.TearDown() + + migrationKey := model.NewId() + + th.DeleteAllJobsByTypeAndMigrationKey(model.JOB_TYPE_MIGRATIONS, migrationKey) + + // Test with no job yet. + state, job, err := GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Nil(t, job) + assert.Equal(t, "unscheduled", state) + + // Test with the system table showing the migration as done. + system := model.System{ + Name: migrationKey, + Value: "true", + } + res1 := <-th.App.Srv.Store.System().Save(&system) + assert.Nil(t, res1.Err) + + state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Nil(t, job) + assert.Equal(t, "completed", state) + + res2 := <-th.App.Srv.Store.System().PermanentDeleteByName(migrationKey) + assert.Nil(t, res2.Err) + + // Test with a job scheduled in "pending" state. + j1 := &model.Job{ + Id: model.NewId(), + CreateAt: model.GetMillis(), + Data: map[string]string{ + JOB_DATA_KEY_MIGRATION: migrationKey, + }, + Status: model.JOB_STATUS_PENDING, + Type: model.JOB_TYPE_MIGRATIONS, + } + + j1 = (<-th.App.Srv.Store.Job().Save(j1)).Data.(*model.Job) + + state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Equal(t, j1.Id, job.Id) + assert.Equal(t, "in_progress", state) + + // Test with a job scheduled in "in progress" state. + j2 := &model.Job{ + Id: model.NewId(), + CreateAt: j1.CreateAt + 1, + Data: map[string]string{ + JOB_DATA_KEY_MIGRATION: migrationKey, + }, + Status: model.JOB_STATUS_IN_PROGRESS, + Type: model.JOB_TYPE_MIGRATIONS, + } + + j2 = (<-th.App.Srv.Store.Job().Save(j2)).Data.(*model.Job) + + state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Equal(t, j2.Id, job.Id) + assert.Equal(t, "in_progress", state) + + // Test with a job scheduled in "error" state. + j3 := &model.Job{ + Id: model.NewId(), + CreateAt: j2.CreateAt + 1, + Data: map[string]string{ + JOB_DATA_KEY_MIGRATION: migrationKey, + }, + Status: model.JOB_STATUS_ERROR, + Type: model.JOB_TYPE_MIGRATIONS, + } + + j3 = (<-th.App.Srv.Store.Job().Save(j3)).Data.(*model.Job) + + state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) + assert.Nil(t, err) + assert.Equal(t, j3.Id, job.Id) + assert.Equal(t, "unscheduled", state) +} diff --git a/migrations/migrationstestlib.go b/migrations/migrationstestlib.go new file mode 100644 index 000000000..b52f7af79 --- /dev/null +++ b/migrations/migrationstestlib.go @@ -0,0 +1,419 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "encoding/json" + "io" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/einterfaces" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/plugin" + "github.com/mattermost/mattermost-server/plugin/pluginenv" + "github.com/mattermost/mattermost-server/store" + "github.com/mattermost/mattermost-server/store/sqlstore" + "github.com/mattermost/mattermost-server/store/storetest" + "github.com/mattermost/mattermost-server/utils" +) + +type TestHelper struct { + App *app.App + BasicTeam *model.Team + BasicUser *model.User + BasicUser2 *model.User + BasicChannel *model.Channel + BasicPost *model.Post + + SystemAdminUser *model.User + + tempConfigPath string + tempWorkspace string + pluginHooks map[string]plugin.Hooks +} + +type persistentTestStore struct { + store.Store +} + +func (*persistentTestStore) Close() {} + +var testStoreContainer *storetest.RunningContainer +var testStore *persistentTestStore +var testStoreSqlSupplier *sqlstore.SqlSupplier +var testClusterInterface *FakeClusterInterface + +// UseTestStore sets the container and corresponding settings to use for tests. Once the tests are +// complete (e.g. at the end of your TestMain implementation), you should call StopTestStore. +func UseTestStore(container *storetest.RunningContainer, settings *model.SqlSettings) { + testClusterInterface = &FakeClusterInterface{} + testStoreContainer = container + testStoreSqlSupplier = sqlstore.NewSqlSupplier(*settings, nil) + testStore = &persistentTestStore{store.NewLayeredStore(testStoreSqlSupplier, nil, testClusterInterface)} +} + +func StopTestStore() { + if testStoreContainer != nil { + testStoreContainer.Stop() + testStoreContainer = nil + } +} + +func setupTestHelper(enterprise bool) *TestHelper { + permConfig, err := os.Open(utils.FindConfigFile("config.json")) + if err != nil { + panic(err) + } + defer permConfig.Close() + tempConfig, err := ioutil.TempFile("", "") + if err != nil { + panic(err) + } + _, err = io.Copy(tempConfig, permConfig) + tempConfig.Close() + if err != nil { + panic(err) + } + + options := []app.Option{app.ConfigFile(tempConfig.Name()), app.DisableConfigWatch} + if testStore != nil { + options = append(options, app.StoreOverride(testStore)) + } + + a, err := app.New(options...) + if err != nil { + panic(err) + } + + th := &TestHelper{ + App: a, + pluginHooks: make(map[string]plugin.Hooks), + tempConfigPath: tempConfig.Name(), + } + + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.MaxUsersPerTeam = 50 }) + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.RateLimitSettings.Enable = false }) + prevListenAddress := *th.App.Config().ServiceSettings.ListenAddress + if testStore != nil { + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = ":0" }) + } + serverErr := th.App.StartServer() + if serverErr != nil { + panic(serverErr) + } + + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = prevListenAddress }) + + th.App.DoAdvancedPermissionsMigration() + + th.App.Srv.Store.MarkSystemRanUnitTests() + + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.EnableOpenServer = true }) + + if enterprise { + th.App.SetLicense(model.NewTestLicense()) + } else { + th.App.SetLicense(nil) + } + + return th +} + +func SetupEnterprise() *TestHelper { + return setupTestHelper(true) +} + +func Setup() *TestHelper { + return setupTestHelper(false) +} + +func (me *TestHelper) InitBasic() *TestHelper { + me.BasicTeam = me.CreateTeam() + me.BasicUser = me.CreateUser() + me.LinkUserToTeam(me.BasicUser, me.BasicTeam) + me.BasicUser2 = me.CreateUser() + me.LinkUserToTeam(me.BasicUser2, me.BasicTeam) + me.BasicChannel = me.CreateChannel(me.BasicTeam) + me.BasicPost = me.CreatePost(me.BasicChannel) + + return me +} + +func (me *TestHelper) InitSystemAdmin() *TestHelper { + me.SystemAdminUser = me.CreateUser() + me.App.UpdateUserRoles(me.SystemAdminUser.Id, model.SYSTEM_USER_ROLE_ID+" "+model.SYSTEM_ADMIN_ROLE_ID, false) + me.SystemAdminUser, _ = me.App.GetUser(me.SystemAdminUser.Id) + + return me +} + +func (me *TestHelper) MakeEmail() string { + return "success_" + model.NewId() + "@simulator.amazonses.com" +} + +func (me *TestHelper) CreateTeam() *model.Team { + id := model.NewId() + team := &model.Team{ + DisplayName: "dn_" + id, + Name: "name" + id, + Email: "success+" + id + "@simulator.amazonses.com", + Type: model.TEAM_OPEN, + } + + utils.DisableDebugLogForTest() + var err *model.AppError + if team, err = me.App.CreateTeam(team); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return team +} + +func (me *TestHelper) CreateUser() *model.User { + id := model.NewId() + + user := &model.User{ + Email: "success+" + id + "@simulator.amazonses.com", + Username: "un_" + id, + Nickname: "nn_" + id, + Password: "Password1", + EmailVerified: true, + } + + utils.DisableDebugLogForTest() + var err *model.AppError + if user, err = me.App.CreateUser(user); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return user +} + +func (me *TestHelper) CreateChannel(team *model.Team) *model.Channel { + return me.createChannel(team, model.CHANNEL_OPEN) +} + +func (me *TestHelper) createChannel(team *model.Team, channelType string) *model.Channel { + id := model.NewId() + + channel := &model.Channel{ + DisplayName: "dn_" + id, + Name: "name_" + id, + Type: channelType, + TeamId: team.Id, + CreatorId: me.BasicUser.Id, + } + + utils.DisableDebugLogForTest() + var err *model.AppError + if channel, err = me.App.CreateChannel(channel, true); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return channel +} + +func (me *TestHelper) CreateDmChannel(user *model.User) *model.Channel { + utils.DisableDebugLogForTest() + var err *model.AppError + var channel *model.Channel + if channel, err = me.App.CreateDirectChannel(me.BasicUser.Id, user.Id); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return channel +} + +func (me *TestHelper) CreatePost(channel *model.Channel) *model.Post { + id := model.NewId() + + post := &model.Post{ + UserId: me.BasicUser.Id, + ChannelId: channel.Id, + Message: "message_" + id, + CreateAt: model.GetMillis() - 10000, + } + + utils.DisableDebugLogForTest() + var err *model.AppError + if post, err = me.App.CreatePost(post, channel, false); err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + utils.EnableDebugLogForTest() + return post +} + +func (me *TestHelper) LinkUserToTeam(user *model.User, team *model.Team) { + utils.DisableDebugLogForTest() + + err := me.App.JoinUserToTeam(team, user, "") + if err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + + utils.EnableDebugLogForTest() +} + +func (me *TestHelper) AddUserToChannel(user *model.User, channel *model.Channel) *model.ChannelMember { + utils.DisableDebugLogForTest() + + member, err := me.App.AddUserToChannel(user, channel) + if err != nil { + mlog.Error(err.Error()) + + time.Sleep(time.Second) + panic(err) + } + + utils.EnableDebugLogForTest() + + return member +} + +func (me *TestHelper) TearDown() { + me.App.Shutdown() + os.Remove(me.tempConfigPath) + if err := recover(); err != nil { + StopTestStore() + panic(err) + } + if me.tempWorkspace != "" { + os.RemoveAll(me.tempWorkspace) + } +} + +type mockPluginSupervisor struct { + hooks plugin.Hooks +} + +func (s *mockPluginSupervisor) Start(api plugin.API) error { + return s.hooks.OnActivate(api) +} + +func (s *mockPluginSupervisor) Stop() error { + return nil +} + +func (s *mockPluginSupervisor) Hooks() plugin.Hooks { + return s.hooks +} + +func (me *TestHelper) InstallPlugin(manifest *model.Manifest, hooks plugin.Hooks) { + if me.tempWorkspace == "" { + dir, err := ioutil.TempDir("", "apptest") + if err != nil { + panic(err) + } + me.tempWorkspace = dir + } + + pluginDir := filepath.Join(me.tempWorkspace, "plugins") + webappDir := filepath.Join(me.tempWorkspace, "webapp") + me.App.InitPlugins(pluginDir, webappDir, func(bundle *model.BundleInfo) (plugin.Supervisor, error) { + if hooks, ok := me.pluginHooks[bundle.Manifest.Id]; ok { + return &mockPluginSupervisor{hooks}, nil + } + return pluginenv.DefaultSupervisorProvider(bundle) + }) + + me.pluginHooks[manifest.Id] = hooks + + manifestCopy := *manifest + if manifestCopy.Backend == nil { + manifestCopy.Backend = &model.ManifestBackend{} + } + manifestBytes, err := json.Marshal(&manifestCopy) + if err != nil { + panic(err) + } + + if err := os.MkdirAll(filepath.Join(pluginDir, manifest.Id), 0700); err != nil { + panic(err) + } + + if err := ioutil.WriteFile(filepath.Join(pluginDir, manifest.Id, "plugin.json"), manifestBytes, 0600); err != nil { + panic(err) + } +} + +func (me *TestHelper) ResetRoleMigration() { + if _, err := testStoreSqlSupplier.GetMaster().Exec("DELETE from Roles"); err != nil { + panic(err) + } + + testClusterInterface.sendClearRoleCacheMessage() + + if _, err := testStoreSqlSupplier.GetMaster().Exec("DELETE from Systems where Name = :Name", map[string]interface{}{"Name": app.ADVANCED_PERMISSIONS_MIGRATION_KEY}); err != nil { + panic(err) + } +} + +func (me *TestHelper) DeleteAllJobsByTypeAndMigrationKey(jobType string, migrationKey string) { + if res := <-me.App.Srv.Store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); res.Err != nil { + panic(res.Err) + } else { + jobs := res.Data.([]*model.Job) + + for _, job := range jobs { + if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok && key == migrationKey { + if res := <-me.App.Srv.Store.Job().Delete(job.Id); res.Err != nil { + panic(res.Err) + } + } + } + } +} + +type FakeClusterInterface struct { + clusterMessageHandler einterfaces.ClusterMessageHandler +} + +func (me *FakeClusterInterface) StartInterNodeCommunication() {} +func (me *FakeClusterInterface) StopInterNodeCommunication() {} +func (me *FakeClusterInterface) RegisterClusterMessageHandler(event string, crm einterfaces.ClusterMessageHandler) { + me.clusterMessageHandler = crm +} +func (me *FakeClusterInterface) GetClusterId() string { return "" } +func (me *FakeClusterInterface) IsLeader() bool { return false } +func (me *FakeClusterInterface) GetMyClusterInfo() *model.ClusterInfo { return nil } +func (me *FakeClusterInterface) GetClusterInfos() []*model.ClusterInfo { return nil } +func (me *FakeClusterInterface) SendClusterMessage(cluster *model.ClusterMessage) {} +func (me *FakeClusterInterface) NotifyMsg(buf []byte) {} +func (me *FakeClusterInterface) GetClusterStats() ([]*model.ClusterStats, *model.AppError) { + return nil, nil +} +func (me *FakeClusterInterface) GetLogs(page, perPage int) ([]string, *model.AppError) { + return []string{}, nil +} +func (me *FakeClusterInterface) ConfigChanged(previousConfig *model.Config, newConfig *model.Config, sendToOtherServer bool) *model.AppError { + return nil +} +func (me *FakeClusterInterface) sendClearRoleCacheMessage() { + me.clusterMessageHandler(&model.ClusterMessage{ + Event: model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_ROLES, + }) +} diff --git a/migrations/scheduler.go b/migrations/scheduler.go new file mode 100644 index 000000000..8a7ac30d0 --- /dev/null +++ b/migrations/scheduler.go @@ -0,0 +1,110 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "time" + + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" +) + +const ( + MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS = 3600000 // 1 hour +) + +type Scheduler struct { + App *app.App + allMigrationsCompleted bool +} + +func (m *MigrationsJobInterfaceImpl) MakeScheduler() model.Scheduler { + return &Scheduler{m.App, false} +} + +func (scheduler *Scheduler) Name() string { + return "MigrationsScheduler" +} + +func (scheduler *Scheduler) JobType() string { + return model.JOB_TYPE_MIGRATIONS +} + +func (scheduler *Scheduler) Enabled(cfg *model.Config) bool { + return true +} + +func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time { + if scheduler.allMigrationsCompleted { + return nil + } + + nextTime := time.Now().Add(60 * time.Second) + return &nextTime +} + +func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) { + mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name())) + + // Work through the list of migrations in order. Schedule the first one that isn't done (assuming it isn't in progress already). + for _, key := range MakeMigrationsList() { + state, job, err := GetMigrationState(key, scheduler.App.Srv.Store) + if err != nil { + mlog.Error("Failed to determine status of migration: ", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key), mlog.String("error", err.Error())) + return nil, nil + } + + if state == MIGRATION_STATE_IN_PROGRESS { + // Check the migration job isn't wedged. + if job != nil && job.LastActivityAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS { + mlog.Warn("Job appears to be wedged. Rescheduling another instance.", mlog.String("scheduler", scheduler.Name()), mlog.String("wedged_job_id", job.Id), mlog.String("migration_key", key)) + if err := scheduler.App.Jobs.SetJobError(job, nil); err != nil { + mlog.Error("Worker: Failed to set job error", mlog.String("scheduler", scheduler.Name()), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } + return scheduler.createJob(key, job, scheduler.App.Srv.Store) + } + + return nil, nil + } + + if state == MIGRATION_STATE_COMPLETED { + // This migration is done. Continue to check the next. + continue + } + + if state == MIGRATION_STATE_UNSCHEDULED { + mlog.Debug("Scheduling a new job for migration.", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key)) + return scheduler.createJob(key, job, scheduler.App.Srv.Store) + } + + mlog.Error("Unknown migration state. Not doing anything.", mlog.String("migration_state", state)) + return nil, nil + } + + // If we reached here, then there aren't any migrations left to run. + scheduler.allMigrationsCompleted = true + mlog.Debug("All migrations are complete.", mlog.String("scheduler", scheduler.Name())) + + return nil, nil +} + +func (scheduler *Scheduler) createJob(migrationKey string, lastJob *model.Job, store store.Store) (*model.Job, *model.AppError) { + var lastDone string + if lastJob != nil { + lastDone = lastJob.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE] + } + + data := map[string]string{ + JOB_DATA_KEY_MIGRATION: migrationKey, + JOB_DATA_KEY_MIGRATION_LAST_DONE: lastDone, + } + + if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_MIGRATIONS, data); err != nil { + return nil, err + } else { + return job, nil + } +} diff --git a/migrations/worker.go b/migrations/worker.go new file mode 100644 index 000000000..7a64dd609 --- /dev/null +++ b/migrations/worker.go @@ -0,0 +1,166 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package migrations + +import ( + "context" + "net/http" + "time" + + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/jobs" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" +) + +const ( + TIME_BETWEEN_BATCHES = 100 +) + +type Worker struct { + name string + stop chan bool + stopped chan bool + jobs chan model.Job + jobServer *jobs.JobServer + app *app.App +} + +func (m *MigrationsJobInterfaceImpl) MakeWorker() model.Worker { + worker := Worker{ + name: "Migrations", + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + jobServer: m.App.Jobs, + app: m.App, + } + + return &worker +} + +func (worker *Worker) Run() { + mlog.Debug("Worker started", mlog.String("worker", worker.name)) + + defer func() { + mlog.Debug("Worker finished", mlog.String("worker", worker.name)) + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + mlog.Debug("Worker received stop signal", mlog.String("worker", worker.name)) + return + case job := <-worker.jobs: + mlog.Debug("Worker received a new candidate job.", mlog.String("worker", worker.name)) + worker.DoJob(&job) + } + } +} + +func (worker *Worker) Stop() { + mlog.Debug("Worker stopping", mlog.String("worker", worker.name)) + worker.stop <- true + <-worker.stopped +} + +func (worker *Worker) JobChannel() chan<- model.Job { + return worker.jobs +} + +func (worker *Worker) DoJob(job *model.Job) { + if claimed, err := worker.jobServer.ClaimJob(job); err != nil { + mlog.Info("Worker experienced an error while trying to claim job", + mlog.String("worker", worker.name), + mlog.String("job_id", job.Id), + mlog.String("error", err.Error())) + return + } else if !claimed { + return + } + + cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background()) + cancelWatcherChan := make(chan interface{}, 1) + go worker.app.Jobs.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan) + + defer cancelCancelWatcher() + + for { + select { + case <-cancelWatcherChan: + mlog.Debug("Worker: Job has been canceled via CancellationWatcher", mlog.String("worker", worker.name), mlog.String("job_id", job.Id)) + worker.setJobCanceled(job) + return + + case <-worker.stop: + mlog.Debug("Worker: Job has been canceled via Worker Stop", mlog.String("worker", worker.name), mlog.String("job_id", job.Id)) + worker.setJobCanceled(job) + return + + case <-time.After(TIME_BETWEEN_BATCHES * time.Millisecond): + done, progress, err := worker.runMigration(job.Data[JOB_DATA_KEY_MIGRATION], job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE]) + if err != nil { + mlog.Error("Worker: Failed to run migration", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + return + } else if done { + mlog.Info("Worker: Job is complete", mlog.String("worker", worker.name), mlog.String("job_id", job.Id)) + worker.setJobSuccess(job) + return + } else { + job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE] = progress + if err := worker.app.Jobs.UpdateInProgressJobData(job); err != nil { + mlog.Error("Worker: Failed to update migration status data for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + return + } + } + } + } +} + +func (worker *Worker) setJobSuccess(job *model.Job) { + if err := worker.app.Jobs.SetJobSuccess(job); err != nil { + mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + } +} + +func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) { + if err := worker.app.Jobs.SetJobError(job, appError); err != nil { + mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } +} + +func (worker *Worker) setJobCanceled(job *model.Job) { + if err := worker.app.Jobs.SetJobCanceled(job); err != nil { + mlog.Error("Worker: Failed to mark job as canceled", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } +} + +// Return parameters: +// - whether the migration is completed on this run (true) or still incomplete (false). +// - the updated lastDone string for the migration. +// - any error which may have occurred while running the migration. +func (worker *Worker) runMigration(key string, lastDone string) (bool, string, *model.AppError) { + var done bool + var progress string + var err *model.AppError + + switch key { + case model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2: + done, progress, err = worker.runAdvancedPermissionsPhase2Migration(lastDone) + default: + return false, "", model.NewAppError("MigrationsWorker.runMigration", "migrations.worker.run_migration.unknown_key", map[string]interface{}{"key": key}, "", http.StatusInternalServerError) + } + + if done { + if result := <-worker.app.Srv.Store.System().Save(&model.System{Name: key, Value: "true"}); result.Err != nil { + return false, "", result.Err + } + } + + return done, progress, err +} -- cgit v1.2.3-1-g7c22