From 86fb0d87a3a09b237bec124ce0e74cd05aa164be Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Tue, 13 Dec 2016 22:24:24 -0500 Subject: Adding caching layer to some posts calls (#4779) --- api/command_statuses_test.go | 2 +- api/context.go | 1 + api/post.go | 10 ++++++++-- api/slackimport.go | 2 ++ api/web_hub.go | 20 ++++++++++++++++++++ einterfaces/cluster.go | 2 ++ store/sql_post_store.go | 28 +++++++++++++++++++++++++++- store/sql_post_store_test.go | 43 +++++++++++++++++++++++++++++++++++++++---- store/store.go | 3 ++- 9 files changed, 102 insertions(+), 9 deletions(-) diff --git a/api/command_statuses_test.go b/api/command_statuses_test.go index 47628de3f..063d76062 100644 --- a/api/command_statuses_test.go +++ b/api/command_statuses_test.go @@ -27,7 +27,7 @@ func commandAndTest(t *testing.T, th *TestHelper, status string) { t.Fatal("Command failed to execute") } - time.Sleep(500 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) statuses := Client.Must(Client.GetStatuses()).Data.(map[string]string) diff --git a/api/context.go b/api/context.go index 1e82acb68..91f17dada 100644 --- a/api/context.go +++ b/api/context.go @@ -564,4 +564,5 @@ func InvalidateAllCaches() { ClearStatusCache() store.ClearChannelCaches() store.ClearUserCaches() + store.ClearPostCaches() } diff --git a/api/post.go b/api/post.go index 143dbf7e1..a4a493060 100644 --- a/api/post.go +++ b/api/post.go @@ -166,6 +166,8 @@ func CreatePost(c *Context, post *model.Post, triggerWebhooks bool) (*model.Post } } + InvalidateCacheForChannelPosts(rpost.ChannelId) + handlePostEvents(c, rpost, triggerWebhooks) return rpost, nil @@ -1226,6 +1228,8 @@ func updatePost(c *Context, w http.ResponseWriter, r *http.Request) { go Publish(message) + InvalidateCacheForChannelPosts(rpost.ChannelId) + w.Write([]byte(rpost.ToJson())) } } @@ -1278,7 +1282,7 @@ func getPosts(c *Context, w http.ResponseWriter, r *http.Request) { return } - etagChan := Srv.Store.Post().GetEtag(id) + etagChan := Srv.Store.Post().GetEtag(id, true) if !HasPermissionToChannelContext(c, id, model.PERMISSION_CREATE_POST) { return @@ -1508,6 +1512,8 @@ func deletePost(c *Context, w http.ResponseWriter, r *http.Request) { go DeletePostFiles(post) go DeleteFlaggedPost(c.Session.UserId, post) + InvalidateCacheForChannelPosts(post.ChannelId) + result := make(map[string]string) result["id"] = postId w.Write([]byte(model.MapToJson(result))) @@ -1567,7 +1573,7 @@ func getPostsBeforeOrAfter(c *Context, w http.ResponseWriter, r *http.Request, b } // We can do better than this etag in this situation - etagChan := Srv.Store.Post().GetEtag(id) + etagChan := Srv.Store.Post().GetEtag(id, true) if !HasPermissionToChannelContext(c, id, model.PERMISSION_READ_CHANNEL) { return diff --git a/api/slackimport.go b/api/slackimport.go index 1225e7833..3fd0ec3f6 100644 --- a/api/slackimport.go +++ b/api/slackimport.go @@ -560,6 +560,8 @@ func SlackImport(fileData multipart.File, fileSize int64, teamID string) (*model deactivateSlackBotUser(botUser) } + InvalidateAllCaches() + log.WriteString(utils.T("api.slackimport.slack_import.notes")) log.WriteString("=======\r\n\r\n") diff --git a/api/web_hub.go b/api/web_hub.go index ce11d26b0..da40bf1c7 100644 --- a/api/web_hub.go +++ b/api/web_hub.go @@ -103,10 +103,30 @@ func PublishSkipClusterSend(message *model.WebSocketEvent) { } func InvalidateCacheForChannel(channelId string) { + InvalidateCacheForChannelSkipClusterSend(channelId) + + if cluster := einterfaces.GetClusterInterface(); cluster != nil { + cluster.InvalidateCacheForChannel(channelId) + } +} + +func InvalidateCacheForChannelSkipClusterSend(channelId string) { Srv.Store.User().InvalidateProfilesInChannelCache(channelId) Srv.Store.Channel().InvalidateMemberCount(channelId) } +func InvalidateCacheForChannelPosts(channelId string) { + InvalidateCacheForChannelPostsSkipClusterSend(channelId) + + if cluster := einterfaces.GetClusterInterface(); cluster != nil { + cluster.InvalidateCacheForChannelPosts(channelId) + } +} + +func InvalidateCacheForChannelPostsSkipClusterSend(channelId string) { + Srv.Store.Post().InvalidatePostEtagCache(channelId) +} + func InvalidateCacheForUser(userId string) { InvalidateCacheForUserSkipClusterSend(userId) diff --git a/einterfaces/cluster.go b/einterfaces/cluster.go index 43ce46280..0d7bf7e86 100644 --- a/einterfaces/cluster.go +++ b/einterfaces/cluster.go @@ -14,6 +14,8 @@ type ClusterInterface interface { GetClusterStats() ([]*model.ClusterStats, *model.AppError) RemoveAllSessionsForUserId(userId string) InvalidateCacheForUser(userId string) + InvalidateCacheForChannel(channelId string) + InvalidateCacheForChannelPosts(channelId string) Publish(event *model.WebSocketEvent) UpdateStatus(status *model.Status) GetLogs() ([]string, *model.AppError) diff --git a/store/sql_post_store.go b/store/sql_post_store.go index 900aeeeb6..44ae58b32 100644 --- a/store/sql_post_store.go +++ b/store/sql_post_store.go @@ -17,6 +17,17 @@ type SqlPostStore struct { *SqlStore } +const ( + POSTS_ETAG_CACHE_SIZE = 25000 + POSTS_ETAG_CACHE_SEC = 900 // 15 minutes +) + +var postEtagCache = utils.NewLru(CHANNEL_MEMBERS_COUNTS_CACHE_SIZE) + +func ClearPostCaches() { + postEtagCache.Purge() +} + func NewSqlPostStore(sqlStore *SqlStore) PostStore { s := &SqlPostStore{sqlStore} @@ -210,12 +221,25 @@ type etagPosts struct { UpdateAt int64 } -func (s SqlPostStore) GetEtag(channelId string) StoreChannel { +func (s SqlPostStore) InvalidatePostEtagCache(channelId string) { + postEtagCache.Remove(channelId) +} + +func (s SqlPostStore) GetEtag(channelId string, allowFromCache bool) StoreChannel { storeChannel := make(StoreChannel, 1) go func() { result := StoreResult{} + if allowFromCache { + if cacheItem, ok := postEtagCache.Get(channelId); ok { + result.Data = cacheItem.(string) + storeChannel <- result + close(storeChannel) + return + } + } + var et etagPosts err := s.GetReplica().SelectOne(&et, "SELECT Id, UpdateAt FROM Posts WHERE ChannelId = :ChannelId ORDER BY UpdateAt DESC LIMIT 1", map[string]interface{}{"ChannelId": channelId}) if err != nil { @@ -224,6 +248,8 @@ func (s SqlPostStore) GetEtag(channelId string) StoreChannel { result.Data = fmt.Sprintf("%v.%v.%v", model.CurrentVersion, et.Id, et.UpdateAt) } + postEtagCache.AddWithExpiresInSecs(channelId, result.Data.(string), POSTS_ETAG_CACHE_SEC) + storeChannel <- result close(storeChannel) }() diff --git a/store/sql_post_store_test.go b/store/sql_post_store_test.go index d685ea41e..30376fae5 100644 --- a/store/sql_post_store_test.go +++ b/store/sql_post_store_test.go @@ -37,14 +37,14 @@ func TestPostStoreGet(t *testing.T) { o1.UserId = model.NewId() o1.Message = "a" + model.NewId() + "b" - etag1 := (<-store.Post().GetEtag(o1.ChannelId)).Data.(string) + etag1 := (<-store.Post().GetEtag(o1.ChannelId, false)).Data.(string) if strings.Index(etag1, model.CurrentVersion+".0.") != 0 { t.Fatal("Invalid Etag") } o1 = (<-store.Post().Save(o1)).Data.(*model.Post) - etag2 := (<-store.Post().GetEtag(o1.ChannelId)).Data.(string) + etag2 := (<-store.Post().GetEtag(o1.ChannelId, false)).Data.(string) if strings.Index(etag2, model.CurrentVersion+"."+o1.Id) != 0 { t.Fatal("Invalid Etag") } @@ -62,6 +62,41 @@ func TestPostStoreGet(t *testing.T) { } } +func TestGetEtagCache(t *testing.T) { + Setup() + o1 := &model.Post{} + o1.ChannelId = model.NewId() + o1.UserId = model.NewId() + o1.Message = "a" + model.NewId() + "b" + + etag1 := (<-store.Post().GetEtag(o1.ChannelId, true)).Data.(string) + if strings.Index(etag1, model.CurrentVersion+".0.") != 0 { + t.Fatal("Invalid Etag") + } + + // This one should come from the cache + etag2 := (<-store.Post().GetEtag(o1.ChannelId, true)).Data.(string) + if strings.Index(etag2, model.CurrentVersion+".0.") != 0 { + t.Fatal("Invalid Etag") + } + + o1 = (<-store.Post().Save(o1)).Data.(*model.Post) + + // We have not invalidated the cache so this should be the same as above + etag3 := (<-store.Post().GetEtag(o1.ChannelId, true)).Data.(string) + if strings.Index(etag3, model.CurrentVersion+".0.") != 0 { + t.Fatal("Invalid Etag") + } + + store.Post().InvalidatePostEtagCache(o1.ChannelId) + + // Invalidated cache so we should get a good result + etag4 := (<-store.Post().GetEtag(o1.ChannelId, true)).Data.(string) + if strings.Index(etag4, model.CurrentVersion+"."+o1.Id) != 0 { + t.Fatal("Invalid Etag") + } +} + func TestPostStoreUpdate(t *testing.T) { Setup() @@ -164,7 +199,7 @@ func TestPostStoreDelete(t *testing.T) { o1.UserId = model.NewId() o1.Message = "a" + model.NewId() + "b" - etag1 := (<-store.Post().GetEtag(o1.ChannelId)).Data.(string) + etag1 := (<-store.Post().GetEtag(o1.ChannelId, false)).Data.(string) if strings.Index(etag1, model.CurrentVersion+".0.") != 0 { t.Fatal("Invalid Etag") } @@ -188,7 +223,7 @@ func TestPostStoreDelete(t *testing.T) { t.Fatal("Missing id should have failed") } - etag2 := (<-store.Post().GetEtag(o1.ChannelId)).Data.(string) + etag2 := (<-store.Post().GetEtag(o1.ChannelId, false)).Data.(string) if strings.Index(etag2, model.CurrentVersion+"."+o1.Id) != 0 { t.Fatal("Invalid Etag") } diff --git a/store/store.go b/store/store.go index 236fbbc13..b0cc09983 100644 --- a/store/store.go +++ b/store/store.go @@ -130,11 +130,12 @@ type PostStore interface { GetPostsBefore(channelId string, postId string, numPosts int, offset int) StoreChannel GetPostsAfter(channelId string, postId string, numPosts int, offset int) StoreChannel GetPostsSince(channelId string, time int64) StoreChannel - GetEtag(channelId string) StoreChannel + GetEtag(channelId string, allowFromCache bool) StoreChannel Search(teamId string, userId string, params *model.SearchParams) StoreChannel AnalyticsUserCountsWithPostsByDay(teamId string) StoreChannel AnalyticsPostCountsByDay(teamId string) StoreChannel AnalyticsPostCount(teamId string, mustHaveFile bool, mustHaveHashtag bool) StoreChannel + InvalidatePostEtagCache(channelId string) } type UserStore interface { -- cgit v1.2.3-1-g7c22