From 937b6480d534b3051cadf4a892a9aa210bec579a Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Thu, 25 Oct 2018 13:25:27 -0400 Subject: MM-12342: merge the experimental channel store (#9681) * MM-12342: merge the experimental channel store * gofmt after upgrading to go 1.11 --- store/sqlstore/channel_store.go | 390 +++++++++++++++++++++++++++++----------- 1 file changed, 281 insertions(+), 109 deletions(-) (limited to 'store/sqlstore/channel_store.go') diff --git a/store/sqlstore/channel_store.go b/store/sqlstore/channel_store.go index 63b9fbb42..5947a8f82 100644 --- a/store/sqlstore/channel_store.go +++ b/store/sqlstore/channel_store.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/mattermost/gorp" + "github.com/pkg/errors" "github.com/mattermost/mattermost-server/einterfaces" "github.com/mattermost/mattermost-server/mlog" @@ -232,6 +233,17 @@ func (db allChannelMembers) ToMapStringString() map[string]string { return result } +// publicChannel is a subset of the metadata corresponding to public channels only. +type publicChannel struct { + Id string `json:"id"` + DeleteAt int64 `json:"delete_at"` + TeamId string `json:"team_id"` + DisplayName string `json:"display_name"` + Name string `json:"name"` + Header string `json:"header"` + Purpose string `json:"purpose"` +} + var channelMemberCountsCache = utils.NewLru(CHANNEL_MEMBERS_COUNTS_CACHE_SIZE) var allChannelMembersForUserCache = utils.NewLru(ALL_CHANNEL_MEMBERS_FOR_USER_CACHE_SIZE) var allChannelMembersNotifyPropsForChannelCache = utils.NewLru(ALL_CHANNEL_MEMBERS_NOTIFY_PROPS_FOR_CHANNEL_CACHE_SIZE) @@ -278,6 +290,15 @@ func NewSqlChannelStore(sqlStore SqlStore, metrics einterfaces.MetricsInterface) tablem.ColMap("UserId").SetMaxSize(26) tablem.ColMap("Roles").SetMaxSize(64) tablem.ColMap("NotifyProps").SetMaxSize(2000) + + tablePublicChannels := db.AddTableWithName(publicChannel{}, "PublicChannels").SetKeys(false, "Id") + tablePublicChannels.ColMap("Id").SetMaxSize(26) + tablePublicChannels.ColMap("TeamId").SetMaxSize(26) + tablePublicChannels.ColMap("DisplayName").SetMaxSize(64) + tablePublicChannels.ColMap("Name").SetMaxSize(64) + tablePublicChannels.SetUniqueTogether("Name", "TeamId") + tablePublicChannels.ColMap("Header").SetMaxSize(1024) + tablePublicChannels.ColMap("Purpose").SetMaxSize(250) } return s @@ -299,21 +320,112 @@ func (s SqlChannelStore) CreateIndexesIfNotExists() { s.CreateIndexIfNotExists("idx_channelmembers_user_id", "ChannelMembers", "UserId") s.CreateFullTextIndexIfNotExists("idx_channel_search_txt", "Channels", "Name, DisplayName, Purpose") + + s.CreateIndexIfNotExists("idx_publicchannels_team_id", "PublicChannels", "TeamId") + s.CreateIndexIfNotExists("idx_publicchannels_name", "PublicChannels", "Name") + s.CreateIndexIfNotExists("idx_publicchannels_delete_at", "PublicChannels", "DeleteAt") + if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { + s.CreateIndexIfNotExists("idx_publicchannels_name_lower", "PublicChannels", "lower(Name)") + s.CreateIndexIfNotExists("idx_publicchannels_displayname_lower", "PublicChannels", "lower(DisplayName)") + } + s.CreateFullTextIndexIfNotExists("idx_publicchannels_search_txt", "PublicChannels", "Name, DisplayName, Purpose") } +// MigratePublicChannels initializes the PublicChannels table with data created before this version +// of the Mattermost server kept it up-to-date. func (s SqlChannelStore) MigratePublicChannels() error { - // See SqlChannelStoreExperimental + transaction, err := s.GetMaster().Begin() + if err != nil { + return err + } + + if _, err := transaction.Exec(` + INSERT INTO PublicChannels + (Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) + SELECT + c.Id, c.DeleteAt, c.TeamId, c.DisplayName, c.Name, c.Header, c.Purpose + FROM + Channels c + LEFT JOIN + PublicChannels pc ON (pc.Id = c.Id) + WHERE + c.Type = 'O' + AND pc.Id IS NULL + `); err != nil { + return err + } + + if err := transaction.Commit(); err != nil { + return err + } + return nil } -func (s SqlChannelStore) DropPublicChannels() error { - // See SqlChannelStoreExperimental +func (s SqlChannelStore) upsertPublicChannelT(transaction *gorp.Transaction, channel *model.Channel) error { + publicChannel := &publicChannel{ + Id: channel.Id, + DeleteAt: channel.DeleteAt, + TeamId: channel.TeamId, + DisplayName: channel.DisplayName, + Name: channel.Name, + Header: channel.Header, + Purpose: channel.Purpose, + } + + if channel.Type != model.CHANNEL_OPEN { + if _, err := transaction.Delete(publicChannel); err != nil { + return errors.Wrap(err, "failed to delete public channel") + } + + return nil + } + + if s.DriverName() == model.DATABASE_DRIVER_MYSQL { + // Leverage native upsert for MySQL, since RowsAffected returns 0 if the row exists + // but no changes were made, breaking the update-then-insert paradigm below when + // the row already exists. (Postgres 9.4 doesn't support native upsert.) + if _, err := transaction.Exec(` + INSERT INTO + PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) + VALUES + (:Id, :DeleteAt, :TeamId, :DisplayName, :Name, :Header, :Purpose) + ON DUPLICATE KEY UPDATE + DeleteAt = :DeleteAt, + TeamId = :TeamId, + DisplayName = :DisplayName, + Name = :Name, + Header = :Header, + Purpose = :Purpose; + `, map[string]interface{}{ + "Id": publicChannel.Id, + "DeleteAt": publicChannel.DeleteAt, + "TeamId": publicChannel.TeamId, + "DisplayName": publicChannel.DisplayName, + "Name": publicChannel.Name, + "Header": publicChannel.Header, + "Purpose": publicChannel.Purpose, + }); err != nil { + return errors.Wrap(err, "failed to insert public channel") + } + } else { + count, err := transaction.Update(publicChannel) + if err != nil { + return errors.Wrap(err, "failed to update public channel") + } + if count > 0 { + return nil + } + + if err := transaction.Insert(publicChannel); err != nil { + return errors.Wrap(err, "failed to insert public channel") + } + } + return nil } // Save writes the (non-direct) channel channel to the database. -// -// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) Save(channel *model.Channel, maxChannelsPerTeam int64) store.StoreChannel { return store.Do(func(result *store.StoreResult) { if channel.DeleteAt != 0 { @@ -338,6 +450,13 @@ func (s SqlChannelStore) Save(channel *model.Channel, maxChannelsPerTeam int64) return } + // Additionally propagate the write to the PublicChannels table. + if err := s.upsertPublicChannelT(transaction, result.Data.(*model.Channel)); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStore.Save", "store.sql_channel.save.upsert_public_channel.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + if err := transaction.Commit(); err != nil { result.Err = model.NewAppError("SqlChannelStore.Save", "store.sql_channel.save.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) return @@ -473,8 +592,6 @@ func (s SqlChannelStore) saveChannelT(transaction *gorp.Transaction, channel *mo } // Update writes the updated channel to the database. -// -// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) Update(channel *model.Channel) store.StoreChannel { return store.Do(func(result *store.StoreResult) { transaction, err := s.GetMaster().Begin() @@ -489,11 +606,19 @@ func (s SqlChannelStore) Update(channel *model.Channel) store.StoreChannel { return } + // Additionally propagate the write to the PublicChannels table. + if err := s.upsertPublicChannelT(transaction, result.Data.(*model.Channel)); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.upsert_public_channel.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + if err := transaction.Commit(); err != nil { result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) return } }) + } func (s SqlChannelStore) updateChannelT(transaction *gorp.Transaction, channel *model.Channel) store.StoreResult { @@ -642,22 +767,16 @@ func (s SqlChannelStore) get(id string, master bool, allowFromCache bool) store. } // Delete records the given deleted timestamp to the channel in question. -// -// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) Delete(channelId string, time int64) store.StoreChannel { return s.SetDeleteAt(channelId, time, time) } // Restore reverts a previous deleted timestamp from the channel in question. -// -// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) Restore(channelId string, time int64) store.StoreChannel { return s.SetDeleteAt(channelId, 0, time) } // SetDeleteAt records the given deleted and updated timestamp to the channel in question. -// -// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) SetDeleteAt(channelId string, deleteAt, updateAt int64) store.StoreChannel { return store.Do(func(result *store.StoreResult) { defer s.InvalidateChannel(channelId) @@ -674,6 +793,23 @@ func (s SqlChannelStore) SetDeleteAt(channelId string, deleteAt, updateAt int64) return } + // Additionally propagate the write to the PublicChannels table. + if _, err := transaction.Exec(` + UPDATE + PublicChannels + SET + DeleteAt = :DeleteAt + WHERE + Id = :ChannelId + `, map[string]interface{}{ + "DeleteAt": deleteAt, + "ChannelId": channelId, + }); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStore.SetDeleteAt", "store.sql_channel.set_delete_at.update_public_channel.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError) + return + } + if err := transaction.Commit(); err != nil { result.Err = model.NewAppError("SqlChannelStore.SetDeleteAt", "store.sql_channel.set_delete_at.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) return @@ -694,8 +830,6 @@ func (s SqlChannelStore) setDeleteAtT(transaction *gorp.Transaction, channelId s } // PermanentDeleteByTeam removes all channels for the given team from the database. -// -// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) PermanentDeleteByTeam(teamId string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { transaction, err := s.GetMaster().Begin() @@ -710,6 +844,20 @@ func (s SqlChannelStore) PermanentDeleteByTeam(teamId string) store.StoreChannel return } + // Additionally propagate the deletions to the PublicChannels table. + if _, err := transaction.Exec(` + DELETE FROM + PublicChannels + WHERE + TeamId = :TeamId + `, map[string]interface{}{ + "TeamId": teamId, + }); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeamt", "store.sql_channel.permanent_delete_by_team.delete_public_channels.app_error", nil, "team_id="+teamId+", "+err.Error(), http.StatusInternalServerError) + return + } + if err := transaction.Commit(); err != nil { result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) return @@ -729,8 +877,6 @@ func (s SqlChannelStore) permanentDeleteByTeamtT(transaction *gorp.Transaction, } // PermanentDelete removes the given channel from the database. -// -// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) PermanentDelete(channelId string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { transaction, err := s.GetMaster().Begin() @@ -745,6 +891,20 @@ func (s SqlChannelStore) PermanentDelete(channelId string) store.StoreChannel { return } + // Additionally propagate the deletion to the PublicChannels table. + if _, err := transaction.Exec(` + DELETE FROM + PublicChannels + WHERE + Id = :ChannelId + `, map[string]interface{}{ + "ChannelId": channelId, + }); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.delete_public_channel.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError) + return + } + if err := transaction.Commit(); err != nil { result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) return @@ -798,29 +958,38 @@ func (s SqlChannelStore) GetChannels(teamId string, userId string, includeDelete func (s SqlChannelStore) GetMoreChannels(teamId string, userId string, offset int, limit int) store.StoreChannel { return store.Do(func(result *store.StoreResult) { data := &model.ChannelList{} - _, err := s.GetReplica().Select(data, - `SELECT - * + _, err := s.GetReplica().Select(data, ` + SELECT + Channels.* FROM Channels + JOIN + PublicChannels c ON (c.Id = Channels.Id) WHERE - TeamId = :TeamId1 - AND Type IN ('O') - AND DeleteAt = 0 - AND Id NOT IN (SELECT - Channels.Id - FROM - Channels, - ChannelMembers - WHERE - Id = ChannelId - AND TeamId = :TeamId2 - AND UserId = :UserId - AND DeleteAt = 0) - ORDER BY DisplayName + c.TeamId = :TeamId + AND c.DeleteAt = 0 + AND c.Id NOT IN ( + SELECT + c.Id + FROM + PublicChannels c + JOIN + ChannelMembers cm ON (cm.ChannelId = c.Id) + WHERE + c.TeamId = :TeamId + AND cm.UserId = :UserId + AND c.DeleteAt = 0 + ) + ORDER BY + c.DisplayName LIMIT :Limit - OFFSET :Offset`, - map[string]interface{}{"TeamId1": teamId, "TeamId2": teamId, "UserId": userId, "Limit": limit, "Offset": offset}) + OFFSET :Offset + `, map[string]interface{}{ + "TeamId": teamId, + "UserId": userId, + "Limit": limit, + "Offset": offset, + }) if err != nil { result.Err = model.NewAppError("SqlChannelStore.GetMoreChannels", "store.sql_channel.get_more_channels.get.app_error", nil, "teamId="+teamId+", userId="+userId+", err="+err.Error(), http.StatusInternalServerError) @@ -834,19 +1003,24 @@ func (s SqlChannelStore) GetMoreChannels(teamId string, userId string, offset in func (s SqlChannelStore) GetPublicChannelsForTeam(teamId string, offset int, limit int) store.StoreChannel { return store.Do(func(result *store.StoreResult) { data := &model.ChannelList{} - _, err := s.GetReplica().Select(data, - `SELECT - * + _, err := s.GetReplica().Select(data, ` + SELECT + Channels.* FROM Channels + JOIN + PublicChannels pc ON (pc.Id = Channels.Id) WHERE - TeamId = :TeamId - AND Type = 'O' - AND DeleteAt = 0 - ORDER BY DisplayName + pc.TeamId = :TeamId + AND pc.DeleteAt = 0 + ORDER BY pc.DisplayName LIMIT :Limit - OFFSET :Offset`, - map[string]interface{}{"TeamId": teamId, "Limit": limit, "Offset": offset}) + OFFSET :Offset + `, map[string]interface{}{ + "TeamId": teamId, + "Limit": limit, + "Offset": offset, + }) if err != nil { result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsForTeam", "store.sql_channel.get_public_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError) @@ -874,18 +1048,19 @@ func (s SqlChannelStore) GetPublicChannelsByIdsForTeam(teamId string, channelIds } data := &model.ChannelList{} - _, err := s.GetReplica().Select(data, - `SELECT - * + _, err := s.GetReplica().Select(data, ` + SELECT + Channels.* FROM Channels + JOIN + PublicChannels pc ON (pc.Id = Channels.Id) WHERE - TeamId = :teamId - AND Type = 'O' - AND DeleteAt = 0 - AND Id IN (`+idQuery+`) - ORDER BY DisplayName`, - props) + pc.TeamId = :teamId + AND pc.DeleteAt = 0 + AND pc.Id IN (`+idQuery+`) + ORDER BY pc.DisplayName + `, props) if err != nil { result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsByIdsForTeam", "store.sql_channel.get_channels_by_ids.get.app_error", nil, err.Error(), http.StatusInternalServerError) @@ -1721,33 +1896,35 @@ func (s SqlChannelStore) GetMembersForUser(teamId string, userId string) store.S func (s SqlChannelStore) AutocompleteInTeam(teamId string, term string, includeDeleted bool) store.StoreChannel { return store.Do(func(result *store.StoreResult) { - deleteFilter := "AND DeleteAt = 0" + deleteFilter := "AND c.DeleteAt = 0" if includeDeleted { deleteFilter = "" } queryFormat := ` SELECT - * + Channels.* FROM - Channels + Channels + JOIN + PublicChannels c ON (c.Id = Channels.Id) WHERE - TeamId = :TeamId - AND Type = 'O' - ` + deleteFilter + ` - %v - LIMIT 50` + c.TeamId = :TeamId + ` + deleteFilter + ` + %v + LIMIT 50 + ` var channels model.ChannelList - if likeClause, likeTerm := s.buildLIKEClause(term, "Name, DisplayName, Purpose"); likeClause == "" { + if likeClause, likeTerm := s.buildLIKEClause(term, "c.Name, c.DisplayName, c.Purpose"); likeClause == "" { if _, err := s.GetReplica().Select(&channels, fmt.Sprintf(queryFormat, ""), map[string]interface{}{"TeamId": teamId}); err != nil { result.Err = model.NewAppError("SqlChannelStore.AutocompleteInTeam", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError) } } else { // Using a UNION results in index_merge and fulltext queries and is much faster than the ref // query you would get using an OR of the LIKE and full-text clauses. - fulltextClause, fulltextTerm := s.buildFulltextClause(term, "Name, DisplayName, Purpose") + fulltextClause, fulltextTerm := s.buildFulltextClause(term, "c.Name, c.DisplayName, c.Purpose") likeQuery := fmt.Sprintf(queryFormat, "AND "+likeClause) fulltextQuery := fmt.Sprintf(queryFormat, "AND "+fulltextClause) query := fmt.Sprintf("(%v) UNION (%v) LIMIT 50", likeQuery, fulltextQuery) @@ -1863,53 +2040,61 @@ func (s SqlChannelStore) autocompleteInTeamForSearchDirectMessages(userId string func (s SqlChannelStore) SearchInTeam(teamId string, term string, includeDeleted bool) store.StoreChannel { return store.Do(func(result *store.StoreResult) { - deleteFilter := "AND DeleteAt = 0" + deleteFilter := "AND c.DeleteAt = 0" if includeDeleted { deleteFilter = "" } - searchQuery := ` + + *result = s.performSearch(` SELECT - * + Channels.* FROM - Channels + Channels + JOIN + PublicChannels c ON (c.Id = Channels.Id) WHERE - TeamId = :TeamId - AND Type = 'O' - ` + deleteFilter + ` - SEARCH_CLAUSE - ORDER BY DisplayName - LIMIT 100` - - *result = s.performSearch(searchQuery, term, map[string]interface{}{"TeamId": teamId}) + c.TeamId = :TeamId + `+deleteFilter+` + SEARCH_CLAUSE + ORDER BY c.DisplayName + LIMIT 100 + `, term, map[string]interface{}{ + "TeamId": teamId, + }) }) } func (s SqlChannelStore) SearchMore(userId string, teamId string, term string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { - searchQuery := ` + *result = s.performSearch(` SELECT - * + Channels.* FROM Channels + JOIN + PublicChannels c ON (c.Id = Channels.Id) WHERE - TeamId = :TeamId - AND Type = 'O' - AND DeleteAt = 0 - AND Id NOT IN (SELECT - Channels.Id + c.TeamId = :TeamId + AND c.DeleteAt = 0 + AND c.Id NOT IN ( + SELECT + c.Id FROM - Channels, - ChannelMembers + PublicChannels c + JOIN + ChannelMembers cm ON (cm.ChannelId = c.Id) WHERE - Id = ChannelId - AND TeamId = :TeamId - AND UserId = :UserId - AND DeleteAt = 0) - SEARCH_CLAUSE - ORDER BY DisplayName - LIMIT 100` - - *result = s.performSearch(searchQuery, term, map[string]interface{}{"TeamId": teamId, "UserId": userId}) + c.TeamId = :TeamId + AND cm.UserId = :UserId + AND c.DeleteAt = 0 + ) + SEARCH_CLAUSE + ORDER BY c.DisplayName + LIMIT 100 + `, term, map[string]interface{}{ + "TeamId": teamId, + "UserId": userId, + }) }) } @@ -1987,13 +2172,13 @@ func (s SqlChannelStore) buildFulltextClause(term string, searchColumns string) func (s SqlChannelStore) performSearch(searchQuery string, term string, parameters map[string]interface{}) store.StoreResult { result := store.StoreResult{} - likeClause, likeTerm := s.buildLIKEClause(term, "Name, DisplayName, Purpose") + likeClause, likeTerm := s.buildLIKEClause(term, "c.Name, c.DisplayName, c.Purpose") if likeTerm == "" { // If the likeTerm is empty after preparing, then don't bother searching. searchQuery = strings.Replace(searchQuery, "SEARCH_CLAUSE", "", 1) } else { parameters["LikeTerm"] = likeTerm - fulltextClause, fulltextTerm := s.buildFulltextClause(term, "Name, DisplayName, Purpose") + fulltextClause, fulltextTerm := s.buildFulltextClause(term, "c.Name, c.DisplayName, c.Purpose") parameters["FulltextTerm"] = fulltextTerm searchQuery = strings.Replace(searchQuery, "SEARCH_CLAUSE", "AND ("+likeClause+" OR "+fulltextClause+")", 1) } @@ -2259,19 +2444,6 @@ func (s SqlChannelStore) resetLastPostAtT(transaction *gorp.Transaction) store.S return result } -func (s SqlChannelStore) EnableExperimentalPublicChannelsMaterialization() { - // See SqlChannelStoreExperimental -} - -func (s SqlChannelStore) DisableExperimentalPublicChannelsMaterialization() { - // See SqlChannelStoreExperimental -} - -func (s SqlChannelStore) IsExperimentalPublicChannelsMaterializationEnabled() bool { - // See SqlChannelStoreExperimental - return false -} - func (s SqlChannelStore) GetAllChannelsForExportAfter(limit int, afterId string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { var data []*model.ChannelForExport -- cgit v1.2.3-1-g7c22