From c36e85c9126b921cf00e578ac70c1f1ee0153abd Mon Sep 17 00:00:00 2001 From: Daniel Schalla Date: Wed, 10 Oct 2018 19:55:12 +0200 Subject: DeleteAll for KV (#9431) Expire K/V Values Regenerate Code pathfix Update Expiry on Update Check for Exit Signal gofmt Rewrote Go Routine Remove tempoarily cleanup loop fix expiretime TEST: Expired Watchdog as GoRoutine Check if Srv is nil Use Scheduler/Worker for Expired Key CleanUp add license fix scheduler job type; DoJob Restructuring Remove unused imports and constants move db migration from 5.4 to 5.5 --- app/plugin.go | 1 + app/plugin_api.go | 8 +++ app/plugin_key_value_store.go | 35 +++++++++++ jobs/interfaces/plugins_interface.go | 11 ++++ jobs/schedulers.go | 4 ++ jobs/server.go | 1 + jobs/workers.go | 5 ++ model/job.go | 1 + model/plugin_key_value.go | 1 + plugin/api.go | 6 ++ plugin/client_rpc_generated.go | 57 +++++++++++++++++ plugin/plugintest/api.go | 32 ++++++++++ plugin/scheduler/plugin.go | 19 ++++++ plugin/scheduler/scheduler.go | 47 ++++++++++++++ plugin/scheduler/worker.go | 99 ++++++++++++++++++++++++++++++ store/sqlstore/plugin_store.go | 27 +++++++- store/sqlstore/upgrade.go | 2 +- store/store.go | 2 + store/storetest/mocks/PluginStore.go | 32 ++++++++++ store/storetest/plugin_store.go | 115 +++++++++++++++++++++++++++++++++++ 20 files changed, 501 insertions(+), 4 deletions(-) create mode 100644 jobs/interfaces/plugins_interface.go create mode 100644 plugin/scheduler/plugin.go create mode 100644 plugin/scheduler/scheduler.go create mode 100644 plugin/scheduler/worker.go diff --git a/app/plugin.go b/app/plugin.go index 51e67e2bf..2c18c6cec 100644 --- a/app/plugin.go +++ b/app/plugin.go @@ -146,6 +146,7 @@ func (a *App) InitPlugins(pluginDir, webappPluginDir string) { }) a.SyncPluginsActiveState() + } func (a *App) ShutDownPlugins() { diff --git a/app/plugin_api.go b/app/plugin_api.go index 8def80180..70b3d60fe 100644 --- a/app/plugin_api.go +++ b/app/plugin_api.go @@ -331,6 +331,10 @@ func (api *PluginAPI) KVSet(key string, value []byte) *model.AppError { return api.app.SetPluginKey(api.id, key, value) } +func (api *PluginAPI) KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError { + return api.app.SetPluginKeyWithExpiry(api.id, key, value, expireInSeconds) +} + func (api *PluginAPI) KVGet(key string) ([]byte, *model.AppError) { return api.app.GetPluginKey(api.id, key) } @@ -339,6 +343,10 @@ func (api *PluginAPI) KVDelete(key string) *model.AppError { return api.app.DeletePluginKey(api.id, key) } +func (api *PluginAPI) KVDeleteAll() *model.AppError { + return api.app.DeleteAllKeysForPlugin(api.id) +} + func (api *PluginAPI) KVList(page, perPage int) ([]string, *model.AppError) { return api.app.ListPluginKeys(api.id, page, perPage) } diff --git a/app/plugin_key_value_store.go b/app/plugin_key_value_store.go index 8c3e1f18b..f0aed31f0 100644 --- a/app/plugin_key_value_store.go +++ b/app/plugin_key_value_store.go @@ -19,10 +19,20 @@ func getKeyHash(key string) string { } func (a *App) SetPluginKey(pluginId string, key string, value []byte) *model.AppError { + return a.SetPluginKeyWithExpiry(pluginId, key, value, 0) +} + +func (a *App) SetPluginKeyWithExpiry(pluginId string, key string, value []byte, expireInSeconds int64) *model.AppError { + + if expireInSeconds > 0 { + expireInSeconds = model.GetMillis() + (expireInSeconds * 1000) + } + kv := &model.PluginKeyValue{ PluginId: pluginId, Key: getKeyHash(key), Value: value, + ExpireAt: expireInSeconds, } result := <-a.Srv.Store.Plugin().SaveOrUpdate(kv) @@ -60,6 +70,31 @@ func (a *App) DeletePluginKey(pluginId string, key string) *model.AppError { return result.Err } +func (a *App) DeleteAllKeysForPlugin(pluginId string) *model.AppError { + result := <-a.Srv.Store.Plugin().DeleteAllForPlugin(pluginId) + + if result.Err != nil { + mlog.Error(result.Err.Error()) + } + + return result.Err +} + +func (a *App) DeleteAllExpiredPluginKeys() *model.AppError { + + if a.Srv == nil { + return nil + } + + result := <-a.Srv.Store.Plugin().DeleteAllExpired() + + if result.Err != nil { + mlog.Error(result.Err.Error()) + } + + return result.Err +} + func (a *App) ListPluginKeys(pluginId string, page, perPage int) ([]string, *model.AppError) { result := <-a.Srv.Store.Plugin().List(pluginId, page, perPage) diff --git a/jobs/interfaces/plugins_interface.go b/jobs/interfaces/plugins_interface.go new file mode 100644 index 000000000..4d67129af --- /dev/null +++ b/jobs/interfaces/plugins_interface.go @@ -0,0 +1,11 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package interfaces + +import "github.com/mattermost/mattermost-server/model" + +type PluginsJobInterface interface { + MakeWorker() model.Worker + MakeScheduler() model.Scheduler +} diff --git a/jobs/schedulers.go b/jobs/schedulers.go index 37a64bc22..b0cb92fc5 100644 --- a/jobs/schedulers.go +++ b/jobs/schedulers.go @@ -56,6 +56,10 @@ func (srv *JobServer) InitSchedulers() *Schedulers { schedulers.schedulers = append(schedulers.schedulers, migrationsInterface.MakeScheduler()) } + if pluginsInterface := srv.Plugins; pluginsInterface != nil { + schedulers.schedulers = append(schedulers.schedulers, pluginsInterface.MakeScheduler()) + } + schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers)) return schedulers } diff --git a/jobs/server.go b/jobs/server.go index 80c48a165..06897e43a 100644 --- a/jobs/server.go +++ b/jobs/server.go @@ -23,6 +23,7 @@ type JobServer struct { ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface LdapSync ejobs.LdapSyncInterface Migrations tjobs.MigrationsJobInterface + Plugins tjobs.PluginsJobInterface } func NewJobServer(configService configservice.ConfigService, store store.Store) *JobServer { diff --git a/jobs/workers.go b/jobs/workers.go index ad457ed2a..f06809945 100644 --- a/jobs/workers.go +++ b/jobs/workers.go @@ -22,6 +22,7 @@ type Workers struct { ElasticsearchAggregation model.Worker LdapSync model.Worker Migrations model.Worker + Plugins model.Worker listenerId string } @@ -56,6 +57,10 @@ func (srv *JobServer) InitWorkers() *Workers { workers.Migrations = migrationsInterface.MakeWorker() } + if pluginsInterface := srv.Plugins; pluginsInterface != nil { + workers.Migrations = pluginsInterface.MakeWorker() + } + return workers } diff --git a/model/job.go b/model/job.go index c16614958..76ef65b6f 100644 --- a/model/job.go +++ b/model/job.go @@ -17,6 +17,7 @@ const ( JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION = "elasticsearch_post_aggregation" JOB_TYPE_LDAP_SYNC = "ldap_sync" JOB_TYPE_MIGRATIONS = "migrations" + JOB_TYPE_PLUGINS = "plugins" JOB_STATUS_PENDING = "pending" JOB_STATUS_IN_PROGRESS = "in_progress" diff --git a/model/plugin_key_value.go b/model/plugin_key_value.go index b7a7731c4..7e156d93d 100644 --- a/model/plugin_key_value.go +++ b/model/plugin_key_value.go @@ -17,6 +17,7 @@ type PluginKeyValue struct { PluginId string `json:"plugin_id"` Key string `json:"key" db:"PKey"` Value []byte `json:"value" db:"PValue"` + ExpireAt int64 `json:"expire_at"` } func (kv *PluginKeyValue) IsValid() *AppError { diff --git a/plugin/api.go b/plugin/api.go index 308067228..5a5727659 100644 --- a/plugin/api.go +++ b/plugin/api.go @@ -196,12 +196,18 @@ type API interface { // KVSet will store a key-value pair, unique per plugin. KVSet(key string, value []byte) *model.AppError + // KVSet will store a key-value pair, unique per plugin with an expiry time + KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError + // KVGet will retrieve a value based on the key. Returns nil for non-existent keys. KVGet(key string) ([]byte, *model.AppError) // KVDelete will remove a key-value pair. Returns nil for non-existent keys. KVDelete(key string) *model.AppError + // KVDeleteAll will remove all key-value pairs for a plugin. + KVDeleteAll() *model.AppError + // KVList will list all keys for a plugin. KVList(page, perPage int) ([]string, *model.AppError) diff --git a/plugin/client_rpc_generated.go b/plugin/client_rpc_generated.go index 1dc3fe143..da41d9418 100644 --- a/plugin/client_rpc_generated.go +++ b/plugin/client_rpc_generated.go @@ -2156,6 +2156,36 @@ func (s *apiRPCServer) KVSet(args *Z_KVSetArgs, returns *Z_KVSetReturns) error { return nil } +type Z_KVSetWithExpiryArgs struct { + A string + B []byte + C int64 +} + +type Z_KVSetWithExpiryReturns struct { + A *model.AppError +} + +func (g *apiRPCClient) KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError { + _args := &Z_KVSetWithExpiryArgs{key, value, expireInSeconds} + _returns := &Z_KVSetWithExpiryReturns{} + if err := g.client.Call("Plugin.KVSetWithExpiry", _args, _returns); err != nil { + log.Printf("RPC call to KVSetWithExpiry API failed: %s", err.Error()) + } + return _returns.A +} + +func (s *apiRPCServer) KVSetWithExpiry(args *Z_KVSetWithExpiryArgs, returns *Z_KVSetWithExpiryReturns) error { + if hook, ok := s.impl.(interface { + KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError + }); ok { + returns.A = hook.KVSetWithExpiry(args.A, args.B, args.C) + } else { + return encodableError(fmt.Errorf("API KVSetWithExpiry called but not implemented.")) + } + return nil +} + type Z_KVGetArgs struct { A string } @@ -2213,6 +2243,33 @@ func (s *apiRPCServer) KVDelete(args *Z_KVDeleteArgs, returns *Z_KVDeleteReturns return nil } +type Z_KVDeleteAllArgs struct { +} + +type Z_KVDeleteAllReturns struct { + A *model.AppError +} + +func (g *apiRPCClient) KVDeleteAll() *model.AppError { + _args := &Z_KVDeleteAllArgs{} + _returns := &Z_KVDeleteAllReturns{} + if err := g.client.Call("Plugin.KVDeleteAll", _args, _returns); err != nil { + log.Printf("RPC call to KVDeleteAll API failed: %s", err.Error()) + } + return _returns.A +} + +func (s *apiRPCServer) KVDeleteAll(args *Z_KVDeleteAllArgs, returns *Z_KVDeleteAllReturns) error { + if hook, ok := s.impl.(interface { + KVDeleteAll() *model.AppError + }); ok { + returns.A = hook.KVDeleteAll() + } else { + return encodableError(fmt.Errorf("API KVDeleteAll called but not implemented.")) + } + return nil +} + type Z_KVListArgs struct { A int B int diff --git a/plugin/plugintest/api.go b/plugin/plugintest/api.go index 8240a81c9..c11446bda 100644 --- a/plugin/plugintest/api.go +++ b/plugin/plugintest/api.go @@ -1021,6 +1021,22 @@ func (_m *API) KVDelete(key string) *model.AppError { return r0 } +// KVDeleteAll provides a mock function with given fields: +func (_m *API) KVDeleteAll() *model.AppError { + ret := _m.Called() + + var r0 *model.AppError + if rf, ok := ret.Get(0).(func() *model.AppError); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.AppError) + } + } + + return r0 +} + // KVGet provides a mock function with given fields: key func (_m *API) KVGet(key string) ([]byte, *model.AppError) { ret := _m.Called(key) @@ -1087,6 +1103,22 @@ func (_m *API) KVSet(key string, value []byte) *model.AppError { return r0 } +// KVSetWithExpiry provides a mock function with given fields: key, value, expireInSeconds +func (_m *API) KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError { + ret := _m.Called(key, value, expireInSeconds) + + var r0 *model.AppError + if rf, ok := ret.Get(0).(func(string, []byte, int64) *model.AppError); ok { + r0 = rf(key, value, expireInSeconds) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.AppError) + } + } + + return r0 +} + // LoadPluginConfiguration provides a mock function with given fields: dest func (_m *API) LoadPluginConfiguration(dest interface{}) error { ret := _m.Called(dest) diff --git a/plugin/scheduler/plugin.go b/plugin/scheduler/plugin.go new file mode 100644 index 000000000..3133cb4b1 --- /dev/null +++ b/plugin/scheduler/plugin.go @@ -0,0 +1,19 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package scheduler + +import ( + "github.com/mattermost/mattermost-server/app" + tjobs "github.com/mattermost/mattermost-server/jobs/interfaces" +) + +type PluginsJobInterfaceImpl struct { + App *app.App +} + +func init() { + app.RegisterJobsMigrationsJobInterface(func(a *app.App) tjobs.MigrationsJobInterface { + return &PluginsJobInterfaceImpl{a} + }) +} diff --git a/plugin/scheduler/scheduler.go b/plugin/scheduler/scheduler.go new file mode 100644 index 000000000..7214d6cfd --- /dev/null +++ b/plugin/scheduler/scheduler.go @@ -0,0 +1,47 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package scheduler + +import ( + "time" + + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" +) + +type Scheduler struct { + App *app.App +} + +func (m *PluginsJobInterfaceImpl) MakeScheduler() model.Scheduler { + return &Scheduler{m.App} +} + +func (scheduler *Scheduler) Name() string { + return "PluginsScheduler" +} + +func (scheduler *Scheduler) JobType() string { + return model.JOB_TYPE_PLUGINS +} + +func (scheduler *Scheduler) Enabled(cfg *model.Config) bool { + return true +} + +func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time { + nextTime := time.Now().Add(60 * time.Second) + return &nextTime +} + +func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) { + mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name())) + + if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_PLUGINS, nil); err != nil { + return nil, err + } else { + return job, nil + } +} diff --git a/plugin/scheduler/worker.go b/plugin/scheduler/worker.go new file mode 100644 index 000000000..252e100fa --- /dev/null +++ b/plugin/scheduler/worker.go @@ -0,0 +1,99 @@ +// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package scheduler + +import ( + "github.com/mattermost/mattermost-server/app" + "github.com/mattermost/mattermost-server/jobs" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" +) + +type Worker struct { + name string + stop chan bool + stopped chan bool + jobs chan model.Job + jobServer *jobs.JobServer + app *app.App +} + +func (m *PluginsJobInterfaceImpl) MakeWorker() model.Worker { + worker := Worker{ + name: "Plugins", + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + jobServer: m.App.Jobs, + app: m.App, + } + + return &worker +} + +func (worker *Worker) Run() { + mlog.Debug("Worker started", mlog.String("worker", worker.name)) + + defer func() { + mlog.Debug("Worker finished", mlog.String("worker", worker.name)) + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + mlog.Debug("Worker received stop signal", mlog.String("worker", worker.name)) + return + case job := <-worker.jobs: + mlog.Debug("Worker received a new candidate job.", mlog.String("worker", worker.name)) + worker.DoJob(&job) + } + } +} + +func (worker *Worker) Stop() { + mlog.Debug("Worker stopping", mlog.String("worker", worker.name)) + worker.stop <- true + <-worker.stopped +} + +func (worker *Worker) JobChannel() chan<- model.Job { + return worker.jobs +} + +func (worker *Worker) DoJob(job *model.Job) { + if claimed, err := worker.jobServer.ClaimJob(job); err != nil { + mlog.Info("Worker experienced an error while trying to claim job", + mlog.String("worker", worker.name), + mlog.String("job_id", job.Id), + mlog.String("error", err.Error())) + return + } else if !claimed { + return + } + + err := worker.app.DeleteAllExpiredPluginKeys() + if err == nil { + mlog.Info("Worker: Job is complete", mlog.String("worker", worker.name), mlog.String("job_id", job.Id)) + worker.setJobSuccess(job) + return + } else { + mlog.Error("Worker: Failed to delete expired keys", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + return + } +} + +func (worker *Worker) setJobSuccess(job *model.Job) { + if err := worker.app.Jobs.SetJobSuccess(job); err != nil { + mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + worker.setJobError(job, err) + } +} + +func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) { + if err := worker.app.Jobs.SetJobError(job, appError); err != nil { + mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error())) + } +} diff --git a/store/sqlstore/plugin_store.go b/store/sqlstore/plugin_store.go index e4b79e54a..ae7e65d2d 100644 --- a/store/sqlstore/plugin_store.go +++ b/store/sqlstore/plugin_store.go @@ -61,7 +61,7 @@ func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) store.StoreChann } } } else if ps.DriverName() == model.DATABASE_DRIVER_MYSQL { - if _, err := ps.GetMaster().Exec("INSERT INTO PluginKeyValueStore (PluginId, PKey, PValue) VALUES(:PluginId, :Key, :Value) ON DUPLICATE KEY UPDATE PValue = :Value", map[string]interface{}{"PluginId": kv.PluginId, "Key": kv.Key, "Value": kv.Value}); err != nil { + if _, err := ps.GetMaster().Exec("INSERT INTO PluginKeyValueStore (PluginId, PKey, PValue, ExpireAt) VALUES(:PluginId, :Key, :Value, :ExpireAt) ON DUPLICATE KEY UPDATE PValue = :Value, ExpireAt = :ExpireAt", map[string]interface{}{"PluginId": kv.PluginId, "Key": kv.Key, "Value": kv.Value, "ExpireAt": kv.ExpireAt}); err != nil { result.Err = model.NewAppError("SqlPluginStore.SaveOrUpdate", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError) return } @@ -74,8 +74,8 @@ func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) store.StoreChann func (ps SqlPluginStore) Get(pluginId, key string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { var kv *model.PluginKeyValue - - if err := ps.GetReplica().SelectOne(&kv, "SELECT * FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key", map[string]interface{}{"PluginId": pluginId, "Key": key}); err != nil { + currentTime := model.GetMillis() + if err := ps.GetReplica().SelectOne(&kv, "SELECT * FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key AND (ExpireAt = 0 OR ExpireAt > :CurrentTime)", map[string]interface{}{"PluginId": pluginId, "Key": key, "CurrentTime": currentTime}); err != nil { if err == sql.ErrNoRows { result.Err = model.NewAppError("SqlPluginStore.Get", "store.sql_plugin_store.get.app_error", nil, fmt.Sprintf("plugin_id=%v, key=%v, err=%v", pluginId, key, err.Error()), http.StatusNotFound) } else { @@ -97,6 +97,27 @@ func (ps SqlPluginStore) Delete(pluginId, key string) store.StoreChannel { }) } +func (ps SqlPluginStore) DeleteAllForPlugin(pluginId string) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + if _, err := ps.GetMaster().Exec("DELETE FROM PluginKeyValueStore WHERE PluginId = :PluginId", map[string]interface{}{"PluginId": pluginId}); err != nil { + result.Err = model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("plugin_id=%v, err=%v", pluginId, err.Error()), http.StatusInternalServerError) + } else { + result.Data = true + } + }) +} + +func (ps SqlPluginStore) DeleteAllExpired() store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + currentTime := model.GetMillis() + if _, err := ps.GetMaster().Exec("DELETE FROM PluginKeyValueStore WHERE ExpireAt != 0 AND ExpireAt < :CurrentTime", map[string]interface{}{"CurrentTime": currentTime}); err != nil { + result.Err = model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("current_time=%v, err=%v", currentTime, err.Error()), http.StatusInternalServerError) + } else { + result.Data = true + } + }) +} + func (ps SqlPluginStore) List(pluginId string, offset int, limit int) store.StoreChannel { if limit <= 0 { limit = DEFAULT_PLUGIN_KEY_FETCH_LIMIT diff --git a/store/sqlstore/upgrade.go b/store/sqlstore/upgrade.go index 42d34f525..9898d5b32 100644 --- a/store/sqlstore/upgrade.go +++ b/store/sqlstore/upgrade.go @@ -510,7 +510,7 @@ func UpgradeDatabaseToVersion54(sqlStore SqlStore) { func UpgradeDatabaseToVersion55(sqlStore SqlStore) { // TODO: Uncomment following condition when version 5.5.0 is released // if shouldPerformUpgrade(sqlStore, VERSION_5_4_0, VERSION_5_5_0) { - + sqlStore.CreateColumnIfNotExists("PluginKeyValueStore", "ExpireAt", "bigint(20)", "bigint", "0") // saveSchemaVersion(sqlStore, VERSION_5_5_0) // } } diff --git a/store/store.go b/store/store.go index 29028130e..e21917345 100644 --- a/store/store.go +++ b/store/store.go @@ -502,6 +502,8 @@ type PluginStore interface { SaveOrUpdate(keyVal *model.PluginKeyValue) StoreChannel Get(pluginId, key string) StoreChannel Delete(pluginId, key string) StoreChannel + DeleteAllForPlugin(PluginId string) StoreChannel + DeleteAllExpired() StoreChannel List(pluginId string, page, perPage int) StoreChannel } diff --git a/store/storetest/mocks/PluginStore.go b/store/storetest/mocks/PluginStore.go index 9c4a40032..b85e17fdf 100644 --- a/store/storetest/mocks/PluginStore.go +++ b/store/storetest/mocks/PluginStore.go @@ -29,6 +29,38 @@ func (_m *PluginStore) Delete(pluginId string, key string) store.StoreChannel { return r0 } +// DeleteAllExpired provides a mock function with given fields: +func (_m *PluginStore) DeleteAllExpired() store.StoreChannel { + ret := _m.Called() + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func() store.StoreChannel); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + +// DeleteAllForPlugin provides a mock function with given fields: PluginId +func (_m *PluginStore) DeleteAllForPlugin(PluginId string) store.StoreChannel { + ret := _m.Called(PluginId) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok { + r0 = rf(PluginId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + // Get provides a mock function with given fields: pluginId, key func (_m *PluginStore) Get(pluginId string, key string) store.StoreChannel { ret := _m.Called(pluginId, key) diff --git a/store/storetest/plugin_store.go b/store/storetest/plugin_store.go index 3d7d0ec05..0e14c7539 100644 --- a/store/storetest/plugin_store.go +++ b/store/storetest/plugin_store.go @@ -13,7 +13,10 @@ import ( func TestPluginStore(t *testing.T, ss store.Store) { t.Run("PluginSaveGet", func(t *testing.T) { testPluginSaveGet(t, ss) }) + t.Run("PluginSaveGetExpiry", func(t *testing.T) { testPluginSaveGetExpiry(t, ss) }) t.Run("PluginDelete", func(t *testing.T) { testPluginDelete(t, ss) }) + t.Run("PluginDeleteAll", func(t *testing.T) { testPluginDeleteAll(t, ss) }) + t.Run("PluginDeleteExpired", func(t *testing.T) { testPluginDeleteExpired(t, ss) }) } func testPluginSaveGet(t *testing.T, ss store.Store) { @@ -21,6 +24,7 @@ func testPluginSaveGet(t *testing.T, ss store.Store) { PluginId: model.NewId(), Key: model.NewId(), Value: []byte(model.NewId()), + ExpireAt: 0, } if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil { @@ -38,6 +42,7 @@ func testPluginSaveGet(t *testing.T, ss store.Store) { assert.Equal(t, kv.PluginId, received.PluginId) assert.Equal(t, kv.Key, received.Key) assert.Equal(t, kv.Value, received.Value) + assert.Equal(t, kv.ExpireAt, received.ExpireAt) } // Try inserting when already exists @@ -56,6 +61,52 @@ func testPluginSaveGet(t *testing.T, ss store.Store) { } } +func testPluginSaveGetExpiry(t *testing.T, ss store.Store) { + kv := &model.PluginKeyValue{ + PluginId: model.NewId(), + Key: model.NewId(), + Value: []byte(model.NewId()), + ExpireAt: model.GetMillis() + 30000, + } + + if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil { + t.Fatal(result.Err) + } + + defer func() { + <-ss.Plugin().Delete(kv.PluginId, kv.Key) + }() + + if result := <-ss.Plugin().Get(kv.PluginId, kv.Key); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.PluginKeyValue) + assert.Equal(t, kv.PluginId, received.PluginId) + assert.Equal(t, kv.Key, received.Key) + assert.Equal(t, kv.Value, received.Value) + assert.Equal(t, kv.ExpireAt, received.ExpireAt) + } + + kv = &model.PluginKeyValue{ + PluginId: model.NewId(), + Key: model.NewId(), + Value: []byte(model.NewId()), + ExpireAt: model.GetMillis() - 5000, + } + + if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil { + t.Fatal(result.Err) + } + + defer func() { + <-ss.Plugin().Delete(kv.PluginId, kv.Key) + }() + + if result := <-ss.Plugin().Get(kv.PluginId, kv.Key); result.Err == nil { + t.Fatal("result.Err should not be nil") + } +} + func testPluginDelete(t *testing.T, ss store.Store) { kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ PluginId: model.NewId(), @@ -67,3 +118,67 @@ func testPluginDelete(t *testing.T, ss store.Store) { t.Fatal(result.Err) } } + +func testPluginDeleteAll(t *testing.T, ss store.Store) { + pluginId := model.NewId() + + kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ + PluginId: pluginId, + Key: model.NewId(), + Value: []byte(model.NewId()), + })).(*model.PluginKeyValue) + + kv2 := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ + PluginId: pluginId, + Key: model.NewId(), + Value: []byte(model.NewId()), + })).(*model.PluginKeyValue) + + if result := <-ss.Plugin().DeleteAllForPlugin(pluginId); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-ss.Plugin().Get(pluginId, kv.Key); result.Err == nil { + t.Fatal("result.Err should not be nil") + } + + if result := <-ss.Plugin().Get(pluginId, kv2.Key); result.Err == nil { + t.Fatal("result.Err should not be nil") + } +} + +func testPluginDeleteExpired(t *testing.T, ss store.Store) { + pluginId := model.NewId() + + kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ + PluginId: pluginId, + Key: model.NewId(), + Value: []byte(model.NewId()), + ExpireAt: model.GetMillis() - 6000, + })).(*model.PluginKeyValue) + + kv2 := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ + PluginId: pluginId, + Key: model.NewId(), + Value: []byte(model.NewId()), + ExpireAt: 0, + })).(*model.PluginKeyValue) + + if result := <-ss.Plugin().DeleteAllExpired(); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-ss.Plugin().Get(pluginId, kv.Key); result.Err == nil { + t.Fatal("result.Err should not be nil") + } + + if result := <-ss.Plugin().Get(kv2.PluginId, kv2.Key); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.PluginKeyValue) + assert.Equal(t, kv2.PluginId, received.PluginId) + assert.Equal(t, kv2.Key, received.Key) + assert.Equal(t, kv2.Value, received.Value) + assert.Equal(t, kv2.ExpireAt, received.ExpireAt) + } +} -- cgit v1.2.3-1-g7c22