From c36e85c9126b921cf00e578ac70c1f1ee0153abd Mon Sep 17 00:00:00 2001 From: Daniel Schalla Date: Wed, 10 Oct 2018 19:55:12 +0200 Subject: DeleteAll for KV (#9431) Expire K/V Values Regenerate Code pathfix Update Expiry on Update Check for Exit Signal gofmt Rewrote Go Routine Remove tempoarily cleanup loop fix expiretime TEST: Expired Watchdog as GoRoutine Check if Srv is nil Use Scheduler/Worker for Expired Key CleanUp add license fix scheduler job type; DoJob Restructuring Remove unused imports and constants move db migration from 5.4 to 5.5 --- plugin/api.go | 6 +++ plugin/client_rpc_generated.go | 57 ++++++++++++++++++++++++ plugin/plugintest/api.go | 32 ++++++++++++++ plugin/scheduler/plugin.go | 19 ++++++++ plugin/scheduler/scheduler.go | 47 ++++++++++++++++++++ plugin/scheduler/worker.go | 99 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 260 insertions(+) create mode 100644 plugin/scheduler/plugin.go create mode 100644 plugin/scheduler/scheduler.go create mode 100644 plugin/scheduler/worker.go (limited to 'plugin') diff --git a/plugin/api.go b/plugin/api.go index 308067228..5a5727659 100644 --- a/plugin/api.go +++ b/plugin/api.go @@ -196,12 +196,18 @@ type API interface { // KVSet will store a key-value pair, unique per plugin. KVSet(key string, value []byte) *model.AppError + // KVSet will store a key-value pair, unique per plugin with an expiry time + KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError + // KVGet will retrieve a value based on the key. Returns nil for non-existent keys. KVGet(key string) ([]byte, *model.AppError) // KVDelete will remove a key-value pair. Returns nil for non-existent keys. KVDelete(key string) *model.AppError + // KVDeleteAll will remove all key-value pairs for a plugin. + KVDeleteAll() *model.AppError + // KVList will list all keys for a plugin. KVList(page, perPage int) ([]string, *model.AppError) diff --git a/plugin/client_rpc_generated.go b/plugin/client_rpc_generated.go index 1dc3fe143..da41d9418 100644 --- a/plugin/client_rpc_generated.go +++ b/plugin/client_rpc_generated.go @@ -2156,6 +2156,36 @@ func (s *apiRPCServer) KVSet(args *Z_KVSetArgs, returns *Z_KVSetReturns) error { return nil } +type Z_KVSetWithExpiryArgs struct { + A string + B []byte + C int64 +} + +type Z_KVSetWithExpiryReturns struct { + A *model.AppError +} + +func (g *apiRPCClient) KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError { + _args := &Z_KVSetWithExpiryArgs{key, value, expireInSeconds} + _returns := &Z_KVSetWithExpiryReturns{} + if err := g.client.Call("Plugin.KVSetWithExpiry", _args, _returns); err != nil { + log.Printf("RPC call to KVSetWithExpiry API failed: %s", err.Error()) + } + return _returns.A +} + +func (s *apiRPCServer) KVSetWithExpiry(args *Z_KVSetWithExpiryArgs, returns *Z_KVSetWithExpiryReturns) error { + if hook, ok := s.impl.(interface { + KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError + }); ok { + returns.A = hook.KVSetWithExpiry(args.A, args.B, args.C) + } else { + return encodableError(fmt.Errorf("API KVSetWithExpiry called but not implemented.")) + } + return nil +} + type Z_KVGetArgs struct { A string } @@ -2213,6 +2243,33 @@ func (s *apiRPCServer) KVDelete(args *Z_KVDeleteArgs, returns *Z_KVDeleteReturns return nil } +type Z_KVDeleteAllArgs struct { +} + +type Z_KVDeleteAllReturns struct { + A *model.AppError +} + +func (g *apiRPCClient) KVDeleteAll() *model.AppError { + _args := &Z_KVDeleteAllArgs{} + _returns := &Z_KVDeleteAllReturns{} + if err := g.client.Call("Plugin.KVDeleteAll", _args, _returns); err != nil { + log.Printf("RPC call to KVDeleteAll API failed: %s", err.Error()) + } + return _returns.A +} + +func (s *apiRPCServer) KVDeleteAll(args *Z_KVDeleteAllArgs, returns *Z_KVDeleteAllReturns) error { + if hook, ok := s.impl.(interface { + KVDeleteAll() *model.AppError + }); ok { + returns.A = hook.KVDeleteAll() + } else { + return encodableError(fmt.Errorf("API KVDeleteAll called but not implemented.")) + } + return nil +} + type Z_KVListArgs struct { A int B int diff --git a/plugin/plugintest/api.go b/plugin/plugintest/api.go index 8240a81c9..c11446bda 100644 --- a/plugin/plugintest/api.go +++ b/plugin/plugintest/api.go @@ -1021,6 +1021,22 @@ func (_m *API) KVDelete(key string) *model.AppError { return r0 } +// KVDeleteAll provides a mock function with given fields: +func (_m *API) KVDeleteAll() *model.AppError { + ret := _m.Called() + + var r0 *model.AppError + if rf, ok := ret.Get(0).(func() *model.AppError); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.AppError) + } + } + + return r0 +} + // KVGet provides a mock function with given fields: key func (_m *API) KVGet(key string) ([]byte, *model.AppError) { ret := _m.Called(key) @@ -1087,6 +1103,22 @@ func (_m *API) KVSet(key string, value []byte) *model.AppError { return r0 } +// KVSetWithExpiry provides a mock function with given fields: key, value, expireInSeconds +func (_m *API) KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError { + ret := _m.Called(key, value, expireInSeconds) + + var r0 *model.AppError + if rf, ok := ret.Get(0).(func(string, []byte, int64) *model.AppError); ok { + r0 = rf(key, value, expireInSeconds) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.AppError) + } + } + + return r0 +} + // LoadPluginConfiguration provides a mock function with given fields: dest func (_m *API) LoadPluginConfiguration(dest interface{}) error { ret := _m.Called(dest) diff --git a/plugin/scheduler/plugin.go b/plugin/scheduler/plugin.go new file mode 100644 index 000000000..3133cb4b1 --- /dev/null +++ b/plugin/scheduler/plugin.go @@ -0,0 +1,19 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package scheduler + +import ( + "github.com/mattermost/mattermost-server/app" + tjobs "github.com/mattermost/mattermost-server/jobs/interfaces" +) + +type PluginsJobInterfaceImpl struct { + App *app.App +} + +func init() { + app.RegisterJobsMigrationsJobInterface(func(a *app.App) tjobs.MigrationsJobInterface { + return &PluginsJobInterfaceImpl{a} + }) +} diff --git a/plugin/scheduler/scheduler.go b/plugin/scheduler/scheduler.go new file mode 100644 index 000000000..7214d6cfd --- /dev/null +++ b/plugin/scheduler/scheduler.go @@ -0,0 +1,47 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package scheduler + +import ( + "time" + + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" +) + +type Scheduler struct { + App *app.App +} + +func (m *PluginsJobInterfaceImpl) MakeScheduler() model.Scheduler { + return &Scheduler{m.App} +} + +func (scheduler *Scheduler) Name() string { + return "PluginsScheduler" +} + +func (scheduler *Scheduler) JobType() string { + return model.JOB_TYPE_PLUGINS +} + +func (scheduler *Scheduler) Enabled(cfg *model.Config) bool { + return true +} + +func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time { + nextTime := time.Now().Add(60 * time.Second) + return &nextTime +} + +func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) { + mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name())) + + if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_PLUGINS, nil); err != nil { + return nil, err + } else { + return job, nil + } +} diff --git a/plugin/scheduler/worker.go b/plugin/scheduler/worker.go new file mode 100644 index 000000000..252e100fa --- /dev/null +++ b/plugin/scheduler/worker.go @@ -0,0 +1,99 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package scheduler + +import ( + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/jobs" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" +) + +type Worker struct { + name string + stop chan bool + stopped chan bool + jobs chan model.Job + jobServer *jobs.JobServer + app *app.App +} + +func (m *PluginsJobInterfaceImpl) MakeWorker() model.Worker { + worker := Worker{ + name: "Plugins", + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + jobServer: m.App.Jobs, + app: m.App, + } + + return &worker +} + +func (worker *Worker) Run() { + mlog.Debug("Worker started", mlog.String("worker", worker.name)) + + defer func() { + mlog.Debug("Worker finished", mlog.String("worker", worker.name)) + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + mlog.Debug("Worker received stop signal", mlog.String("worker", worker.name)) + return + case job := <-worker.jobs: + mlog.Debug("Worker received a new candidate job.", mlog.String("worker", worker.name)) + worker.DoJob(&job) + } + } +} + +func (worker *Worker) Stop() { + mlog.Debug("Worker stopping", mlog.String("worker", worker.name)) + worker.stop <- true + <-worker.stopped +} + +func (worker *Worker) JobChannel() chan<- model.Job { + return worker.jobs +} + +func (worker *Worker) DoJob(job *model.Job) { + if claimed, err := worker.jobServer.ClaimJob(job); err != nil { + mlog.Info("Worker experienced an error while trying to claim job", + mlog.String("worker", worker.name), + mlog.String("job_id", job.Id), + mlog.String("error", err.Error())) + return + } else if !claimed { + return + } + + err := worker.app.DeleteAllExpiredPluginKeys() + if err == nil { + mlog.Info("Worker: Job is complete", mlog.String("worker", worker.name), mlog.String("job_id", job.Id)) + worker.setJobSuccess(job) + return + } else { + mlog.Error("Worker: Failed to delete expired keys", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + return + } +} + +func (worker *Worker) setJobSuccess(job *model.Job) { + if err := worker.app.Jobs.SetJobSuccess(job); err != nil { + mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + } +} + +func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) { + if err := worker.app.Jobs.SetJobError(job, appError); err != nil { + mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } +} -- cgit v1.2.3-1-g7c22