From c36e85c9126b921cf00e578ac70c1f1ee0153abd Mon Sep 17 00:00:00 2001 From: Daniel Schalla Date: Wed, 10 Oct 2018 19:55:12 +0200 Subject: DeleteAll for KV (#9431) Expire K/V Values Regenerate Code pathfix Update Expiry on Update Check for Exit Signal gofmt Rewrote Go Routine Remove tempoarily cleanup loop fix expiretime TEST: Expired Watchdog as GoRoutine Check if Srv is nil Use Scheduler/Worker for Expired Key CleanUp add license fix scheduler job type; DoJob Restructuring Remove unused imports and constants move db migration from 5.4 to 5.5 --- plugin/scheduler/plugin.go | 19 +++++++++ plugin/scheduler/scheduler.go | 47 ++++++++++++++++++++ plugin/scheduler/worker.go | 99 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 165 insertions(+) create mode 100644 plugin/scheduler/plugin.go create mode 100644 plugin/scheduler/scheduler.go create mode 100644 plugin/scheduler/worker.go (limited to 'plugin/scheduler') diff --git a/plugin/scheduler/plugin.go b/plugin/scheduler/plugin.go new file mode 100644 index 000000000..3133cb4b1 --- /dev/null +++ b/plugin/scheduler/plugin.go @@ -0,0 +1,19 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package scheduler + +import ( + "github.com/mattermost/mattermost-server/app" + tjobs "github.com/mattermost/mattermost-server/jobs/interfaces" +) + +type PluginsJobInterfaceImpl struct { + App *app.App +} + +func init() { + app.RegisterJobsMigrationsJobInterface(func(a *app.App) tjobs.MigrationsJobInterface { + return &PluginsJobInterfaceImpl{a} + }) +} diff --git a/plugin/scheduler/scheduler.go b/plugin/scheduler/scheduler.go new file mode 100644 index 000000000..7214d6cfd --- /dev/null +++ b/plugin/scheduler/scheduler.go @@ -0,0 +1,47 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package scheduler + +import ( + "time" + + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" +) + +type Scheduler struct { + App *app.App +} + +func (m *PluginsJobInterfaceImpl) MakeScheduler() model.Scheduler { + return &Scheduler{m.App} +} + +func (scheduler *Scheduler) Name() string { + return "PluginsScheduler" +} + +func (scheduler *Scheduler) JobType() string { + return model.JOB_TYPE_PLUGINS +} + +func (scheduler *Scheduler) Enabled(cfg *model.Config) bool { + return true +} + +func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time { + nextTime := time.Now().Add(60 * time.Second) + return &nextTime +} + +func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) { + mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name())) + + if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_PLUGINS, nil); err != nil { + return nil, err + } else { + return job, nil + } +} diff --git a/plugin/scheduler/worker.go b/plugin/scheduler/worker.go new file mode 100644 index 000000000..252e100fa --- /dev/null +++ b/plugin/scheduler/worker.go @@ -0,0 +1,99 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package scheduler + +import ( + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/jobs" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" +) + +type Worker struct { + name string + stop chan bool + stopped chan bool + jobs chan model.Job + jobServer *jobs.JobServer + app *app.App +} + +func (m *PluginsJobInterfaceImpl) MakeWorker() model.Worker { + worker := Worker{ + name: "Plugins", + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + jobServer: m.App.Jobs, + app: m.App, + } + + return &worker +} + +func (worker *Worker) Run() { + mlog.Debug("Worker started", mlog.String("worker", worker.name)) + + defer func() { + mlog.Debug("Worker finished", mlog.String("worker", worker.name)) + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + mlog.Debug("Worker received stop signal", mlog.String("worker", worker.name)) + return + case job := <-worker.jobs: + mlog.Debug("Worker received a new candidate job.", mlog.String("worker", worker.name)) + worker.DoJob(&job) + } + } +} + +func (worker *Worker) Stop() { + mlog.Debug("Worker stopping", mlog.String("worker", worker.name)) + worker.stop <- true + <-worker.stopped +} + +func (worker *Worker) JobChannel() chan<- model.Job { + return worker.jobs +} + +func (worker *Worker) DoJob(job *model.Job) { + if claimed, err := worker.jobServer.ClaimJob(job); err != nil { + mlog.Info("Worker experienced an error while trying to claim job", + mlog.String("worker", worker.name), + mlog.String("job_id", job.Id), + mlog.String("error", err.Error())) + return + } else if !claimed { + return + } + + err := worker.app.DeleteAllExpiredPluginKeys() + if err == nil { + mlog.Info("Worker: Job is complete", mlog.String("worker", worker.name), mlog.String("job_id", job.Id)) + worker.setJobSuccess(job) + return + } else { + mlog.Error("Worker: Failed to delete expired keys", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + return + } +} + +func (worker *Worker) setJobSuccess(job *model.Job) { + if err := worker.app.Jobs.SetJobSuccess(job); err != nil { + mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + } +} + +func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) { + if err := worker.app.Jobs.SetJobError(job, appError); err != nil { + mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } +} -- cgit v1.2.3-1-g7c22