summaryrefslogtreecommitdiffstats
path: root/store/sqlstore
diff options
context:
space:
mode:
authorJesse Hallam <jesse.hallam@gmail.com>2018-09-13 13:47:17 -0400
committerGitHub <noreply@github.com>2018-09-13 13:47:17 -0400
commit8b17bf9e42dd56ecd0fe8300da90bea5ee8684ef (patch)
treed9f3341228a2fe8f5994f56d524943f8702c5365 /store/sqlstore
parent0a5f792d2d6ceaa6c9bdb3050acbc4050c0c02f5 (diff)
downloadchat-8b17bf9e42dd56ecd0fe8300da90bea5ee8684ef.tar.gz
chat-8b17bf9e42dd56ecd0fe8300da90bea5ee8684ef.tar.bz2
chat-8b17bf9e42dd56ecd0fe8300da90bea5ee8684ef.zip
MM-11886: materialize channel search (#9349)
* materialize PublicChannels table Introduce triggers for each supported database that automatically maintain a subset of the Channels table corresponding to only public channels. This improves corresponding queries that no longer need to filter out 99% DM channels. This initial commit modifies the channel store directly for easier code reviewing, but the next wraps an experimental version around it to enable a kill switch in case there are unforeseen performance regressions. This addresses [MM-11886](https://mattermost.atlassian.net/browse/MM-11886) and [MM-11945](https://mattermost.atlassian.net/browse/MM-11945). * extract the experimental public channels materialization Wrap the original channel store with an experimental version that leverages the materialized public channels, but can be disabled to fallback to the original implementation. This addresses MM-11947. * s/ExperimentalPublicChannelsMaterialization/EnablePublicChannelsMaterialization/ * simplify error handling * move experimental config listener until after store is initialized
Diffstat (limited to 'store/sqlstore')
-rw-r--r--store/sqlstore/channel_store.go38
-rw-r--r--store/sqlstore/channel_store_experimental.go819
-rw-r--r--store/sqlstore/channel_store_test.go2
-rw-r--r--store/sqlstore/store.go1
-rw-r--r--store/sqlstore/store_test.go25
-rw-r--r--store/sqlstore/supplier.go65
-rw-r--r--store/sqlstore/upgrade.go8
7 files changed, 943 insertions, 15 deletions
diff --git a/store/sqlstore/channel_store.go b/store/sqlstore/channel_store.go
index 820fe1e9f..4103980c5 100644
--- a/store/sqlstore/channel_store.go
+++ b/store/sqlstore/channel_store.go
@@ -301,6 +301,21 @@ func (s SqlChannelStore) CreateIndexesIfNotExists() {
s.CreateFullTextIndexIfNotExists("idx_channel_search_txt", "Channels", "Name, DisplayName, Purpose")
}
+func (s SqlChannelStore) CreateTriggersIfNotExists() error {
+ // See SqlChannelStoreExperimental
+ return nil
+}
+
+func (s SqlChannelStore) MigratePublicChannels() error {
+ // See SqlChannelStoreExperimental
+ return nil
+}
+
+func (s SqlChannelStore) DropPublicChannels() error {
+ // See SqlChannelStoreExperimental
+ return nil
+}
+
func (s SqlChannelStore) Save(channel *model.Channel, maxChannelsPerTeam int64) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if channel.DeleteAt != 0 {
@@ -804,12 +819,12 @@ func (s SqlChannelStore) GetTeamChannels(teamId string) store.StoreChannel {
_, err := s.GetReplica().Select(data, "SELECT * FROM Channels WHERE TeamId = :TeamId And Type != 'D' ORDER BY DisplayName", map[string]interface{}{"TeamId": teamId})
if err != nil {
- result.Err = model.NewAppError("SqlChannelStore.GetChannels", "store.sql_channel.get_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError)
+ result.Err = model.NewAppError("SqlChannelStore.GetTeamChannels", "store.sql_channel.get_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError)
return
}
if len(*data) == 0 {
- result.Err = model.NewAppError("SqlChannelStore.GetChannels", "store.sql_channel.get_channels.not_found.app_error", nil, "teamId="+teamId, http.StatusNotFound)
+ result.Err = model.NewAppError("SqlChannelStore.GetTeamChannels", "store.sql_channel.get_channels.not_found.app_error", nil, "teamId="+teamId, http.StatusNotFound)
return
}
@@ -962,16 +977,16 @@ var CHANNEL_MEMBERS_WITH_SCHEME_SELECT_QUERY = `
TeamScheme.DefaultChannelAdminRole TeamSchemeDefaultAdminRole,
ChannelScheme.DefaultChannelUserRole ChannelSchemeDefaultUserRole,
ChannelScheme.DefaultChannelAdminRole ChannelSchemeDefaultAdminRole
- FROM
+ FROM
ChannelMembers
- INNER JOIN
+ INNER JOIN
Channels ON ChannelMembers.ChannelId = Channels.Id
LEFT JOIN
Schemes ChannelScheme ON Channels.SchemeId = ChannelScheme.Id
LEFT JOIN
Teams ON Channels.TeamId = Teams.Id
LEFT JOIN
- Schemes TeamScheme ON Teams.SchemeId = TeamScheme.Id
+ Schemes TeamScheme ON Teams.SchemeId = TeamScheme.Id
`
func (s SqlChannelStore) SaveMember(member *model.ChannelMember) store.StoreChannel {
@@ -1988,3 +2003,16 @@ func (s SqlChannelStore) ResetLastPostAt() store.StoreChannel {
}
})
}
+
+func (s SqlChannelStore) EnableExperimentalPublicChannelsMaterialization() {
+ // See SqlChannelStoreExperimental
+}
+
+func (s SqlChannelStore) DisableExperimentalPublicChannelsMaterialization() {
+ // See SqlChannelStoreExperimental
+}
+
+func (s SqlChannelStore) IsExperimentalPublicChannelsMaterializationEnabled() bool {
+ // See SqlChannelStoreExperimental
+ return false
+}
diff --git a/store/sqlstore/channel_store_experimental.go b/store/sqlstore/channel_store_experimental.go
new file mode 100644
index 000000000..67576ddc1
--- /dev/null
+++ b/store/sqlstore/channel_store_experimental.go
@@ -0,0 +1,819 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package sqlstore
+
+import (
+ "fmt"
+ "net/http"
+ "sort"
+ "strconv"
+ "strings"
+ "sync/atomic"
+
+ "github.com/pkg/errors"
+
+ "github.com/mattermost/mattermost-server/einterfaces"
+ "github.com/mattermost/mattermost-server/mlog"
+ "github.com/mattermost/mattermost-server/model"
+ "github.com/mattermost/mattermost-server/store"
+)
+
+// publicChannel is a subset of the metadata corresponding to public channels only.
+type publicChannel struct {
+ Id string `json:"id"`
+ DeleteAt int64 `json:"delete_at"`
+ TeamId string `json:"team_id"`
+ DisplayName string `json:"display_name"`
+ Name string `json:"name"`
+ Header string `json:"header"`
+ Purpose string `json:"purpose"`
+}
+
+type SqlChannelStoreExperimental struct {
+ SqlChannelStore
+ experimentalPublicChannelsMaterializationDisabled *uint32
+}
+
+func NewSqlChannelStoreExperimental(sqlStore SqlStore, metrics einterfaces.MetricsInterface, enabled bool) store.ChannelStore {
+ s := &SqlChannelStoreExperimental{
+ SqlChannelStore: *NewSqlChannelStore(sqlStore, metrics).(*SqlChannelStore),
+ experimentalPublicChannelsMaterializationDisabled: new(uint32),
+ }
+
+ if enabled {
+ // Forcibly log, since the default state is enabled and we want this on startup.
+ mlog.Info("Enabling experimental public channels materialization")
+ s.EnableExperimentalPublicChannelsMaterialization()
+ } else {
+ s.DisableExperimentalPublicChannelsMaterialization()
+ }
+
+ if s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ for _, db := range sqlStore.GetAllConns() {
+ tablePublicChannels := db.AddTableWithName(publicChannel{}, "PublicChannels").SetKeys(false, "Id")
+ tablePublicChannels.ColMap("Id").SetMaxSize(26)
+ tablePublicChannels.ColMap("TeamId").SetMaxSize(26)
+ tablePublicChannels.ColMap("DisplayName").SetMaxSize(64)
+ tablePublicChannels.ColMap("Name").SetMaxSize(64)
+ tablePublicChannels.SetUniqueTogether("Name", "TeamId")
+ tablePublicChannels.ColMap("Header").SetMaxSize(1024)
+ tablePublicChannels.ColMap("Purpose").SetMaxSize(250)
+ }
+ }
+
+ return s
+}
+
+// migratePublicChannels initializes the PublicChannels table with data created before the triggers
+// took over keeping it up-to-date.
+func (s SqlChannelStoreExperimental) MigratePublicChannels() error {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.MigratePublicChannels()
+ }
+
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ return err
+ }
+
+ if _, err := transaction.Exec(`
+ INSERT INTO PublicChannels
+ (Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
+ SELECT
+ c.Id, c.DeleteAt, c.TeamId, c.DisplayName, c.Name, c.Header, c.Purpose
+ FROM
+ Channels c
+ LEFT JOIN
+ PublicChannels pc ON (pc.Id = c.Id)
+ WHERE
+ c.Type = 'O'
+ AND pc.Id IS NULL
+ `); err != nil {
+ return err
+ }
+
+ if err := transaction.Commit(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// DropPublicChannels removes the public channels table and all associated triggers.
+func (s SqlChannelStoreExperimental) DropPublicChannels() error {
+ // Only PostgreSQL will honour the transaction when executing the DDL changes below.
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ return err
+ }
+
+ if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
+ if _, err := transaction.Exec(`
+ DROP TRIGGER IF EXISTS trigger_channels ON Channels
+ `); err != nil {
+ return err
+ }
+ if _, err := transaction.Exec(`
+ DROP FUNCTION IF EXISTS channels_copy_to_public_channels
+ `); err != nil {
+ return err
+ }
+ } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
+ if _, err := transaction.Exec(`
+ DROP TRIGGER IF EXISTS trigger_channels_insert
+ `); err != nil {
+ return err
+ }
+ if _, err := transaction.Exec(`
+ DROP TRIGGER IF EXISTS trigger_channels_update
+ `); err != nil {
+ return err
+ }
+ if _, err := transaction.Exec(`
+ DROP TRIGGER IF EXISTS trigger_channels_delete
+ `); err != nil {
+ return err
+ }
+ } else if s.DriverName() == model.DATABASE_DRIVER_SQLITE {
+ if _, err := transaction.Exec(`
+ DROP TRIGGER IF EXISTS trigger_channels_insert
+ `); err != nil {
+ return err
+ }
+ if _, err := transaction.Exec(`
+ DROP TRIGGER IF EXISTS trigger_channels_update_delete
+ `); err != nil {
+ return err
+ }
+ if _, err := transaction.Exec(`
+ DROP TRIGGER IF EXISTS trigger_channels_update
+ `); err != nil {
+ return err
+ }
+ if _, err := transaction.Exec(`
+ DROP TRIGGER IF EXISTS trigger_channels_delete
+ `); err != nil {
+ return err
+ }
+ } else {
+ return errors.New("failed to create trigger because of missing driver")
+ }
+
+ if _, err := transaction.Exec(`
+ DROP TABLE IF EXISTS PublicChannels
+ `); err != nil {
+ return err
+ }
+
+ if err := transaction.Commit(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (s SqlChannelStoreExperimental) CreateIndexesIfNotExists() {
+ s.SqlChannelStore.CreateIndexesIfNotExists()
+
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return
+ }
+
+ s.CreateIndexIfNotExists("idx_publicchannels_team_id", "PublicChannels", "TeamId")
+ s.CreateIndexIfNotExists("idx_publicchannels_name", "PublicChannels", "Name")
+ s.CreateIndexIfNotExists("idx_publicchannels_delete_at", "PublicChannels", "DeleteAt")
+ if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
+ s.CreateIndexIfNotExists("idx_publicchannels_name_lower", "PublicChannels", "lower(Name)")
+ s.CreateIndexIfNotExists("idx_publicchannels_displayname_lower", "PublicChannels", "lower(DisplayName)")
+ }
+ s.CreateFullTextIndexIfNotExists("idx_publicchannels_search_txt", "PublicChannels", "Name, DisplayName, Purpose")
+}
+
+func (s SqlChannelStoreExperimental) CreateTriggersIfNotExists() error {
+ s.SqlChannelStore.CreateTriggersIfNotExists()
+
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return nil
+ }
+
+ if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
+ if !s.DoesTriggerExist("trigger_channels") {
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ return errors.Wrap(err, "failed to create trigger function")
+ }
+
+ if _, err := transaction.ExecNoTimeout(`
+ CREATE OR REPLACE FUNCTION channels_copy_to_public_channels() RETURNS TRIGGER
+ SECURITY DEFINER
+ LANGUAGE plpgsql
+ AS $$
+ DECLARE
+ counter int := 0;
+ BEGIN
+ IF (TG_OP = 'DELETE' AND OLD.Type = 'O') OR (TG_OP = 'UPDATE' AND NEW.Type != 'O') THEN
+ DELETE FROM
+ PublicChannels
+ WHERE
+ Id = OLD.Id;
+ ELSEIF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') AND NEW.Type = 'O' THEN
+ UPDATE
+ PublicChannels
+ SET
+ DeleteAt = NEW.DeleteAt,
+ TeamId = NEW.TeamId,
+ DisplayName = NEW.DisplayName,
+ Name = NEW.Name,
+ Header = NEW.Header,
+ Purpose = NEW.Purpose
+ WHERE
+ Id = NEW.Id;
+
+ -- There's a race condition here where the INSERT might fail, though this should only occur
+ -- if PublicChannels had been modified outside of the triggers. We could improve this with
+ -- the UPSERT functionality in Postgres 9.5+ once we support same.
+ IF NOT FOUND THEN
+ INSERT INTO
+ PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
+ VALUES
+ (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose);
+ END IF;
+ END IF;
+
+ RETURN NULL;
+ END
+ $$;
+ `); err != nil {
+ return errors.Wrap(err, "failed to create trigger function")
+ }
+
+ if _, err := transaction.ExecNoTimeout(`
+ CREATE TRIGGER
+ trigger_channels
+ AFTER INSERT OR UPDATE OR DELETE ON
+ Channels
+ FOR EACH ROW EXECUTE PROCEDURE
+ channels_copy_to_public_channels();
+ `); err != nil {
+ return errors.Wrap(err, "failed to create trigger")
+ }
+
+ if err := transaction.Commit(); err != nil {
+ return errors.Wrap(err, "failed to create trigger function")
+ }
+ }
+ } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
+ // Note that DDL statements in MySQL (CREATE TABLE, CREATE TRIGGER, etc.) cannot
+ // be rolled back inside a transaction (unlike PostgreSQL), so there's no point in
+ // wrapping what follows inside a transaction.
+
+ if !s.DoesTriggerExist("trigger_channels_insert") {
+ if _, err := s.GetMaster().ExecNoTimeout(`
+ CREATE TRIGGER
+ trigger_channels_insert
+ AFTER INSERT ON
+ Channels
+ FOR EACH ROW
+ BEGIN
+ IF NEW.Type = 'O' THEN
+ INSERT INTO
+ PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
+ VALUES
+ (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose)
+ ON DUPLICATE KEY UPDATE
+ DeleteAt = NEW.DeleteAt,
+ TeamId = NEW.TeamId,
+ DisplayName = NEW.DisplayName,
+ Name = NEW.Name,
+ Header = NEW.Header,
+ Purpose = NEW.Purpose;
+ END IF;
+ END;
+ `); err != nil {
+ return errors.Wrap(err, "failed to create trigger_channels_insert trigger")
+ }
+ }
+
+ if !s.DoesTriggerExist("trigger_channels_update") {
+ if _, err := s.GetMaster().ExecNoTimeout(`
+ CREATE TRIGGER
+ trigger_channels_update
+ AFTER UPDATE ON
+ Channels
+ FOR EACH ROW
+ BEGIN
+ IF OLD.Type = 'O' AND NEW.Type != 'O' THEN
+ DELETE FROM
+ PublicChannels
+ WHERE
+ Id = NEW.Id;
+ ELSEIF NEW.Type = 'O' THEN
+ INSERT INTO
+ PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
+ VALUES
+ (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose)
+ ON DUPLICATE KEY UPDATE
+ DeleteAt = NEW.DeleteAt,
+ TeamId = NEW.TeamId,
+ DisplayName = NEW.DisplayName,
+ Name = NEW.Name,
+ Header = NEW.Header,
+ Purpose = NEW.Purpose;
+ END IF;
+ END;
+ `); err != nil {
+ return errors.Wrap(err, "failed to create trigger_channels_update trigger")
+ }
+ }
+
+ if !s.DoesTriggerExist("trigger_channels_delete") {
+ if _, err := s.GetMaster().ExecNoTimeout(`
+ CREATE TRIGGER
+ trigger_channels_delete
+ AFTER DELETE ON
+ Channels
+ FOR EACH ROW
+ BEGIN
+ IF OLD.Type = 'O' THEN
+ DELETE FROM
+ PublicChannels
+ WHERE
+ Id = OLD.Id;
+ END IF;
+ END;
+ `); err != nil {
+ return errors.Wrap(err, "failed to create trigger_channels_delete trigger")
+ }
+ }
+ } else if s.DriverName() == model.DATABASE_DRIVER_SQLITE {
+ if _, err := s.GetMaster().ExecNoTimeout(`
+ CREATE TRIGGER IF NOT EXISTS
+ trigger_channels_insert
+ AFTER INSERT ON
+ Channels
+ FOR EACH ROW
+ WHEN NEW.Type = 'O'
+ BEGIN
+ -- Ideally, we'd leverage ON CONFLICT DO UPDATE below and make this INSERT resilient to pre-existing
+ -- data. However, the version of Sqlite we're compiling against doesn't support this. This isn't
+ -- critical, though, since we don't support Sqlite in production.
+ INSERT INTO
+ PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
+ VALUES
+ (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose);
+ END;
+ `); err != nil {
+ return errors.Wrap(err, "failed to create trigger_channels_insert trigger")
+ }
+
+ if _, err := s.GetMaster().ExecNoTimeout(`
+ CREATE TRIGGER IF NOT EXISTS
+ trigger_channels_update_delete
+ AFTER UPDATE ON
+ Channels
+ FOR EACH ROW
+ WHEN
+ OLD.Type = 'O'
+ AND NEW.Type != 'O'
+ BEGIN
+ DELETE FROM
+ PublicChannels
+ WHERE
+ Id = NEW.Id;
+ END;
+ `); err != nil {
+ return errors.Wrap(err, "failed to create trigger_channels_update_delete trigger")
+ }
+
+ if _, err := s.GetMaster().ExecNoTimeout(`
+ CREATE TRIGGER IF NOT EXISTS
+ trigger_channels_update
+ AFTER UPDATE ON
+ Channels
+ FOR EACH ROW
+ WHEN
+ OLD.Type != 'O'
+ AND NEW.Type = 'O'
+ BEGIN
+ -- See comments re: ON CONFLICT DO UPDATE above that would apply here as well.
+ UPDATE
+ PublicChannels
+ SET
+ DeleteAt = NEW.DeleteAt,
+ TeamId = NEW.TeamId,
+ DisplayName = NEW.DisplayName,
+ Name = NEW.Name,
+ Header = NEW.Header,
+ Purpose = NEW.Purpose
+ WHERE
+ Id = NEW.Id;
+ END;
+ `); err != nil {
+ return errors.Wrap(err, "failed to create trigger_channels_update trigger")
+ }
+
+ if _, err := s.GetMaster().ExecNoTimeout(`
+ CREATE TRIGGER IF NOT EXISTS
+ trigger_channels_delete
+ AFTER UPDATE ON
+ Channels
+ FOR EACH ROW
+ WHEN
+ OLD.Type = 'O'
+ BEGIN
+ DELETE FROM
+ PublicChannels
+ WHERE
+ Id = OLD.Id;
+ END;
+ `); err != nil {
+ return errors.Wrap(err, "failed to create trigger_channels_delete trigger")
+ }
+ } else {
+ return errors.New("failed to create trigger because of missing driver")
+ }
+
+ return nil
+}
+
+func (s SqlChannelStoreExperimental) GetMoreChannels(teamId string, userId string, offset int, limit int) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.GetMoreChannels(teamId, userId, offset, limit)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ data := &model.ChannelList{}
+ _, err := s.GetReplica().Select(data, `
+ SELECT
+ Channels.*
+ FROM
+ Channels
+ JOIN
+ PublicChannels c ON (c.Id = Channels.Id)
+ WHERE
+ c.TeamId = :TeamId
+ AND c.DeleteAt = 0
+ AND c.Id NOT IN (
+ SELECT
+ c.Id
+ FROM
+ PublicChannels c
+ JOIN
+ ChannelMembers cm ON (cm.ChannelId = c.Id)
+ WHERE
+ c.TeamId = :TeamId
+ AND cm.UserId = :UserId
+ AND c.DeleteAt = 0
+ )
+ ORDER BY
+ c.DisplayName
+ LIMIT :Limit
+ OFFSET :Offset
+ `, map[string]interface{}{
+ "TeamId": teamId,
+ "UserId": userId,
+ "Limit": limit,
+ "Offset": offset,
+ })
+
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.GetMoreChannels", "store.sql_channel.get_more_channels.get.app_error", nil, "teamId="+teamId+", userId="+userId+", err="+err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ result.Data = data
+ })
+}
+
+func (s SqlChannelStoreExperimental) GetPublicChannelsForTeam(teamId string, offset int, limit int) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.GetPublicChannelsForTeam(teamId, offset, limit)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ data := &model.ChannelList{}
+ _, err := s.GetReplica().Select(data, `
+ SELECT
+ Channels.*
+ FROM
+ Channels
+ JOIN
+ PublicChannels pc ON (pc.Id = Channels.Id)
+ WHERE
+ pc.TeamId = :TeamId
+ AND pc.DeleteAt = 0
+ ORDER BY pc.DisplayName
+ LIMIT :Limit
+ OFFSET :Offset
+ `, map[string]interface{}{
+ "TeamId": teamId,
+ "Limit": limit,
+ "Offset": offset,
+ })
+
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsForTeam", "store.sql_channel.get_public_channels.get.app_error", nil, "teamId="+teamId+", err="+err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ result.Data = data
+ })
+}
+
+func (s SqlChannelStoreExperimental) GetPublicChannelsByIdsForTeam(teamId string, channelIds []string) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.GetPublicChannelsByIdsForTeam(teamId, channelIds)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ props := make(map[string]interface{})
+ props["teamId"] = teamId
+
+ idQuery := ""
+
+ for index, channelId := range channelIds {
+ if len(idQuery) > 0 {
+ idQuery += ", "
+ }
+
+ props["channelId"+strconv.Itoa(index)] = channelId
+ idQuery += ":channelId" + strconv.Itoa(index)
+ }
+
+ data := &model.ChannelList{}
+ _, err := s.GetReplica().Select(data, `
+ SELECT
+ Channels.*
+ FROM
+ Channels
+ JOIN
+ PublicChannels pc ON (pc.Id = Channels.Id)
+ WHERE
+ pc.TeamId = :teamId
+ AND pc.DeleteAt = 0
+ AND pc.Id IN (`+idQuery+`)
+ ORDER BY pc.DisplayName
+ `, props)
+
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsByIdsForTeam", "store.sql_channel.get_channels_by_ids.get.app_error", nil, err.Error(), http.StatusInternalServerError)
+ }
+
+ if len(*data) == 0 {
+ result.Err = model.NewAppError("SqlChannelStore.GetPublicChannelsByIdsForTeam", "store.sql_channel.get_channels_by_ids.not_found.app_error", nil, "", http.StatusNotFound)
+ }
+
+ result.Data = data
+ })
+}
+
+func (s SqlChannelStoreExperimental) AutocompleteInTeam(teamId string, term string, includeDeleted bool) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.AutocompleteInTeam(teamId, term, includeDeleted)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ deleteFilter := "AND c.DeleteAt = 0"
+ if includeDeleted {
+ deleteFilter = ""
+ }
+
+ queryFormat := `
+ SELECT
+ Channels.*
+ FROM
+ Channels
+ JOIN
+ PublicChannels c ON (c.Id = Channels.Id)
+ WHERE
+ c.TeamId = :TeamId
+ ` + deleteFilter + `
+ %v
+ LIMIT 50
+ `
+
+ var channels model.ChannelList
+
+ if likeClause, likeTerm := s.buildLIKEClause(term); likeClause == "" {
+ if _, err := s.GetReplica().Select(&channels, fmt.Sprintf(queryFormat, ""), map[string]interface{}{"TeamId": teamId}); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.AutocompleteInTeam", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError)
+ }
+ } else {
+ // Using a UNION results in index_merge and fulltext queries and is much faster than the ref
+ // query you would get using an OR of the LIKE and full-text clauses.
+ fulltextClause, fulltextTerm := s.buildFulltextClause(term)
+ likeQuery := fmt.Sprintf(queryFormat, "AND "+likeClause)
+ fulltextQuery := fmt.Sprintf(queryFormat, "AND "+fulltextClause)
+ query := fmt.Sprintf("(%v) UNION (%v) LIMIT 50", likeQuery, fulltextQuery)
+
+ if _, err := s.GetReplica().Select(&channels, query, map[string]interface{}{"TeamId": teamId, "LikeTerm": likeTerm, "FulltextTerm": fulltextTerm}); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.AutocompleteInTeam", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError)
+ }
+ }
+
+ sort.Slice(channels, func(a, b int) bool {
+ return strings.ToLower(channels[a].DisplayName) < strings.ToLower(channels[b].DisplayName)
+ })
+ result.Data = &channels
+ })
+}
+
+func (s SqlChannelStoreExperimental) SearchInTeam(teamId string, term string, includeDeleted bool) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.SearchInTeam(teamId, term, includeDeleted)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ deleteFilter := "AND c.DeleteAt = 0"
+ if includeDeleted {
+ deleteFilter = ""
+ }
+
+ *result = s.performSearch(`
+ SELECT
+ Channels.*
+ FROM
+ Channels
+ JOIN
+ PublicChannels c ON (c.Id = Channels.Id)
+ WHERE
+ c.TeamId = :TeamId
+ `+deleteFilter+`
+ SEARCH_CLAUSE
+ ORDER BY c.DisplayName
+ LIMIT 100
+ `, term, map[string]interface{}{
+ "TeamId": teamId,
+ })
+ })
+}
+
+func (s SqlChannelStoreExperimental) SearchMore(userId string, teamId string, term string) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.SearchMore(userId, teamId, term)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ *result = s.performSearch(`
+ SELECT
+ Channels.*
+ FROM
+ Channels
+ JOIN
+ PublicChannels c ON (c.Id = Channels.Id)
+ WHERE
+ c.TeamId = :TeamId
+ AND c.DeleteAt = 0
+ AND c.Id NOT IN (
+ SELECT
+ c.Id
+ FROM
+ PublicChannels c
+ JOIN
+ ChannelMembers cm ON (cm.ChannelId = c.Id)
+ WHERE
+ c.TeamId = :TeamId
+ AND cm.UserId = :UserId
+ AND c.DeleteAt = 0
+ )
+ SEARCH_CLAUSE
+ ORDER BY c.DisplayName
+ LIMIT 100
+ `, term, map[string]interface{}{
+ "TeamId": teamId,
+ "UserId": userId,
+ })
+ })
+}
+
+func (s SqlChannelStoreExperimental) buildLIKEClause(term string) (likeClause, likeTerm string) {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.buildLIKEClause(term)
+ }
+
+ likeTerm = term
+ searchColumns := "c.Name, c.DisplayName, c.Purpose"
+
+ // These chars must be removed from the like query.
+ for _, c := range ignoreLikeSearchChar {
+ likeTerm = strings.Replace(likeTerm, c, "", -1)
+ }
+
+ // These chars must be escaped in the like query.
+ for _, c := range escapeLikeSearchChar {
+ likeTerm = strings.Replace(likeTerm, c, "*"+c, -1)
+ }
+
+ if likeTerm == "" {
+ return
+ }
+
+ // Prepare the LIKE portion of the query.
+ var searchFields []string
+ for _, field := range strings.Split(searchColumns, ", ") {
+ if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
+ searchFields = append(searchFields, fmt.Sprintf("lower(%s) LIKE lower(%s) escape '*'", field, ":LikeTerm"))
+ } else {
+ searchFields = append(searchFields, fmt.Sprintf("%s LIKE %s escape '*'", field, ":LikeTerm"))
+ }
+ }
+
+ likeClause = fmt.Sprintf("(%s)", strings.Join(searchFields, " OR "))
+ likeTerm += "%"
+ return
+}
+
+func (s SqlChannelStoreExperimental) buildFulltextClause(term string) (fulltextClause, fulltextTerm string) {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.buildFulltextClause(term)
+ }
+
+ // Copy the terms as we will need to prepare them differently for each search type.
+ fulltextTerm = term
+
+ searchColumns := "c.Name, c.DisplayName, c.Purpose"
+
+ // These chars must be treated as spaces in the fulltext query.
+ for _, c := range spaceFulltextSearchChar {
+ fulltextTerm = strings.Replace(fulltextTerm, c, " ", -1)
+ }
+
+ // Prepare the FULLTEXT portion of the query.
+ if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
+ fulltextTerm = strings.Replace(fulltextTerm, "|", "", -1)
+
+ splitTerm := strings.Fields(fulltextTerm)
+ for i, t := range strings.Fields(fulltextTerm) {
+ if i == len(splitTerm)-1 {
+ splitTerm[i] = t + ":*"
+ } else {
+ splitTerm[i] = t + ":* &"
+ }
+ }
+
+ fulltextTerm = strings.Join(splitTerm, " ")
+
+ fulltextClause = fmt.Sprintf("((%s) @@ to_tsquery(:FulltextTerm))", convertMySQLFullTextColumnsToPostgres(searchColumns))
+ } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
+ splitTerm := strings.Fields(fulltextTerm)
+ for i, t := range strings.Fields(fulltextTerm) {
+ splitTerm[i] = "+" + t + "*"
+ }
+
+ fulltextTerm = strings.Join(splitTerm, " ")
+
+ fulltextClause = fmt.Sprintf("MATCH(%s) AGAINST (:FulltextTerm IN BOOLEAN MODE)", searchColumns)
+ }
+
+ return
+}
+
+func (s SqlChannelStoreExperimental) performSearch(searchQuery string, term string, parameters map[string]interface{}) store.StoreResult {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.performSearch(searchQuery, term, parameters)
+ }
+
+ result := store.StoreResult{}
+
+ likeClause, likeTerm := s.buildLIKEClause(term)
+ if likeTerm == "" {
+ // If the likeTerm is empty after preparing, then don't bother searching.
+ searchQuery = strings.Replace(searchQuery, "SEARCH_CLAUSE", "", 1)
+ } else {
+ parameters["LikeTerm"] = likeTerm
+ fulltextClause, fulltextTerm := s.buildFulltextClause(term)
+ parameters["FulltextTerm"] = fulltextTerm
+ searchQuery = strings.Replace(searchQuery, "SEARCH_CLAUSE", "AND ("+likeClause+" OR "+fulltextClause+")", 1)
+ }
+
+ var channels model.ChannelList
+
+ if _, err := s.GetReplica().Select(&channels, searchQuery, parameters); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.Search", "store.sql_channel.search.app_error", nil, "term="+term+", "+", "+err.Error(), http.StatusInternalServerError)
+ return result
+ }
+
+ result.Data = &channels
+ return result
+}
+
+func (s SqlChannelStoreExperimental) EnableExperimentalPublicChannelsMaterialization() {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ mlog.Info("Enabling experimental public channels materialization")
+ }
+
+ atomic.StoreUint32(s.experimentalPublicChannelsMaterializationDisabled, 0)
+}
+
+func (s SqlChannelStoreExperimental) DisableExperimentalPublicChannelsMaterialization() {
+ if s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ mlog.Info("Disabling experimental public channels materialization")
+ }
+
+ atomic.StoreUint32(s.experimentalPublicChannelsMaterializationDisabled, 1)
+}
+
+func (s SqlChannelStoreExperimental) IsExperimentalPublicChannelsMaterializationEnabled() bool {
+ return atomic.LoadUint32(s.experimentalPublicChannelsMaterializationDisabled) == 0
+}
diff --git a/store/sqlstore/channel_store_test.go b/store/sqlstore/channel_store_test.go
index 0e8b4191a..5eb84afcd 100644
--- a/store/sqlstore/channel_store_test.go
+++ b/store/sqlstore/channel_store_test.go
@@ -14,7 +14,7 @@ import (
)
func TestChannelStore(t *testing.T) {
- StoreTest(t, storetest.TestChannelStore)
+ StoreTestWithSqlSupplier(t, storetest.TestChannelStore)
}
func TestChannelStoreInternalDataTypes(t *testing.T) {
diff --git a/store/sqlstore/store.go b/store/sqlstore/store.go
index 500f98235..df912028b 100644
--- a/store/sqlstore/store.go
+++ b/store/sqlstore/store.go
@@ -51,6 +51,7 @@ type SqlStore interface {
MarkSystemRanUnitTests()
DoesTableExist(tablename string) bool
DoesColumnExist(tableName string, columName string) bool
+ DoesTriggerExist(triggerName string) bool
CreateColumnIfNotExists(tableName string, columnName string, mySqlColType string, postgresColType string, defaultValue string) bool
CreateColumnIfNotExistsNoDefault(tableName string, columnName string, mySqlColType string, postgresColType string) bool
RemoveColumnIfExists(tableName string, columnName string) bool
diff --git a/store/sqlstore/store_test.go b/store/sqlstore/store_test.go
index 58065d65d..55002aee2 100644
--- a/store/sqlstore/store_test.go
+++ b/store/sqlstore/store_test.go
@@ -16,10 +16,11 @@ import (
)
var storeTypes = []*struct {
- Name string
- Func func() (*storetest.RunningContainer, *model.SqlSettings, error)
- Container *storetest.RunningContainer
- Store store.Store
+ Name string
+ Func func() (*storetest.RunningContainer, *model.SqlSettings, error)
+ Container *storetest.RunningContainer
+ SqlSupplier *SqlSupplier
+ Store store.Store
}{
{
Name: "MySQL",
@@ -44,6 +45,19 @@ func StoreTest(t *testing.T, f func(*testing.T, store.Store)) {
}
}
+func StoreTestWithSqlSupplier(t *testing.T, f func(*testing.T, store.Store, storetest.SqlSupplier)) {
+ defer func() {
+ if err := recover(); err != nil {
+ tearDownStores()
+ panic(err)
+ }
+ }()
+ for _, st := range storeTypes {
+ st := st
+ t.Run(st.Name, func(t *testing.T) { f(t, st.Store, st.SqlSupplier) })
+ }
+}
+
func initStores() {
defer func() {
if err := recover(); err != nil {
@@ -64,7 +78,8 @@ func initStores() {
return
}
st.Container = container
- st.Store = store.NewLayeredStore(NewSqlSupplier(*settings, nil), nil, nil)
+ st.SqlSupplier = NewSqlSupplier(*settings, nil)
+ st.Store = store.NewLayeredStore(st.SqlSupplier, nil, nil)
st.Store.MarkSystemRanUnitTests()
}()
}
diff --git a/store/sqlstore/supplier.go b/store/sqlstore/supplier.go
index 6c49d91fb..d1d7564f7 100644
--- a/store/sqlstore/supplier.go
+++ b/store/sqlstore/supplier.go
@@ -33,6 +33,7 @@ const (
)
const (
+ EXIT_GENERIC_FAILURE = 1
EXIT_CREATE_TABLE = 100
EXIT_DB_OPEN = 101
EXIT_PING = 102
@@ -116,8 +117,13 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter
supplier.initConnection()
+ enableExperimentalPublicChannelsMaterialization := true
+ if settings.EnablePublicChannelsMaterialization != nil && !*settings.EnablePublicChannelsMaterialization {
+ enableExperimentalPublicChannelsMaterialization = false
+ }
+
supplier.oldStores.team = NewSqlTeamStore(supplier)
- supplier.oldStores.channel = NewSqlChannelStore(supplier, metrics)
+ supplier.oldStores.channel = NewSqlChannelStoreExperimental(supplier, metrics, enableExperimentalPublicChannelsMaterialization)
supplier.oldStores.post = NewSqlPostStore(supplier, metrics)
supplier.oldStores.user = NewSqlUserStore(supplier, metrics)
supplier.oldStores.audit = NewSqlAuditStore(supplier)
@@ -151,10 +157,19 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter
os.Exit(EXIT_CREATE_TABLE)
}
+ // This store's triggers should exist before the migration is run to ensure the
+ // corresponding tables stay in sync. Whether or not a trigger should be created before
+ // or after a migration is likely to be decided on a case-by-case basis.
+ if err := supplier.oldStores.channel.(*SqlChannelStoreExperimental).CreateTriggersIfNotExists(); err != nil {
+ mlog.Critical("Error creating triggers", mlog.Err(err))
+ time.Sleep(time.Second)
+ os.Exit(EXIT_GENERIC_FAILURE)
+ }
+
UpgradeDatabase(supplier)
supplier.oldStores.team.(*SqlTeamStore).CreateIndexesIfNotExists()
- supplier.oldStores.channel.(*SqlChannelStore).CreateIndexesIfNotExists()
+ supplier.oldStores.channel.(*SqlChannelStoreExperimental).CreateIndexesIfNotExists()
supplier.oldStores.post.(*SqlPostStore).CreateIndexesIfNotExists()
supplier.oldStores.user.(*SqlUserStore).CreateIndexesIfNotExists()
supplier.oldStores.audit.(*SqlAuditStore).CreateIndexesIfNotExists()
@@ -461,6 +476,52 @@ func (ss *SqlSupplier) DoesColumnExist(tableName string, columnName string) bool
}
}
+func (ss *SqlSupplier) DoesTriggerExist(triggerName string) bool {
+ if ss.DriverName() == model.DATABASE_DRIVER_POSTGRES {
+ count, err := ss.GetMaster().SelectInt(`
+ SELECT
+ COUNT(0)
+ FROM
+ pg_trigger
+ WHERE
+ tgname = $1
+ `, triggerName)
+
+ if err != nil {
+ mlog.Critical(fmt.Sprintf("Failed to check if trigger exists %v", err))
+ time.Sleep(time.Second)
+ os.Exit(EXIT_GENERIC_FAILURE)
+ }
+
+ return count > 0
+
+ } else if ss.DriverName() == model.DATABASE_DRIVER_MYSQL {
+ count, err := ss.GetMaster().SelectInt(`
+ SELECT
+ COUNT(0)
+ FROM
+ information_schema.triggers
+ WHERE
+ trigger_schema = DATABASE()
+ AND trigger_name = ?
+ `, triggerName)
+
+ if err != nil {
+ mlog.Critical(fmt.Sprintf("Failed to check if trigger exists %v", err))
+ time.Sleep(time.Second)
+ os.Exit(EXIT_GENERIC_FAILURE)
+ }
+
+ return count > 0
+
+ } else {
+ mlog.Critical("Failed to check if column exists because of missing driver")
+ time.Sleep(time.Second)
+ os.Exit(EXIT_GENERIC_FAILURE)
+ return false
+ }
+}
+
func (ss *SqlSupplier) CreateColumnIfNotExists(tableName string, columnName string, mySqlColType string, postgresColType string, defaultValue string) bool {
if ss.DoesColumnExist(tableName, columnName) {
diff --git a/store/sqlstore/upgrade.go b/store/sqlstore/upgrade.go
index 5f74dbfb1..a8be96172 100644
--- a/store/sqlstore/upgrade.go
+++ b/store/sqlstore/upgrade.go
@@ -489,7 +489,6 @@ func UpgradeDatabaseToVersion53(sqlStore SqlStore) {
if shouldPerformUpgrade(sqlStore, VERSION_5_2_0, VERSION_5_3_0) {
saveSchemaVersion(sqlStore, VERSION_5_3_0)
}
-
}
func UpgradeDatabaseToVersion54(sqlStore SqlStore) {
@@ -497,6 +496,11 @@ func UpgradeDatabaseToVersion54(sqlStore SqlStore) {
// if shouldPerformUpgrade(sqlStore, VERSION_5_3_0, VERSION_5_4_0) {
sqlStore.AlterColumnTypeIfExists("OutgoingWebhooks", "Description", "varchar(500)", "varchar(500)")
sqlStore.AlterColumnTypeIfExists("IncomingWebhooks", "Description", "varchar(500)", "varchar(500)")
+
+ if err := sqlStore.Channel().MigratePublicChannels(); err != nil {
+ mlog.Critical("Failed to migrate PublicChannels table", mlog.Err(err))
+ time.Sleep(time.Second)
+ os.Exit(EXIT_GENERIC_FAILURE)
+ }
// saveSchemaVersion(sqlStore, VERSION_5_4_0)
- // }
}