summaryrefslogtreecommitdiffstats
path: root/store
diff options
context:
space:
mode:
authorJesse Hallam <jesse.hallam@gmail.com>2018-09-17 16:25:19 -0400
committerChristopher Speller <crspeller@gmail.com>2018-09-17 13:25:19 -0700
commit6c826e765f2ea38a68628f548c6de068019e3281 (patch)
treebc57008392c23705eb4773e3f7a3320dcba9f734 /store
parent6c2a5555b85bd15106df5f4f631bb6e945a187b8 (diff)
downloadchat-6c826e765f2ea38a68628f548c6de068019e3281.tar.gz
chat-6c826e765f2ea38a68628f548c6de068019e3281.tar.bz2
chat-6c826e765f2ea38a68628f548c6de068019e3281.zip
materialize PublicChannels without triggers (#9424)
Creating triggers requires SUPERUSER privileges, and is especially painful on RDS. Pivot to maintaining this denormalized table in code.
Diffstat (limited to 'store')
-rw-r--r--store/sqlstore/channel_store.go247
-rw-r--r--store/sqlstore/channel_store_experimental.go546
-rw-r--r--store/sqlstore/supplier.go9
3 files changed, 456 insertions, 346 deletions
diff --git a/store/sqlstore/channel_store.go b/store/sqlstore/channel_store.go
index c0c1d2c8a..b4d3bd8fa 100644
--- a/store/sqlstore/channel_store.go
+++ b/store/sqlstore/channel_store.go
@@ -301,11 +301,6 @@ func (s SqlChannelStore) CreateIndexesIfNotExists() {
s.CreateFullTextIndexIfNotExists("idx_channel_search_txt", "Channels", "Name, DisplayName, Purpose")
}
-func (s SqlChannelStore) CreateTriggersIfNotExists() error {
- // See SqlChannelStoreExperimental
- return nil
-}
-
func (s SqlChannelStore) MigratePublicChannels() error {
// See SqlChannelStoreExperimental
return nil
@@ -316,6 +311,9 @@ func (s SqlChannelStore) DropPublicChannels() error {
return nil
}
+// Save writes the (non-direct) channel channel to the database.
+//
+// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) Save(channel *model.Channel, maxChannelsPerTeam int64) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if channel.DeleteAt != 0 {
@@ -474,42 +472,68 @@ func (s SqlChannelStore) saveChannelT(transaction *gorp.Transaction, channel *mo
return result
}
+// Update writes the updated channel to the database.
+//
+// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) Update(channel *model.Channel) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
- channel.PreUpdate()
-
- if channel.DeleteAt != 0 {
- result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.archived_channel.app_error", nil, "", http.StatusBadRequest)
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
- if result.Err = channel.IsValid(); result.Err != nil {
+ *result = s.updateChannelT(transaction, channel)
+ if result.Err != nil {
+ transaction.Rollback()
return
}
- count, err := s.GetMaster().Update(channel)
- if err != nil {
- if IsUniqueConstraintError(err, []string{"Name", "channels_name_teamid_key"}) {
- dupChannel := model.Channel{}
- s.GetReplica().SelectOne(&dupChannel, "SELECT * FROM Channels WHERE TeamId = :TeamId AND Name= :Name AND DeleteAt > 0", map[string]interface{}{"TeamId": channel.TeamId, "Name": channel.Name})
- if dupChannel.DeleteAt > 0 {
- result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.previously.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest)
- return
- }
- result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.exists.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest)
- return
- }
- result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.updating.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusInternalServerError)
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
+ })
+}
- if count != 1 {
- result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.app_error", nil, "id="+channel.Id, http.StatusInternalServerError)
- return
+func (s SqlChannelStore) updateChannelT(transaction *gorp.Transaction, channel *model.Channel) store.StoreResult {
+ result := store.StoreResult{}
+
+ channel.PreUpdate()
+
+ if channel.DeleteAt != 0 {
+ result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.archived_channel.app_error", nil, "", http.StatusBadRequest)
+ return result
+ }
+
+ if result.Err = channel.IsValid(); result.Err != nil {
+ return result
+ }
+
+ count, err := transaction.Update(channel)
+ if err != nil {
+ if IsUniqueConstraintError(err, []string{"Name", "channels_name_teamid_key"}) {
+ dupChannel := model.Channel{}
+ s.GetReplica().SelectOne(&dupChannel, "SELECT * FROM Channels WHERE TeamId = :TeamId AND Name= :Name AND DeleteAt > 0", map[string]interface{}{"TeamId": channel.TeamId, "Name": channel.Name})
+ if dupChannel.DeleteAt > 0 {
+ result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.previously.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest)
+ return result
+ }
+ result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.exists.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest)
+ return result
}
+ result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.updating.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusInternalServerError)
+ return result
+ }
- result.Data = channel
- })
+ if count != 1 {
+ result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.app_error", nil, "id="+channel.Id, http.StatusInternalServerError)
+ return result
+ }
+
+ result.Data = channel
+
+ return result
}
func (s SqlChannelStore) GetChannelUnread(channelId, userId string) store.StoreChannel {
@@ -617,39 +641,126 @@ func (s SqlChannelStore) get(id string, master bool, allowFromCache bool) store.
})
}
+// Delete records the given deleted timestamp to the channel in question.
+//
+// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) Delete(channelId string, time int64) store.StoreChannel {
return s.SetDeleteAt(channelId, time, time)
}
+// Restore reverts a previous deleted timestamp from the channel in question.
+//
+// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) Restore(channelId string, time int64) store.StoreChannel {
return s.SetDeleteAt(channelId, 0, time)
}
-func (s SqlChannelStore) SetDeleteAt(channelId string, deleteAt int64, updateAt int64) store.StoreChannel {
+// SetDeleteAt records the given deleted and updated timestamp to the channel in question.
+//
+// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
+func (s SqlChannelStore) SetDeleteAt(channelId string, deleteAt, updateAt int64) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
- _, err := s.GetMaster().Exec("Update Channels SET DeleteAt = :DeleteAt, UpdateAt = :UpdateAt WHERE Id = :ChannelId", map[string]interface{}{"DeleteAt": deleteAt, "UpdateAt": updateAt, "ChannelId": channelId})
+ transaction, err := s.GetMaster().Begin()
if err != nil {
- result.Err = model.NewAppError("SqlChannelStore.Delete", "store.sql_channel.delete.channel.app_error", nil, "id="+channelId+", err="+err.Error(), http.StatusInternalServerError)
+ result.Err = model.NewAppError("SqlChannelStore.SetDeleteAt", "store.sql_channel.set_delete_at.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ *result = s.setDeleteAtT(transaction, channelId, deleteAt, updateAt)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
+ }
+
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.SetDeleteAt", "store.sql_channel.set_delete_at.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
}
})
}
+func (s SqlChannelStore) setDeleteAtT(transaction *gorp.Transaction, channelId string, deleteAt, updateAt int64) store.StoreResult {
+ result := store.StoreResult{}
+
+ _, err := transaction.Exec("Update Channels SET DeleteAt = :DeleteAt, UpdateAt = :UpdateAt WHERE Id = :ChannelId", map[string]interface{}{"DeleteAt": deleteAt, "UpdateAt": updateAt, "ChannelId": channelId})
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.Delete", "store.sql_channel.delete.channel.app_error", nil, "id="+channelId+", err="+err.Error(), http.StatusInternalServerError)
+ return result
+ }
+
+ return result
+}
+
+// PermanentDeleteByTeam removes all channels for the given team from the database.
+//
+// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) PermanentDeleteByTeam(teamId string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
- if _, err := s.GetMaster().Exec("DELETE FROM Channels WHERE TeamId = :TeamId", map[string]interface{}{"TeamId": teamId}); err != nil {
- result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.app_error", nil, "teamId="+teamId+", "+err.Error(), http.StatusInternalServerError)
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ *result = s.permanentDeleteByTeamtT(transaction, teamId)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
+ }
+
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
}
})
}
+func (s SqlChannelStore) permanentDeleteByTeamtT(transaction *gorp.Transaction, teamId string) store.StoreResult {
+ result := store.StoreResult{}
+
+ if _, err := transaction.Exec("DELETE FROM Channels WHERE TeamId = :TeamId", map[string]interface{}{"TeamId": teamId}); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.app_error", nil, "teamId="+teamId+", "+err.Error(), http.StatusInternalServerError)
+ return result
+ }
+
+ return result
+}
+
+// PermanentDelete removes the given channel from the database.
+//
+// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) PermanentDelete(channelId string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
- if _, err := s.GetMaster().Exec("DELETE FROM Channels WHERE Id = :ChannelId", map[string]interface{}{"ChannelId": channelId}); err != nil {
- result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError)
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ *result = s.permanentDeleteT(transaction, channelId)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
+ }
+
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
}
})
}
+func (s SqlChannelStore) permanentDeleteT(transaction *gorp.Transaction, channelId string) store.StoreResult {
+ result := store.StoreResult{}
+
+ if _, err := transaction.Exec("DELETE FROM Channels WHERE Id = :ChannelId", map[string]interface{}{"ChannelId": channelId}); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError)
+ return result
+ }
+
+ return result
+}
+
func (s SqlChannelStore) PermanentDeleteMembersByChannel(channelId string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
_, err := s.GetMaster().Exec("DELETE FROM ChannelMembers WHERE ChannelId = :ChannelId", map[string]interface{}{"ChannelId": channelId})
@@ -1914,12 +2025,36 @@ func (s SqlChannelStore) MigrateChannelMembers(fromChannelId string, fromUserId
func (s SqlChannelStore) ResetAllChannelSchemes() store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
- if _, err := s.GetMaster().Exec("UPDATE Channels SET SchemeId=''"); err != nil {
- result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.app_error", nil, err.Error(), http.StatusInternalServerError)
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ *result = s.resetAllChannelSchemesT(transaction)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
+ }
+
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
}
})
}
+func (s SqlChannelStore) resetAllChannelSchemesT(transaction *gorp.Transaction) store.StoreResult {
+ result := store.StoreResult{}
+
+ if _, err := transaction.Exec("UPDATE Channels SET SchemeId=''"); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return result
+ }
+
+ return result
+}
+
func (s SqlChannelStore) ClearAllCustomRoleAssignments() store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
builtInRoles := model.MakeDefaultRoles()
@@ -1991,19 +2126,43 @@ func (s SqlChannelStore) ClearAllCustomRoleAssignments() store.StoreChannel {
func (s SqlChannelStore) ResetLastPostAt() store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
- var query string
- if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
- query = "UPDATE Channels SET LastPostAt = COALESCE((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);"
- } else {
- query = "UPDATE Channels SET LastPostAt = IFNULL((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);"
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ *result = s.resetLastPostAtT(transaction)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
}
- if _, err := s.GetMaster().Exec(query); err != nil {
- result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.app_error", nil, err.Error(), http.StatusInternalServerError)
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
}
})
}
+func (s SqlChannelStore) resetLastPostAtT(transaction *gorp.Transaction) store.StoreResult {
+ result := store.StoreResult{}
+
+ var query string
+ if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
+ query = "UPDATE Channels SET LastPostAt = COALESCE((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);"
+ } else {
+ query = "UPDATE Channels SET LastPostAt = IFNULL((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);"
+ }
+
+ if _, err := transaction.Exec(query); err != nil {
+ result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return result
+ }
+
+ return result
+}
+
func (s SqlChannelStore) EnableExperimentalPublicChannelsMaterialization() {
// See SqlChannelStoreExperimental
}
diff --git a/store/sqlstore/channel_store_experimental.go b/store/sqlstore/channel_store_experimental.go
index 67576ddc1..7cf142aba 100644
--- a/store/sqlstore/channel_store_experimental.go
+++ b/store/sqlstore/channel_store_experimental.go
@@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
+ "github.com/mattermost/gorp"
"github.com/mattermost/mattermost-server/einterfaces"
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
@@ -65,8 +66,8 @@ func NewSqlChannelStoreExperimental(sqlStore SqlStore, metrics einterfaces.Metri
return s
}
-// migratePublicChannels initializes the PublicChannels table with data created before the triggers
-// took over keeping it up-to-date.
+// migratePublicChannels initializes the PublicChannels table with data created before this version
+// of the Mattermost server kept it up-to-date.
func (s SqlChannelStoreExperimental) MigratePublicChannels() error {
if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return s.SqlChannelStore.MigratePublicChannels()
@@ -100,74 +101,13 @@ func (s SqlChannelStoreExperimental) MigratePublicChannels() error {
return nil
}
-// DropPublicChannels removes the public channels table and all associated triggers.
+// DropPublicChannels removes the public channels table.
func (s SqlChannelStoreExperimental) DropPublicChannels() error {
- // Only PostgreSQL will honour the transaction when executing the DDL changes below.
- transaction, err := s.GetMaster().Begin()
- if err != nil {
- return err
- }
-
- if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
- if _, err := transaction.Exec(`
- DROP TRIGGER IF EXISTS trigger_channels ON Channels
- `); err != nil {
- return err
- }
- if _, err := transaction.Exec(`
- DROP FUNCTION IF EXISTS channels_copy_to_public_channels
- `); err != nil {
- return err
- }
- } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
- if _, err := transaction.Exec(`
- DROP TRIGGER IF EXISTS trigger_channels_insert
- `); err != nil {
- return err
- }
- if _, err := transaction.Exec(`
- DROP TRIGGER IF EXISTS trigger_channels_update
- `); err != nil {
- return err
- }
- if _, err := transaction.Exec(`
- DROP TRIGGER IF EXISTS trigger_channels_delete
- `); err != nil {
- return err
- }
- } else if s.DriverName() == model.DATABASE_DRIVER_SQLITE {
- if _, err := transaction.Exec(`
- DROP TRIGGER IF EXISTS trigger_channels_insert
- `); err != nil {
- return err
- }
- if _, err := transaction.Exec(`
- DROP TRIGGER IF EXISTS trigger_channels_update_delete
- `); err != nil {
- return err
- }
- if _, err := transaction.Exec(`
- DROP TRIGGER IF EXISTS trigger_channels_update
- `); err != nil {
- return err
- }
- if _, err := transaction.Exec(`
- DROP TRIGGER IF EXISTS trigger_channels_delete
- `); err != nil {
- return err
- }
- } else {
- return errors.New("failed to create trigger because of missing driver")
- }
-
- if _, err := transaction.Exec(`
+ _, err := s.GetMaster().Exec(`
DROP TABLE IF EXISTS PublicChannels
- `); err != nil {
- return err
- }
-
- if err := transaction.Commit(); err != nil {
- return err
+ `)
+ if err != nil {
+ return errors.Wrap(err, "failed to drop public channels table")
}
return nil
@@ -190,251 +130,271 @@ func (s SqlChannelStoreExperimental) CreateIndexesIfNotExists() {
s.CreateFullTextIndexIfNotExists("idx_publicchannels_search_txt", "PublicChannels", "Name, DisplayName, Purpose")
}
-func (s SqlChannelStoreExperimental) CreateTriggersIfNotExists() error {
- s.SqlChannelStore.CreateTriggersIfNotExists()
+func (s SqlChannelStoreExperimental) upsertPublicChannelT(transaction *gorp.Transaction, channel *model.Channel) error {
+ publicChannel := &publicChannel{
+ Id: channel.Id,
+ DeleteAt: channel.DeleteAt,
+ TeamId: channel.TeamId,
+ DisplayName: channel.DisplayName,
+ Name: channel.Name,
+ Header: channel.Header,
+ Purpose: channel.Purpose,
+ }
+
+ if channel.Type != model.CHANNEL_OPEN {
+ if _, err := transaction.Delete(publicChannel); err != nil {
+ return errors.Wrap(err, "failed to delete public channel")
+ }
- if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
return nil
}
- if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
- if !s.DoesTriggerExist("trigger_channels") {
- transaction, err := s.GetMaster().Begin()
- if err != nil {
- return errors.Wrap(err, "failed to create trigger function")
- }
+ if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
+ // Leverage native upsert for MySQL, since RowsAffected returns 0 if the row exists
+ // but no changes were made, breaking the update-then-insert paradigm below when
+ // the row already exists. (Postgres 9.4 doesn't support native upsert.)
+ if _, err := transaction.Exec(`
+ INSERT INTO
+ PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
+ VALUES
+ (:Id, :DeleteAt, :TeamId, :DisplayName, :Name, :Header, :Purpose)
+ ON DUPLICATE KEY UPDATE
+ DeleteAt = :DeleteAt,
+ TeamId = :TeamId,
+ DisplayName = :DisplayName,
+ Name = :Name,
+ Header = :Header,
+ Purpose = :Purpose;
+ `, map[string]interface{}{
+ "Id": publicChannel.Id,
+ "DeleteAt": publicChannel.DeleteAt,
+ "TeamId": publicChannel.TeamId,
+ "DisplayName": publicChannel.DisplayName,
+ "Name": publicChannel.Name,
+ "Header": publicChannel.Header,
+ "Purpose": publicChannel.Purpose,
+ }); err != nil {
+ return errors.Wrap(err, "failed to insert public channel")
+ }
+ } else {
+ count, err := transaction.Update(publicChannel)
+ if err != nil {
+ return errors.Wrap(err, "failed to update public channel")
+ }
+ if count > 0 {
+ return nil
+ }
- if _, err := transaction.ExecNoTimeout(`
- CREATE OR REPLACE FUNCTION channels_copy_to_public_channels() RETURNS TRIGGER
- SECURITY DEFINER
- LANGUAGE plpgsql
- AS $$
- DECLARE
- counter int := 0;
- BEGIN
- IF (TG_OP = 'DELETE' AND OLD.Type = 'O') OR (TG_OP = 'UPDATE' AND NEW.Type != 'O') THEN
- DELETE FROM
- PublicChannels
- WHERE
- Id = OLD.Id;
- ELSEIF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') AND NEW.Type = 'O' THEN
- UPDATE
- PublicChannels
- SET
- DeleteAt = NEW.DeleteAt,
- TeamId = NEW.TeamId,
- DisplayName = NEW.DisplayName,
- Name = NEW.Name,
- Header = NEW.Header,
- Purpose = NEW.Purpose
- WHERE
- Id = NEW.Id;
-
- -- There's a race condition here where the INSERT might fail, though this should only occur
- -- if PublicChannels had been modified outside of the triggers. We could improve this with
- -- the UPSERT functionality in Postgres 9.5+ once we support same.
- IF NOT FOUND THEN
- INSERT INTO
- PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
- VALUES
- (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose);
- END IF;
- END IF;
-
- RETURN NULL;
- END
- $$;
- `); err != nil {
- return errors.Wrap(err, "failed to create trigger function")
- }
+ if err := transaction.Insert(publicChannel); err != nil {
+ return errors.Wrap(err, "failed to insert public channel")
+ }
+ }
- if _, err := transaction.ExecNoTimeout(`
- CREATE TRIGGER
- trigger_channels
- AFTER INSERT OR UPDATE OR DELETE ON
- Channels
- FOR EACH ROW EXECUTE PROCEDURE
- channels_copy_to_public_channels();
- `); err != nil {
- return errors.Wrap(err, "failed to create trigger")
- }
+ return nil
+}
- if err := transaction.Commit(); err != nil {
- return errors.Wrap(err, "failed to create trigger function")
- }
+func (s SqlChannelStoreExperimental) Save(channel *model.Channel, maxChannelsPerTeam int64) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.Save(channel, maxChannelsPerTeam)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ if channel.DeleteAt != 0 {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.archived_channel.app_error", nil, "", http.StatusBadRequest)
+ return
}
- } else if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
- // Note that DDL statements in MySQL (CREATE TABLE, CREATE TRIGGER, etc.) cannot
- // be rolled back inside a transaction (unlike PostgreSQL), so there's no point in
- // wrapping what follows inside a transaction.
-
- if !s.DoesTriggerExist("trigger_channels_insert") {
- if _, err := s.GetMaster().ExecNoTimeout(`
- CREATE TRIGGER
- trigger_channels_insert
- AFTER INSERT ON
- Channels
- FOR EACH ROW
- BEGIN
- IF NEW.Type = 'O' THEN
- INSERT INTO
- PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
- VALUES
- (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose)
- ON DUPLICATE KEY UPDATE
- DeleteAt = NEW.DeleteAt,
- TeamId = NEW.TeamId,
- DisplayName = NEW.DisplayName,
- Name = NEW.Name,
- Header = NEW.Header,
- Purpose = NEW.Purpose;
- END IF;
- END;
- `); err != nil {
- return errors.Wrap(err, "failed to create trigger_channels_insert trigger")
- }
+
+ if channel.Type == model.CHANNEL_DIRECT {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.direct_channel.app_error", nil, "", http.StatusBadRequest)
+ return
}
- if !s.DoesTriggerExist("trigger_channels_update") {
- if _, err := s.GetMaster().ExecNoTimeout(`
- CREATE TRIGGER
- trigger_channels_update
- AFTER UPDATE ON
- Channels
- FOR EACH ROW
- BEGIN
- IF OLD.Type = 'O' AND NEW.Type != 'O' THEN
- DELETE FROM
- PublicChannels
- WHERE
- Id = NEW.Id;
- ELSEIF NEW.Type = 'O' THEN
- INSERT INTO
- PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
- VALUES
- (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose)
- ON DUPLICATE KEY UPDATE
- DeleteAt = NEW.DeleteAt,
- TeamId = NEW.TeamId,
- DisplayName = NEW.DisplayName,
- Name = NEW.Name,
- Header = NEW.Header,
- Purpose = NEW.Purpose;
- END IF;
- END;
- `); err != nil {
- return errors.Wrap(err, "failed to create trigger_channels_update trigger")
- }
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
}
- if !s.DoesTriggerExist("trigger_channels_delete") {
- if _, err := s.GetMaster().ExecNoTimeout(`
- CREATE TRIGGER
- trigger_channels_delete
- AFTER DELETE ON
- Channels
- FOR EACH ROW
- BEGIN
- IF OLD.Type = 'O' THEN
- DELETE FROM
- PublicChannels
- WHERE
- Id = OLD.Id;
- END IF;
- END;
- `); err != nil {
- return errors.Wrap(err, "failed to create trigger_channels_delete trigger")
- }
+ *result = s.saveChannelT(transaction, channel, maxChannelsPerTeam)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
}
- } else if s.DriverName() == model.DATABASE_DRIVER_SQLITE {
- if _, err := s.GetMaster().ExecNoTimeout(`
- CREATE TRIGGER IF NOT EXISTS
- trigger_channels_insert
- AFTER INSERT ON
- Channels
- FOR EACH ROW
- WHEN NEW.Type = 'O'
- BEGIN
- -- Ideally, we'd leverage ON CONFLICT DO UPDATE below and make this INSERT resilient to pre-existing
- -- data. However, the version of Sqlite we're compiling against doesn't support this. This isn't
- -- critical, though, since we don't support Sqlite in production.
- INSERT INTO
- PublicChannels(Id, DeleteAt, TeamId, DisplayName, Name, Header, Purpose)
- VALUES
- (NEW.Id, NEW.DeleteAt, NEW.TeamId, NEW.DisplayName, NEW.Name, NEW.Header, NEW.Purpose);
- END;
- `); err != nil {
- return errors.Wrap(err, "failed to create trigger_channels_insert trigger")
- }
-
- if _, err := s.GetMaster().ExecNoTimeout(`
- CREATE TRIGGER IF NOT EXISTS
- trigger_channels_update_delete
- AFTER UPDATE ON
- Channels
- FOR EACH ROW
- WHEN
- OLD.Type = 'O'
- AND NEW.Type != 'O'
- BEGIN
- DELETE FROM
- PublicChannels
- WHERE
- Id = NEW.Id;
- END;
- `); err != nil {
- return errors.Wrap(err, "failed to create trigger_channels_update_delete trigger")
+
+ // Additionally propagate the write to the PublicChannels table.
+ if err := s.upsertPublicChannelT(transaction, result.Data.(*model.Channel)); err != nil {
+ transaction.Rollback()
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.upsert_public_channel.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
}
- if _, err := s.GetMaster().ExecNoTimeout(`
- CREATE TRIGGER IF NOT EXISTS
- trigger_channels_update
- AFTER UPDATE ON
- Channels
- FOR EACH ROW
- WHEN
- OLD.Type != 'O'
- AND NEW.Type = 'O'
- BEGIN
- -- See comments re: ON CONFLICT DO UPDATE above that would apply here as well.
- UPDATE
- PublicChannels
- SET
- DeleteAt = NEW.DeleteAt,
- TeamId = NEW.TeamId,
- DisplayName = NEW.DisplayName,
- Name = NEW.Name,
- Header = NEW.Header,
- Purpose = NEW.Purpose
- WHERE
- Id = NEW.Id;
- END;
- `); err != nil {
- return errors.Wrap(err, "failed to create trigger_channels_update trigger")
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.Save", "store.sql_channel.save.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
}
+ })
+}
- if _, err := s.GetMaster().ExecNoTimeout(`
- CREATE TRIGGER IF NOT EXISTS
- trigger_channels_delete
- AFTER UPDATE ON
- Channels
- FOR EACH ROW
- WHEN
- OLD.Type = 'O'
- BEGIN
- DELETE FROM
- PublicChannels
- WHERE
- Id = OLD.Id;
- END;
- `); err != nil {
- return errors.Wrap(err, "failed to create trigger_channels_delete trigger")
+func (s SqlChannelStoreExperimental) Update(channel *model.Channel) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.Update(channel)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.Update", "store.sql_channel.update.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
}
- } else {
- return errors.New("failed to create trigger because of missing driver")
+
+ *result = s.updateChannelT(transaction, channel)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
+ }
+
+ // Additionally propagate the write to the PublicChannels table.
+ if err := s.upsertPublicChannelT(transaction, result.Data.(*model.Channel)); err != nil {
+ transaction.Rollback()
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.Update", "store.sql_channel.update.upsert_public_channel.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.Update", "store.sql_channel.update.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ })
+}
+
+func (s SqlChannelStoreExperimental) Delete(channelId string, time int64) store.StoreChannel {
+ // Call the experimental version first.
+ return s.SetDeleteAt(channelId, time, time)
+}
+
+func (s SqlChannelStoreExperimental) Restore(channelId string, time int64) store.StoreChannel {
+ // Call the experimental version first.
+ return s.SetDeleteAt(channelId, 0, time)
+}
+
+func (s SqlChannelStoreExperimental) SetDeleteAt(channelId string, deleteAt, updateAt int64) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.SetDeleteAt(channelId, deleteAt, updateAt)
}
- return nil
+ return store.Do(func(result *store.StoreResult) {
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.SetDeleteAt", "store.sql_channel.set_delete_at.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ *result = s.setDeleteAtT(transaction, channelId, deleteAt, updateAt)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
+ }
+
+ // Additionally propagate the write to the PublicChannels table.
+ if _, err := transaction.Exec(`
+ UPDATE
+ PublicChannels
+ SET
+ DeleteAt = :DeleteAt
+ WHERE
+ Id = :ChannelId
+ `, map[string]interface{}{
+ "DeleteAt": deleteAt,
+ "ChannelId": channelId,
+ }); err != nil {
+ transaction.Rollback()
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.SetDeleteAt", "store.sql_channel.set_delete_at.update_public_channel.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.SetDeleteAt", "store.sql_channel.set_delete_at.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ })
+}
+
+func (s SqlChannelStoreExperimental) PermanentDeleteByTeam(teamId string) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.PermanentDeleteByTeam(teamId)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ *result = s.permanentDeleteByTeamtT(transaction, teamId)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
+ }
+
+ // Additionally propagate the deletions to the PublicChannels table.
+ if _, err := transaction.Exec(`
+ DELETE FROM
+ PublicChannels
+ WHERE
+ TeamId = :TeamId
+ `, map[string]interface{}{
+ "TeamId": teamId,
+ }); err != nil {
+ transaction.Rollback()
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDeleteByTeamt", "store.sql_channel.permanent_delete_by_team.delete_public_channels.app_error", nil, "team_id="+teamId+", "+err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ })
+}
+
+func (s SqlChannelStoreExperimental) PermanentDelete(channelId string) store.StoreChannel {
+ if !s.IsExperimentalPublicChannelsMaterializationEnabled() {
+ return s.SqlChannelStore.PermanentDelete(channelId)
+ }
+
+ return store.Do(func(result *store.StoreResult) {
+ transaction, err := s.GetMaster().Begin()
+ if err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDelete", "store.sql_channel.permanent_delete.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ *result = s.permanentDeleteT(transaction, channelId)
+ if result.Err != nil {
+ transaction.Rollback()
+ return
+ }
+
+ // Additionally propagate the deletion to the PublicChannels table.
+ if _, err := transaction.Exec(`
+ DELETE FROM
+ PublicChannels
+ WHERE
+ Id = :ChannelId
+ `, map[string]interface{}{
+ "ChannelId": channelId,
+ }); err != nil {
+ transaction.Rollback()
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDelete", "store.sql_channel.permanent_delete.delete_public_channel.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ if err := transaction.Commit(); err != nil {
+ result.Err = model.NewAppError("SqlChannelStoreExperimental.PermanentDelete", "store.sql_channel.permanent_delete.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ })
}
func (s SqlChannelStoreExperimental) GetMoreChannels(teamId string, userId string, offset int, limit int) store.StoreChannel {
diff --git a/store/sqlstore/supplier.go b/store/sqlstore/supplier.go
index d1d7564f7..11216dd25 100644
--- a/store/sqlstore/supplier.go
+++ b/store/sqlstore/supplier.go
@@ -157,15 +157,6 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter
os.Exit(EXIT_CREATE_TABLE)
}
- // This store's triggers should exist before the migration is run to ensure the
- // corresponding tables stay in sync. Whether or not a trigger should be created before
- // or after a migration is likely to be decided on a case-by-case basis.
- if err := supplier.oldStores.channel.(*SqlChannelStoreExperimental).CreateTriggersIfNotExists(); err != nil {
- mlog.Critical("Error creating triggers", mlog.Err(err))
- time.Sleep(time.Second)
- os.Exit(EXIT_GENERIC_FAILURE)
- }
-
UpgradeDatabase(supplier)
supplier.oldStores.team.(*SqlTeamStore).CreateIndexesIfNotExists()