From 09b49c26ddfdb20ced61e7dfd4192e750ce40449 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Mon, 31 Jul 2017 08:15:23 -0700 Subject: PLT-5308 Caching layer part 2 (#6973) * Adding Reaction store cache layer example * Implementing reaction store in new caching system. * Redis for reaction store * Adding redis library * Adding invalidation for DeleteAllWithEmojiName and other minor enhancements --- store/layered_store.go | 70 +++++---- store/layered_store_hints.go | 20 +++ store/layered_store_supplier.go | 26 +-- store/local_cache_supplier.go | 104 ++++++++++++ store/local_cache_supplier_reactions.go | 47 ++++++ store/redis_supplier.go | 133 ++++++++++++++++ store/sql_reaction_store.go | 271 -------------------------------- store/sql_supplier.go | 18 ++- store/sql_supplier_reactions.go | 165 ++++++++++++++++++- store/store.go | 2 - 10 files changed, 525 insertions(+), 331 deletions(-) create mode 100644 store/local_cache_supplier.go create mode 100644 store/local_cache_supplier_reactions.go create mode 100644 store/redis_supplier.go delete mode 100644 store/sql_reaction_store.go (limited to 'store') diff --git a/store/layered_store.go b/store/layered_store.go index ab9859c80..3d3f941e8 100644 --- a/store/layered_store.go +++ b/store/layered_store.go @@ -6,38 +6,54 @@ package store import ( "context" + l4g "github.com/alecthomas/log4go" "github.com/mattermost/platform/model" ) +const ( + ENABLE_EXPERIMENTAL_REDIS = false +) + type LayeredStore struct { - TmpContext context.Context - ReactionStore ReactionStore - DatabaseLayer *SqlSupplier + TmpContext context.Context + ReactionStore ReactionStore + DatabaseLayer *SqlSupplier + LocalCacheLayer *LocalCacheSupplier + RedisLayer *RedisSupplier + LayerChainHead LayeredStoreSupplier } func NewLayeredStore() Store { - return &LayeredStore{ - TmpContext: context.TODO(), - ReactionStore: &LayeredReactionStore{}, - DatabaseLayer: NewSqlSupplier(), + store := &LayeredStore{ + TmpContext: context.TODO(), + DatabaseLayer: NewSqlSupplier(), + LocalCacheLayer: NewLocalCacheSupplier(), } + + store.ReactionStore = &LayeredReactionStore{store} + + // Setup the chain + if ENABLE_EXPERIMENTAL_REDIS { + l4g.Debug("Experimental redis enabled.") + store.RedisLayer = NewRedisSupplier() + store.RedisLayer.SetChainNext(store.DatabaseLayer) + store.LayerChainHead = store.RedisLayer + } else { + store.LocalCacheLayer.SetChainNext(store.DatabaseLayer) + store.LayerChainHead = store.LocalCacheLayer + } + + return store } -type QueryFunction func(LayeredStoreSupplier) LayeredStoreSupplierResult +type QueryFunction func(LayeredStoreSupplier) *LayeredStoreSupplierResult func (s *LayeredStore) RunQuery(queryFunction QueryFunction) StoreChannel { storeChannel := make(StoreChannel) go func() { - finalResult := StoreResult{} - // Logic for determining what layers to run - if result := queryFunction(s.DatabaseLayer); result.Err == nil { - finalResult.Data = result.Result - } else { - finalResult.Err = result.Err - } - - storeChannel <- finalResult + result := queryFunction(s.LayerChainHead) + storeChannel <- result.StoreResult }() return storeChannel @@ -116,7 +132,7 @@ func (s *LayeredStore) FileInfo() FileInfoStore { } func (s *LayeredStore) Reaction() ReactionStore { - return s.DatabaseLayer.Reaction() + return s.ReactionStore } func (s *LayeredStore) Job() JobStore { @@ -152,35 +168,25 @@ type LayeredReactionStore struct { } func (s *LayeredReactionStore) Save(reaction *model.Reaction) StoreChannel { - return s.RunQuery(func(supplier LayeredStoreSupplier) LayeredStoreSupplierResult { + return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult { return supplier.ReactionSave(s.TmpContext, reaction) }) } func (s *LayeredReactionStore) Delete(reaction *model.Reaction) StoreChannel { - return s.RunQuery(func(supplier LayeredStoreSupplier) LayeredStoreSupplierResult { + return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult { return supplier.ReactionDelete(s.TmpContext, reaction) }) } -// TODO: DELETE ME -func (s *LayeredReactionStore) InvalidateCacheForPost(postId string) { - return -} - -// TODO: DELETE ME -func (s *LayeredReactionStore) InvalidateCache() { - return -} - func (s *LayeredReactionStore) GetForPost(postId string, allowFromCache bool) StoreChannel { - return s.RunQuery(func(supplier LayeredStoreSupplier) LayeredStoreSupplierResult { + return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult { return supplier.ReactionGetForPost(s.TmpContext, postId) }) } func (s *LayeredReactionStore) DeleteAllWithEmojiName(emojiName string) StoreChannel { - return s.RunQuery(func(supplier LayeredStoreSupplier) LayeredStoreSupplierResult { + return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult { return supplier.ReactionDeleteAllWithEmojiName(s.TmpContext, emojiName) }) } diff --git a/store/layered_store_hints.go b/store/layered_store_hints.go index 6154af7c9..064f4f326 100644 --- a/store/layered_store_hints.go +++ b/store/layered_store_hints.go @@ -9,3 +9,23 @@ const ( LSH_NO_CACHE LayeredStoreHint = iota LSH_MASTER_ONLY ) + +func hintsContains(hints []LayeredStoreHint, contains LayeredStoreHint) bool { + for _, hint := range hints { + if hint == contains { + return true + } + } + return false +} + +func hintsContainsAny(hints []LayeredStoreHint, contains ...LayeredStoreHint) bool { + for _, hint := range hints { + for _, hint2 := range contains { + if hint == hint2 { + return true + } + } + } + return false +} diff --git a/store/layered_store_supplier.go b/store/layered_store_supplier.go index 7b7da5710..22c90ab17 100644 --- a/store/layered_store_supplier.go +++ b/store/layered_store_supplier.go @@ -6,24 +6,28 @@ package store import "github.com/mattermost/platform/model" import "context" +type ResultHandler func(*StoreResult) + type LayeredStoreSupplierResult struct { - Result StoreResult - Err *model.AppError + StoreResult } -func NewSupplierResult() LayeredStoreSupplierResult { - return LayeredStoreSupplierResult{ - Result: StoreResult{}, - Err: nil, - } +func NewSupplierResult() *LayeredStoreSupplierResult { + return &LayeredStoreSupplierResult{} } type LayeredStoreSupplier interface { + // + // Control + // + SetChainNext(LayeredStoreSupplier) + Next() LayeredStoreSupplier + // // Reactions //), hints ...LayeredStoreHint) - ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) LayeredStoreSupplierResult - ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) LayeredStoreSupplierResult - ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) LayeredStoreSupplierResult - ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) LayeredStoreSupplierResult + ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult + ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult + ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult + ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult } diff --git a/store/local_cache_supplier.go b/store/local_cache_supplier.go new file mode 100644 index 000000000..63c050485 --- /dev/null +++ b/store/local_cache_supplier.go @@ -0,0 +1,104 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "context" + + "github.com/mattermost/platform/einterfaces" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" +) + +const ( + REACTION_CACHE_SIZE = 20000 + REACTION_CACHE_SEC = 1800 // 30 minutes + + CLEAR_CACHE_MESSAGE_DATA = "" +) + +type LocalCacheSupplier struct { + next LayeredStoreSupplier + reactionCache *utils.Cache +} + +func NewLocalCacheSupplier() *LocalCacheSupplier { + supplier := &LocalCacheSupplier{ + reactionCache: utils.NewLruWithParams(REACTION_CACHE_SIZE, "Reaction", REACTION_CACHE_SEC, model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS), + } + + registerClusterHandlers(supplier) + + return supplier +} + +func registerClusterHandlers(supplier *LocalCacheSupplier) { + if cluster := einterfaces.GetClusterInterface(); cluster != nil { + cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS, supplier.handleClusterInvalidateReaction) + } +} + +func (s *LocalCacheSupplier) SetChainNext(next LayeredStoreSupplier) { + s.next = next +} + +func (s *LocalCacheSupplier) Next() LayeredStoreSupplier { + return s.next +} + +func doStandardReadCache(ctx context.Context, cache utils.ObjectCache, key string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + metrics := einterfaces.GetMetricsInterface() + + if hintsContains(hints, LSH_NO_CACHE) { + if metrics != nil { + metrics.IncrementMemCacheMissCounter(cache.Name()) + } + return nil + } + + if cacheItem, ok := cache.Get(key); ok { + if metrics != nil { + metrics.IncrementMemCacheHitCounter(cache.Name()) + } + result := NewSupplierResult() + result.Data = cacheItem + return result + } + + if metrics != nil { + metrics.IncrementMemCacheMissCounter(cache.Name()) + } + + return nil +} + +func doStandardAddToCache(ctx context.Context, cache utils.ObjectCache, key string, result *LayeredStoreSupplierResult, hints ...LayeredStoreHint) { + if result.Err == nil && result.Data != nil { + cache.AddWithDefaultExpires(key, result.Data) + } +} + +func doInvalidateCacheCluster(cache utils.ObjectCache, key string) { + cache.Remove(key) + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: cache.GetInvalidateClusterEvent(), + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: key, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) + } +} + +func doClearCacheCluster(cache utils.ObjectCache) { + cache.Purge() + if einterfaces.GetClusterInterface() != nil { + msg := &model.ClusterMessage{ + Event: cache.GetInvalidateClusterEvent(), + SendType: model.CLUSTER_SEND_BEST_EFFORT, + Data: CLEAR_CACHE_MESSAGE_DATA, + } + einterfaces.GetClusterInterface().SendClusterMessage(msg) + } +} diff --git a/store/local_cache_supplier_reactions.go b/store/local_cache_supplier_reactions.go new file mode 100644 index 000000000..7d2c9f065 --- /dev/null +++ b/store/local_cache_supplier_reactions.go @@ -0,0 +1,47 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "context" + + "github.com/mattermost/platform/model" +) + +func (s *LocalCacheSupplier) handleClusterInvalidateReaction(msg *model.ClusterMessage) { + if msg.Data == CLEAR_CACHE_MESSAGE_DATA { + s.reactionCache.Purge() + } else { + s.reactionCache.Remove(msg.Data) + } +} + +func (s *LocalCacheSupplier) ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + doInvalidateCacheCluster(s.reactionCache, reaction.PostId) + return s.Next().ReactionSave(ctx, reaction, hints...) +} + +func (s *LocalCacheSupplier) ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + doInvalidateCacheCluster(s.reactionCache, reaction.PostId) + return s.Next().ReactionDelete(ctx, reaction, hints...) +} + +func (s *LocalCacheSupplier) ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + if result := doStandardReadCache(ctx, s.reactionCache, postId, hints...); result != nil { + return result + } + + result := s.Next().ReactionGetForPost(ctx, postId, hints...) + + doStandardAddToCache(ctx, s.reactionCache, postId, result, hints...) + + return result +} + +func (s *LocalCacheSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + // This could be improved. Right now we just clear the whole + // cache because we don't have a way find what post Ids have this emoji name. + doClearCacheCluster(s.reactionCache) + return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...) +} diff --git a/store/redis_supplier.go b/store/redis_supplier.go new file mode 100644 index 000000000..eede36ef2 --- /dev/null +++ b/store/redis_supplier.go @@ -0,0 +1,133 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "bytes" + "context" + "encoding/gob" + + "time" + + l4g "github.com/alecthomas/log4go" + "github.com/go-redis/redis" + "github.com/mattermost/platform/model" +) + +const REDIS_EXPIRY_TIME = 30 * time.Minute + +type RedisSupplier struct { + next LayeredStoreSupplier + client *redis.Client +} + +func GetBytes(key interface{}) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(key) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func DecodeBytes(input []byte, thing interface{}) error { + dec := gob.NewDecoder(bytes.NewReader(input)) + err := dec.Decode(thing) + if err != nil { + return err + } + return nil +} + +func NewRedisSupplier() *RedisSupplier { + supplier := &RedisSupplier{} + + supplier.client = redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }) + + if _, err := supplier.client.Ping().Result(); err != nil { + l4g.Error("Unable to ping redis server: " + err.Error()) + return nil + } + + return supplier +} + +func (s *RedisSupplier) save(key string, value interface{}, expiry time.Duration) error { + if bytes, err := GetBytes(value); err != nil { + return err + } else { + if err := s.client.Set(key, bytes, expiry).Err(); err != nil { + return err + } + } + return nil +} + +func (s *RedisSupplier) load(key string, writeTo interface{}) (bool, error) { + if data, err := s.client.Get(key).Bytes(); err != nil { + if err == redis.Nil { + return false, nil + } else { + return false, err + } + } else { + if err := DecodeBytes(data, writeTo); err != nil { + return false, err + } + } + return true, nil +} + +func (s *RedisSupplier) SetChainNext(next LayeredStoreSupplier) { + s.next = next +} + +func (s *RedisSupplier) Next() LayeredStoreSupplier { + return s.next +} + +func (s *RedisSupplier) ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + if err := s.client.Del("reactions:" + reaction.PostId).Err(); err != nil { + l4g.Error("Redis failed to remove key reactions:" + reaction.PostId + " Error: " + err.Error()) + } + return s.Next().ReactionSave(ctx, reaction, hints...) +} + +func (s *RedisSupplier) ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + if err := s.client.Del("reactions:" + reaction.PostId).Err(); err != nil { + l4g.Error("Redis failed to remove key reactions:" + reaction.PostId + " Error: " + err.Error()) + } + return s.Next().ReactionDelete(ctx, reaction, hints...) +} + +func (s *RedisSupplier) ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + var resultdata []*model.Reaction + found, err := s.load("reactions:"+postId, &resultdata) + if found { + result := NewSupplierResult() + result.Data = resultdata + return result + } + if err != nil { + l4g.Error("Redis encountered an error on read: " + err.Error()) + } + + result := s.Next().ReactionGetForPost(ctx, postId, hints...) + + if err := s.save("reactions:"+postId, result.Data, REDIS_EXPIRY_TIME); err != nil { + l4g.Error("Redis encountered and error on write: " + err.Error()) + } + + return result +} + +func (s *RedisSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + // Ignoring this. It's probably OK to have the emoji slowly expire from Redis. + return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...) +} diff --git a/store/sql_reaction_store.go b/store/sql_reaction_store.go deleted file mode 100644 index 87845421e..000000000 --- a/store/sql_reaction_store.go +++ /dev/null @@ -1,271 +0,0 @@ -// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package store - -import ( - "github.com/mattermost/gorp" - "github.com/mattermost/platform/einterfaces" - "github.com/mattermost/platform/model" - "github.com/mattermost/platform/utils" - - l4g "github.com/alecthomas/log4go" -) - -const ( - REACTION_CACHE_SIZE = 20000 - REACTION_CACHE_SEC = 1800 // 30 minutes -) - -var reactionCache *utils.Cache = utils.NewLru(REACTION_CACHE_SIZE) - -type SqlReactionStore struct { - SqlStore -} - -func NewSqlReactionStore(sqlStore SqlStore) ReactionStore { - s := &SqlReactionStore{sqlStore} - - for _, db := range sqlStore.GetAllConns() { - table := db.AddTableWithName(model.Reaction{}, "Reactions").SetKeys(false, "UserId", "PostId", "EmojiName") - table.ColMap("UserId").SetMaxSize(26) - table.ColMap("PostId").SetMaxSize(26) - table.ColMap("EmojiName").SetMaxSize(64) - } - - return s -} - -func (s SqlReactionStore) CreateIndexesIfNotExists() { - s.CreateIndexIfNotExists("idx_reactions_post_id", "Reactions", "PostId") - s.CreateIndexIfNotExists("idx_reactions_user_id", "Reactions", "UserId") - s.CreateIndexIfNotExists("idx_reactions_emoji_name", "Reactions", "EmojiName") -} - -func (s SqlReactionStore) Save(reaction *model.Reaction) StoreChannel { - storeChannel := make(StoreChannel) - - go func() { - result := StoreResult{} - - reaction.PreSave() - if result.Err = reaction.IsValid(); result.Err != nil { - storeChannel <- result - close(storeChannel) - return - } - - if transaction, err := s.GetMaster().Begin(); err != nil { - result.Err = model.NewLocAppError("SqlReactionStore.Save", "store.sql_reaction.save.begin.app_error", nil, err.Error()) - } else { - err := saveReactionAndUpdatePost(transaction, reaction) - - if err != nil { - transaction.Rollback() - - // We don't consider duplicated save calls as an error - if !IsUniqueConstraintError(err.Error(), []string{"reactions_pkey", "PRIMARY"}) { - result.Err = model.NewLocAppError("SqlPreferenceStore.Save", "store.sql_reaction.save.save.app_error", nil, err.Error()) - } - } else { - if err := transaction.Commit(); err != nil { - // don't need to rollback here since the transaction is already closed - result.Err = model.NewLocAppError("SqlPreferenceStore.Save", "store.sql_reaction.save.commit.app_error", nil, err.Error()) - } - } - - if result.Err == nil { - result.Data = reaction - } - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (s SqlReactionStore) Delete(reaction *model.Reaction) StoreChannel { - storeChannel := make(StoreChannel) - - go func() { - result := StoreResult{} - - if transaction, err := s.GetMaster().Begin(); err != nil { - result.Err = model.NewLocAppError("SqlReactionStore.Delete", "store.sql_reaction.delete.begin.app_error", nil, err.Error()) - } else { - err := deleteReactionAndUpdatePost(transaction, reaction) - - if err != nil { - transaction.Rollback() - - result.Err = model.NewLocAppError("SqlPreferenceStore.Delete", "store.sql_reaction.delete.app_error", nil, err.Error()) - } else if err := transaction.Commit(); err != nil { - // don't need to rollback here since the transaction is already closed - result.Err = model.NewLocAppError("SqlPreferenceStore.Delete", "store.sql_reaction.delete.commit.app_error", nil, err.Error()) - } else { - result.Data = reaction - } - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func saveReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error { - if err := transaction.Insert(reaction); err != nil { - return err - } - - return updatePostForReactions(transaction, reaction.PostId) -} - -func deleteReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error { - if _, err := transaction.Exec( - `DELETE FROM - Reactions - WHERE - PostId = :PostId AND - UserId = :UserId AND - EmojiName = :EmojiName`, - map[string]interface{}{"PostId": reaction.PostId, "UserId": reaction.UserId, "EmojiName": reaction.EmojiName}); err != nil { - return err - } - - return updatePostForReactions(transaction, reaction.PostId) -} - -const ( - // Set HasReactions = true if and only if the post has reactions, update UpdateAt only if HasReactions changes - UPDATE_POST_HAS_REACTIONS_QUERY = `UPDATE - Posts - SET - UpdateAt = (CASE - WHEN HasReactions != (SELECT count(0) > 0 FROM Reactions WHERE PostId = :PostId) THEN :UpdateAt - ELSE UpdateAt - END), - HasReactions = (SELECT count(0) > 0 FROM Reactions WHERE PostId = :PostId) - WHERE - Id = :PostId` -) - -func updatePostForReactions(transaction *gorp.Transaction, postId string) error { - _, err := transaction.Exec(UPDATE_POST_HAS_REACTIONS_QUERY, map[string]interface{}{"PostId": postId, "UpdateAt": model.GetMillis()}) - - return err -} - -func (s SqlReactionStore) InvalidateCacheForPost(postId string) { - reactionCache.Remove(postId) -} - -func (s SqlReactionStore) InvalidateCache() { - reactionCache.Purge() -} - -func (s SqlReactionStore) GetForPost(postId string, allowFromCache bool) StoreChannel { - storeChannel := make(StoreChannel) - - go func() { - result := StoreResult{} - metrics := einterfaces.GetMetricsInterface() - - if allowFromCache { - if cacheItem, ok := reactionCache.Get(postId); ok { - if metrics != nil { - metrics.IncrementMemCacheHitCounter("Reactions") - } - result.Data = cacheItem.([]*model.Reaction) - storeChannel <- result - close(storeChannel) - return - } else { - if metrics != nil { - metrics.IncrementMemCacheMissCounter("Reactions") - } - } - } else { - if metrics != nil { - metrics.IncrementMemCacheMissCounter("Reactions") - } - } - - var reactions []*model.Reaction - - if _, err := s.GetReplica().Select(&reactions, - `SELECT - * - FROM - Reactions - WHERE - PostId = :PostId - ORDER BY - CreateAt`, map[string]interface{}{"PostId": postId}); err != nil { - result.Err = model.NewLocAppError("SqlReactionStore.GetForPost", "store.sql_reaction.get_for_post.app_error", nil, "") - } else { - result.Data = reactions - - reactionCache.AddWithExpiresInSecs(postId, reactions, REACTION_CACHE_SEC) - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} - -func (s SqlReactionStore) DeleteAllWithEmojiName(emojiName string) StoreChannel { - storeChannel := make(StoreChannel) - - go func() { - result := StoreResult{} - - // doesn't use a transaction since it's better for this to half-finish than to not commit anything - var reactions []*model.Reaction - - if _, err := s.GetReplica().Select(&reactions, - `SELECT - * - FROM - Reactions - WHERE - EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName}); err != nil { - result.Err = model.NewLocAppError("SqlReactionStore.DeleteAllWithEmojiName", - "store.sql_reaction.delete_all_with_emoji_name.get_reactions.app_error", nil, - "emoji_name="+emojiName+", error="+err.Error()) - storeChannel <- result - close(storeChannel) - return - } - - if _, err := s.GetMaster().Exec( - `DELETE FROM - Reactions - WHERE - EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName}); err != nil { - result.Err = model.NewLocAppError("SqlReactionStore.DeleteAllWithEmojiName", - "store.sql_reaction.delete_all_with_emoji_name.delete_reactions.app_error", nil, - "emoji_name="+emojiName+", error="+err.Error()) - storeChannel <- result - close(storeChannel) - return - } - - for _, reaction := range reactions { - if _, err := s.GetMaster().Exec(UPDATE_POST_HAS_REACTIONS_QUERY, - map[string]interface{}{"PostId": reaction.PostId, "UpdateAt": model.GetMillis()}); err != nil { - l4g.Warn(utils.T("store.sql_reaction.delete_all_with_emoji_name.update_post.warn"), reaction.PostId, err.Error()) - } - } - - storeChannel <- result - close(storeChannel) - }() - - return storeChannel -} diff --git a/store/sql_supplier.go b/store/sql_supplier.go index 0f4ab8380..df934f2cb 100644 --- a/store/sql_supplier.go +++ b/store/sql_supplier.go @@ -57,11 +57,6 @@ const ( EXIT_REMOVE_TABLE = 134 ) -type SqlSupplierResult struct { - Err model.AppError - Result interface{} -} - type SqlSupplierOldStores struct { team TeamStore channel ChannelStore @@ -86,6 +81,7 @@ type SqlSupplierOldStores struct { } type SqlSupplier struct { + next LayeredStoreSupplier master *gorp.DbMap replicas []*gorp.DbMap searchReplicas []*gorp.DbMap @@ -120,9 +116,10 @@ func NewSqlSupplier() *SqlSupplier { supplier.oldStores.emoji = NewSqlEmojiStore(supplier) supplier.oldStores.status = NewSqlStatusStore(supplier) supplier.oldStores.fileInfo = NewSqlFileInfoStore(supplier) - supplier.oldStores.reaction = NewSqlReactionStore(supplier) supplier.oldStores.job = NewSqlJobStore(supplier) + initSqlSupplierReactions(supplier) + err := supplier.GetMaster().CreateTablesIfNotExists() if err != nil { l4g.Critical(utils.T("store.sql.creating_tables.critical"), err) @@ -149,7 +146,6 @@ func NewSqlSupplier() *SqlSupplier { supplier.oldStores.emoji.(*SqlEmojiStore).CreateIndexesIfNotExists() supplier.oldStores.status.(*SqlStatusStore).CreateIndexesIfNotExists() supplier.oldStores.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists() - supplier.oldStores.reaction.(*SqlReactionStore).CreateIndexesIfNotExists() supplier.oldStores.job.(*SqlJobStore).CreateIndexesIfNotExists() supplier.oldStores.preference.(*SqlPreferenceStore).DeleteUnusedFeatures() @@ -157,6 +153,14 @@ func NewSqlSupplier() *SqlSupplier { return supplier } +func (s *SqlSupplier) SetChainNext(next LayeredStoreSupplier) { + s.next = next +} + +func (s *SqlSupplier) Next() LayeredStoreSupplier { + return s.next +} + func setupConnection(con_type string, driver string, dataSource string, maxIdle int, maxOpen int, trace bool) *gorp.DbMap { db, err := dbsql.Open(driver, dataSource) if err != nil { diff --git a/store/sql_supplier_reactions.go b/store/sql_supplier_reactions.go index 14f13cce6..30ca6beed 100644 --- a/store/sql_supplier_reactions.go +++ b/store/sql_supplier_reactions.go @@ -6,7 +6,10 @@ package store import ( "context" + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/gorp" "github.com/mattermost/platform/model" + "github.com/mattermost/platform/utils" ) func initSqlSupplierReactions(sqlStore SqlStore) { @@ -18,18 +21,164 @@ func initSqlSupplierReactions(sqlStore SqlStore) { } } -func (s *SqlSupplier) ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) LayeredStoreSupplierResult { - panic("not implemented") +func (s *SqlSupplier) ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + result := NewSupplierResult() + + reaction.PreSave() + if result.Err = reaction.IsValid(); result.Err != nil { + return result + } + + if transaction, err := s.GetMaster().Begin(); err != nil { + result.Err = model.NewLocAppError("SqlReactionStore.Save", "store.sql_reaction.save.begin.app_error", nil, err.Error()) + } else { + err := saveReactionAndUpdatePost(transaction, reaction) + + if err != nil { + transaction.Rollback() + + // We don't consider duplicated save calls as an error + if !IsUniqueConstraintError(err.Error(), []string{"reactions_pkey", "PRIMARY"}) { + result.Err = model.NewLocAppError("SqlPreferenceStore.Save", "store.sql_reaction.save.save.app_error", nil, err.Error()) + } + } else { + if err := transaction.Commit(); err != nil { + // don't need to rollback here since the transaction is already closed + result.Err = model.NewLocAppError("SqlPreferenceStore.Save", "store.sql_reaction.save.commit.app_error", nil, err.Error()) + } + } + + if result.Err == nil { + result.Data = reaction + } + } + + return result +} + +func (s *SqlSupplier) ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + result := NewSupplierResult() + + if transaction, err := s.GetMaster().Begin(); err != nil { + result.Err = model.NewLocAppError("SqlReactionStore.Delete", "store.sql_reaction.delete.begin.app_error", nil, err.Error()) + } else { + err := deleteReactionAndUpdatePost(transaction, reaction) + + if err != nil { + transaction.Rollback() + + result.Err = model.NewLocAppError("SqlPreferenceStore.Delete", "store.sql_reaction.delete.app_error", nil, err.Error()) + } else if err := transaction.Commit(); err != nil { + // don't need to rollback here since the transaction is already closed + result.Err = model.NewLocAppError("SqlPreferenceStore.Delete", "store.sql_reaction.delete.commit.app_error", nil, err.Error()) + } else { + result.Data = reaction + } + } + + return result +} + +func (s *SqlSupplier) ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + result := NewSupplierResult() + + var reactions []*model.Reaction + + if _, err := s.GetReplica().Select(&reactions, + `SELECT + * + FROM + Reactions + WHERE + PostId = :PostId + ORDER BY + CreateAt`, map[string]interface{}{"PostId": postId}); err != nil { + result.Err = model.NewLocAppError("SqlReactionStore.GetForPost", "store.sql_reaction.get_for_post.app_error", nil, "") + } else { + result.Data = reactions + } + + return result } -func (s *SqlSupplier) ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) LayeredStoreSupplierResult { - panic("not implemented") +func (s *SqlSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult { + result := NewSupplierResult() + + var reactions []*model.Reaction + + if _, err := s.GetReplica().Select(&reactions, + `SELECT + * + FROM + Reactions + WHERE + EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName}); err != nil { + result.Err = model.NewLocAppError("SqlReactionStore.DeleteAllWithEmojiName", + "store.sql_reaction.delete_all_with_emoji_name.get_reactions.app_error", nil, + "emoji_name="+emojiName+", error="+err.Error()) + return result + } + + if _, err := s.GetMaster().Exec( + `DELETE FROM + Reactions + WHERE + EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName}); err != nil { + result.Err = model.NewLocAppError("SqlReactionStore.DeleteAllWithEmojiName", + "store.sql_reaction.delete_all_with_emoji_name.delete_reactions.app_error", nil, + "emoji_name="+emojiName+", error="+err.Error()) + return result + } + + for _, reaction := range reactions { + if _, err := s.GetMaster().Exec(UPDATE_POST_HAS_REACTIONS_QUERY, + map[string]interface{}{"PostId": reaction.PostId, "UpdateAt": model.GetMillis()}); err != nil { + l4g.Warn(utils.T("store.sql_reaction.delete_all_with_emoji_name.update_post.warn"), reaction.PostId, err.Error()) + } + } + + return result } -func (s *SqlSupplier) ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) LayeredStoreSupplierResult { - panic("not implemented") +func saveReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error { + if err := transaction.Insert(reaction); err != nil { + return err + } + + return updatePostForReactions(transaction, reaction.PostId) } -func (s *SqlSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) LayeredStoreSupplierResult { - panic("not implemented") +func deleteReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error { + if _, err := transaction.Exec( + `DELETE FROM + Reactions + WHERE + PostId = :PostId AND + UserId = :UserId AND + EmojiName = :EmojiName`, + map[string]interface{}{"PostId": reaction.PostId, "UserId": reaction.UserId, "EmojiName": reaction.EmojiName}); err != nil { + return err + } + + return updatePostForReactions(transaction, reaction.PostId) +} + +const ( + // Set HasReactions = true if and only if the post has reactions, update UpdateAt only if HasReactions changes + UPDATE_POST_HAS_REACTIONS_QUERY = `UPDATE + Posts + SET + UpdateAt = (CASE + WHEN HasReactions != (SELECT count(0) > 0 FROM Reactions WHERE PostId = :PostId) THEN :UpdateAt + ELSE UpdateAt + END), + HasReactions = (SELECT count(0) > 0 FROM Reactions WHERE PostId = :PostId) + WHERE + Id = :PostId` +) + +func updatePostForReactions(transaction *gorp.Transaction, postId string) error { + _, err := transaction.Exec(UPDATE_POST_HAS_REACTIONS_QUERY, map[string]interface{}{"PostId": postId, "UpdateAt": model.GetMillis()}) + + return err } diff --git a/store/store.go b/store/store.go index ab3d97d9b..0fa2a96b3 100644 --- a/store/store.go +++ b/store/store.go @@ -379,8 +379,6 @@ type FileInfoStore interface { type ReactionStore interface { Save(reaction *model.Reaction) StoreChannel Delete(reaction *model.Reaction) StoreChannel - InvalidateCacheForPost(postId string) - InvalidateCache() GetForPost(postId string, allowFromCache bool) StoreChannel DeleteAllWithEmojiName(emojiName string) StoreChannel } -- cgit v1.2.3-1-g7c22