summaryrefslogtreecommitdiffstats
path: root/store
diff options
context:
space:
mode:
authorChristopher Speller <crspeller@gmail.com>2017-07-31 08:15:23 -0700
committerGitHub <noreply@github.com>2017-07-31 08:15:23 -0700
commit09b49c26ddfdb20ced61e7dfd4192e750ce40449 (patch)
tree1288d069cc8a199b8eb3b858935dffd377ee3d2d /store
parent6f4e38d129ffaf469d40fc8596d3957ee94d21e9 (diff)
downloadchat-09b49c26ddfdb20ced61e7dfd4192e750ce40449.tar.gz
chat-09b49c26ddfdb20ced61e7dfd4192e750ce40449.tar.bz2
chat-09b49c26ddfdb20ced61e7dfd4192e750ce40449.zip
PLT-5308 Caching layer part 2 (#6973)
* Adding Reaction store cache layer example * Implementing reaction store in new caching system. * Redis for reaction store * Adding redis library * Adding invalidation for DeleteAllWithEmojiName and other minor enhancements
Diffstat (limited to 'store')
-rw-r--r--store/layered_store.go70
-rw-r--r--store/layered_store_hints.go20
-rw-r--r--store/layered_store_supplier.go26
-rw-r--r--store/local_cache_supplier.go104
-rw-r--r--store/local_cache_supplier_reactions.go47
-rw-r--r--store/redis_supplier.go133
-rw-r--r--store/sql_reaction_store.go271
-rw-r--r--store/sql_supplier.go18
-rw-r--r--store/sql_supplier_reactions.go165
-rw-r--r--store/store.go2
10 files changed, 525 insertions, 331 deletions
diff --git a/store/layered_store.go b/store/layered_store.go
index ab9859c80..3d3f941e8 100644
--- a/store/layered_store.go
+++ b/store/layered_store.go
@@ -6,38 +6,54 @@ package store
import (
"context"
+ l4g "github.com/alecthomas/log4go"
"github.com/mattermost/platform/model"
)
+const (
+ ENABLE_EXPERIMENTAL_REDIS = false
+)
+
type LayeredStore struct {
- TmpContext context.Context
- ReactionStore ReactionStore
- DatabaseLayer *SqlSupplier
+ TmpContext context.Context
+ ReactionStore ReactionStore
+ DatabaseLayer *SqlSupplier
+ LocalCacheLayer *LocalCacheSupplier
+ RedisLayer *RedisSupplier
+ LayerChainHead LayeredStoreSupplier
}
func NewLayeredStore() Store {
- return &LayeredStore{
- TmpContext: context.TODO(),
- ReactionStore: &LayeredReactionStore{},
- DatabaseLayer: NewSqlSupplier(),
+ store := &LayeredStore{
+ TmpContext: context.TODO(),
+ DatabaseLayer: NewSqlSupplier(),
+ LocalCacheLayer: NewLocalCacheSupplier(),
}
+
+ store.ReactionStore = &LayeredReactionStore{store}
+
+ // Setup the chain
+ if ENABLE_EXPERIMENTAL_REDIS {
+ l4g.Debug("Experimental redis enabled.")
+ store.RedisLayer = NewRedisSupplier()
+ store.RedisLayer.SetChainNext(store.DatabaseLayer)
+ store.LayerChainHead = store.RedisLayer
+ } else {
+ store.LocalCacheLayer.SetChainNext(store.DatabaseLayer)
+ store.LayerChainHead = store.LocalCacheLayer
+ }
+
+ return store
}
-type QueryFunction func(LayeredStoreSupplier) LayeredStoreSupplierResult
+type QueryFunction func(LayeredStoreSupplier) *LayeredStoreSupplierResult
func (s *LayeredStore) RunQuery(queryFunction QueryFunction) StoreChannel {
storeChannel := make(StoreChannel)
go func() {
- finalResult := StoreResult{}
- // Logic for determining what layers to run
- if result := queryFunction(s.DatabaseLayer); result.Err == nil {
- finalResult.Data = result.Result
- } else {
- finalResult.Err = result.Err
- }
-
- storeChannel <- finalResult
+ result := queryFunction(s.LayerChainHead)
+ storeChannel <- result.StoreResult
}()
return storeChannel
@@ -116,7 +132,7 @@ func (s *LayeredStore) FileInfo() FileInfoStore {
}
func (s *LayeredStore) Reaction() ReactionStore {
- return s.DatabaseLayer.Reaction()
+ return s.ReactionStore
}
func (s *LayeredStore) Job() JobStore {
@@ -152,35 +168,25 @@ type LayeredReactionStore struct {
}
func (s *LayeredReactionStore) Save(reaction *model.Reaction) StoreChannel {
- return s.RunQuery(func(supplier LayeredStoreSupplier) LayeredStoreSupplierResult {
+ return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
return supplier.ReactionSave(s.TmpContext, reaction)
})
}
func (s *LayeredReactionStore) Delete(reaction *model.Reaction) StoreChannel {
- return s.RunQuery(func(supplier LayeredStoreSupplier) LayeredStoreSupplierResult {
+ return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
return supplier.ReactionDelete(s.TmpContext, reaction)
})
}
-// TODO: DELETE ME
-func (s *LayeredReactionStore) InvalidateCacheForPost(postId string) {
- return
-}
-
-// TODO: DELETE ME
-func (s *LayeredReactionStore) InvalidateCache() {
- return
-}
-
func (s *LayeredReactionStore) GetForPost(postId string, allowFromCache bool) StoreChannel {
- return s.RunQuery(func(supplier LayeredStoreSupplier) LayeredStoreSupplierResult {
+ return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
return supplier.ReactionGetForPost(s.TmpContext, postId)
})
}
func (s *LayeredReactionStore) DeleteAllWithEmojiName(emojiName string) StoreChannel {
- return s.RunQuery(func(supplier LayeredStoreSupplier) LayeredStoreSupplierResult {
+ return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
return supplier.ReactionDeleteAllWithEmojiName(s.TmpContext, emojiName)
})
}
diff --git a/store/layered_store_hints.go b/store/layered_store_hints.go
index 6154af7c9..064f4f326 100644
--- a/store/layered_store_hints.go
+++ b/store/layered_store_hints.go
@@ -9,3 +9,23 @@ const (
LSH_NO_CACHE LayeredStoreHint = iota
LSH_MASTER_ONLY
)
+
+func hintsContains(hints []LayeredStoreHint, contains LayeredStoreHint) bool {
+ for _, hint := range hints {
+ if hint == contains {
+ return true
+ }
+ }
+ return false
+}
+
+func hintsContainsAny(hints []LayeredStoreHint, contains ...LayeredStoreHint) bool {
+ for _, hint := range hints {
+ for _, hint2 := range contains {
+ if hint == hint2 {
+ return true
+ }
+ }
+ }
+ return false
+}
diff --git a/store/layered_store_supplier.go b/store/layered_store_supplier.go
index 7b7da5710..22c90ab17 100644
--- a/store/layered_store_supplier.go
+++ b/store/layered_store_supplier.go
@@ -6,24 +6,28 @@ package store
import "github.com/mattermost/platform/model"
import "context"
+type ResultHandler func(*StoreResult)
+
type LayeredStoreSupplierResult struct {
- Result StoreResult
- Err *model.AppError
+ StoreResult
}
-func NewSupplierResult() LayeredStoreSupplierResult {
- return LayeredStoreSupplierResult{
- Result: StoreResult{},
- Err: nil,
- }
+func NewSupplierResult() *LayeredStoreSupplierResult {
+ return &LayeredStoreSupplierResult{}
}
type LayeredStoreSupplier interface {
//
+ // Control
+ //
+ SetChainNext(LayeredStoreSupplier)
+ Next() LayeredStoreSupplier
+
+ //
// Reactions
//), hints ...LayeredStoreHint)
- ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) LayeredStoreSupplierResult
- ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) LayeredStoreSupplierResult
- ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) LayeredStoreSupplierResult
- ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) LayeredStoreSupplierResult
+ ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult
+ ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult
+ ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult
+ ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult
}
diff --git a/store/local_cache_supplier.go b/store/local_cache_supplier.go
new file mode 100644
index 000000000..63c050485
--- /dev/null
+++ b/store/local_cache_supplier.go
@@ -0,0 +1,104 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "context"
+
+ "github.com/mattermost/platform/einterfaces"
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
+)
+
+const (
+ REACTION_CACHE_SIZE = 20000
+ REACTION_CACHE_SEC = 1800 // 30 minutes
+
+ CLEAR_CACHE_MESSAGE_DATA = ""
+)
+
+type LocalCacheSupplier struct {
+ next LayeredStoreSupplier
+ reactionCache *utils.Cache
+}
+
+func NewLocalCacheSupplier() *LocalCacheSupplier {
+ supplier := &LocalCacheSupplier{
+ reactionCache: utils.NewLruWithParams(REACTION_CACHE_SIZE, "Reaction", REACTION_CACHE_SEC, model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS),
+ }
+
+ registerClusterHandlers(supplier)
+
+ return supplier
+}
+
+func registerClusterHandlers(supplier *LocalCacheSupplier) {
+ if cluster := einterfaces.GetClusterInterface(); cluster != nil {
+ cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS, supplier.handleClusterInvalidateReaction)
+ }
+}
+
+func (s *LocalCacheSupplier) SetChainNext(next LayeredStoreSupplier) {
+ s.next = next
+}
+
+func (s *LocalCacheSupplier) Next() LayeredStoreSupplier {
+ return s.next
+}
+
+func doStandardReadCache(ctx context.Context, cache utils.ObjectCache, key string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ metrics := einterfaces.GetMetricsInterface()
+
+ if hintsContains(hints, LSH_NO_CACHE) {
+ if metrics != nil {
+ metrics.IncrementMemCacheMissCounter(cache.Name())
+ }
+ return nil
+ }
+
+ if cacheItem, ok := cache.Get(key); ok {
+ if metrics != nil {
+ metrics.IncrementMemCacheHitCounter(cache.Name())
+ }
+ result := NewSupplierResult()
+ result.Data = cacheItem
+ return result
+ }
+
+ if metrics != nil {
+ metrics.IncrementMemCacheMissCounter(cache.Name())
+ }
+
+ return nil
+}
+
+func doStandardAddToCache(ctx context.Context, cache utils.ObjectCache, key string, result *LayeredStoreSupplierResult, hints ...LayeredStoreHint) {
+ if result.Err == nil && result.Data != nil {
+ cache.AddWithDefaultExpires(key, result.Data)
+ }
+}
+
+func doInvalidateCacheCluster(cache utils.ObjectCache, key string) {
+ cache.Remove(key)
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: cache.GetInvalidateClusterEvent(),
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: key,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
+ }
+}
+
+func doClearCacheCluster(cache utils.ObjectCache) {
+ cache.Purge()
+ if einterfaces.GetClusterInterface() != nil {
+ msg := &model.ClusterMessage{
+ Event: cache.GetInvalidateClusterEvent(),
+ SendType: model.CLUSTER_SEND_BEST_EFFORT,
+ Data: CLEAR_CACHE_MESSAGE_DATA,
+ }
+ einterfaces.GetClusterInterface().SendClusterMessage(msg)
+ }
+}
diff --git a/store/local_cache_supplier_reactions.go b/store/local_cache_supplier_reactions.go
new file mode 100644
index 000000000..7d2c9f065
--- /dev/null
+++ b/store/local_cache_supplier_reactions.go
@@ -0,0 +1,47 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "context"
+
+ "github.com/mattermost/platform/model"
+)
+
+func (s *LocalCacheSupplier) handleClusterInvalidateReaction(msg *model.ClusterMessage) {
+ if msg.Data == CLEAR_CACHE_MESSAGE_DATA {
+ s.reactionCache.Purge()
+ } else {
+ s.reactionCache.Remove(msg.Data)
+ }
+}
+
+func (s *LocalCacheSupplier) ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ doInvalidateCacheCluster(s.reactionCache, reaction.PostId)
+ return s.Next().ReactionSave(ctx, reaction, hints...)
+}
+
+func (s *LocalCacheSupplier) ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ doInvalidateCacheCluster(s.reactionCache, reaction.PostId)
+ return s.Next().ReactionDelete(ctx, reaction, hints...)
+}
+
+func (s *LocalCacheSupplier) ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ if result := doStandardReadCache(ctx, s.reactionCache, postId, hints...); result != nil {
+ return result
+ }
+
+ result := s.Next().ReactionGetForPost(ctx, postId, hints...)
+
+ doStandardAddToCache(ctx, s.reactionCache, postId, result, hints...)
+
+ return result
+}
+
+func (s *LocalCacheSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ // This could be improved. Right now we just clear the whole
+ // cache because we don't have a way find what post Ids have this emoji name.
+ doClearCacheCluster(s.reactionCache)
+ return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...)
+}
diff --git a/store/redis_supplier.go b/store/redis_supplier.go
new file mode 100644
index 000000000..eede36ef2
--- /dev/null
+++ b/store/redis_supplier.go
@@ -0,0 +1,133 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "bytes"
+ "context"
+ "encoding/gob"
+
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/go-redis/redis"
+ "github.com/mattermost/platform/model"
+)
+
+const REDIS_EXPIRY_TIME = 30 * time.Minute
+
+type RedisSupplier struct {
+ next LayeredStoreSupplier
+ client *redis.Client
+}
+
+func GetBytes(key interface{}) ([]byte, error) {
+ var buf bytes.Buffer
+ enc := gob.NewEncoder(&buf)
+ err := enc.Encode(key)
+ if err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+func DecodeBytes(input []byte, thing interface{}) error {
+ dec := gob.NewDecoder(bytes.NewReader(input))
+ err := dec.Decode(thing)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func NewRedisSupplier() *RedisSupplier {
+ supplier := &RedisSupplier{}
+
+ supplier.client = redis.NewClient(&redis.Options{
+ Addr: "localhost:6379",
+ Password: "",
+ DB: 0,
+ })
+
+ if _, err := supplier.client.Ping().Result(); err != nil {
+ l4g.Error("Unable to ping redis server: " + err.Error())
+ return nil
+ }
+
+ return supplier
+}
+
+func (s *RedisSupplier) save(key string, value interface{}, expiry time.Duration) error {
+ if bytes, err := GetBytes(value); err != nil {
+ return err
+ } else {
+ if err := s.client.Set(key, bytes, expiry).Err(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *RedisSupplier) load(key string, writeTo interface{}) (bool, error) {
+ if data, err := s.client.Get(key).Bytes(); err != nil {
+ if err == redis.Nil {
+ return false, nil
+ } else {
+ return false, err
+ }
+ } else {
+ if err := DecodeBytes(data, writeTo); err != nil {
+ return false, err
+ }
+ }
+ return true, nil
+}
+
+func (s *RedisSupplier) SetChainNext(next LayeredStoreSupplier) {
+ s.next = next
+}
+
+func (s *RedisSupplier) Next() LayeredStoreSupplier {
+ return s.next
+}
+
+func (s *RedisSupplier) ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ if err := s.client.Del("reactions:" + reaction.PostId).Err(); err != nil {
+ l4g.Error("Redis failed to remove key reactions:" + reaction.PostId + " Error: " + err.Error())
+ }
+ return s.Next().ReactionSave(ctx, reaction, hints...)
+}
+
+func (s *RedisSupplier) ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ if err := s.client.Del("reactions:" + reaction.PostId).Err(); err != nil {
+ l4g.Error("Redis failed to remove key reactions:" + reaction.PostId + " Error: " + err.Error())
+ }
+ return s.Next().ReactionDelete(ctx, reaction, hints...)
+}
+
+func (s *RedisSupplier) ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ var resultdata []*model.Reaction
+ found, err := s.load("reactions:"+postId, &resultdata)
+ if found {
+ result := NewSupplierResult()
+ result.Data = resultdata
+ return result
+ }
+ if err != nil {
+ l4g.Error("Redis encountered an error on read: " + err.Error())
+ }
+
+ result := s.Next().ReactionGetForPost(ctx, postId, hints...)
+
+ if err := s.save("reactions:"+postId, result.Data, REDIS_EXPIRY_TIME); err != nil {
+ l4g.Error("Redis encountered and error on write: " + err.Error())
+ }
+
+ return result
+}
+
+func (s *RedisSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ // Ignoring this. It's probably OK to have the emoji slowly expire from Redis.
+ return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...)
+}
diff --git a/store/sql_reaction_store.go b/store/sql_reaction_store.go
deleted file mode 100644
index 87845421e..000000000
--- a/store/sql_reaction_store.go
+++ /dev/null
@@ -1,271 +0,0 @@
-// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package store
-
-import (
- "github.com/mattermost/gorp"
- "github.com/mattermost/platform/einterfaces"
- "github.com/mattermost/platform/model"
- "github.com/mattermost/platform/utils"
-
- l4g "github.com/alecthomas/log4go"
-)
-
-const (
- REACTION_CACHE_SIZE = 20000
- REACTION_CACHE_SEC = 1800 // 30 minutes
-)
-
-var reactionCache *utils.Cache = utils.NewLru(REACTION_CACHE_SIZE)
-
-type SqlReactionStore struct {
- SqlStore
-}
-
-func NewSqlReactionStore(sqlStore SqlStore) ReactionStore {
- s := &SqlReactionStore{sqlStore}
-
- for _, db := range sqlStore.GetAllConns() {
- table := db.AddTableWithName(model.Reaction{}, "Reactions").SetKeys(false, "UserId", "PostId", "EmojiName")
- table.ColMap("UserId").SetMaxSize(26)
- table.ColMap("PostId").SetMaxSize(26)
- table.ColMap("EmojiName").SetMaxSize(64)
- }
-
- return s
-}
-
-func (s SqlReactionStore) CreateIndexesIfNotExists() {
- s.CreateIndexIfNotExists("idx_reactions_post_id", "Reactions", "PostId")
- s.CreateIndexIfNotExists("idx_reactions_user_id", "Reactions", "UserId")
- s.CreateIndexIfNotExists("idx_reactions_emoji_name", "Reactions", "EmojiName")
-}
-
-func (s SqlReactionStore) Save(reaction *model.Reaction) StoreChannel {
- storeChannel := make(StoreChannel)
-
- go func() {
- result := StoreResult{}
-
- reaction.PreSave()
- if result.Err = reaction.IsValid(); result.Err != nil {
- storeChannel <- result
- close(storeChannel)
- return
- }
-
- if transaction, err := s.GetMaster().Begin(); err != nil {
- result.Err = model.NewLocAppError("SqlReactionStore.Save", "store.sql_reaction.save.begin.app_error", nil, err.Error())
- } else {
- err := saveReactionAndUpdatePost(transaction, reaction)
-
- if err != nil {
- transaction.Rollback()
-
- // We don't consider duplicated save calls as an error
- if !IsUniqueConstraintError(err.Error(), []string{"reactions_pkey", "PRIMARY"}) {
- result.Err = model.NewLocAppError("SqlPreferenceStore.Save", "store.sql_reaction.save.save.app_error", nil, err.Error())
- }
- } else {
- if err := transaction.Commit(); err != nil {
- // don't need to rollback here since the transaction is already closed
- result.Err = model.NewLocAppError("SqlPreferenceStore.Save", "store.sql_reaction.save.commit.app_error", nil, err.Error())
- }
- }
-
- if result.Err == nil {
- result.Data = reaction
- }
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (s SqlReactionStore) Delete(reaction *model.Reaction) StoreChannel {
- storeChannel := make(StoreChannel)
-
- go func() {
- result := StoreResult{}
-
- if transaction, err := s.GetMaster().Begin(); err != nil {
- result.Err = model.NewLocAppError("SqlReactionStore.Delete", "store.sql_reaction.delete.begin.app_error", nil, err.Error())
- } else {
- err := deleteReactionAndUpdatePost(transaction, reaction)
-
- if err != nil {
- transaction.Rollback()
-
- result.Err = model.NewLocAppError("SqlPreferenceStore.Delete", "store.sql_reaction.delete.app_error", nil, err.Error())
- } else if err := transaction.Commit(); err != nil {
- // don't need to rollback here since the transaction is already closed
- result.Err = model.NewLocAppError("SqlPreferenceStore.Delete", "store.sql_reaction.delete.commit.app_error", nil, err.Error())
- } else {
- result.Data = reaction
- }
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func saveReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error {
- if err := transaction.Insert(reaction); err != nil {
- return err
- }
-
- return updatePostForReactions(transaction, reaction.PostId)
-}
-
-func deleteReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error {
- if _, err := transaction.Exec(
- `DELETE FROM
- Reactions
- WHERE
- PostId = :PostId AND
- UserId = :UserId AND
- EmojiName = :EmojiName`,
- map[string]interface{}{"PostId": reaction.PostId, "UserId": reaction.UserId, "EmojiName": reaction.EmojiName}); err != nil {
- return err
- }
-
- return updatePostForReactions(transaction, reaction.PostId)
-}
-
-const (
- // Set HasReactions = true if and only if the post has reactions, update UpdateAt only if HasReactions changes
- UPDATE_POST_HAS_REACTIONS_QUERY = `UPDATE
- Posts
- SET
- UpdateAt = (CASE
- WHEN HasReactions != (SELECT count(0) > 0 FROM Reactions WHERE PostId = :PostId) THEN :UpdateAt
- ELSE UpdateAt
- END),
- HasReactions = (SELECT count(0) > 0 FROM Reactions WHERE PostId = :PostId)
- WHERE
- Id = :PostId`
-)
-
-func updatePostForReactions(transaction *gorp.Transaction, postId string) error {
- _, err := transaction.Exec(UPDATE_POST_HAS_REACTIONS_QUERY, map[string]interface{}{"PostId": postId, "UpdateAt": model.GetMillis()})
-
- return err
-}
-
-func (s SqlReactionStore) InvalidateCacheForPost(postId string) {
- reactionCache.Remove(postId)
-}
-
-func (s SqlReactionStore) InvalidateCache() {
- reactionCache.Purge()
-}
-
-func (s SqlReactionStore) GetForPost(postId string, allowFromCache bool) StoreChannel {
- storeChannel := make(StoreChannel)
-
- go func() {
- result := StoreResult{}
- metrics := einterfaces.GetMetricsInterface()
-
- if allowFromCache {
- if cacheItem, ok := reactionCache.Get(postId); ok {
- if metrics != nil {
- metrics.IncrementMemCacheHitCounter("Reactions")
- }
- result.Data = cacheItem.([]*model.Reaction)
- storeChannel <- result
- close(storeChannel)
- return
- } else {
- if metrics != nil {
- metrics.IncrementMemCacheMissCounter("Reactions")
- }
- }
- } else {
- if metrics != nil {
- metrics.IncrementMemCacheMissCounter("Reactions")
- }
- }
-
- var reactions []*model.Reaction
-
- if _, err := s.GetReplica().Select(&reactions,
- `SELECT
- *
- FROM
- Reactions
- WHERE
- PostId = :PostId
- ORDER BY
- CreateAt`, map[string]interface{}{"PostId": postId}); err != nil {
- result.Err = model.NewLocAppError("SqlReactionStore.GetForPost", "store.sql_reaction.get_for_post.app_error", nil, "")
- } else {
- result.Data = reactions
-
- reactionCache.AddWithExpiresInSecs(postId, reactions, REACTION_CACHE_SEC)
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
-
-func (s SqlReactionStore) DeleteAllWithEmojiName(emojiName string) StoreChannel {
- storeChannel := make(StoreChannel)
-
- go func() {
- result := StoreResult{}
-
- // doesn't use a transaction since it's better for this to half-finish than to not commit anything
- var reactions []*model.Reaction
-
- if _, err := s.GetReplica().Select(&reactions,
- `SELECT
- *
- FROM
- Reactions
- WHERE
- EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName}); err != nil {
- result.Err = model.NewLocAppError("SqlReactionStore.DeleteAllWithEmojiName",
- "store.sql_reaction.delete_all_with_emoji_name.get_reactions.app_error", nil,
- "emoji_name="+emojiName+", error="+err.Error())
- storeChannel <- result
- close(storeChannel)
- return
- }
-
- if _, err := s.GetMaster().Exec(
- `DELETE FROM
- Reactions
- WHERE
- EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName}); err != nil {
- result.Err = model.NewLocAppError("SqlReactionStore.DeleteAllWithEmojiName",
- "store.sql_reaction.delete_all_with_emoji_name.delete_reactions.app_error", nil,
- "emoji_name="+emojiName+", error="+err.Error())
- storeChannel <- result
- close(storeChannel)
- return
- }
-
- for _, reaction := range reactions {
- if _, err := s.GetMaster().Exec(UPDATE_POST_HAS_REACTIONS_QUERY,
- map[string]interface{}{"PostId": reaction.PostId, "UpdateAt": model.GetMillis()}); err != nil {
- l4g.Warn(utils.T("store.sql_reaction.delete_all_with_emoji_name.update_post.warn"), reaction.PostId, err.Error())
- }
- }
-
- storeChannel <- result
- close(storeChannel)
- }()
-
- return storeChannel
-}
diff --git a/store/sql_supplier.go b/store/sql_supplier.go
index 0f4ab8380..df934f2cb 100644
--- a/store/sql_supplier.go
+++ b/store/sql_supplier.go
@@ -57,11 +57,6 @@ const (
EXIT_REMOVE_TABLE = 134
)
-type SqlSupplierResult struct {
- Err model.AppError
- Result interface{}
-}
-
type SqlSupplierOldStores struct {
team TeamStore
channel ChannelStore
@@ -86,6 +81,7 @@ type SqlSupplierOldStores struct {
}
type SqlSupplier struct {
+ next LayeredStoreSupplier
master *gorp.DbMap
replicas []*gorp.DbMap
searchReplicas []*gorp.DbMap
@@ -120,9 +116,10 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.emoji = NewSqlEmojiStore(supplier)
supplier.oldStores.status = NewSqlStatusStore(supplier)
supplier.oldStores.fileInfo = NewSqlFileInfoStore(supplier)
- supplier.oldStores.reaction = NewSqlReactionStore(supplier)
supplier.oldStores.job = NewSqlJobStore(supplier)
+ initSqlSupplierReactions(supplier)
+
err := supplier.GetMaster().CreateTablesIfNotExists()
if err != nil {
l4g.Critical(utils.T("store.sql.creating_tables.critical"), err)
@@ -149,7 +146,6 @@ func NewSqlSupplier() *SqlSupplier {
supplier.oldStores.emoji.(*SqlEmojiStore).CreateIndexesIfNotExists()
supplier.oldStores.status.(*SqlStatusStore).CreateIndexesIfNotExists()
supplier.oldStores.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists()
- supplier.oldStores.reaction.(*SqlReactionStore).CreateIndexesIfNotExists()
supplier.oldStores.job.(*SqlJobStore).CreateIndexesIfNotExists()
supplier.oldStores.preference.(*SqlPreferenceStore).DeleteUnusedFeatures()
@@ -157,6 +153,14 @@ func NewSqlSupplier() *SqlSupplier {
return supplier
}
+func (s *SqlSupplier) SetChainNext(next LayeredStoreSupplier) {
+ s.next = next
+}
+
+func (s *SqlSupplier) Next() LayeredStoreSupplier {
+ return s.next
+}
+
func setupConnection(con_type string, driver string, dataSource string, maxIdle int, maxOpen int, trace bool) *gorp.DbMap {
db, err := dbsql.Open(driver, dataSource)
if err != nil {
diff --git a/store/sql_supplier_reactions.go b/store/sql_supplier_reactions.go
index 14f13cce6..30ca6beed 100644
--- a/store/sql_supplier_reactions.go
+++ b/store/sql_supplier_reactions.go
@@ -6,7 +6,10 @@ package store
import (
"context"
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/gorp"
"github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/utils"
)
func initSqlSupplierReactions(sqlStore SqlStore) {
@@ -18,18 +21,164 @@ func initSqlSupplierReactions(sqlStore SqlStore) {
}
}
-func (s *SqlSupplier) ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) LayeredStoreSupplierResult {
- panic("not implemented")
+func (s *SqlSupplier) ReactionSave(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ result := NewSupplierResult()
+
+ reaction.PreSave()
+ if result.Err = reaction.IsValid(); result.Err != nil {
+ return result
+ }
+
+ if transaction, err := s.GetMaster().Begin(); err != nil {
+ result.Err = model.NewLocAppError("SqlReactionStore.Save", "store.sql_reaction.save.begin.app_error", nil, err.Error())
+ } else {
+ err := saveReactionAndUpdatePost(transaction, reaction)
+
+ if err != nil {
+ transaction.Rollback()
+
+ // We don't consider duplicated save calls as an error
+ if !IsUniqueConstraintError(err.Error(), []string{"reactions_pkey", "PRIMARY"}) {
+ result.Err = model.NewLocAppError("SqlPreferenceStore.Save", "store.sql_reaction.save.save.app_error", nil, err.Error())
+ }
+ } else {
+ if err := transaction.Commit(); err != nil {
+ // don't need to rollback here since the transaction is already closed
+ result.Err = model.NewLocAppError("SqlPreferenceStore.Save", "store.sql_reaction.save.commit.app_error", nil, err.Error())
+ }
+ }
+
+ if result.Err == nil {
+ result.Data = reaction
+ }
+ }
+
+ return result
+}
+
+func (s *SqlSupplier) ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ result := NewSupplierResult()
+
+ if transaction, err := s.GetMaster().Begin(); err != nil {
+ result.Err = model.NewLocAppError("SqlReactionStore.Delete", "store.sql_reaction.delete.begin.app_error", nil, err.Error())
+ } else {
+ err := deleteReactionAndUpdatePost(transaction, reaction)
+
+ if err != nil {
+ transaction.Rollback()
+
+ result.Err = model.NewLocAppError("SqlPreferenceStore.Delete", "store.sql_reaction.delete.app_error", nil, err.Error())
+ } else if err := transaction.Commit(); err != nil {
+ // don't need to rollback here since the transaction is already closed
+ result.Err = model.NewLocAppError("SqlPreferenceStore.Delete", "store.sql_reaction.delete.commit.app_error", nil, err.Error())
+ } else {
+ result.Data = reaction
+ }
+ }
+
+ return result
+}
+
+func (s *SqlSupplier) ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ result := NewSupplierResult()
+
+ var reactions []*model.Reaction
+
+ if _, err := s.GetReplica().Select(&reactions,
+ `SELECT
+ *
+ FROM
+ Reactions
+ WHERE
+ PostId = :PostId
+ ORDER BY
+ CreateAt`, map[string]interface{}{"PostId": postId}); err != nil {
+ result.Err = model.NewLocAppError("SqlReactionStore.GetForPost", "store.sql_reaction.get_for_post.app_error", nil, "")
+ } else {
+ result.Data = reactions
+ }
+
+ return result
}
-func (s *SqlSupplier) ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) LayeredStoreSupplierResult {
- panic("not implemented")
+func (s *SqlSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
+ result := NewSupplierResult()
+
+ var reactions []*model.Reaction
+
+ if _, err := s.GetReplica().Select(&reactions,
+ `SELECT
+ *
+ FROM
+ Reactions
+ WHERE
+ EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName}); err != nil {
+ result.Err = model.NewLocAppError("SqlReactionStore.DeleteAllWithEmojiName",
+ "store.sql_reaction.delete_all_with_emoji_name.get_reactions.app_error", nil,
+ "emoji_name="+emojiName+", error="+err.Error())
+ return result
+ }
+
+ if _, err := s.GetMaster().Exec(
+ `DELETE FROM
+ Reactions
+ WHERE
+ EmojiName = :EmojiName`, map[string]interface{}{"EmojiName": emojiName}); err != nil {
+ result.Err = model.NewLocAppError("SqlReactionStore.DeleteAllWithEmojiName",
+ "store.sql_reaction.delete_all_with_emoji_name.delete_reactions.app_error", nil,
+ "emoji_name="+emojiName+", error="+err.Error())
+ return result
+ }
+
+ for _, reaction := range reactions {
+ if _, err := s.GetMaster().Exec(UPDATE_POST_HAS_REACTIONS_QUERY,
+ map[string]interface{}{"PostId": reaction.PostId, "UpdateAt": model.GetMillis()}); err != nil {
+ l4g.Warn(utils.T("store.sql_reaction.delete_all_with_emoji_name.update_post.warn"), reaction.PostId, err.Error())
+ }
+ }
+
+ return result
}
-func (s *SqlSupplier) ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) LayeredStoreSupplierResult {
- panic("not implemented")
+func saveReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error {
+ if err := transaction.Insert(reaction); err != nil {
+ return err
+ }
+
+ return updatePostForReactions(transaction, reaction.PostId)
}
-func (s *SqlSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) LayeredStoreSupplierResult {
- panic("not implemented")
+func deleteReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error {
+ if _, err := transaction.Exec(
+ `DELETE FROM
+ Reactions
+ WHERE
+ PostId = :PostId AND
+ UserId = :UserId AND
+ EmojiName = :EmojiName`,
+ map[string]interface{}{"PostId": reaction.PostId, "UserId": reaction.UserId, "EmojiName": reaction.EmojiName}); err != nil {
+ return err
+ }
+
+ return updatePostForReactions(transaction, reaction.PostId)
+}
+
+const (
+ // Set HasReactions = true if and only if the post has reactions, update UpdateAt only if HasReactions changes
+ UPDATE_POST_HAS_REACTIONS_QUERY = `UPDATE
+ Posts
+ SET
+ UpdateAt = (CASE
+ WHEN HasReactions != (SELECT count(0) > 0 FROM Reactions WHERE PostId = :PostId) THEN :UpdateAt
+ ELSE UpdateAt
+ END),
+ HasReactions = (SELECT count(0) > 0 FROM Reactions WHERE PostId = :PostId)
+ WHERE
+ Id = :PostId`
+)
+
+func updatePostForReactions(transaction *gorp.Transaction, postId string) error {
+ _, err := transaction.Exec(UPDATE_POST_HAS_REACTIONS_QUERY, map[string]interface{}{"PostId": postId, "UpdateAt": model.GetMillis()})
+
+ return err
}
diff --git a/store/store.go b/store/store.go
index ab3d97d9b..0fa2a96b3 100644
--- a/store/store.go
+++ b/store/store.go
@@ -379,8 +379,6 @@ type FileInfoStore interface {
type ReactionStore interface {
Save(reaction *model.Reaction) StoreChannel
Delete(reaction *model.Reaction) StoreChannel
- InvalidateCacheForPost(postId string)
- InvalidateCache()
GetForPost(postId string, allowFromCache bool) StoreChannel
DeleteAllWithEmojiName(emojiName string) StoreChannel
}