diff options
Diffstat (limited to 'store/sqlstore')
-rw-r--r-- | store/sqlstore/channel_store.go | 38 | ||||
-rw-r--r-- | store/sqlstore/channel_store_experimental.go | 819 | ||||
-rw-r--r-- | store/sqlstore/channel_store_test.go | 2 | ||||
-rw-r--r-- | store/sqlstore/store.go | 1 | ||||
-rw-r--r-- | store/sqlstore/store_test.go | 25 | ||||
-rw-r--r-- | store/sqlstore/supplier.go | 65 | ||||
-rw-r--r-- | store/sqlstore/upgrade.go | 8 |
7 files changed, 943 insertions, 15 deletions
diff --git a/store/sqlstore/channel_store.go b/store/sqlstore/channel_store.go index 820fe1e9f..4103980c5 100644 --- a/store/sqlstore/channel_store.go +++ b/store/sqlstore/channel_store.go @@ -301,6 +301,21 @@ func (s SqlChannelStore) CreateIndexesIfNotExists() { s.CreateFullTextIndexIfNotExists("idx_channel_search_txt", "Channels", "Name, DisplayName, Purpose") } +func (s SqlChannelStore) CreateTriggersIfNotExists() error { + // See SqlChannelStoreExperimental + return nil +} + +func (s SqlChannelStore) MigratePublicChannels() error { + // See SqlChannelStoreExperimental + return nil +} + +func (s SqlChannelStore) DropPublicChannels() error { + // See SqlChannelStoreExperimental + return nil +} + func (s SqlChannelStore) Save(channel *model.Channel, maxChannelsPerTeam int64) store.StoreChannel { return store.Do(func(result *store.StoreResult) { if channel.DeleteAt != 0 { @@ -804,12 +819,12 @@ func (s SqlChannelStore) GetTeamChannels(teamId string) store.StoreChannel { _, err := s.GetReplica().Select(data, "SELECT * FROM Channels WHERE TeamId = :TeamId And Type != 'D' ORDER BY DisplayName", map[string]interface{}{"TeamId": teamId}) if err != nil { - result.Err = model.NewAppError("SqlChannelStore.GetChannels", "store.sql_channel.get_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError) + result.Err = model.NewAppError("SqlChannelStore.GetTeamChannels", "store.sql_channel.get_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError) return } if len(*data) == 0 { - result.Err = model.NewAppError("SqlChannelStore.GetChannels", "store.sql_channel.get_channels.not_found.app_error", nil, "teamId="+teamId, http.StatusNotFound) + result.Err = model.NewAppError("SqlChannelStore.GetTeamChannels", "store.sql_channel.get_channels.not_found.app_error", nil, "teamId="+teamId, http.StatusNotFound) return } @@ -962,16 +977,16 @@ var CHANNEL_MEMBERS_WITH_SCHEME_SELECT_QUERY = ` TeamScheme.DefaultChannelAdminRole TeamSchemeDefaultAdminRole, ChannelScheme.DefaultChannelUserRole ChannelSchemeDefaultUserRole, ChannelScheme.DefaultChannelAdminRole ChannelSchemeDefaultAdminRole - FROM + FROM ChannelMembers - INNER JOIN + INNER JOIN Channels ON ChannelMembers.ChannelId = Channels.Id LEFT JOIN Schemes ChannelScheme ON Channels.SchemeId = ChannelScheme.Id LEFT JOIN Teams ON Channels.TeamId = Teams.Id LEFT JOIN - Schemes TeamScheme ON Teams.SchemeId = TeamScheme.Id + Schemes TeamScheme ON Teams.SchemeId = TeamScheme.Id ` func (s SqlChannelStore) SaveMember(member *model.ChannelMember) store.StoreChannel { @@ -1988,3 +2003,16 @@ func (s SqlChannelStore) ResetLastPostAt() store.StoreChannel { } }) } + +func (s SqlChannelStore) EnableExperimentalPublicChannelsMaterialization() { + // See SqlChannelStoreExperimental +} + +func (s SqlChannelStore) DisableExperimentalPublicChannelsMaterialization() { + // See SqlChannelStoreExperimental +} + +func (s SqlChannelStore) IsExperimentalPublicChannelsMaterializationEnabled() bool { + // See SqlChannelStoreExperimental + return false +} diff --git a/store/sqlstore/channel_store_experimental.go b/store/sqlstore/channel_store_experimental.go new file mode 100644 index 000000000..67576ddc1 --- /dev/null +++ b/store/sqlstore/channel_store_experimental.go @@ -0,0 +1,819 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package sqlstore + +import ( + "fmt" + "net/http" + "sort" + "strconv" + "strings" + "sync/atomic" + + "github.com/pkg/errors" + + "github.com/mattermost/mattermost-server/einterfaces" + "github.com/mattermost/mattermost-server/mlog" + "github.com/mattermost/mattermost-server/model" + "github.com/mattermost/mattermost-server/store" +) + +// publicChannel is a subset of the metadata corresponding to public channels only. +type publicChannel struct { + Id string `json:"id"` + DeleteAt int64 `json:"delete_at"` + TeamId string `json:"team_id"` + DisplayName string `json:"display_name"` + Name string `json:"name"` + Header string `json:"header"` + Purpose string `json:"purpose"` +} + +type SqlChannelStoreExperimental struct { + SqlChannelStore + experimentalPublicChannelsMaterializationDisabled *uint32 +} + +func NewSqlChannelStoreExperimental(sqlStore SqlStore, metrics einterfaces.MetricsInterface, enabled bool) store.ChannelStore { + s := &SqlChannelStoreExperimental{ + SqlChannelStore: *NewSqlChannelStore(sqlStore, metrics).(*SqlChannelStore), + experimentalPublicChannelsMaterializationDisabled: new(uint32), + } + + if enabled { + // Forcibly log, since the default state is enabled and we want this on startup. + mlog.Info("Enabling experimental public channels materialization") + s.EnableExperimentalPublicChannelsMaterialization() + } else { + s.DisableExperimentalPublicChannelsMaterialization() + } + + if s.IsExperimentalPublicChannelsMaterializationEnabled() { + for _, db := range sqlStore.GetAllConns() { + tablePublicChannels := db.AddTableWithName(publicChannel{}, "PublicChannels").SetKeys(false, "Id") + tablePublicChannels.ColMap("Id").SetMaxSize(26) + tablePublicChannels.ColMap("TeamId").SetMaxSize(26) + tablePublicChannels.ColMap("DisplayName").SetMaxSize(64) + tablePublicChannels.ColMap("Name").SetMaxSize(64) + tablePublicChannels.SetUniqueTogether("Name", "TeamId") + tablePublicChannels.ColMap("Header").SetMaxSize(1024) + tablePublicChannels.ColMap("Purpose").SetMaxSize(250) + } + } + + return s +} + +// migratePublicChannels initializes the PublicChannels table with data created before the triggers +// took over keeping it up-to-date. +func (s SqlChannelStoreExperimental) MigratePublicChannels() error { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.MigratePublicChannels() + } + + transaction, err := s.GetMaster().Begin() + if err != nil { + return err + } + + if _, err := transaction.Exec(` + INSERT INTO PublicChannels + (Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) + SELECT + c.Id, c.DeleteAt, c.TeamId, c.DisplayName, c.Name, c.Header, c.Purpose + FROM + Channels c + LEFT JOIN + PublicChannels pc ON (pc.Id = c.Id) + WHERE + c.Type = 'O' + AND pc.Id IS NULL + `); err != nil { + return err + } + + if err := transaction.Commit(); err != nil { + return err + } + + return nil +} + +// DropPublicChannels removes the public channels table and all associated triggers. +func (s SqlChannelStoreExperimental) DropPublicChannels() error { + // Only PostgreSQL will honour the transaction when executing the DDL changes below. + transaction, err := s.GetMaster().Begin() + if err != nil { + return err + } + + if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { + if _, err := transaction.Exec(` + DROP TRIGGER IF EXISTS trigger_channels ON Channels + `); err != nil { + return err + } + if _, err := transaction.Exec(` + DROP FUNCTION IF EXISTS channels_copy_to_public_channels + `); err != nil { + return err + } + } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL { + if _, err := transaction.Exec(` + DROP TRIGGER IF EXISTS trigger_channels_insert + `); err != nil { + return err + } + if _, err := transaction.Exec(` + DROP TRIGGER IF EXISTS trigger_channels_update + `); err != nil { + return err + } + if _, err := transaction.Exec(` + DROP TRIGGER IF EXISTS trigger_channels_delete + `); err != nil { + return err + } + } else if s.DriverName() == model.DATABASE_DRIVER_SQLITE { + if _, err := transaction.Exec(` + DROP TRIGGER IF EXISTS trigger_channels_insert + `); err != nil { + return err + } + if _, err := transaction.Exec(` + DROP TRIGGER IF EXISTS trigger_channels_update_delete + `); err != nil { + return err + } + if _, err := transaction.Exec(` + DROP TRIGGER IF EXISTS trigger_channels_update + `); err != nil { + return err + } + if _, err := transaction.Exec(` + DROP TRIGGER IF EXISTS trigger_channels_delete + `); err != nil { + return err + } + } else { + return errors.New("failed to create trigger because of missing driver") + } + + if _, err := transaction.Exec(` + DROP TABLE IF EXISTS PublicChannels + `); err != nil { + return err + } + + if err := transaction.Commit(); err != nil { + return err + } + + return nil +} + +func (s SqlChannelStoreExperimental) CreateIndexesIfNotExists() { + s.SqlChannelStore.CreateIndexesIfNotExists() + + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return + } + + s.CreateIndexIfNotExists("idx_publicchannels_team_id", "PublicChannels", "TeamId") + s.CreateIndexIfNotExists("idx_publicchannels_name", "PublicChannels", "Name") + s.CreateIndexIfNotExists("idx_publicchannels_delete_at", "PublicChannels", "DeleteAt") + if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { + s.CreateIndexIfNotExists("idx_publicchannels_name_lower", "PublicChannels", "lower(Name)") + s.CreateIndexIfNotExists("idx_publicchannels_displayname_lower", "PublicChannels", "lower(DisplayName)") + } + s.CreateFullTextIndexIfNotExists("idx_publicchannels_search_txt", "PublicChannels", "Name, DisplayName, Purpose") +} + +func (s SqlChannelStoreExperimental) CreateTriggersIfNotExists() error { + s.SqlChannelStore.CreateTriggersIfNotExists() + + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return nil + } + + if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { + if !s.DoesTriggerExist("trigger_channels") { + transaction, err := s.GetMaster().Begin() + if err != nil { + return errors.Wrap(err, "failed to create trigger function") + } + + if _, err := transaction.ExecNoTimeout(` + CREATE OR REPLACE FUNCTION channels_copy_to_public_channels() RETURNS TRIGGER + SECURITY DEFINER + LANGUAGE plpgsql + AS $$ + DECLARE + counter int := 0; + BEGIN + IF (TG_OP = 'DELETE' AND OLD.Type = 'O') OR (TG_OP = 'UPDATE' AND NEW.Type != 'O') THEN + DELETE FROM + PublicChannels + WHERE + Id = OLD.Id; + ELSEIF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') AND NEW.Type = 'O' THEN + UPDATE + PublicChannels + SET + DeleteAt = NEW.DeleteAt, + TeamId = NEW.TeamId, + DisplayName = NEW.DisplayName, + Name = NEW.Name, + Header = NEW.Header, + Purpose = NEW.Purpose + WHERE + Id = NEW.Id; + + -- There's a race condition here where the INSERT might fail, though this should only occur + -- if PublicChannels had been modified outside of the triggers. We could improve this with + -- the UPSERT functionality in Postgres 9.5+ once we support same. + IF NOT FOUND THEN + INSERT INTO + PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) + VALUES + (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose); + END IF; + END IF; + + RETURN NULL; + END + $$; + `); err != nil { + return errors.Wrap(err, "failed to create trigger function") + } + + if _, err := transaction.ExecNoTimeout(` + CREATE TRIGGER + trigger_channels + AFTER INSERT OR UPDATE OR DELETE ON + Channels + FOR EACH ROW EXECUTE PROCEDURE + channels_copy_to_public_channels(); + `); err != nil { + return errors.Wrap(err, "failed to create trigger") + } + + if err := transaction.Commit(); err != nil { + return errors.Wrap(err, "failed to create trigger function") + } + } + } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL { + // Note that DDL statements in MySQL (CREATE TABLE, CREATE TRIGGER, etc.) cannot + // be rolled back inside a transaction (unlike PostgreSQL), so there's no point in + // wrapping what follows inside a transaction. + + if !s.DoesTriggerExist("trigger_channels_insert") { + if _, err := s.GetMaster().ExecNoTimeout(` + CREATE TRIGGER + trigger_channels_insert + AFTER INSERT ON + Channels + FOR EACH ROW + BEGIN + IF NEW.Type = 'O' THEN + INSERT INTO + PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) + VALUES + (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose) + ON DUPLICATE KEY UPDATE + DeleteAt = NEW.DeleteAt, + TeamId = NEW.TeamId, + DisplayName = NEW.DisplayName, + Name = NEW.Name, + Header = NEW.Header, + Purpose = NEW.Purpose; + END IF; + END; + `); err != nil { + return errors.Wrap(err, "failed to create trigger_channels_insert trigger") + } + } + + if !s.DoesTriggerExist("trigger_channels_update") { + if _, err := s.GetMaster().ExecNoTimeout(` + CREATE TRIGGER + trigger_channels_update + AFTER UPDATE ON + Channels + FOR EACH ROW + BEGIN + IF OLD.Type = 'O' AND NEW.Type != 'O' THEN + DELETE FROM + PublicChannels + WHERE + Id = NEW.Id; + ELSEIF NEW.Type = 'O' THEN + INSERT INTO + PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) + VALUES + (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose) + ON DUPLICATE KEY UPDATE + DeleteAt = NEW.DeleteAt, + TeamId = NEW.TeamId, + DisplayName = NEW.DisplayName, + Name = NEW.Name, + Header = NEW.Header, + Purpose = NEW.Purpose; + END IF; + END; + `); err != nil { + return errors.Wrap(err, "failed to create trigger_channels_update trigger") + } + } + + if !s.DoesTriggerExist("trigger_channels_delete") { + if _, err := s.GetMaster().ExecNoTimeout(` + CREATE TRIGGER + trigger_channels_delete + AFTER DELETE ON + Channels + FOR EACH ROW + BEGIN + IF OLD.Type = 'O' THEN + DELETE FROM + PublicChannels + WHERE + Id = OLD.Id; + END IF; + END; + `); err != nil { + return errors.Wrap(err, "failed to create trigger_channels_delete trigger") + } + } + } else if s.DriverName() == model.DATABASE_DRIVER_SQLITE { + if _, err := s.GetMaster().ExecNoTimeout(` + CREATE TRIGGER IF NOT EXISTS + trigger_channels_insert + AFTER INSERT ON + Channels + FOR EACH ROW + WHEN NEW.Type = 'O' + BEGIN + -- Ideally, we'd leverage ON CONFLICT DO UPDATE below and make this INSERT resilient to pre-existing + -- data. However, the version of Sqlite we're compiling against doesn't support this. This isn't + -- critical, though, since we don't support Sqlite in production. + INSERT INTO + PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) + VALUES + (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose); + END; + `); err != nil { + return errors.Wrap(err, "failed to create trigger_channels_insert trigger") + } + + if _, err := s.GetMaster().ExecNoTimeout(` + CREATE TRIGGER IF NOT EXISTS + trigger_channels_update_delete + AFTER UPDATE ON + Channels + FOR EACH ROW + WHEN + OLD.Type = 'O' + AND NEW.Type != 'O' + BEGIN + DELETE FROM + PublicChannels + WHERE + Id = NEW.Id; + END; + `); err != nil { + return errors.Wrap(err, "failed to create trigger_channels_update_delete trigger") + } + + if _, err := s.GetMaster().ExecNoTimeout(` + CREATE TRIGGER IF NOT EXISTS + trigger_channels_update + AFTER UPDATE ON + Channels + FOR EACH ROW + WHEN + OLD.Type != 'O' + AND NEW.Type = 'O' + BEGIN + -- See comments re: ON CONFLICT DO UPDATE above that would apply here as well. + UPDATE + PublicChannels + SET + DeleteAt = NEW.DeleteAt, + TeamId = NEW.TeamId, + DisplayName = NEW.DisplayName, + Name = NEW.Name, + Header = NEW.Header, + Purpose = NEW.Purpose + WHERE + Id = NEW.Id; + END; + `); err != nil { + return errors.Wrap(err, "failed to create trigger_channels_update trigger") + } + + if _, err := s.GetMaster().ExecNoTimeout(` + CREATE TRIGGER IF NOT EXISTS + trigger_channels_delete + AFTER UPDATE ON + Channels + FOR EACH ROW + WHEN + OLD.Type = 'O' + BEGIN + DELETE FROM + PublicChannels + WHERE + Id = OLD.Id; + END; + `); err != nil { + return errors.Wrap(err, "failed to create trigger_channels_delete trigger") + } + } else { + return errors.New("failed to create trigger because of missing driver") + } + + return nil +} + +func (s SqlChannelStoreExperimental) GetMoreChannels(teamId string, userId string, offset int, limit int) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.GetMoreChannels(teamId, userId, offset, limit) + } + + return store.Do(func(result *store.StoreResult) { + data := &model.ChannelList{} + _, err := s.GetReplica().Select(data, ` + SELECT + Channels.* + FROM + Channels + JOIN + PublicChannels c ON (c.Id = Channels.Id) + WHERE + c.TeamId = :TeamId + AND c.DeleteAt = 0 + AND c.Id NOT IN ( + SELECT + c.Id + FROM + PublicChannels c + JOIN + ChannelMembers cm ON (cm.ChannelId = c.Id) + WHERE + c.TeamId = :TeamId + AND cm.UserId = :UserId + AND c.DeleteAt = 0 + ) + ORDER BY + c.DisplayName + LIMIT :Limit + OFFSET :Offset + `, map[string]interface{}{ + "TeamId": teamId, + "UserId": userId, + "Limit": limit, + "Offset": offset, + }) + + if err != nil { + result.Err = model.NewAppError("SqlChannelStore.GetMoreChannels", "store.sql_channel.get_more_channels.get.app_error", nil, "teamId="+teamId+", userId="+userId+", err="+err.Error(), http.StatusInternalServerError) + return + } + + result.Data = data + }) +} + +func (s SqlChannelStoreExperimental) GetPublicChannelsForTeam(teamId string, offset int, limit int) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.GetPublicChannelsForTeam(teamId, offset, limit) + } + + return store.Do(func(result *store.StoreResult) { + data := &model.ChannelList{} + _, err := s.GetReplica().Select(data, ` + SELECT + Channels.* + FROM + Channels + JOIN + PublicChannels pc ON (pc.Id = Channels.Id) + WHERE + pc.TeamId = :TeamId + AND pc.DeleteAt = 0 + ORDER BY pc.DisplayName + LIMIT :Limit + OFFSET :Offset + `, map[string]interface{}{ + "TeamId": teamId, + "Limit": limit, + "Offset": offset, + }) + + if err != nil { + result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsForTeam", "store.sql_channel.get_public_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError) + return + } + + result.Data = data + }) +} + +func (s SqlChannelStoreExperimental) GetPublicChannelsByIdsForTeam(teamId string, channelIds []string) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.GetPublicChannelsByIdsForTeam(teamId, channelIds) + } + + return store.Do(func(result *store.StoreResult) { + props := make(map[string]interface{}) + props["teamId"] = teamId + + idQuery := "" + + for index, channelId := range channelIds { + if len(idQuery) > 0 { + idQuery += ", " + } + + props["channelId"+strconv.Itoa(index)] = channelId + idQuery += ":channelId" + strconv.Itoa(index) + } + + data := &model.ChannelList{} + _, err := s.GetReplica().Select(data, ` + SELECT + Channels.* + FROM + Channels + JOIN + PublicChannels pc ON (pc.Id = Channels.Id) + WHERE + pc.TeamId = :teamId + AND pc.DeleteAt = 0 + AND pc.Id IN (`+idQuery+`) + ORDER BY pc.DisplayName + `, props) + + if err != nil { + result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsByIdsForTeam", "store.sql_channel.get_channels_by_ids.get.app_error", nil, err.Error(), http.StatusInternalServerError) + } + + if len(*data) == 0 { + result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsByIdsForTeam", "store.sql_channel.get_channels_by_ids.not_found.app_error", nil, "", http.StatusNotFound) + } + + result.Data = data + }) +} + +func (s SqlChannelStoreExperimental) AutocompleteInTeam(teamId string, term string, includeDeleted bool) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.AutocompleteInTeam(teamId, term, includeDeleted) + } + + return store.Do(func(result *store.StoreResult) { + deleteFilter := "AND c.DeleteAt = 0" + if includeDeleted { + deleteFilter = "" + } + + queryFormat := ` + SELECT + Channels.* + FROM + Channels + JOIN + PublicChannels c ON (c.Id = Channels.Id) + WHERE + c.TeamId = :TeamId + ` + deleteFilter + ` + %v + LIMIT 50 + ` + + var channels model.ChannelList + + if likeClause, likeTerm := s.buildLIKEClause(term); likeClause == "" { + if _, err := s.GetReplica().Select(&channels, fmt.Sprintf(queryFormat, ""), map[string]interface{}{"TeamId": teamId}); err != nil { + result.Err = model.NewAppError("SqlChannelStore.AutocompleteInTeam", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError) + } + } else { + // Using a UNION results in index_merge and fulltext queries and is much faster than the ref + // query you would get using an OR of the LIKE and full-text clauses. + fulltextClause, fulltextTerm := s.buildFulltextClause(term) + likeQuery := fmt.Sprintf(queryFormat, "AND "+likeClause) + fulltextQuery := fmt.Sprintf(queryFormat, "AND "+fulltextClause) + query := fmt.Sprintf("(%v) UNION (%v) LIMIT 50", likeQuery, fulltextQuery) + + if _, err := s.GetReplica().Select(&channels, query, map[string]interface{}{"TeamId": teamId, "LikeTerm": likeTerm, "FulltextTerm": fulltextTerm}); err != nil { + result.Err = model.NewAppError("SqlChannelStore.AutocompleteInTeam", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError) + } + } + + sort.Slice(channels, func(a, b int) bool { + return strings.ToLower(channels[a].DisplayName) < strings.ToLower(channels[b].DisplayName) + }) + result.Data = &channels + }) +} + +func (s SqlChannelStoreExperimental) SearchInTeam(teamId string, term string, includeDeleted bool) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.SearchInTeam(teamId, term, includeDeleted) + } + + return store.Do(func(result *store.StoreResult) { + deleteFilter := "AND c.DeleteAt = 0" + if includeDeleted { + deleteFilter = "" + } + + *result = s.performSearch(` + SELECT + Channels.* + FROM + Channels + JOIN + PublicChannels c ON (c.Id = Channels.Id) + WHERE + c.TeamId = :TeamId + `+deleteFilter+` + SEARCH_CLAUSE + ORDER BY c.DisplayName + LIMIT 100 + `, term, map[string]interface{}{ + "TeamId": teamId, + }) + }) +} + +func (s SqlChannelStoreExperimental) SearchMore(userId string, teamId string, term string) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.SearchMore(userId, teamId, term) + } + + return store.Do(func(result *store.StoreResult) { + *result = s.performSearch(` + SELECT + Channels.* + FROM + Channels + JOIN + PublicChannels c ON (c.Id = Channels.Id) + WHERE + c.TeamId = :TeamId + AND c.DeleteAt = 0 + AND c.Id NOT IN ( + SELECT + c.Id + FROM + PublicChannels c + JOIN + ChannelMembers cm ON (cm.ChannelId = c.Id) + WHERE + c.TeamId = :TeamId + AND cm.UserId = :UserId + AND c.DeleteAt = 0 + ) + SEARCH_CLAUSE + ORDER BY c.DisplayName + LIMIT 100 + `, term, map[string]interface{}{ + "TeamId": teamId, + "UserId": userId, + }) + }) +} + +func (s SqlChannelStoreExperimental) buildLIKEClause(term string) (likeClause, likeTerm string) { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.buildLIKEClause(term) + } + + likeTerm = term + searchColumns := "c.Name, c.DisplayName, c.Purpose" + + // These chars must be removed from the like query. + for _, c := range ignoreLikeSearchChar { + likeTerm = strings.Replace(likeTerm, c, "", -1) + } + + // These chars must be escaped in the like query. + for _, c := range escapeLikeSearchChar { + likeTerm = strings.Replace(likeTerm, c, "*"+c, -1) + } + + if likeTerm == "" { + return + } + + // Prepare the LIKE portion of the query. + var searchFields []string + for _, field := range strings.Split(searchColumns, ", ") { + if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { + searchFields = append(searchFields, fmt.Sprintf("lower(%s) LIKE lower(%s) escape '*'", field, ":LikeTerm")) + } else { + searchFields = append(searchFields, fmt.Sprintf("%s LIKE %s escape '*'", field, ":LikeTerm")) + } + } + + likeClause = fmt.Sprintf("(%s)", strings.Join(searchFields, " OR ")) + likeTerm += "%" + return +} + +func (s SqlChannelStoreExperimental) buildFulltextClause(term string) (fulltextClause, fulltextTerm string) { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.buildFulltextClause(term) + } + + // Copy the terms as we will need to prepare them differently for each search type. + fulltextTerm = term + + searchColumns := "c.Name, c.DisplayName, c.Purpose" + + // These chars must be treated as spaces in the fulltext query. + for _, c := range spaceFulltextSearchChar { + fulltextTerm = strings.Replace(fulltextTerm, c, " ", -1) + } + + // Prepare the FULLTEXT portion of the query. + if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { + fulltextTerm = strings.Replace(fulltextTerm, "|", "", -1) + + splitTerm := strings.Fields(fulltextTerm) + for i, t := range strings.Fields(fulltextTerm) { + if i == len(splitTerm)-1 { + splitTerm[i] = t + ":*" + } else { + splitTerm[i] = t + ":* &" + } + } + + fulltextTerm = strings.Join(splitTerm, " ") + + fulltextClause = fmt.Sprintf("((%s) @@ to_tsquery(:FulltextTerm))", convertMySQLFullTextColumnsToPostgres(searchColumns)) + } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL { + splitTerm := strings.Fields(fulltextTerm) + for i, t := range strings.Fields(fulltextTerm) { + splitTerm[i] = "+" + t + "*" + } + + fulltextTerm = strings.Join(splitTerm, " ") + + fulltextClause = fmt.Sprintf("MATCH(%s) AGAINST (:FulltextTerm IN BOOLEAN MODE)", searchColumns) + } + + return +} + +func (s SqlChannelStoreExperimental) performSearch(searchQuery string, term string, parameters map[string]interface{}) store.StoreResult { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.performSearch(searchQuery, term, parameters) + } + + result := store.StoreResult{} + + likeClause, likeTerm := s.buildLIKEClause(term) + if likeTerm == "" { + // If the likeTerm is empty after preparing, then don't bother searching. + searchQuery = strings.Replace(searchQuery, "SEARCH_CLAUSE", "", 1) + } else { + parameters["LikeTerm"] = likeTerm + fulltextClause, fulltextTerm := s.buildFulltextClause(term) + parameters["FulltextTerm"] = fulltextTerm + searchQuery = strings.Replace(searchQuery, "SEARCH_CLAUSE", "AND ("+likeClause+" OR "+fulltextClause+")", 1) + } + + var channels model.ChannelList + + if _, err := s.GetReplica().Select(&channels, searchQuery, parameters); err != nil { + result.Err = model.NewAppError("SqlChannelStore.Search", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError) + return result + } + + result.Data = &channels + return result +} + +func (s SqlChannelStoreExperimental) EnableExperimentalPublicChannelsMaterialization() { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + mlog.Info("Enabling experimental public channels materialization") + } + + atomic.StoreUint32(s.experimentalPublicChannelsMaterializationDisabled, 0) +} + +func (s SqlChannelStoreExperimental) DisableExperimentalPublicChannelsMaterialization() { + if s.IsExperimentalPublicChannelsMaterializationEnabled() { + mlog.Info("Disabling experimental public channels materialization") + } + + atomic.StoreUint32(s.experimentalPublicChannelsMaterializationDisabled, 1) +} + +func (s SqlChannelStoreExperimental) IsExperimentalPublicChannelsMaterializationEnabled() bool { + return atomic.LoadUint32(s.experimentalPublicChannelsMaterializationDisabled) == 0 +} diff --git a/store/sqlstore/channel_store_test.go b/store/sqlstore/channel_store_test.go index 0e8b4191a..5eb84afcd 100644 --- a/store/sqlstore/channel_store_test.go +++ b/store/sqlstore/channel_store_test.go @@ -14,7 +14,7 @@ import ( ) func TestChannelStore(t *testing.T) { - StoreTest(t, storetest.TestChannelStore) + StoreTestWithSqlSupplier(t, storetest.TestChannelStore) } func TestChannelStoreInternalDataTypes(t *testing.T) { diff --git a/store/sqlstore/store.go b/store/sqlstore/store.go index 500f98235..df912028b 100644 --- a/store/sqlstore/store.go +++ b/store/sqlstore/store.go @@ -51,6 +51,7 @@ type SqlStore interface { MarkSystemRanUnitTests() DoesTableExist(tablename string) bool DoesColumnExist(tableName string, columName string) bool + DoesTriggerExist(triggerName string) bool CreateColumnIfNotExists(tableName string, columnName string, mySqlColType string, postgresColType string, defaultValue string) bool CreateColumnIfNotExistsNoDefault(tableName string, columnName string, mySqlColType string, postgresColType string) bool RemoveColumnIfExists(tableName string, columnName string) bool diff --git a/store/sqlstore/store_test.go b/store/sqlstore/store_test.go index 58065d65d..55002aee2 100644 --- a/store/sqlstore/store_test.go +++ b/store/sqlstore/store_test.go @@ -16,10 +16,11 @@ import ( ) var storeTypes = []*struct { - Name string - Func func() (*storetest.RunningContainer, *model.SqlSettings, error) - Container *storetest.RunningContainer - Store store.Store + Name string + Func func() (*storetest.RunningContainer, *model.SqlSettings, error) + Container *storetest.RunningContainer + SqlSupplier *SqlSupplier + Store store.Store }{ { Name: "MySQL", @@ -44,6 +45,19 @@ func StoreTest(t *testing.T, f func(*testing.T, store.Store)) { } } +func StoreTestWithSqlSupplier(t *testing.T, f func(*testing.T, store.Store, storetest.SqlSupplier)) { + defer func() { + if err := recover(); err != nil { + tearDownStores() + panic(err) + } + }() + for _, st := range storeTypes { + st := st + t.Run(st.Name, func(t *testing.T) { f(t, st.Store, st.SqlSupplier) }) + } +} + func initStores() { defer func() { if err := recover(); err != nil { @@ -64,7 +78,8 @@ func initStores() { return } st.Container = container - st.Store = store.NewLayeredStore(NewSqlSupplier(*settings, nil), nil, nil) + st.SqlSupplier = NewSqlSupplier(*settings, nil) + st.Store = store.NewLayeredStore(st.SqlSupplier, nil, nil) st.Store.MarkSystemRanUnitTests() }() } diff --git a/store/sqlstore/supplier.go b/store/sqlstore/supplier.go index 6c49d91fb..d1d7564f7 100644 --- a/store/sqlstore/supplier.go +++ b/store/sqlstore/supplier.go @@ -33,6 +33,7 @@ const ( ) const ( + EXIT_GENERIC_FAILURE = 1 EXIT_CREATE_TABLE = 100 EXIT_DB_OPEN = 101 EXIT_PING = 102 @@ -116,8 +117,13 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter supplier.initConnection() + enableExperimentalPublicChannelsMaterialization := true + if settings.EnablePublicChannelsMaterialization != nil && !*settings.EnablePublicChannelsMaterialization { + enableExperimentalPublicChannelsMaterialization = false + } + supplier.oldStores.team = NewSqlTeamStore(supplier) - supplier.oldStores.channel = NewSqlChannelStore(supplier, metrics) + supplier.oldStores.channel = NewSqlChannelStoreExperimental(supplier, metrics, enableExperimentalPublicChannelsMaterialization) supplier.oldStores.post = NewSqlPostStore(supplier, metrics) supplier.oldStores.user = NewSqlUserStore(supplier, metrics) supplier.oldStores.audit = NewSqlAuditStore(supplier) @@ -151,10 +157,19 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter os.Exit(EXIT_CREATE_TABLE) } + // This store's triggers should exist before the migration is run to ensure the + // corresponding tables stay in sync. Whether or not a trigger should be created before + // or after a migration is likely to be decided on a case-by-case basis. + if err := supplier.oldStores.channel.(*SqlChannelStoreExperimental).CreateTriggersIfNotExists(); err != nil { + mlog.Critical("Error creating triggers", mlog.Err(err)) + time.Sleep(time.Second) + os.Exit(EXIT_GENERIC_FAILURE) + } + UpgradeDatabase(supplier) supplier.oldStores.team.(*SqlTeamStore).CreateIndexesIfNotExists() - supplier.oldStores.channel.(*SqlChannelStore).CreateIndexesIfNotExists() + supplier.oldStores.channel.(*SqlChannelStoreExperimental).CreateIndexesIfNotExists() supplier.oldStores.post.(*SqlPostStore).CreateIndexesIfNotExists() supplier.oldStores.user.(*SqlUserStore).CreateIndexesIfNotExists() supplier.oldStores.audit.(*SqlAuditStore).CreateIndexesIfNotExists() @@ -461,6 +476,52 @@ func (ss *SqlSupplier) DoesColumnExist(tableName string, columnName string) bool } } +func (ss *SqlSupplier) DoesTriggerExist(triggerName string) bool { + if ss.DriverName() == model.DATABASE_DRIVER_POSTGRES { + count, err := ss.GetMaster().SelectInt(` + SELECT + COUNT(0) + FROM + pg_trigger + WHERE + tgname = $1 + `, triggerName) + + if err != nil { + mlog.Critical(fmt.Sprintf("Failed to check if trigger exists %v", err)) + time.Sleep(time.Second) + os.Exit(EXIT_GENERIC_FAILURE) + } + + return count > 0 + + } else if ss.DriverName() == model.DATABASE_DRIVER_MYSQL { + count, err := ss.GetMaster().SelectInt(` + SELECT + COUNT(0) + FROM + information_schema.triggers + WHERE + trigger_schema = DATABASE() + AND trigger_name = ? + `, triggerName) + + if err != nil { + mlog.Critical(fmt.Sprintf("Failed to check if trigger exists %v", err)) + time.Sleep(time.Second) + os.Exit(EXIT_GENERIC_FAILURE) + } + + return count > 0 + + } else { + mlog.Critical("Failed to check if column exists because of missing driver") + time.Sleep(time.Second) + os.Exit(EXIT_GENERIC_FAILURE) + return false + } +} + func (ss *SqlSupplier) CreateColumnIfNotExists(tableName string, columnName string, mySqlColType string, postgresColType string, defaultValue string) bool { if ss.DoesColumnExist(tableName, columnName) { diff --git a/store/sqlstore/upgrade.go b/store/sqlstore/upgrade.go index 5f74dbfb1..a8be96172 100644 --- a/store/sqlstore/upgrade.go +++ b/store/sqlstore/upgrade.go @@ -489,7 +489,6 @@ func UpgradeDatabaseToVersion53(sqlStore SqlStore) { if shouldPerformUpgrade(sqlStore, VERSION_5_2_0, VERSION_5_3_0) { saveSchemaVersion(sqlStore, VERSION_5_3_0) } - } func UpgradeDatabaseToVersion54(sqlStore SqlStore) { @@ -497,6 +496,11 @@ func UpgradeDatabaseToVersion54(sqlStore SqlStore) { // if shouldPerformUpgrade(sqlStore, VERSION_5_3_0, VERSION_5_4_0) { sqlStore.AlterColumnTypeIfExists("OutgoingWebhooks", "Description", "varchar(500)", "varchar(500)") sqlStore.AlterColumnTypeIfExists("IncomingWebhooks", "Description", "varchar(500)", "varchar(500)") + + if err := sqlStore.Channel().MigratePublicChannels(); err != nil { + mlog.Critical("Failed to migrate PublicChannels table", mlog.Err(err)) + time.Sleep(time.Second) + os.Exit(EXIT_GENERIC_FAILURE) + } // saveSchemaVersion(sqlStore, VERSION_5_4_0) - // } } |