From 8195c80aa12136838ff4491fac989e0b946382b1 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Fri, 15 Sep 2017 17:35:55 +0100 Subject: PLT-7639: Batch delete methods for data retention. (#7444) --- i18n/en.json | 16 ++++++++++ store/layered_store.go | 6 ++++ store/layered_store_supplier.go | 1 + store/local_cache_supplier_reactions.go | 6 ++++ store/redis_supplier.go | 5 +++ store/sql_audit_store.go | 36 +++++++++++++++++++++ store/sql_audit_store_test.go | 29 +++++++++++++++++ store/sql_file_info_store.go | 33 ++++++++++++++++++++ store/sql_file_info_store_test.go | 41 ++++++++++++++++++++++++ store/sql_post_store.go | 33 ++++++++++++++++++++ store/sql_post_store_test.go | 39 +++++++++++++++++++++++ store/sql_reaction_store_test.go | 55 +++++++++++++++++++++++++++++++++ store/sql_supplier_reactions.go | 28 +++++++++++++++++ store/store.go | 4 +++ 14 files changed, 332 insertions(+) diff --git a/i18n/en.json b/i18n/en.json index 2c8bbd27b..94b6bd298 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -5263,6 +5263,10 @@ "id": "store.sql_audit.save.saving.app_error", "translation": "We encountered an error saving the audit" }, + { + "id": "store.sql_audit.permanent_delete_batch.app_error", + "translation": "We encountered an error permanently deleting the batch of audits" + }, { "id": "store.sql_channel.analytics_deleted_type_count.app_error", "translation": "We couldn't get deleted channel type counts" @@ -5623,6 +5627,10 @@ "id": "store.sql_file_info.save_or_update.app_error", "translation": "We couldn't save or update the file info" }, + { + "id": "store.sql_file_info.permanent_delete_batch.app_error", + "translation": "We encountered an error permanently deleting the batch of file infos" + }, { "id": "store.sql_job.delete.app_error", "translation": "We couldn't delete the job" @@ -5855,6 +5863,10 @@ "id": "store.sql_post.update.app_error", "translation": "We couldn't update the Post" }, + { + "id": "store.sql_post.permanent_delete_batch.app_error", + "translation": "We encountered an error permanently deleting the batch of posts" + }, { "id": "store.sql_preference.delete.app_error", "translation": "We encountered an error while deleting preferences" @@ -5955,6 +5967,10 @@ "id": "store.sql_reaction.save.save.app_error", "translation": "Unable to save reaction" }, + { + "id": "store.sql_reaction.permanent_delete_batch.app_error", + "translation": "We encountered an error permanently deleting the batch of reactions" + }, { "id": "store.sql_session.analytics_session_count.app_error", "translation": "We couldn't count the sessions" diff --git a/store/layered_store.go b/store/layered_store.go index ac0713f57..0c6a01125 100644 --- a/store/layered_store.go +++ b/store/layered_store.go @@ -199,3 +199,9 @@ func (s *LayeredReactionStore) DeleteAllWithEmojiName(emojiName string) StoreCha return supplier.ReactionDeleteAllWithEmojiName(s.TmpContext, emojiName) }) } + +func (s *LayeredReactionStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel { + return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult { + return supplier.ReactionPermanentDeleteBatch(s.TmpContext, endTime, limit) + }) +} diff --git a/store/layered_store_supplier.go b/store/layered_store_supplier.go index 35668c717..841b75a32 100644 --- a/store/layered_store_supplier.go +++ b/store/layered_store_supplier.go @@ -30,4 +30,5 @@ type LayeredStoreSupplier interface { ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult + ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult } diff --git a/store/local_cache_supplier_reactions.go b/store/local_cache_supplier_reactions.go index a67cff2e4..be32ab77e 100644 --- a/store/local_cache_supplier_reactions.go +++ b/store/local_cache_supplier_reactions.go @@ -45,3 +45,9 @@ func (s *LocalCacheSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, s.doClearCacheCluster(s.reactionCache) return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...) } + +func (s *LocalCacheSupplier) ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + // Don't bother to clear the cache as the posts will be gone anyway and the reactions being deleted will + // expire from the cache in due course. + return s.Next().ReactionPermanentDeleteBatch(ctx, endTime, limit) +} diff --git a/store/redis_supplier.go b/store/redis_supplier.go index 195d2c496..167bafd6f 100644 --- a/store/redis_supplier.go +++ b/store/redis_supplier.go @@ -131,3 +131,8 @@ func (s *RedisSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emoj // Ignoring this. It's probably OK to have the emoji slowly expire from Redis. return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...) } + +func (s *RedisSupplier) ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + // Ignoring this. It's probably OK to have the emoji slowly expire from Redis. + return s.Next().ReactionPermanentDeleteBatch(ctx, endTime, limit, hints...) +} diff --git a/store/sql_audit_store.go b/store/sql_audit_store.go index d1ba65be1..1eb4e4819 100644 --- a/store/sql_audit_store.go +++ b/store/sql_audit_store.go @@ -4,7 +4,10 @@ package store import ( + "net/http" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/utils" ) type SqlAuditStore struct { @@ -109,3 +112,36 @@ func (s SqlAuditStore) PermanentDeleteByUser(userId string) StoreChannel { return storeChannel } + +func (s SqlAuditStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var query string + if *utils.Cfg.SqlSettings.DriverName == "postgres" { + query = "DELETE from Audits WHERE Id = any (array (SELECT Id FROM Audits WHERE CreateAt < :EndTime LIMIT :Limit))" + } else { + query = "DELETE from Audits WHERE CreateAt < :EndTime LIMIT :Limit" + } + + sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit}) + if err != nil { + result.Err = model.NewAppError("SqlAuditStore.PermanentDeleteBatch", "store.sql_audit.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + } else { + rowsAffected, err1 := sqlResult.RowsAffected() + if err1 != nil { + result.Err = model.NewAppError("SqlAuditStore.PermanentDeleteBatch", "store.sql_audit.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + result.Data = int64(0) + } else { + result.Data = rowsAffected + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_audit_store_test.go b/store/sql_audit_store_test.go index 2e83bf563..8db5b5506 100644 --- a/store/sql_audit_store_test.go +++ b/store/sql_audit_store_test.go @@ -58,3 +58,32 @@ func TestSqlAuditStore(t *testing.T) { t.Fatal(r2.Err) } } + +func TestAuditStorePermanentDeleteBatch(t *testing.T) { + Setup() + + a1 := &model.Audit{UserId: model.NewId(), IpAddress: "ipaddress", Action: "Action"} + Must(store.Audit().Save(a1)) + time.Sleep(10 * time.Millisecond) + a2 := &model.Audit{UserId: a1.UserId, IpAddress: "ipaddress", Action: "Action"} + Must(store.Audit().Save(a2)) + time.Sleep(10 * time.Millisecond) + cutoff := model.GetMillis() + time.Sleep(10 * time.Millisecond) + a3 := &model.Audit{UserId: a1.UserId, IpAddress: "ipaddress", Action: "Action"} + Must(store.Audit().Save(a3)) + + if r := <-store.Audit().Get(a1.UserId, 0, 100); len(r.Data.(model.Audits)) != 3 { + t.Fatal("Expected 3 audits. Got ", len(r.Data.(model.Audits))) + } + + Must(store.Audit().PermanentDeleteBatch(cutoff, 1000000)) + + if r := <-store.Audit().Get(a1.UserId, 0, 100); len(r.Data.(model.Audits)) != 1 { + t.Fatal("Expected 1 audit. Got ", len(r.Data.(model.Audits))) + } + + if r2 := <-store.Audit().PermanentDeleteByUser(a1.UserId); r2.Err != nil { + t.Fatal(r2.Err) + } +} diff --git a/store/sql_file_info_store.go b/store/sql_file_info_store.go index eab83992f..4cd574b13 100644 --- a/store/sql_file_info_store.go +++ b/store/sql_file_info_store.go @@ -281,3 +281,36 @@ func (fs SqlFileInfoStore) PermanentDelete(fileId string) StoreChannel { return storeChannel } + +func (s SqlFileInfoStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var query string + if *utils.Cfg.SqlSettings.DriverName == "postgres" { + query = "DELETE from FileInfo WHERE Id = any (array (SELECT Id FROM FileInfo WHERE CreateAt < :EndTime LIMIT :Limit))" + } else { + query = "DELETE from FileInfo WHERE CreateAt < :EndTime LIMIT :Limit" + } + + sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit}) + if err != nil { + result.Err = model.NewAppError("SqlFileInfoStore.PermanentDeleteBatch", "store.sql_file_info.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + } else { + rowsAffected, err1 := sqlResult.RowsAffected() + if err1 != nil { + result.Err = model.NewAppError("SqlFileInfoStore.PermanentDeleteBatch", "store.sql_file_info.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + result.Data = int64(0) + } else { + result.Data = rowsAffected + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_file_info_store_test.go b/store/sql_file_info_store_test.go index b62083136..c08bed7d4 100644 --- a/store/sql_file_info_store_test.go +++ b/store/sql_file_info_store_test.go @@ -256,3 +256,44 @@ func TestFileInfoPermanentDelete(t *testing.T) { t.Fatal(result.Err) } } + +func TestFileInfoPermanentDeleteBatch(t *testing.T) { + Setup() + + postId := model.NewId() + + Must(store.FileInfo().Save(&model.FileInfo{ + PostId: postId, + CreatorId: model.NewId(), + Path: "file.txt", + CreateAt: 1000, + })) + + Must(store.FileInfo().Save(&model.FileInfo{ + PostId: postId, + CreatorId: model.NewId(), + Path: "file.txt", + CreateAt: 1200, + })) + + Must(store.FileInfo().Save(&model.FileInfo{ + PostId: postId, + CreatorId: model.NewId(), + Path: "file.txt", + CreateAt: 2000, + })) + + if result := <-store.FileInfo().GetForPost(postId, true, false); result.Err != nil { + t.Fatal(result.Err) + } else if len(result.Data.([]*model.FileInfo)) != 3 { + t.Fatal("Expected 3 fileInfos") + } + + Must(store.FileInfo().PermanentDeleteBatch(1500, 1000)) + + if result := <-store.FileInfo().GetForPost(postId, true, false); result.Err != nil { + t.Fatal(result.Err) + } else if len(result.Data.([]*model.FileInfo)) != 1 { + t.Fatal("Expected 3 fileInfos") + } +} diff --git a/store/sql_post_store.go b/store/sql_post_store.go index 2aa862218..b300f9a59 100644 --- a/store/sql_post_store.go +++ b/store/sql_post_store.go @@ -1356,3 +1356,36 @@ func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, limit int) Store return storeChannel } + +func (s SqlPostStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var query string + if *utils.Cfg.SqlSettings.DriverName == "postgres" { + query = "DELETE from Posts WHERE Id = any (array (SELECT Id FROM Posts WHERE CreateAt < :EndTime LIMIT :Limit))" + } else { + query = "DELETE from Posts WHERE CreateAt < :EndTime LIMIT :Limit" + } + + sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit}) + if err != nil { + result.Err = model.NewAppError("SqlPostStore.PermanentDeleteBatch", "store.sql_post.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + } else { + rowsAffected, err1 := sqlResult.RowsAffected() + if err1 != nil { + result.Err = model.NewAppError("SqlPostStore.PermanentDeleteBatch", "store.sql_post.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + result.Data = int64(0) + } else { + result.Data = rowsAffected + } + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_post_store_test.go b/store/sql_post_store_test.go index 79892b5f5..304fb9f8a 100644 --- a/store/sql_post_store_test.go +++ b/store/sql_post_store_test.go @@ -1661,3 +1661,42 @@ func TestPostStoreGetPostsBatchForIndexing(t *testing.T) { } } } + +func TestPostStorePermanentDeleteBatch(t *testing.T) { + Setup() + + o1 := &model.Post{} + o1.ChannelId = model.NewId() + o1.UserId = model.NewId() + o1.Message = "zz" + model.NewId() + "AAAAAAAAAAA" + o1.CreateAt = 1000 + o1 = (<-store.Post().Save(o1)).Data.(*model.Post) + + o2 := &model.Post{} + o2.ChannelId = model.NewId() + o2.UserId = model.NewId() + o2.Message = "zz" + model.NewId() + "AAAAAAAAAAA" + o2.CreateAt = 1000 + o2 = (<-store.Post().Save(o2)).Data.(*model.Post) + + o3 := &model.Post{} + o3.ChannelId = model.NewId() + o3.UserId = model.NewId() + o3.Message = "zz" + model.NewId() + "AAAAAAAAAAA" + o3.CreateAt = 100000 + o3 = (<-store.Post().Save(o3)).Data.(*model.Post) + + Must(store.Post().PermanentDeleteBatch(2000, 1000)) + + if p := <-store.Post().Get(o1.Id); p.Err == nil { + t.Fatalf("Should have not found post 1 after purge") + } + + if p := <-store.Post().Get(o2.Id); p.Err == nil { + t.Fatalf("Should have not found post 2 after purge") + } + + if p := <-store.Post().Get(o3.Id); p.Err != nil { + t.Fatalf("Should have found post 3 after purge") + } +} diff --git a/store/sql_reaction_store_test.go b/store/sql_reaction_store_test.go index ac2590ea4..ebc09dc9b 100644 --- a/store/sql_reaction_store_test.go +++ b/store/sql_reaction_store_test.go @@ -294,3 +294,58 @@ func TestReactionDeleteAllWithEmojiName(t *testing.T) { t.Fatal("post shouldn't have reactions any more") } } + +func TestReactionStorePermanentDeleteBatch(t *testing.T) { + Setup() + + post := Must(store.Post().Save(&model.Post{ + ChannelId: model.NewId(), + UserId: model.NewId(), + })).(*model.Post) + + reactions := []*model.Reaction{ + { + UserId: model.NewId(), + PostId: post.Id, + EmojiName: "sad", + CreateAt: 1000, + }, + { + UserId: model.NewId(), + PostId: post.Id, + EmojiName: "sad", + CreateAt: 1500, + }, + { + UserId: model.NewId(), + PostId: post.Id, + EmojiName: "sad", + CreateAt: 2000, + }, + { + UserId: model.NewId(), + PostId: post.Id, + EmojiName: "sad", + CreateAt: 2000, + }, + } + + // Need to hang on to a reaction to delete later in order to clear the cache, as "allowFromCache" isn't honoured any more. + var lastReaction *model.Reaction + for _, reaction := range reactions { + lastReaction = Must(store.Reaction().Save(reaction)).(*model.Reaction) + } + + if returned := Must(store.Reaction().GetForPost(post.Id, false)).([]*model.Reaction); len(returned) != 4 { + t.Fatal("expected 4 reactions") + } + + Must(store.Reaction().PermanentDeleteBatch(1800, 1000)) + + // This is to force a clear of the cache. + Must(store.Reaction().Delete(lastReaction)) + + if returned := Must(store.Reaction().GetForPost(post.Id, false)).([]*model.Reaction); len(returned) != 1 { + t.Fatalf("expected 1 reaction. Got: %v", len(returned)) + } +} diff --git a/store/sql_supplier_reactions.go b/store/sql_supplier_reactions.go index 2293a2f88..94a980455 100644 --- a/store/sql_supplier_reactions.go +++ b/store/sql_supplier_reactions.go @@ -5,8 +5,10 @@ package store import ( "context" + "net/http" l4g "github.com/alecthomas/log4go" + "github.com/mattermost/gorp" "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/utils" @@ -140,6 +142,32 @@ func (s *SqlSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiN return result } +func (s *SqlSupplier) ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + result := NewSupplierResult() + + var query string + if *utils.Cfg.SqlSettings.DriverName == "postgres" { + query = "DELETE from Reactions WHERE Id = any (array (SELECT Id FROM Reactions WHERE CreateAt < :EndTime LIMIT :Limit))" + } else { + query = "DELETE from Reactions WHERE CreateAt < :EndTime LIMIT :Limit" + } + + sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit}) + if err != nil { + result.Err = model.NewAppError("SqlReactionStore.PermanentDeleteBatch", "store.sql_reaction.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + } else { + rowsAffected, err1 := sqlResult.RowsAffected() + if err1 != nil { + result.Err = model.NewAppError("SqlReactionStore.PermanentDeleteBatch", "store.sql_reaction.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError) + result.Data = int64(0) + } else { + result.Data = rowsAffected + } + } + + return result +} + func saveReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error { if err := transaction.Insert(reaction); err != nil { return err diff --git a/store/store.go b/store/store.go index 49d395432..f7962fa4f 100644 --- a/store/store.go +++ b/store/store.go @@ -171,6 +171,7 @@ type PostStore interface { Overwrite(post *model.Post) StoreChannel GetPostsByIds(postIds []string) StoreChannel GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel + PermanentDeleteBatch(endTime int64, limit int64) StoreChannel } type UserStore interface { @@ -242,6 +243,7 @@ type AuditStore interface { Save(audit *model.Audit) StoreChannel Get(user_id string, offset int, limit int) StoreChannel PermanentDeleteByUser(userId string) StoreChannel + PermanentDeleteBatch(endTime int64, limit int64) StoreChannel } type ClusterDiscoveryStore interface { @@ -387,6 +389,7 @@ type FileInfoStore interface { AttachToPost(fileId string, postId string) StoreChannel DeleteForPost(postId string) StoreChannel PermanentDelete(fileId string) StoreChannel + PermanentDeleteBatch(endTime int64, limit int64) StoreChannel } type ReactionStore interface { @@ -394,6 +397,7 @@ type ReactionStore interface { Delete(reaction *model.Reaction) StoreChannel GetForPost(postId string, allowFromCache bool) StoreChannel DeleteAllWithEmojiName(emojiName string) StoreChannel + PermanentDeleteBatch(endTime int64, limit int64) StoreChannel } type JobStore interface { -- cgit v1.2.3-1-g7c22