summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniel Schalla <daniel@schalla.me>2018-10-10 19:55:12 +0200
committerChristopher Speller <crspeller@gmail.com>2018-10-10 10:55:12 -0700
commitc36e85c9126b921cf00e578ac70c1f1ee0153abd (patch)
tree86bfea62ec6a1ce0edc548db4a87851c41e30b88
parentbd04d7f75698c7b68434199208dc469021b823c2 (diff)
downloadchat-c36e85c9126b921cf00e578ac70c1f1ee0153abd.tar.gz
chat-c36e85c9126b921cf00e578ac70c1f1ee0153abd.tar.bz2
chat-c36e85c9126b921cf00e578ac70c1f1ee0153abd.zip
DeleteAll for KV (#9431)
Expire K/V Values Regenerate Code pathfix Update Expiry on Update Check for Exit Signal gofmt Rewrote Go Routine Remove tempoarily cleanup loop fix expiretime TEST: Expired Watchdog as GoRoutine Check if Srv is nil Use Scheduler/Worker for Expired Key CleanUp add license fix scheduler job type; DoJob Restructuring Remove unused imports and constants move db migration from 5.4 to 5.5
-rw-r--r--app/plugin.go1
-rw-r--r--app/plugin_api.go8
-rw-r--r--app/plugin_key_value_store.go35
-rw-r--r--jobs/interfaces/plugins_interface.go11
-rw-r--r--jobs/schedulers.go4
-rw-r--r--jobs/server.go1
-rw-r--r--jobs/workers.go5
-rw-r--r--model/job.go1
-rw-r--r--model/plugin_key_value.go1
-rw-r--r--plugin/api.go6
-rw-r--r--plugin/client_rpc_generated.go57
-rw-r--r--plugin/plugintest/api.go32
-rw-r--r--plugin/scheduler/plugin.go19
-rw-r--r--plugin/scheduler/scheduler.go47
-rw-r--r--plugin/scheduler/worker.go99
-rw-r--r--store/sqlstore/plugin_store.go27
-rw-r--r--store/sqlstore/upgrade.go2
-rw-r--r--store/store.go2
-rw-r--r--store/storetest/mocks/PluginStore.go32
-rw-r--r--store/storetest/plugin_store.go115
20 files changed, 501 insertions, 4 deletions
diff --git a/app/plugin.go b/app/plugin.go
index 51e67e2bf..2c18c6cec 100644
--- a/app/plugin.go
+++ b/app/plugin.go
@@ -146,6 +146,7 @@ func (a *App) InitPlugins(pluginDir, webappPluginDir string) {
})
a.SyncPluginsActiveState()
+
}
func (a *App) ShutDownPlugins() {
diff --git a/app/plugin_api.go b/app/plugin_api.go
index 8def80180..70b3d60fe 100644
--- a/app/plugin_api.go
+++ b/app/plugin_api.go
@@ -331,6 +331,10 @@ func (api *PluginAPI) KVSet(key string, value []byte) *model.AppError {
return api.app.SetPluginKey(api.id, key, value)
}
+func (api *PluginAPI) KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError {
+ return api.app.SetPluginKeyWithExpiry(api.id, key, value, expireInSeconds)
+}
+
func (api *PluginAPI) KVGet(key string) ([]byte, *model.AppError) {
return api.app.GetPluginKey(api.id, key)
}
@@ -339,6 +343,10 @@ func (api *PluginAPI) KVDelete(key string) *model.AppError {
return api.app.DeletePluginKey(api.id, key)
}
+func (api *PluginAPI) KVDeleteAll() *model.AppError {
+ return api.app.DeleteAllKeysForPlugin(api.id)
+}
+
func (api *PluginAPI) KVList(page, perPage int) ([]string, *model.AppError) {
return api.app.ListPluginKeys(api.id, page, perPage)
}
diff --git a/app/plugin_key_value_store.go b/app/plugin_key_value_store.go
index 8c3e1f18b..f0aed31f0 100644
--- a/app/plugin_key_value_store.go
+++ b/app/plugin_key_value_store.go
@@ -19,10 +19,20 @@ func getKeyHash(key string) string {
}
func (a *App) SetPluginKey(pluginId string, key string, value []byte) *model.AppError {
+ return a.SetPluginKeyWithExpiry(pluginId, key, value, 0)
+}
+
+func (a *App) SetPluginKeyWithExpiry(pluginId string, key string, value []byte, expireInSeconds int64) *model.AppError {
+
+ if expireInSeconds > 0 {
+ expireInSeconds = model.GetMillis() + (expireInSeconds * 1000)
+ }
+
kv := &model.PluginKeyValue{
PluginId: pluginId,
Key: getKeyHash(key),
Value: value,
+ ExpireAt: expireInSeconds,
}
result := <-a.Srv.Store.Plugin().SaveOrUpdate(kv)
@@ -60,6 +70,31 @@ func (a *App) DeletePluginKey(pluginId string, key string) *model.AppError {
return result.Err
}
+func (a *App) DeleteAllKeysForPlugin(pluginId string) *model.AppError {
+ result := <-a.Srv.Store.Plugin().DeleteAllForPlugin(pluginId)
+
+ if result.Err != nil {
+ mlog.Error(result.Err.Error())
+ }
+
+ return result.Err
+}
+
+func (a *App) DeleteAllExpiredPluginKeys() *model.AppError {
+
+ if a.Srv == nil {
+ return nil
+ }
+
+ result := <-a.Srv.Store.Plugin().DeleteAllExpired()
+
+ if result.Err != nil {
+ mlog.Error(result.Err.Error())
+ }
+
+ return result.Err
+}
+
func (a *App) ListPluginKeys(pluginId string, page, perPage int) ([]string, *model.AppError) {
result := <-a.Srv.Store.Plugin().List(pluginId, page, perPage)
diff --git a/jobs/interfaces/plugins_interface.go b/jobs/interfaces/plugins_interface.go
new file mode 100644
index 000000000..4d67129af
--- /dev/null
+++ b/jobs/interfaces/plugins_interface.go
@@ -0,0 +1,11 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package interfaces
+
+import "github.com/mattermost/mattermost-server/model"
+
+type PluginsJobInterface interface {
+ MakeWorker() model.Worker
+ MakeScheduler() model.Scheduler
+}
diff --git a/jobs/schedulers.go b/jobs/schedulers.go
index 37a64bc22..b0cb92fc5 100644
--- a/jobs/schedulers.go
+++ b/jobs/schedulers.go
@@ -56,6 +56,10 @@ func (srv *JobServer) InitSchedulers() *Schedulers {
schedulers.schedulers = append(schedulers.schedulers, migrationsInterface.MakeScheduler())
}
+ if pluginsInterface := srv.Plugins; pluginsInterface != nil {
+ schedulers.schedulers = append(schedulers.schedulers, pluginsInterface.MakeScheduler())
+ }
+
schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers))
return schedulers
}
diff --git a/jobs/server.go b/jobs/server.go
index 80c48a165..06897e43a 100644
--- a/jobs/server.go
+++ b/jobs/server.go
@@ -23,6 +23,7 @@ type JobServer struct {
ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface
LdapSync ejobs.LdapSyncInterface
Migrations tjobs.MigrationsJobInterface
+ Plugins tjobs.PluginsJobInterface
}
func NewJobServer(configService configservice.ConfigService, store store.Store) *JobServer {
diff --git a/jobs/workers.go b/jobs/workers.go
index ad457ed2a..f06809945 100644
--- a/jobs/workers.go
+++ b/jobs/workers.go
@@ -22,6 +22,7 @@ type Workers struct {
ElasticsearchAggregation model.Worker
LdapSync model.Worker
Migrations model.Worker
+ Plugins model.Worker
listenerId string
}
@@ -56,6 +57,10 @@ func (srv *JobServer) InitWorkers() *Workers {
workers.Migrations = migrationsInterface.MakeWorker()
}
+ if pluginsInterface := srv.Plugins; pluginsInterface != nil {
+ workers.Migrations = pluginsInterface.MakeWorker()
+ }
+
return workers
}
diff --git a/model/job.go b/model/job.go
index c16614958..76ef65b6f 100644
--- a/model/job.go
+++ b/model/job.go
@@ -17,6 +17,7 @@ const (
JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION = "elasticsearch_post_aggregation"
JOB_TYPE_LDAP_SYNC = "ldap_sync"
JOB_TYPE_MIGRATIONS = "migrations"
+ JOB_TYPE_PLUGINS = "plugins"
JOB_STATUS_PENDING = "pending"
JOB_STATUS_IN_PROGRESS = "in_progress"
diff --git a/model/plugin_key_value.go b/model/plugin_key_value.go
index b7a7731c4..7e156d93d 100644
--- a/model/plugin_key_value.go
+++ b/model/plugin_key_value.go
@@ -17,6 +17,7 @@ type PluginKeyValue struct {
PluginId string `json:"plugin_id"`
Key string `json:"key" db:"PKey"`
Value []byte `json:"value" db:"PValue"`
+ ExpireAt int64 `json:"expire_at"`
}
func (kv *PluginKeyValue) IsValid() *AppError {
diff --git a/plugin/api.go b/plugin/api.go
index 308067228..5a5727659 100644
--- a/plugin/api.go
+++ b/plugin/api.go
@@ -196,12 +196,18 @@ type API interface {
// KVSet will store a key-value pair, unique per plugin.
KVSet(key string, value []byte) *model.AppError
+ // KVSet will store a key-value pair, unique per plugin with an expiry time
+ KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError
+
// KVGet will retrieve a value based on the key. Returns nil for non-existent keys.
KVGet(key string) ([]byte, *model.AppError)
// KVDelete will remove a key-value pair. Returns nil for non-existent keys.
KVDelete(key string) *model.AppError
+ // KVDeleteAll will remove all key-value pairs for a plugin.
+ KVDeleteAll() *model.AppError
+
// KVList will list all keys for a plugin.
KVList(page, perPage int) ([]string, *model.AppError)
diff --git a/plugin/client_rpc_generated.go b/plugin/client_rpc_generated.go
index 1dc3fe143..da41d9418 100644
--- a/plugin/client_rpc_generated.go
+++ b/plugin/client_rpc_generated.go
@@ -2156,6 +2156,36 @@ func (s *apiRPCServer) KVSet(args *Z_KVSetArgs, returns *Z_KVSetReturns) error {
return nil
}
+type Z_KVSetWithExpiryArgs struct {
+ A string
+ B []byte
+ C int64
+}
+
+type Z_KVSetWithExpiryReturns struct {
+ A *model.AppError
+}
+
+func (g *apiRPCClient) KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError {
+ _args := &Z_KVSetWithExpiryArgs{key, value, expireInSeconds}
+ _returns := &Z_KVSetWithExpiryReturns{}
+ if err := g.client.Call("Plugin.KVSetWithExpiry", _args, _returns); err != nil {
+ log.Printf("RPC call to KVSetWithExpiry API failed: %s", err.Error())
+ }
+ return _returns.A
+}
+
+func (s *apiRPCServer) KVSetWithExpiry(args *Z_KVSetWithExpiryArgs, returns *Z_KVSetWithExpiryReturns) error {
+ if hook, ok := s.impl.(interface {
+ KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError
+ }); ok {
+ returns.A = hook.KVSetWithExpiry(args.A, args.B, args.C)
+ } else {
+ return encodableError(fmt.Errorf("API KVSetWithExpiry called but not implemented."))
+ }
+ return nil
+}
+
type Z_KVGetArgs struct {
A string
}
@@ -2213,6 +2243,33 @@ func (s *apiRPCServer) KVDelete(args *Z_KVDeleteArgs, returns *Z_KVDeleteReturns
return nil
}
+type Z_KVDeleteAllArgs struct {
+}
+
+type Z_KVDeleteAllReturns struct {
+ A *model.AppError
+}
+
+func (g *apiRPCClient) KVDeleteAll() *model.AppError {
+ _args := &Z_KVDeleteAllArgs{}
+ _returns := &Z_KVDeleteAllReturns{}
+ if err := g.client.Call("Plugin.KVDeleteAll", _args, _returns); err != nil {
+ log.Printf("RPC call to KVDeleteAll API failed: %s", err.Error())
+ }
+ return _returns.A
+}
+
+func (s *apiRPCServer) KVDeleteAll(args *Z_KVDeleteAllArgs, returns *Z_KVDeleteAllReturns) error {
+ if hook, ok := s.impl.(interface {
+ KVDeleteAll() *model.AppError
+ }); ok {
+ returns.A = hook.KVDeleteAll()
+ } else {
+ return encodableError(fmt.Errorf("API KVDeleteAll called but not implemented."))
+ }
+ return nil
+}
+
type Z_KVListArgs struct {
A int
B int
diff --git a/plugin/plugintest/api.go b/plugin/plugintest/api.go
index 8240a81c9..c11446bda 100644
--- a/plugin/plugintest/api.go
+++ b/plugin/plugintest/api.go
@@ -1021,6 +1021,22 @@ func (_m *API) KVDelete(key string) *model.AppError {
return r0
}
+// KVDeleteAll provides a mock function with given fields:
+func (_m *API) KVDeleteAll() *model.AppError {
+ ret := _m.Called()
+
+ var r0 *model.AppError
+ if rf, ok := ret.Get(0).(func() *model.AppError); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*model.AppError)
+ }
+ }
+
+ return r0
+}
+
// KVGet provides a mock function with given fields: key
func (_m *API) KVGet(key string) ([]byte, *model.AppError) {
ret := _m.Called(key)
@@ -1087,6 +1103,22 @@ func (_m *API) KVSet(key string, value []byte) *model.AppError {
return r0
}
+// KVSetWithExpiry provides a mock function with given fields: key, value, expireInSeconds
+func (_m *API) KVSetWithExpiry(key string, value []byte, expireInSeconds int64) *model.AppError {
+ ret := _m.Called(key, value, expireInSeconds)
+
+ var r0 *model.AppError
+ if rf, ok := ret.Get(0).(func(string, []byte, int64) *model.AppError); ok {
+ r0 = rf(key, value, expireInSeconds)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*model.AppError)
+ }
+ }
+
+ return r0
+}
+
// LoadPluginConfiguration provides a mock function with given fields: dest
func (_m *API) LoadPluginConfiguration(dest interface{}) error {
ret := _m.Called(dest)
diff --git a/plugin/scheduler/plugin.go b/plugin/scheduler/plugin.go
new file mode 100644
index 000000000..3133cb4b1
--- /dev/null
+++ b/plugin/scheduler/plugin.go
@@ -0,0 +1,19 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See LICENSE.txt for license information.
+
+package scheduler
+
+import (
+ "github.com/mattermost/mattermost-server/app"
+ tjobs "github.com/mattermost/mattermost-server/jobs/interfaces"
+)
+
+type PluginsJobInterfaceImpl struct {
+ App *app.App
+}
+
+func init() {
+ app.RegisterJobsMigrationsJobInterface(func(a *app.App) tjobs.MigrationsJobInterface {
+ return &PluginsJobInterfaceImpl{a}
+ })
+}
diff --git a/plugin/scheduler/scheduler.go b/plugin/scheduler/scheduler.go
new file mode 100644
index 000000000..7214d6cfd
--- /dev/null
+++ b/plugin/scheduler/scheduler.go
@@ -0,0 +1,47 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package scheduler
+
+import (
+ "time"
+
+ "github.com/mattermost/mattermost-server/app"
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+)
+
+type Scheduler struct {
+ App *app.App
+}
+
+func (m *PluginsJobInterfaceImpl) MakeScheduler() model.Scheduler {
+ return &Scheduler{m.App}
+}
+
+func (scheduler *Scheduler) Name() string {
+ return "PluginsScheduler"
+}
+
+func (scheduler *Scheduler) JobType() string {
+ return model.JOB_TYPE_PLUGINS
+}
+
+func (scheduler *Scheduler) Enabled(cfg *model.Config) bool {
+ return true
+}
+
+func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time {
+ nextTime := time.Now().Add(60 * time.Second)
+ return &nextTime
+}
+
+func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) {
+ mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name()))
+
+ if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_PLUGINS, nil); err != nil {
+ return nil, err
+ } else {
+ return job, nil
+ }
+}
diff --git a/plugin/scheduler/worker.go b/plugin/scheduler/worker.go
new file mode 100644
index 000000000..252e100fa
--- /dev/null
+++ b/plugin/scheduler/worker.go
@@ -0,0 +1,99 @@
+// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package scheduler
+
+import (
+ "github.com/mattermost/mattermost-server/app"
+ "github.com/mattermost/mattermost-server/jobs"
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+)
+
+type Worker struct {
+ name string
+ stop chan bool
+ stopped chan bool
+ jobs chan model.Job
+ jobServer *jobs.JobServer
+ app *app.App
+}
+
+func (m *PluginsJobInterfaceImpl) MakeWorker() model.Worker {
+ worker := Worker{
+ name: "Plugins",
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ jobs: make(chan model.Job),
+ jobServer: m.App.Jobs,
+ app: m.App,
+ }
+
+ return &worker
+}
+
+func (worker *Worker) Run() {
+ mlog.Debug("Worker started", mlog.String("worker", worker.name))
+
+ defer func() {
+ mlog.Debug("Worker finished", mlog.String("worker", worker.name))
+ worker.stopped <- true
+ }()
+
+ for {
+ select {
+ case <-worker.stop:
+ mlog.Debug("Worker received stop signal", mlog.String("worker", worker.name))
+ return
+ case job := <-worker.jobs:
+ mlog.Debug("Worker received a new candidate job.", mlog.String("worker", worker.name))
+ worker.DoJob(&job)
+ }
+ }
+}
+
+func (worker *Worker) Stop() {
+ mlog.Debug("Worker stopping", mlog.String("worker", worker.name))
+ worker.stop <- true
+ <-worker.stopped
+}
+
+func (worker *Worker) JobChannel() chan<- model.Job {
+ return worker.jobs
+}
+
+func (worker *Worker) DoJob(job *model.Job) {
+ if claimed, err := worker.jobServer.ClaimJob(job); err != nil {
+ mlog.Info("Worker experienced an error while trying to claim job",
+ mlog.String("worker", worker.name),
+ mlog.String("job_id", job.Id),
+ mlog.String("error", err.Error()))
+ return
+ } else if !claimed {
+ return
+ }
+
+ err := worker.app.DeleteAllExpiredPluginKeys()
+ if err == nil {
+ mlog.Info("Worker: Job is complete", mlog.String("worker", worker.name), mlog.String("job_id", job.Id))
+ worker.setJobSuccess(job)
+ return
+ } else {
+ mlog.Error("Worker: Failed to delete expired keys", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ return
+ }
+}
+
+func (worker *Worker) setJobSuccess(job *model.Job) {
+ if err := worker.app.Jobs.SetJobSuccess(job); err != nil {
+ mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ worker.setJobError(job, err)
+ }
+}
+
+func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) {
+ if err := worker.app.Jobs.SetJobError(job, appError); err != nil {
+ mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
+ }
+}
diff --git a/store/sqlstore/plugin_store.go b/store/sqlstore/plugin_store.go
index e4b79e54a..ae7e65d2d 100644
--- a/store/sqlstore/plugin_store.go
+++ b/store/sqlstore/plugin_store.go
@@ -61,7 +61,7 @@ func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) store.StoreChann
}
}
} else if ps.DriverName() == model.DATABASE_DRIVER_MYSQL {
- if _, err := ps.GetMaster().Exec("INSERT INTO PluginKeyValueStore (PluginId, PKey, PValue) VALUES(:PluginId, :Key, :Value) ON DUPLICATE KEY UPDATE PValue = :Value", map[string]interface{}{"PluginId": kv.PluginId, "Key": kv.Key, "Value": kv.Value}); err != nil {
+ if _, err := ps.GetMaster().Exec("INSERT INTO PluginKeyValueStore (PluginId, PKey, PValue, ExpireAt) VALUES(:PluginId, :Key, :Value, :ExpireAt) ON DUPLICATE KEY UPDATE PValue = :Value, ExpireAt = :ExpireAt", map[string]interface{}{"PluginId": kv.PluginId, "Key": kv.Key, "Value": kv.Value, "ExpireAt": kv.ExpireAt}); err != nil {
result.Err = model.NewAppError("SqlPluginStore.SaveOrUpdate", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
@@ -74,8 +74,8 @@ func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) store.StoreChann
func (ps SqlPluginStore) Get(pluginId, key string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var kv *model.PluginKeyValue
-
- if err := ps.GetReplica().SelectOne(&kv, "SELECT * FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key", map[string]interface{}{"PluginId": pluginId, "Key": key}); err != nil {
+ currentTime := model.GetMillis()
+ if err := ps.GetReplica().SelectOne(&kv, "SELECT * FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key AND (ExpireAt = 0 OR ExpireAt > :CurrentTime)", map[string]interface{}{"PluginId": pluginId, "Key": key, "CurrentTime": currentTime}); err != nil {
if err == sql.ErrNoRows {
result.Err = model.NewAppError("SqlPluginStore.Get", "store.sql_plugin_store.get.app_error", nil, fmt.Sprintf("plugin_id=%v, key=%v, err=%v", pluginId, key, err.Error()), http.StatusNotFound)
} else {
@@ -97,6 +97,27 @@ func (ps SqlPluginStore) Delete(pluginId, key string) store.StoreChannel {
})
}
+func (ps SqlPluginStore) DeleteAllForPlugin(pluginId string) store.StoreChannel {
+ return store.Do(func(result *store.StoreResult) {
+ if _, err := ps.GetMaster().Exec("DELETE FROM PluginKeyValueStore WHERE PluginId = :PluginId", map[string]interface{}{"PluginId": pluginId}); err != nil {
+ result.Err = model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("plugin_id=%v, err=%v", pluginId, err.Error()), http.StatusInternalServerError)
+ } else {
+ result.Data = true
+ }
+ })
+}
+
+func (ps SqlPluginStore) DeleteAllExpired() store.StoreChannel {
+ return store.Do(func(result *store.StoreResult) {
+ currentTime := model.GetMillis()
+ if _, err := ps.GetMaster().Exec("DELETE FROM PluginKeyValueStore WHERE ExpireAt != 0 AND ExpireAt < :CurrentTime", map[string]interface{}{"CurrentTime": currentTime}); err != nil {
+ result.Err = model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("current_time=%v, err=%v", currentTime, err.Error()), http.StatusInternalServerError)
+ } else {
+ result.Data = true
+ }
+ })
+}
+
func (ps SqlPluginStore) List(pluginId string, offset int, limit int) store.StoreChannel {
if limit <= 0 {
limit = DEFAULT_PLUGIN_KEY_FETCH_LIMIT
diff --git a/store/sqlstore/upgrade.go b/store/sqlstore/upgrade.go
index 42d34f525..9898d5b32 100644
--- a/store/sqlstore/upgrade.go
+++ b/store/sqlstore/upgrade.go
@@ -510,7 +510,7 @@ func UpgradeDatabaseToVersion54(sqlStore SqlStore) {
func UpgradeDatabaseToVersion55(sqlStore SqlStore) {
// TODO: Uncomment following condition when version 5.5.0 is released
// if shouldPerformUpgrade(sqlStore, VERSION_5_4_0, VERSION_5_5_0) {
-
+ sqlStore.CreateColumnIfNotExists("PluginKeyValueStore", "ExpireAt", "bigint(20)", "bigint", "0")
// saveSchemaVersion(sqlStore, VERSION_5_5_0)
// }
}
diff --git a/store/store.go b/store/store.go
index 29028130e..e21917345 100644
--- a/store/store.go
+++ b/store/store.go
@@ -502,6 +502,8 @@ type PluginStore interface {
SaveOrUpdate(keyVal *model.PluginKeyValue) StoreChannel
Get(pluginId, key string) StoreChannel
Delete(pluginId, key string) StoreChannel
+ DeleteAllForPlugin(PluginId string) StoreChannel
+ DeleteAllExpired() StoreChannel
List(pluginId string, page, perPage int) StoreChannel
}
diff --git a/store/storetest/mocks/PluginStore.go b/store/storetest/mocks/PluginStore.go
index 9c4a40032..b85e17fdf 100644
--- a/store/storetest/mocks/PluginStore.go
+++ b/store/storetest/mocks/PluginStore.go
@@ -29,6 +29,38 @@ func (_m *PluginStore) Delete(pluginId string, key string) store.StoreChannel {
return r0
}
+// DeleteAllExpired provides a mock function with given fields:
+func (_m *PluginStore) DeleteAllExpired() store.StoreChannel {
+ ret := _m.Called()
+
+ var r0 store.StoreChannel
+ if rf, ok := ret.Get(0).(func() store.StoreChannel); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(store.StoreChannel)
+ }
+ }
+
+ return r0
+}
+
+// DeleteAllForPlugin provides a mock function with given fields: PluginId
+func (_m *PluginStore) DeleteAllForPlugin(PluginId string) store.StoreChannel {
+ ret := _m.Called(PluginId)
+
+ var r0 store.StoreChannel
+ if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok {
+ r0 = rf(PluginId)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(store.StoreChannel)
+ }
+ }
+
+ return r0
+}
+
// Get provides a mock function with given fields: pluginId, key
func (_m *PluginStore) Get(pluginId string, key string) store.StoreChannel {
ret := _m.Called(pluginId, key)
diff --git a/store/storetest/plugin_store.go b/store/storetest/plugin_store.go
index 3d7d0ec05..0e14c7539 100644
--- a/store/storetest/plugin_store.go
+++ b/store/storetest/plugin_store.go
@@ -13,7 +13,10 @@ import (
func TestPluginStore(t *testing.T, ss store.Store) {
t.Run("PluginSaveGet", func(t *testing.T) { testPluginSaveGet(t, ss) })
+ t.Run("PluginSaveGetExpiry", func(t *testing.T) { testPluginSaveGetExpiry(t, ss) })
t.Run("PluginDelete", func(t *testing.T) { testPluginDelete(t, ss) })
+ t.Run("PluginDeleteAll", func(t *testing.T) { testPluginDeleteAll(t, ss) })
+ t.Run("PluginDeleteExpired", func(t *testing.T) { testPluginDeleteExpired(t, ss) })
}
func testPluginSaveGet(t *testing.T, ss store.Store) {
@@ -21,6 +24,7 @@ func testPluginSaveGet(t *testing.T, ss store.Store) {
PluginId: model.NewId(),
Key: model.NewId(),
Value: []byte(model.NewId()),
+ ExpireAt: 0,
}
if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil {
@@ -38,6 +42,7 @@ func testPluginSaveGet(t *testing.T, ss store.Store) {
assert.Equal(t, kv.PluginId, received.PluginId)
assert.Equal(t, kv.Key, received.Key)
assert.Equal(t, kv.Value, received.Value)
+ assert.Equal(t, kv.ExpireAt, received.ExpireAt)
}
// Try inserting when already exists
@@ -56,6 +61,52 @@ func testPluginSaveGet(t *testing.T, ss store.Store) {
}
}
+func testPluginSaveGetExpiry(t *testing.T, ss store.Store) {
+ kv := &model.PluginKeyValue{
+ PluginId: model.NewId(),
+ Key: model.NewId(),
+ Value: []byte(model.NewId()),
+ ExpireAt: model.GetMillis() + 30000,
+ }
+
+ if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer func() {
+ <-ss.Plugin().Delete(kv.PluginId, kv.Key)
+ }()
+
+ if result := <-ss.Plugin().Get(kv.PluginId, kv.Key); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.PluginKeyValue)
+ assert.Equal(t, kv.PluginId, received.PluginId)
+ assert.Equal(t, kv.Key, received.Key)
+ assert.Equal(t, kv.Value, received.Value)
+ assert.Equal(t, kv.ExpireAt, received.ExpireAt)
+ }
+
+ kv = &model.PluginKeyValue{
+ PluginId: model.NewId(),
+ Key: model.NewId(),
+ Value: []byte(model.NewId()),
+ ExpireAt: model.GetMillis() - 5000,
+ }
+
+ if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer func() {
+ <-ss.Plugin().Delete(kv.PluginId, kv.Key)
+ }()
+
+ if result := <-ss.Plugin().Get(kv.PluginId, kv.Key); result.Err == nil {
+ t.Fatal("result.Err should not be nil")
+ }
+}
+
func testPluginDelete(t *testing.T, ss store.Store) {
kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
PluginId: model.NewId(),
@@ -67,3 +118,67 @@ func testPluginDelete(t *testing.T, ss store.Store) {
t.Fatal(result.Err)
}
}
+
+func testPluginDeleteAll(t *testing.T, ss store.Store) {
+ pluginId := model.NewId()
+
+ kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
+ PluginId: pluginId,
+ Key: model.NewId(),
+ Value: []byte(model.NewId()),
+ })).(*model.PluginKeyValue)
+
+ kv2 := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
+ PluginId: pluginId,
+ Key: model.NewId(),
+ Value: []byte(model.NewId()),
+ })).(*model.PluginKeyValue)
+
+ if result := <-ss.Plugin().DeleteAllForPlugin(pluginId); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-ss.Plugin().Get(pluginId, kv.Key); result.Err == nil {
+ t.Fatal("result.Err should not be nil")
+ }
+
+ if result := <-ss.Plugin().Get(pluginId, kv2.Key); result.Err == nil {
+ t.Fatal("result.Err should not be nil")
+ }
+}
+
+func testPluginDeleteExpired(t *testing.T, ss store.Store) {
+ pluginId := model.NewId()
+
+ kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
+ PluginId: pluginId,
+ Key: model.NewId(),
+ Value: []byte(model.NewId()),
+ ExpireAt: model.GetMillis() - 6000,
+ })).(*model.PluginKeyValue)
+
+ kv2 := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{
+ PluginId: pluginId,
+ Key: model.NewId(),
+ Value: []byte(model.NewId()),
+ ExpireAt: 0,
+ })).(*model.PluginKeyValue)
+
+ if result := <-ss.Plugin().DeleteAllExpired(); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-ss.Plugin().Get(pluginId, kv.Key); result.Err == nil {
+ t.Fatal("result.Err should not be nil")
+ }
+
+ if result := <-ss.Plugin().Get(kv2.PluginId, kv2.Key); result.Err != nil {
+ t.Fatal(result.Err)
+ } else {
+ received := result.Data.(*model.PluginKeyValue)
+ assert.Equal(t, kv2.PluginId, received.PluginId)
+ assert.Equal(t, kv2.Key, received.Key)
+ assert.Equal(t, kv2.Value, received.Value)
+ assert.Equal(t, kv2.ExpireAt, received.ExpireAt)
+ }
+}