summaryrefslogtreecommitdiffstats
path: root/plugin/scheduler
diff options
context:
space:
mode:
authorDaniel Schalla <daniel@schalla.me>2018-10-10 19:55:12 +0200
committerChristopher Speller <crspeller@gmail.com>2018-10-10 10:55:12 -0700
commitc36e85c9126b921cf00e578ac70c1f1ee0153abd (patch)
tree86bfea62ec6a1ce0edc548db4a87851c41e30b88 /plugin/scheduler
parentbd04d7f75698c7b68434199208dc469021b823c2 (diff)
downloadchat-c36e85c9126b921cf00e578ac70c1f1ee0153abd.tar.gz
chat-c36e85c9126b921cf00e578ac70c1f1ee0153abd.tar.bz2
chat-c36e85c9126b921cf00e578ac70c1f1ee0153abd.zip
DeleteAll for KV (#9431)
Expire K/V Values Regenerate Code pathfix Update Expiry on Update Check for Exit Signal gofmt Rewrote Go Routine Remove tempoarily cleanup loop fix expiretime TEST: Expired Watchdog as GoRoutine Check if Srv is nil Use Scheduler/Worker for Expired Key CleanUp add license fix scheduler job type; DoJob Restructuring Remove unused imports and constants move db migration from 5.4 to 5.5
Diffstat (limited to 'plugin/scheduler')
-rw-r--r--plugin/scheduler/plugin.go19
-rw-r--r--plugin/scheduler/scheduler.go47
-rw-r--r--plugin/scheduler/worker.go99
3 files changed, 165 insertions, 0 deletions
diff --git a/plugin/scheduler/plugin.go b/plugin/scheduler/plugin.go
new file mode 100644
index 000000000..3133cb4b1
--- /dev/null
+++ b/plugin/scheduler/plugin.go
@@ -0,0 +1,19 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See LICENSE.txt for license information.
+
+package scheduler
+
+import (
+ "github.com/mattermost/mattermost-server/app"
+ tjobs "github.com/mattermost/mattermost-server/jobs/interfaces"
+)
+
+type PluginsJobInterfaceImpl struct {
+ App *app.App
+}
+
+func init() {
+ app.RegisterJobsMigrationsJobInterface(func(a *app.App) tjobs.MigrationsJobInterface {
+ return &PluginsJobInterfaceImpl{a}
+ })
+}
diff --git a/plugin/scheduler/scheduler.go b/plugin/scheduler/scheduler.go
new file mode 100644
index 000000000..7214d6cfd
--- /dev/null
+++ b/plugin/scheduler/scheduler.go
@@ -0,0 +1,47 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package scheduler
+
+import (
+ "time"
+
+ "github.com/mattermost/mattermost-server/app"
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+)
+
+type Scheduler struct {
+ App *app.App
+}
+
+func (m *PluginsJobInterfaceImpl) MakeScheduler() model.Scheduler {
+ return &Scheduler{m.App}
+}
+
+func (scheduler *Scheduler) Name() string {
+ return "PluginsScheduler"
+}
+
+func (scheduler *Scheduler) JobType() string {
+ return model.JOB_TYPE_PLUGINS
+}
+
+func (scheduler *Scheduler) Enabled(cfg *model.Config) bool {
+ return true
+}
+
+func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time {
+ nextTime := time.Now().Add(60 * time.Second)
+ return &nextTime
+}
+
+func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) {
+ mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name()))
+
+ if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_PLUGINS, nil); err != nil {
+ return nil, err
+ } else {
+ return job, nil
+ }
+}
diff --git a/plugin/scheduler/worker.go b/plugin/scheduler/worker.go
new file mode 100644
index 000000000..252e100fa
--- /dev/null
+++ b/plugin/scheduler/worker.go
@@ -0,0 +1,99 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package scheduler
+
+import (
+ "github.com/mattermost/mattermost-server/app"
+ "github.com/mattermost/mattermost-server/jobs"
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+)
+
+type Worker struct {
+ name string
+ stop chan bool
+ stopped chan bool
+ jobs chan model.Job
+ jobServer *jobs.JobServer
+ app *app.App
+}
+
+func (m *PluginsJobInterfaceImpl) MakeWorker() model.Worker {
+ worker := Worker{
+ name: "Plugins",
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ jobs: make(chan model.Job),
+ jobServer: m.App.Jobs,
+ app: m.App,
+ }
+
+ return &worker
+}
+
+func (worker *Worker) Run() {
+ mlog.Debug("Worker started", mlog.String("worker", worker.name))
+
+ defer func() {
+ mlog.Debug("Worker finished", mlog.String("worker", worker.name))
+ worker.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-worker.stop:
+ mlog.Debug("Worker received stop signal", mlog.String("worker", worker.name))
+ return
+ case job := <-worker.jobs:
+ mlog.Debug("Worker received a new candidate job.", mlog.String("worker", worker.name))
+ worker.DoJob(&job)
+ }
+ }
+}
+
+func (worker *Worker) Stop() {
+ mlog.Debug("Worker stopping", mlog.String("worker", worker.name))
+ worker.stop <- true
+ <-worker.stopped
+}
+
+func (worker *Worker) JobChannel() chan<- model.Job {
+ return worker.jobs
+}
+
+func (worker *Worker) DoJob(job *model.Job) {
+ if claimed, err := worker.jobServer.ClaimJob(job); err != nil {
+ mlog.Info("Worker experienced an error while trying to claim job",
+ mlog.String("worker", worker.name),
+ mlog.String("job_id", job.Id),
+ mlog.String("error", err.Error()))
+ return
+ } else if !claimed {
+ return
+ }
+
+ err := worker.app.DeleteAllExpiredPluginKeys()
+ if err == nil {
+ mlog.Info("Worker: Job is complete", mlog.String("worker", worker.name), mlog.String("job_id", job.Id))
+ worker.setJobSuccess(job)
+ return
+ } else {
+ mlog.Error("Worker: Failed to delete expired keys", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ return
+ }
+}
+
+func (worker *Worker) setJobSuccess(job *model.Job) {
+ if err := worker.app.Jobs.SetJobSuccess(job); err != nil {
+ mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ }
+}
+
+func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) {
+ if err := worker.app.Jobs.SetJobError(job, appError); err != nil {
+ mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ }
+}