From c36e85c9126b921cf00e578ac70c1f1ee0153abd Mon Sep 17 00:00:00 2001 From: Daniel Schalla Date: Wed, 10 Oct 2018 19:55:12 +0200 Subject: DeleteAll for KV (#9431) Expire K/V Values Regenerate Code pathfix Update Expiry on Update Check for Exit Signal gofmt Rewrote Go Routine Remove tempoarily cleanup loop fix expiretime TEST: Expired Watchdog as GoRoutine Check if Srv is nil Use Scheduler/Worker for Expired Key CleanUp add license fix scheduler job type; DoJob Restructuring Remove unused imports and constants move db migration from 5.4 to 5.5 --- store/sqlstore/plugin_store.go | 27 +++++++- store/sqlstore/upgrade.go | 2 +- store/store.go | 2 + store/storetest/mocks/PluginStore.go | 32 ++++++++++ store/storetest/plugin_store.go | 115 +++++++++++++++++++++++++++++++++++ 5 files changed, 174 insertions(+), 4 deletions(-) (limited to 'store') diff --git a/store/sqlstore/plugin_store.go b/store/sqlstore/plugin_store.go index e4b79e54a..ae7e65d2d 100644 --- a/store/sqlstore/plugin_store.go +++ b/store/sqlstore/plugin_store.go @@ -61,7 +61,7 @@ func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) store.StoreChann } } } else if ps.DriverName() == model.DATABASE_DRIVER_MYSQL { - if _, err := ps.GetMaster().Exec("INSERT INTO PluginKeyValueStore (PluginId, PKey, PValue) VALUES(:PluginId, :Key, :Value) ON DUPLICATE KEY UPDATE PValue = :Value", map[string]interface{}{"PluginId": kv.PluginId, "Key": kv.Key, "Value": kv.Value}); err != nil { + if _, err := ps.GetMaster().Exec("INSERT INTO PluginKeyValueStore (PluginId, PKey, PValue, ExpireAt) VALUES(:PluginId, :Key, :Value, :ExpireAt) ON DUPLICATE KEY UPDATE PValue = :Value, ExpireAt = :ExpireAt", map[string]interface{}{"PluginId": kv.PluginId, "Key": kv.Key, "Value": kv.Value, "ExpireAt": kv.ExpireAt}); err != nil { result.Err = model.NewAppError("SqlPluginStore.SaveOrUpdate", "store.sql_plugin_store.save.app_error", nil, err.Error(), http.StatusInternalServerError) return } @@ -74,8 +74,8 @@ func (ps SqlPluginStore) SaveOrUpdate(kv *model.PluginKeyValue) store.StoreChann func (ps SqlPluginStore) Get(pluginId, key string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { var kv *model.PluginKeyValue - - if err := ps.GetReplica().SelectOne(&kv, "SELECT * FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key", map[string]interface{}{"PluginId": pluginId, "Key": key}); err != nil { + currentTime := model.GetMillis() + if err := ps.GetReplica().SelectOne(&kv, "SELECT * FROM PluginKeyValueStore WHERE PluginId = :PluginId AND PKey = :Key AND (ExpireAt = 0 OR ExpireAt > :CurrentTime)", map[string]interface{}{"PluginId": pluginId, "Key": key, "CurrentTime": currentTime}); err != nil { if err == sql.ErrNoRows { result.Err = model.NewAppError("SqlPluginStore.Get", "store.sql_plugin_store.get.app_error", nil, fmt.Sprintf("plugin_id=%v, key=%v, err=%v", pluginId, key, err.Error()), http.StatusNotFound) } else { @@ -97,6 +97,27 @@ func (ps SqlPluginStore) Delete(pluginId, key string) store.StoreChannel { }) } +func (ps SqlPluginStore) DeleteAllForPlugin(pluginId string) store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + if _, err := ps.GetMaster().Exec("DELETE FROM PluginKeyValueStore WHERE PluginId = :PluginId", map[string]interface{}{"PluginId": pluginId}); err != nil { + result.Err = model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("plugin_id=%v, err=%v", pluginId, err.Error()), http.StatusInternalServerError) + } else { + result.Data = true + } + }) +} + +func (ps SqlPluginStore) DeleteAllExpired() store.StoreChannel { + return store.Do(func(result *store.StoreResult) { + currentTime := model.GetMillis() + if _, err := ps.GetMaster().Exec("DELETE FROM PluginKeyValueStore WHERE ExpireAt != 0 AND ExpireAt < :CurrentTime", map[string]interface{}{"CurrentTime": currentTime}); err != nil { + result.Err = model.NewAppError("SqlPluginStore.Delete", "store.sql_plugin_store.delete.app_error", nil, fmt.Sprintf("current_time=%v, err=%v", currentTime, err.Error()), http.StatusInternalServerError) + } else { + result.Data = true + } + }) +} + func (ps SqlPluginStore) List(pluginId string, offset int, limit int) store.StoreChannel { if limit <= 0 { limit = DEFAULT_PLUGIN_KEY_FETCH_LIMIT diff --git a/store/sqlstore/upgrade.go b/store/sqlstore/upgrade.go index 42d34f525..9898d5b32 100644 --- a/store/sqlstore/upgrade.go +++ b/store/sqlstore/upgrade.go @@ -510,7 +510,7 @@ func UpgradeDatabaseToVersion54(sqlStore SqlStore) { func UpgradeDatabaseToVersion55(sqlStore SqlStore) { // TODO: Uncomment following condition when version 5.5.0 is released // if shouldPerformUpgrade(sqlStore, VERSION_5_4_0, VERSION_5_5_0) { - + sqlStore.CreateColumnIfNotExists("PluginKeyValueStore", "ExpireAt", "bigint(20)", "bigint", "0") // saveSchemaVersion(sqlStore, VERSION_5_5_0) // } } diff --git a/store/store.go b/store/store.go index 29028130e..e21917345 100644 --- a/store/store.go +++ b/store/store.go @@ -502,6 +502,8 @@ type PluginStore interface { SaveOrUpdate(keyVal *model.PluginKeyValue) StoreChannel Get(pluginId, key string) StoreChannel Delete(pluginId, key string) StoreChannel + DeleteAllForPlugin(PluginId string) StoreChannel + DeleteAllExpired() StoreChannel List(pluginId string, page, perPage int) StoreChannel } diff --git a/store/storetest/mocks/PluginStore.go b/store/storetest/mocks/PluginStore.go index 9c4a40032..b85e17fdf 100644 --- a/store/storetest/mocks/PluginStore.go +++ b/store/storetest/mocks/PluginStore.go @@ -29,6 +29,38 @@ func (_m *PluginStore) Delete(pluginId string, key string) store.StoreChannel { return r0 } +// DeleteAllExpired provides a mock function with given fields: +func (_m *PluginStore) DeleteAllExpired() store.StoreChannel { + ret := _m.Called() + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func() store.StoreChannel); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + +// DeleteAllForPlugin provides a mock function with given fields: PluginId +func (_m *PluginStore) DeleteAllForPlugin(PluginId string) store.StoreChannel { + ret := _m.Called(PluginId) + + var r0 store.StoreChannel + if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok { + r0 = rf(PluginId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.StoreChannel) + } + } + + return r0 +} + // Get provides a mock function with given fields: pluginId, key func (_m *PluginStore) Get(pluginId string, key string) store.StoreChannel { ret := _m.Called(pluginId, key) diff --git a/store/storetest/plugin_store.go b/store/storetest/plugin_store.go index 3d7d0ec05..0e14c7539 100644 --- a/store/storetest/plugin_store.go +++ b/store/storetest/plugin_store.go @@ -13,7 +13,10 @@ import ( func TestPluginStore(t *testing.T, ss store.Store) { t.Run("PluginSaveGet", func(t *testing.T) { testPluginSaveGet(t, ss) }) + t.Run("PluginSaveGetExpiry", func(t *testing.T) { testPluginSaveGetExpiry(t, ss) }) t.Run("PluginDelete", func(t *testing.T) { testPluginDelete(t, ss) }) + t.Run("PluginDeleteAll", func(t *testing.T) { testPluginDeleteAll(t, ss) }) + t.Run("PluginDeleteExpired", func(t *testing.T) { testPluginDeleteExpired(t, ss) }) } func testPluginSaveGet(t *testing.T, ss store.Store) { @@ -21,6 +24,7 @@ func testPluginSaveGet(t *testing.T, ss store.Store) { PluginId: model.NewId(), Key: model.NewId(), Value: []byte(model.NewId()), + ExpireAt: 0, } if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil { @@ -38,6 +42,7 @@ func testPluginSaveGet(t *testing.T, ss store.Store) { assert.Equal(t, kv.PluginId, received.PluginId) assert.Equal(t, kv.Key, received.Key) assert.Equal(t, kv.Value, received.Value) + assert.Equal(t, kv.ExpireAt, received.ExpireAt) } // Try inserting when already exists @@ -56,6 +61,52 @@ func testPluginSaveGet(t *testing.T, ss store.Store) { } } +func testPluginSaveGetExpiry(t *testing.T, ss store.Store) { + kv := &model.PluginKeyValue{ + PluginId: model.NewId(), + Key: model.NewId(), + Value: []byte(model.NewId()), + ExpireAt: model.GetMillis() + 30000, + } + + if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil { + t.Fatal(result.Err) + } + + defer func() { + <-ss.Plugin().Delete(kv.PluginId, kv.Key) + }() + + if result := <-ss.Plugin().Get(kv.PluginId, kv.Key); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.PluginKeyValue) + assert.Equal(t, kv.PluginId, received.PluginId) + assert.Equal(t, kv.Key, received.Key) + assert.Equal(t, kv.Value, received.Value) + assert.Equal(t, kv.ExpireAt, received.ExpireAt) + } + + kv = &model.PluginKeyValue{ + PluginId: model.NewId(), + Key: model.NewId(), + Value: []byte(model.NewId()), + ExpireAt: model.GetMillis() - 5000, + } + + if result := <-ss.Plugin().SaveOrUpdate(kv); result.Err != nil { + t.Fatal(result.Err) + } + + defer func() { + <-ss.Plugin().Delete(kv.PluginId, kv.Key) + }() + + if result := <-ss.Plugin().Get(kv.PluginId, kv.Key); result.Err == nil { + t.Fatal("result.Err should not be nil") + } +} + func testPluginDelete(t *testing.T, ss store.Store) { kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ PluginId: model.NewId(), @@ -67,3 +118,67 @@ func testPluginDelete(t *testing.T, ss store.Store) { t.Fatal(result.Err) } } + +func testPluginDeleteAll(t *testing.T, ss store.Store) { + pluginId := model.NewId() + + kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ + PluginId: pluginId, + Key: model.NewId(), + Value: []byte(model.NewId()), + })).(*model.PluginKeyValue) + + kv2 := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ + PluginId: pluginId, + Key: model.NewId(), + Value: []byte(model.NewId()), + })).(*model.PluginKeyValue) + + if result := <-ss.Plugin().DeleteAllForPlugin(pluginId); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-ss.Plugin().Get(pluginId, kv.Key); result.Err == nil { + t.Fatal("result.Err should not be nil") + } + + if result := <-ss.Plugin().Get(pluginId, kv2.Key); result.Err == nil { + t.Fatal("result.Err should not be nil") + } +} + +func testPluginDeleteExpired(t *testing.T, ss store.Store) { + pluginId := model.NewId() + + kv := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ + PluginId: pluginId, + Key: model.NewId(), + Value: []byte(model.NewId()), + ExpireAt: model.GetMillis() - 6000, + })).(*model.PluginKeyValue) + + kv2 := store.Must(ss.Plugin().SaveOrUpdate(&model.PluginKeyValue{ + PluginId: pluginId, + Key: model.NewId(), + Value: []byte(model.NewId()), + ExpireAt: 0, + })).(*model.PluginKeyValue) + + if result := <-ss.Plugin().DeleteAllExpired(); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-ss.Plugin().Get(pluginId, kv.Key); result.Err == nil { + t.Fatal("result.Err should not be nil") + } + + if result := <-ss.Plugin().Get(kv2.PluginId, kv2.Key); result.Err != nil { + t.Fatal(result.Err) + } else { + received := result.Data.(*model.PluginKeyValue) + assert.Equal(t, kv2.PluginId, received.PluginId) + assert.Equal(t, kv2.Key, received.Key) + assert.Equal(t, kv2.Value, received.Value) + assert.Equal(t, kv2.ExpireAt, received.ExpireAt) + } +} -- cgit v1.2.3-1-g7c22