summaryrefslogtreecommitdiffstats
path: root/migrations/scheduler.go
blob: 5778c5cb3b281bb87f4ef86a838b8e18aabf17bd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.

package migrations

import (
	"time"

	"github.com/mattermost/mattermost-server/app"
	"github.com/mattermost/mattermost-server/mlog"
	"github.com/mattermost/mattermost-server/model"
	"github.com/mattermost/mattermost-server/store"
)

const (
	MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS = 3600000 // 1 hour
)

type Scheduler struct {
	App                    *app.App
	allMigrationsCompleted bool
}

func (m *MigrationsJobInterfaceImpl) MakeScheduler() model.Scheduler {
	return &Scheduler{m.App, false}
}

func (scheduler *Scheduler) Name() string {
	return "MigrationsScheduler"
}

func (scheduler *Scheduler) JobType() string {
	return model.JOB_TYPE_MIGRATIONS
}

func (scheduler *Scheduler) Enabled(cfg *model.Config) bool {
	return true
}

func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time {
	if scheduler.allMigrationsCompleted {
		return nil
	}

	nextTime := time.Now().Add(60 * time.Second)
	return &nextTime
}

func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) {
	mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name()))

	// Work through the list of migrations in order. Schedule the first one that isn't done (assuming it isn't in progress already).
	for _, key := range MakeMigrationsList() {
		state, job, err := GetMigrationState(key, scheduler.App.Srv.Store)
		if err != nil {
			mlog.Error("Failed to determine status of migration: ", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key), mlog.String("error", err.Error()))
			return nil, nil
		}

		if state == MIGRATION_STATE_IN_PROGRESS {
			// Check the migration job isn't wedged.
			if job != nil && job.LastActivityAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS && job.CreateAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS {
				mlog.Warn("Job appears to be wedged. Rescheduling another instance.", mlog.String("scheduler", scheduler.Name()), mlog.String("wedged_job_id", job.Id), mlog.String("migration_key", key))
				if err := scheduler.App.Jobs.SetJobError(job, nil); err != nil {
					mlog.Error("Worker: Failed to set job error", mlog.String("scheduler", scheduler.Name()), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
				}
				return scheduler.createJob(key, job, scheduler.App.Srv.Store)
			}

			return nil, nil
		}

		if state == MIGRATION_STATE_COMPLETED {
			// This migration is done. Continue to check the next.
			continue
		}

		if state == MIGRATION_STATE_UNSCHEDULED {
			mlog.Debug("Scheduling a new job for migration.", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key))
			return scheduler.createJob(key, job, scheduler.App.Srv.Store)
		}

		mlog.Error("Unknown migration state. Not doing anything.", mlog.String("migration_state", state))
		return nil, nil
	}

	// If we reached here, then there aren't any migrations left to run.
	scheduler.allMigrationsCompleted = true
	mlog.Debug("All migrations are complete.", mlog.String("scheduler", scheduler.Name()))

	return nil, nil
}

func (scheduler *Scheduler) createJob(migrationKey string, lastJob *model.Job, store store.Store) (*model.Job, *model.AppError) {
	var lastDone string
	if lastJob != nil {
		lastDone = lastJob.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE]
	}

	data := map[string]string{
		JOB_DATA_KEY_MIGRATION:           migrationKey,
		JOB_DATA_KEY_MIGRATION_LAST_DONE: lastDone,
	}

	if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_MIGRATIONS, data); err != nil {
		return nil, err
	} else {
		return job, nil
	}
}