From 6c826e765f2ea38a68628f548c6de068019e3281 Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Mon, 17 Sep 2018 16:25:19 -0400 Subject: materialize PublicChannels without triggers (#9424) Creating triggers requires SUPERUSER privileges, and is especially painful on RDS. Pivot to maintaining this denormalized table in code. --- store/sqlstore/channel_store.go | 247 +++++++++--- store/sqlstore/channel_store_experimental.go | 546 +++++++++++++-------------- store/sqlstore/supplier.go | 9 - 3 files changed, 456 insertions(+), 346 deletions(-) (limited to 'store') diff --git a/store/sqlstore/channel_store.go b/store/sqlstore/channel_store.go index c0c1d2c8a..b4d3bd8fa 100644 --- a/store/sqlstore/channel_store.go +++ b/store/sqlstore/channel_store.go @@ -301,11 +301,6 @@ func (s SqlChannelStore) CreateIndexesIfNotExists() { s.CreateFullTextIndexIfNotExists("idx_channel_search_txt", "Channels", "Name, DisplayName, Purpose") } -func (s SqlChannelStore) CreateTriggersIfNotExists() error { - // See SqlChannelStoreExperimental - return nil -} - func (s SqlChannelStore) MigratePublicChannels() error { // See SqlChannelStoreExperimental return nil @@ -316,6 +311,9 @@ func (s SqlChannelStore) DropPublicChannels() error { return nil } +// Save writes the (non-direct) channel channel to the database. +// +// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) Save(channel *model.Channel, maxChannelsPerTeam int64) store.StoreChannel { return store.Do(func(result *store.StoreResult) { if channel.DeleteAt != 0 { @@ -474,42 +472,68 @@ func (s SqlChannelStore) saveChannelT(transaction *gorp.Transaction, channel *mo return result } +// Update writes the updated channel to the database. +// +// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) Update(channel *model.Channel) store.StoreChannel { return store.Do(func(result *store.StoreResult) { - channel.PreUpdate() - - if channel.DeleteAt != 0 { - result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.archived_channel.app_error", nil, "", http.StatusBadRequest) + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) return } - if result.Err = channel.IsValid(); result.Err != nil { + *result = s.updateChannelT(transaction, channel) + if result.Err != nil { + transaction.Rollback() return } - count, err := s.GetMaster().Update(channel) - if err != nil { - if IsUniqueConstraintError(err, []string{"Name", "channels_name_teamid_key"}) { - dupChannel := model.Channel{} - s.GetReplica().SelectOne(&dupChannel, "SELECT * FROM Channels WHERE TeamId = :TeamId AND Name= :Name AND DeleteAt > 0", map[string]interface{}{"TeamId": channel.TeamId, "Name": channel.Name}) - if dupChannel.DeleteAt > 0 { - result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.previously.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest) - return - } - result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.exists.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest) - return - } - result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.updating.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusInternalServerError) + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) return } + }) +} - if count != 1 { - result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.app_error", nil, "id="+channel.Id, http.StatusInternalServerError) - return +func (s SqlChannelStore) updateChannelT(transaction *gorp.Transaction, channel *model.Channel) store.StoreResult { + result := store.StoreResult{} + + channel.PreUpdate() + + if channel.DeleteAt != 0 { + result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.archived_channel.app_error", nil, "", http.StatusBadRequest) + return result + } + + if result.Err = channel.IsValid(); result.Err != nil { + return result + } + + count, err := transaction.Update(channel) + if err != nil { + if IsUniqueConstraintError(err, []string{"Name", "channels_name_teamid_key"}) { + dupChannel := model.Channel{} + s.GetReplica().SelectOne(&dupChannel, "SELECT * FROM Channels WHERE TeamId = :TeamId AND Name= :Name AND DeleteAt > 0", map[string]interface{}{"TeamId": channel.TeamId, "Name": channel.Name}) + if dupChannel.DeleteAt > 0 { + result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.previously.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest) + return result + } + result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.exists.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest) + return result } + result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.updating.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusInternalServerError) + return result + } - result.Data = channel - }) + if count != 1 { + result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.app_error", nil, "id="+channel.Id, http.StatusInternalServerError) + return result + } + + result.Data = channel + + return result } func (s SqlChannelStore) GetChannelUnread(channelId, userId string) store.StoreChannel { @@ -617,39 +641,126 @@ func (s SqlChannelStore) get(id string, master bool, allowFromCache bool) store. }) } +// Delete records the given deleted timestamp to the channel in question. +// +// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) Delete(channelId string, time int64) store.StoreChannel { return s.SetDeleteAt(channelId, time, time) } +// Restore reverts a previous deleted timestamp from the channel in question. +// +// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) Restore(channelId string, time int64) store.StoreChannel { return s.SetDeleteAt(channelId, 0, time) } -func (s SqlChannelStore) SetDeleteAt(channelId string, deleteAt int64, updateAt int64) store.StoreChannel { +// SetDeleteAt records the given deleted and updated timestamp to the channel in question. +// +// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. +func (s SqlChannelStore) SetDeleteAt(channelId string, deleteAt, updateAt int64) store.StoreChannel { return store.Do(func(result *store.StoreResult) { - _, err := s.GetMaster().Exec("Update Channels SET DeleteAt = :DeleteAt, UpdateAt = :UpdateAt WHERE Id = :ChannelId", map[string]interface{}{"DeleteAt": deleteAt, "UpdateAt": updateAt, "ChannelId": channelId}) + transaction, err := s.GetMaster().Begin() if err != nil { - result.Err = model.NewAppError("SqlChannelStore.Delete", "store.sql_channel.delete.channel.app_error", nil, "id="+channelId+", err="+err.Error(), http.StatusInternalServerError) + result.Err = model.NewAppError("SqlChannelStore.SetDeleteAt", "store.sql_channel.set_delete_at.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + *result = s.setDeleteAtT(transaction, channelId, deleteAt, updateAt) + if result.Err != nil { + transaction.Rollback() + return + } + + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStore.SetDeleteAt", "store.sql_channel.set_delete_at.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return } }) } +func (s SqlChannelStore) setDeleteAtT(transaction *gorp.Transaction, channelId string, deleteAt, updateAt int64) store.StoreResult { + result := store.StoreResult{} + + _, err := transaction.Exec("Update Channels SET DeleteAt = :DeleteAt, UpdateAt = :UpdateAt WHERE Id = :ChannelId", map[string]interface{}{"DeleteAt": deleteAt, "UpdateAt": updateAt, "ChannelId": channelId}) + if err != nil { + result.Err = model.NewAppError("SqlChannelStore.Delete", "store.sql_channel.delete.channel.app_error", nil, "id="+channelId+", err="+err.Error(), http.StatusInternalServerError) + return result + } + + return result +} + +// PermanentDeleteByTeam removes all channels for the given team from the database. +// +// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) PermanentDeleteByTeam(teamId string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { - if _, err := s.GetMaster().Exec("DELETE FROM Channels WHERE TeamId = :TeamId", map[string]interface{}{"TeamId": teamId}); err != nil { - result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.app_error", nil, "teamId="+teamId+", "+err.Error(), http.StatusInternalServerError) + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + *result = s.permanentDeleteByTeamtT(transaction, teamId) + if result.Err != nil { + transaction.Rollback() + return + } + + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return } }) } +func (s SqlChannelStore) permanentDeleteByTeamtT(transaction *gorp.Transaction, teamId string) store.StoreResult { + result := store.StoreResult{} + + if _, err := transaction.Exec("DELETE FROM Channels WHERE TeamId = :TeamId", map[string]interface{}{"TeamId": teamId}); err != nil { + result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.app_error", nil, "teamId="+teamId+", "+err.Error(), http.StatusInternalServerError) + return result + } + + return result +} + +// PermanentDelete removes the given channel from the database. +// +// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table. func (s SqlChannelStore) PermanentDelete(channelId string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { - if _, err := s.GetMaster().Exec("DELETE FROM Channels WHERE Id = :ChannelId", map[string]interface{}{"ChannelId": channelId}); err != nil { - result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError) + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + *result = s.permanentDeleteT(transaction, channelId) + if result.Err != nil { + transaction.Rollback() + return + } + + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return } }) } +func (s SqlChannelStore) permanentDeleteT(transaction *gorp.Transaction, channelId string) store.StoreResult { + result := store.StoreResult{} + + if _, err := transaction.Exec("DELETE FROM Channels WHERE Id = :ChannelId", map[string]interface{}{"ChannelId": channelId}); err != nil { + result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError) + return result + } + + return result +} + func (s SqlChannelStore) PermanentDeleteMembersByChannel(channelId string) store.StoreChannel { return store.Do(func(result *store.StoreResult) { _, err := s.GetMaster().Exec("DELETE FROM ChannelMembers WHERE ChannelId = :ChannelId", map[string]interface{}{"ChannelId": channelId}) @@ -1914,12 +2025,36 @@ func (s SqlChannelStore) MigrateChannelMembers(fromChannelId string, fromUserId func (s SqlChannelStore) ResetAllChannelSchemes() store.StoreChannel { return store.Do(func(result *store.StoreResult) { - if _, err := s.GetMaster().Exec("UPDATE Channels SET SchemeId=''"); err != nil { - result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.app_error", nil, err.Error(), http.StatusInternalServerError) + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + *result = s.resetAllChannelSchemesT(transaction) + if result.Err != nil { + transaction.Rollback() + return + } + + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return } }) } +func (s SqlChannelStore) resetAllChannelSchemesT(transaction *gorp.Transaction) store.StoreResult { + result := store.StoreResult{} + + if _, err := transaction.Exec("UPDATE Channels SET SchemeId=''"); err != nil { + result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.app_error", nil, err.Error(), http.StatusInternalServerError) + return result + } + + return result +} + func (s SqlChannelStore) ClearAllCustomRoleAssignments() store.StoreChannel { return store.Do(func(result *store.StoreResult) { builtInRoles := model.MakeDefaultRoles() @@ -1991,19 +2126,43 @@ func (s SqlChannelStore) ClearAllCustomRoleAssignments() store.StoreChannel { func (s SqlChannelStore) ResetLastPostAt() store.StoreChannel { return store.Do(func(result *store.StoreResult) { - var query string - if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { - query = "UPDATE Channels SET LastPostAt = COALESCE((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);" - } else { - query = "UPDATE Channels SET LastPostAt = IFNULL((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);" + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + *result = s.resetLastPostAtT(transaction) + if result.Err != nil { + transaction.Rollback() + return } - if _, err := s.GetMaster().Exec(query); err != nil { - result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.app_error", nil, err.Error(), http.StatusInternalServerError) + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return } }) } +func (s SqlChannelStore) resetLastPostAtT(transaction *gorp.Transaction) store.StoreResult { + result := store.StoreResult{} + + var query string + if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { + query = "UPDATE Channels SET LastPostAt = COALESCE((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);" + } else { + query = "UPDATE Channels SET LastPostAt = IFNULL((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);" + } + + if _, err := transaction.Exec(query); err != nil { + result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.app_error", nil, err.Error(), http.StatusInternalServerError) + return result + } + + return result +} + func (s SqlChannelStore) EnableExperimentalPublicChannelsMaterialization() { // See SqlChannelStoreExperimental } diff --git a/store/sqlstore/channel_store_experimental.go b/store/sqlstore/channel_store_experimental.go index 67576ddc1..7cf142aba 100644 --- a/store/sqlstore/channel_store_experimental.go +++ b/store/sqlstore/channel_store_experimental.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" + "github.com/mattermost/gorp" "github.com/mattermost/mattermost-server/einterfaces" "github.com/mattermost/mattermost-server/mlog" "github.com/mattermost/mattermost-server/model" @@ -65,8 +66,8 @@ func NewSqlChannelStoreExperimental(sqlStore SqlStore, metrics einterfaces.Metri return s } -// migratePublicChannels initializes the PublicChannels table with data created before the triggers -// took over keeping it up-to-date. +// migratePublicChannels initializes the PublicChannels table with data created before this version +// of the Mattermost server kept it up-to-date. func (s SqlChannelStoreExperimental) MigratePublicChannels() error { if !s.IsExperimentalPublicChannelsMaterializationEnabled() { return s.SqlChannelStore.MigratePublicChannels() @@ -100,74 +101,13 @@ func (s SqlChannelStoreExperimental) MigratePublicChannels() error { return nil } -// DropPublicChannels removes the public channels table and all associated triggers. +// DropPublicChannels removes the public channels table. func (s SqlChannelStoreExperimental) DropPublicChannels() error { - // Only PostgreSQL will honour the transaction when executing the DDL changes below. - transaction, err := s.GetMaster().Begin() - if err != nil { - return err - } - - if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { - if _, err := transaction.Exec(` - DROP TRIGGER IF EXISTS trigger_channels ON Channels - `); err != nil { - return err - } - if _, err := transaction.Exec(` - DROP FUNCTION IF EXISTS channels_copy_to_public_channels - `); err != nil { - return err - } - } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL { - if _, err := transaction.Exec(` - DROP TRIGGER IF EXISTS trigger_channels_insert - `); err != nil { - return err - } - if _, err := transaction.Exec(` - DROP TRIGGER IF EXISTS trigger_channels_update - `); err != nil { - return err - } - if _, err := transaction.Exec(` - DROP TRIGGER IF EXISTS trigger_channels_delete - `); err != nil { - return err - } - } else if s.DriverName() == model.DATABASE_DRIVER_SQLITE { - if _, err := transaction.Exec(` - DROP TRIGGER IF EXISTS trigger_channels_insert - `); err != nil { - return err - } - if _, err := transaction.Exec(` - DROP TRIGGER IF EXISTS trigger_channels_update_delete - `); err != nil { - return err - } - if _, err := transaction.Exec(` - DROP TRIGGER IF EXISTS trigger_channels_update - `); err != nil { - return err - } - if _, err := transaction.Exec(` - DROP TRIGGER IF EXISTS trigger_channels_delete - `); err != nil { - return err - } - } else { - return errors.New("failed to create trigger because of missing driver") - } - - if _, err := transaction.Exec(` + _, err := s.GetMaster().Exec(` DROP TABLE IF EXISTS PublicChannels - `); err != nil { - return err - } - - if err := transaction.Commit(); err != nil { - return err + `) + if err != nil { + return errors.Wrap(err, "failed to drop public channels table") } return nil @@ -190,251 +130,271 @@ func (s SqlChannelStoreExperimental) CreateIndexesIfNotExists() { s.CreateFullTextIndexIfNotExists("idx_publicchannels_search_txt", "PublicChannels", "Name, DisplayName, Purpose") } -func (s SqlChannelStoreExperimental) CreateTriggersIfNotExists() error { - s.SqlChannelStore.CreateTriggersIfNotExists() +func (s SqlChannelStoreExperimental) upsertPublicChannelT(transaction *gorp.Transaction, channel *model.Channel) error { + publicChannel := &publicChannel{ + Id: channel.Id, + DeleteAt: channel.DeleteAt, + TeamId: channel.TeamId, + DisplayName: channel.DisplayName, + Name: channel.Name, + Header: channel.Header, + Purpose: channel.Purpose, + } + + if channel.Type != model.CHANNEL_OPEN { + if _, err := transaction.Delete(publicChannel); err != nil { + return errors.Wrap(err, "failed to delete public channel") + } - if !s.IsExperimentalPublicChannelsMaterializationEnabled() { return nil } - if s.DriverName() == model.DATABASE_DRIVER_POSTGRES { - if !s.DoesTriggerExist("trigger_channels") { - transaction, err := s.GetMaster().Begin() - if err != nil { - return errors.Wrap(err, "failed to create trigger function") - } + if s.DriverName() == model.DATABASE_DRIVER_MYSQL { + // Leverage native upsert for MySQL, since RowsAffected returns 0 if the row exists + // but no changes were made, breaking the update-then-insert paradigm below when + // the row already exists. (Postgres 9.4 doesn't support native upsert.) + if _, err := transaction.Exec(` + INSERT INTO + PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) + VALUES + (:Id, :DeleteAt, :TeamId, :DisplayName, :Name, :Header, :Purpose) + ON DUPLICATE KEY UPDATE + DeleteAt = :DeleteAt, + TeamId = :TeamId, + DisplayName = :DisplayName, + Name = :Name, + Header = :Header, + Purpose = :Purpose; + `, map[string]interface{}{ + "Id": publicChannel.Id, + "DeleteAt": publicChannel.DeleteAt, + "TeamId": publicChannel.TeamId, + "DisplayName": publicChannel.DisplayName, + "Name": publicChannel.Name, + "Header": publicChannel.Header, + "Purpose": publicChannel.Purpose, + }); err != nil { + return errors.Wrap(err, "failed to insert public channel") + } + } else { + count, err := transaction.Update(publicChannel) + if err != nil { + return errors.Wrap(err, "failed to update public channel") + } + if count > 0 { + return nil + } - if _, err := transaction.ExecNoTimeout(` - CREATE OR REPLACE FUNCTION channels_copy_to_public_channels() RETURNS TRIGGER - SECURITY DEFINER - LANGUAGE plpgsql - AS $$ - DECLARE - counter int := 0; - BEGIN - IF (TG_OP = 'DELETE' AND OLD.Type = 'O') OR (TG_OP = 'UPDATE' AND NEW.Type != 'O') THEN - DELETE FROM - PublicChannels - WHERE - Id = OLD.Id; - ELSEIF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') AND NEW.Type = 'O' THEN - UPDATE - PublicChannels - SET - DeleteAt = NEW.DeleteAt, - TeamId = NEW.TeamId, - DisplayName = NEW.DisplayName, - Name = NEW.Name, - Header = NEW.Header, - Purpose = NEW.Purpose - WHERE - Id = NEW.Id; - - -- There's a race condition here where the INSERT might fail, though this should only occur - -- if PublicChannels had been modified outside of the triggers. We could improve this with - -- the UPSERT functionality in Postgres 9.5+ once we support same. - IF NOT FOUND THEN - INSERT INTO - PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) - VALUES - (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose); - END IF; - END IF; - - RETURN NULL; - END - $$; - `); err != nil { - return errors.Wrap(err, "failed to create trigger function") - } + if err := transaction.Insert(publicChannel); err != nil { + return errors.Wrap(err, "failed to insert public channel") + } + } - if _, err := transaction.ExecNoTimeout(` - CREATE TRIGGER - trigger_channels - AFTER INSERT OR UPDATE OR DELETE ON - Channels - FOR EACH ROW EXECUTE PROCEDURE - channels_copy_to_public_channels(); - `); err != nil { - return errors.Wrap(err, "failed to create trigger") - } + return nil +} - if err := transaction.Commit(); err != nil { - return errors.Wrap(err, "failed to create trigger function") - } +func (s SqlChannelStoreExperimental) Save(channel *model.Channel, maxChannelsPerTeam int64) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.Save(channel, maxChannelsPerTeam) + } + + return store.Do(func(result *store.StoreResult) { + if channel.DeleteAt != 0 { + result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.archived_channel.app_error", nil, "", http.StatusBadRequest) + return } - } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL { - // Note that DDL statements in MySQL (CREATE TABLE, CREATE TRIGGER, etc.) cannot - // be rolled back inside a transaction (unlike PostgreSQL), so there's no point in - // wrapping what follows inside a transaction. - - if !s.DoesTriggerExist("trigger_channels_insert") { - if _, err := s.GetMaster().ExecNoTimeout(` - CREATE TRIGGER - trigger_channels_insert - AFTER INSERT ON - Channels - FOR EACH ROW - BEGIN - IF NEW.Type = 'O' THEN - INSERT INTO - PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) - VALUES - (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose) - ON DUPLICATE KEY UPDATE - DeleteAt = NEW.DeleteAt, - TeamId = NEW.TeamId, - DisplayName = NEW.DisplayName, - Name = NEW.Name, - Header = NEW.Header, - Purpose = NEW.Purpose; - END IF; - END; - `); err != nil { - return errors.Wrap(err, "failed to create trigger_channels_insert trigger") - } + + if channel.Type == model.CHANNEL_DIRECT { + result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.direct_channel.app_error", nil, "", http.StatusBadRequest) + return } - if !s.DoesTriggerExist("trigger_channels_update") { - if _, err := s.GetMaster().ExecNoTimeout(` - CREATE TRIGGER - trigger_channels_update - AFTER UPDATE ON - Channels - FOR EACH ROW - BEGIN - IF OLD.Type = 'O' AND NEW.Type != 'O' THEN - DELETE FROM - PublicChannels - WHERE - Id = NEW.Id; - ELSEIF NEW.Type = 'O' THEN - INSERT INTO - PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) - VALUES - (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose) - ON DUPLICATE KEY UPDATE - DeleteAt = NEW.DeleteAt, - TeamId = NEW.TeamId, - DisplayName = NEW.DisplayName, - Name = NEW.Name, - Header = NEW.Header, - Purpose = NEW.Purpose; - END IF; - END; - `); err != nil { - return errors.Wrap(err, "failed to create trigger_channels_update trigger") - } + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return } - if !s.DoesTriggerExist("trigger_channels_delete") { - if _, err := s.GetMaster().ExecNoTimeout(` - CREATE TRIGGER - trigger_channels_delete - AFTER DELETE ON - Channels - FOR EACH ROW - BEGIN - IF OLD.Type = 'O' THEN - DELETE FROM - PublicChannels - WHERE - Id = OLD.Id; - END IF; - END; - `); err != nil { - return errors.Wrap(err, "failed to create trigger_channels_delete trigger") - } + *result = s.saveChannelT(transaction, channel, maxChannelsPerTeam) + if result.Err != nil { + transaction.Rollback() + return } - } else if s.DriverName() == model.DATABASE_DRIVER_SQLITE { - if _, err := s.GetMaster().ExecNoTimeout(` - CREATE TRIGGER IF NOT EXISTS - trigger_channels_insert - AFTER INSERT ON - Channels - FOR EACH ROW - WHEN NEW.Type = 'O' - BEGIN - -- Ideally, we'd leverage ON CONFLICT DO UPDATE below and make this INSERT resilient to pre-existing - -- data. However, the version of Sqlite we're compiling against doesn't support this. This isn't - -- critical, though, since we don't support Sqlite in production. - INSERT INTO - PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose) - VALUES - (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose); - END; - `); err != nil { - return errors.Wrap(err, "failed to create trigger_channels_insert trigger") - } - - if _, err := s.GetMaster().ExecNoTimeout(` - CREATE TRIGGER IF NOT EXISTS - trigger_channels_update_delete - AFTER UPDATE ON - Channels - FOR EACH ROW - WHEN - OLD.Type = 'O' - AND NEW.Type != 'O' - BEGIN - DELETE FROM - PublicChannels - WHERE - Id = NEW.Id; - END; - `); err != nil { - return errors.Wrap(err, "failed to create trigger_channels_update_delete trigger") + + // Additionally propagate the write to the PublicChannels table. + if err := s.upsertPublicChannelT(transaction, result.Data.(*model.Channel)); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.upsert_public_channel.app_error", nil, err.Error(), http.StatusInternalServerError) + return } - if _, err := s.GetMaster().ExecNoTimeout(` - CREATE TRIGGER IF NOT EXISTS - trigger_channels_update - AFTER UPDATE ON - Channels - FOR EACH ROW - WHEN - OLD.Type != 'O' - AND NEW.Type = 'O' - BEGIN - -- See comments re: ON CONFLICT DO UPDATE above that would apply here as well. - UPDATE - PublicChannels - SET - DeleteAt = NEW.DeleteAt, - TeamId = NEW.TeamId, - DisplayName = NEW.DisplayName, - Name = NEW.Name, - Header = NEW.Header, - Purpose = NEW.Purpose - WHERE - Id = NEW.Id; - END; - `); err != nil { - return errors.Wrap(err, "failed to create trigger_channels_update trigger") + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return } + }) +} - if _, err := s.GetMaster().ExecNoTimeout(` - CREATE TRIGGER IF NOT EXISTS - trigger_channels_delete - AFTER UPDATE ON - Channels - FOR EACH ROW - WHEN - OLD.Type = 'O' - BEGIN - DELETE FROM - PublicChannels - WHERE - Id = OLD.Id; - END; - `); err != nil { - return errors.Wrap(err, "failed to create trigger_channels_delete trigger") +func (s SqlChannelStoreExperimental) Update(channel *model.Channel) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.Update(channel) + } + + return store.Do(func(result *store.StoreResult) { + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.Update", "store.sql_channel.update.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return } - } else { - return errors.New("failed to create trigger because of missing driver") + + *result = s.updateChannelT(transaction, channel) + if result.Err != nil { + transaction.Rollback() + return + } + + // Additionally propagate the write to the PublicChannels table. + if err := s.upsertPublicChannelT(transaction, result.Data.(*model.Channel)); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStoreExperimental.Update", "store.sql_channel.update.upsert_public_channel.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.Update", "store.sql_channel.update.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + }) +} + +func (s SqlChannelStoreExperimental) Delete(channelId string, time int64) store.StoreChannel { + // Call the experimental version first. + return s.SetDeleteAt(channelId, time, time) +} + +func (s SqlChannelStoreExperimental) Restore(channelId string, time int64) store.StoreChannel { + // Call the experimental version first. + return s.SetDeleteAt(channelId, 0, time) +} + +func (s SqlChannelStoreExperimental) SetDeleteAt(channelId string, deleteAt, updateAt int64) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.SetDeleteAt(channelId, deleteAt, updateAt) } - return nil + return store.Do(func(result *store.StoreResult) { + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.SetDeleteAt", "store.sql_channel.set_delete_at.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + *result = s.setDeleteAtT(transaction, channelId, deleteAt, updateAt) + if result.Err != nil { + transaction.Rollback() + return + } + + // Additionally propagate the write to the PublicChannels table. + if _, err := transaction.Exec(` + UPDATE + PublicChannels + SET + DeleteAt = :DeleteAt + WHERE + Id = :ChannelId + `, map[string]interface{}{ + "DeleteAt": deleteAt, + "ChannelId": channelId, + }); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStoreExperimental.SetDeleteAt", "store.sql_channel.set_delete_at.update_public_channel.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError) + return + } + + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.SetDeleteAt", "store.sql_channel.set_delete_at.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + }) +} + +func (s SqlChannelStoreExperimental) PermanentDeleteByTeam(teamId string) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.PermanentDeleteByTeam(teamId) + } + + return store.Do(func(result *store.StoreResult) { + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + *result = s.permanentDeleteByTeamtT(transaction, teamId) + if result.Err != nil { + transaction.Rollback() + return + } + + // Additionally propagate the deletions to the PublicChannels table. + if _, err := transaction.Exec(` + DELETE FROM + PublicChannels + WHERE + TeamId = :TeamId + `, map[string]interface{}{ + "TeamId": teamId, + }); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDeleteByTeamt", "store.sql_channel.permanent_delete_by_team.delete_public_channels.app_error", nil, "team_id="+teamId+", "+err.Error(), http.StatusInternalServerError) + return + } + + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + }) +} + +func (s SqlChannelStoreExperimental) PermanentDelete(channelId string) store.StoreChannel { + if !s.IsExperimentalPublicChannelsMaterializationEnabled() { + return s.SqlChannelStore.PermanentDelete(channelId) + } + + return store.Do(func(result *store.StoreResult) { + transaction, err := s.GetMaster().Begin() + if err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDelete", "store.sql_channel.permanent_delete.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + *result = s.permanentDeleteT(transaction, channelId) + if result.Err != nil { + transaction.Rollback() + return + } + + // Additionally propagate the deletion to the PublicChannels table. + if _, err := transaction.Exec(` + DELETE FROM + PublicChannels + WHERE + Id = :ChannelId + `, map[string]interface{}{ + "ChannelId": channelId, + }); err != nil { + transaction.Rollback() + result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDelete", "store.sql_channel.permanent_delete.delete_public_channel.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError) + return + } + + if err := transaction.Commit(); err != nil { + result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDelete", "store.sql_channel.permanent_delete.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + }) } func (s SqlChannelStoreExperimental) GetMoreChannels(teamId string, userId string, offset int, limit int) store.StoreChannel { diff --git a/store/sqlstore/supplier.go b/store/sqlstore/supplier.go index d1d7564f7..11216dd25 100644 --- a/store/sqlstore/supplier.go +++ b/store/sqlstore/supplier.go @@ -157,15 +157,6 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter os.Exit(EXIT_CREATE_TABLE) } - // This store's triggers should exist before the migration is run to ensure the - // corresponding tables stay in sync. Whether or not a trigger should be created before - // or after a migration is likely to be decided on a case-by-case basis. - if err := supplier.oldStores.channel.(*SqlChannelStoreExperimental).CreateTriggersIfNotExists(); err != nil { - mlog.Critical("Error creating triggers", mlog.Err(err)) - time.Sleep(time.Second) - os.Exit(EXIT_GENERIC_FAILURE) - } - UpgradeDatabase(supplier) supplier.oldStores.team.(*SqlTeamStore).CreateIndexesIfNotExists() -- cgit v1.2.3-1-g7c22