summaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
authorJonathan <jonfritz@gmail.com>2017-11-30 09:07:04 -0500
committerGitHub <noreply@github.com>2017-11-30 09:07:04 -0500
commit375c0632fab03e3fb54865e320585888499c076d (patch)
treece6cba679337a82370d5a730c428bef21a9964bf /app
parentd0d9ba4a7e43301697d1c9f495930e0c0179fdc3 (diff)
downloadchat-375c0632fab03e3fb54865e320585888499c076d.tar.gz
chat-375c0632fab03e3fb54865e320585888499c076d.tar.bz2
chat-375c0632fab03e3fb54865e320585888499c076d.zip
PLT-7503: Create Message Export Scheduled Task and CLI Command (#7612)
* Created message export scheduled task * Added CLI command to immediately kick off an export job * Added email addresses for users joining and leaving the channel to the export * Added support for both MySQL and PostgreSQL * Fixing gofmt error * Added a new ChannelMemberHistory store and associated tests * Updating the ChannelMemberHistory channel as users create/join/leave channels * Added user email to the message export object so it can be included in the actiance export xml * Don't fail to log a leave event if a corresponding join event wasn't logged * Adding copyright notices * Adding message export settings to daily diagnostics report * Added System Console integration for message export * Cleaned up TODOs * Made batch size configurable * Added export from timestamp to CLI command * Made ChannelMemberHistory table updates best effort * Added a context-based timeout option to the message export CLI * Minor PR updates/improvements * Removed unnecessary fields from MessageExport object to reduce query overhead * Removed JSON functions from the message export query in an effort to optimize performance * Changed the way that channel member history queries and purges work to better account for edge cases * Fixing a test I missed with the last refactor * Added file copy functionality to file backend, improved config validation, added default config values * Fixed file copy tests * More concise use of the testing libraries * Fixed context leak error * Changed default export path to correctly place an 'export' directory under the 'data' directory * Can't delete records from a read replica * Fixed copy file tests * Start job workers when license is applied, if configured to do so * Suggestions from the PR * Moar unit tests * Fixed test imports
Diffstat (limited to 'app')
-rw-r--r--app/app.go19
-rw-r--r--app/channel.go36
-rw-r--r--app/channel_test.go166
-rw-r--r--app/diagnostics.go8
-rw-r--r--app/diagnostics_test.go1
-rw-r--r--app/license.go10
6 files changed, 235 insertions, 5 deletions
diff --git a/app/app.go b/app/app.go
index 7bd4c561b..fd313c9c9 100644
--- a/app/app.go
+++ b/app/app.go
@@ -48,6 +48,7 @@ type App struct {
Elasticsearch einterfaces.ElasticsearchInterface
Emoji einterfaces.EmojiInterface
Ldap einterfaces.LdapInterface
+ MessageExport einterfaces.MessageExportInterface
Metrics einterfaces.MetricsInterface
Mfa einterfaces.MfaInterface
Saml einterfaces.SamlInterface
@@ -198,6 +199,12 @@ func RegisterJobsDataRetentionJobInterface(f func(*App) ejobs.DataRetentionJobIn
jobsDataRetentionJobInterface = f
}
+var jobsMessageExportJobInterface func(*App) ejobs.MessageExportJobInterface
+
+func RegisterJobsMessageExportJobInterface(f func(*App) ejobs.MessageExportJobInterface) {
+ jobsMessageExportJobInterface = f
+}
+
var jobsElasticsearchAggregatorInterface func(*App) ejobs.ElasticsearchAggregatorInterface
func RegisterJobsElasticsearchAggregatorInterface(f func(*App) ejobs.ElasticsearchAggregatorInterface) {
@@ -222,6 +229,12 @@ func RegisterLdapInterface(f func(*App) einterfaces.LdapInterface) {
ldapInterface = f
}
+var messageExportInterface func(*App) einterfaces.MessageExportInterface
+
+func RegisterMessageExportInterface(f func(*App) einterfaces.MessageExportInterface) {
+ messageExportInterface = f
+}
+
var metricsInterface func(*App) einterfaces.MetricsInterface
func RegisterMetricsInterface(f func(*App) einterfaces.MetricsInterface) {
@@ -267,6 +280,9 @@ func (a *App) initEnterprise() {
}
})
}
+ if messageExportInterface != nil {
+ a.MessageExport = messageExportInterface(a)
+ }
if metricsInterface != nil {
a.Metrics = metricsInterface(a)
}
@@ -289,6 +305,9 @@ func (a *App) initJobs() {
if jobsDataRetentionJobInterface != nil {
a.Jobs.DataRetentionJob = jobsDataRetentionJobInterface(a)
}
+ if jobsMessageExportJobInterface != nil {
+ a.Jobs.MessageExportJob = jobsMessageExportJobInterface(a)
+ }
if jobsElasticsearchAggregatorInterface != nil {
a.Jobs.ElasticsearchAggregator = jobsElasticsearchAggregatorInterface(a)
}
diff --git a/app/channel.go b/app/channel.go
index 16c5dd084..caaacea06 100644
--- a/app/channel.go
+++ b/app/channel.go
@@ -49,12 +49,19 @@ func (a *App) JoinDefaultChannels(teamId string, user *model.User, channelRole s
} else {
townSquare := result.Data.(*model.Channel)
- cm := &model.ChannelMember{ChannelId: townSquare.Id, UserId: user.Id,
- Roles: channelRole, NotifyProps: model.GetDefaultChannelNotifyProps()}
+ cm := &model.ChannelMember{
+ ChannelId: townSquare.Id,
+ UserId: user.Id,
+ Roles: channelRole,
+ NotifyProps: model.GetDefaultChannelNotifyProps(),
+ }
if cmResult := <-a.Srv.Store.Channel().SaveMember(cm); cmResult.Err != nil {
err = cmResult.Err
}
+ if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, townSquare.Id, model.GetMillis()); result.Err != nil {
+ l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err)
+ }
if requestor == nil {
if err := a.postJoinChannelMessage(user, townSquare); err != nil {
@@ -74,12 +81,19 @@ func (a *App) JoinDefaultChannels(teamId string, user *model.User, channelRole s
} else {
offTopic := result.Data.(*model.Channel)
- cm := &model.ChannelMember{ChannelId: offTopic.Id, UserId: user.Id,
- Roles: channelRole, NotifyProps: model.GetDefaultChannelNotifyProps()}
+ cm := &model.ChannelMember{
+ ChannelId: offTopic.Id,
+ UserId: user.Id,
+ Roles: channelRole,
+ NotifyProps: model.GetDefaultChannelNotifyProps(),
+ }
if cmResult := <-a.Srv.Store.Channel().SaveMember(cm); cmResult.Err != nil {
err = cmResult.Err
}
+ if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, offTopic.Id, model.GetMillis()); result.Err != nil {
+ l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err)
+ }
if requestor == nil {
if err := a.postJoinChannelMessage(user, offTopic); err != nil {
@@ -158,6 +172,9 @@ func (a *App) CreateChannel(channel *model.Channel, addMember bool) (*model.Chan
if cmresult := <-a.Srv.Store.Channel().SaveMember(cm); cmresult.Err != nil {
return nil, cmresult.Err
}
+ if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(channel.CreatorId, sc.Id, model.GetMillis()); result.Err != nil {
+ l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err)
+ }
a.InvalidateCacheForUser(channel.CreatorId)
}
@@ -302,6 +319,9 @@ func (a *App) createGroupChannel(userIds []string, creatorId string) (*model.Cha
if result := <-a.Srv.Store.Channel().SaveMember(cm); result.Err != nil {
return nil, result.Err
}
+ if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, model.GetMillis()); result.Err != nil {
+ l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err)
+ }
}
return channel, nil
@@ -520,9 +540,12 @@ func (a *App) addUserToChannel(user *model.User, channel *model.Channel, teamMem
l4g.Error("Failed to add member user_id=%v channel_id=%v err=%v", user.Id, channel.Id, result.Err)
return nil, model.NewAppError("AddUserToChannel", "api.channel.add_user.to.channel.failed.app_error", nil, "", http.StatusInternalServerError)
}
-
a.WaitForChannelMembership(channel.Id, user.Id)
+ if result := <-a.Srv.Store.ChannelMemberHistory().LogJoinEvent(user.Id, channel.Id, model.GetMillis()); result.Err != nil {
+ l4g.Warn("Failed to update ChannelMemberHistory table %v", result.Err)
+ }
+
a.InvalidateCacheForUser(user.Id)
a.InvalidateCacheForChannelMembers(channel.Id)
@@ -1069,6 +1092,9 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string,
if cmresult := <-a.Srv.Store.Channel().RemoveMember(channel.Id, userIdToRemove); cmresult.Err != nil {
return cmresult.Err
}
+ if cmhResult := <-a.Srv.Store.ChannelMemberHistory().LogLeaveEvent(userIdToRemove, channel.Id, model.GetMillis()); cmhResult.Err != nil {
+ return cmhResult.Err
+ }
a.InvalidateCacheForUser(userIdToRemove)
a.InvalidateCacheForChannelMembers(channel.Id)
diff --git a/app/channel_test.go b/app/channel_test.go
index 374b20657..d44af467d 100644
--- a/app/channel_test.go
+++ b/app/channel_test.go
@@ -1,9 +1,14 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
package app
import (
"testing"
"github.com/mattermost/mattermost-server/model"
+ "github.com/mattermost/mattermost-server/store"
+ "github.com/stretchr/testify/assert"
)
func TestPermanentDeleteChannel(t *testing.T) {
@@ -104,3 +109,164 @@ func TestMoveChannel(t *testing.T) {
t.Fatal(err)
}
}
+
+func TestJoinDefaultChannelsTownSquare(t *testing.T) {
+ th := Setup().InitBasic()
+ defer th.TearDown()
+
+ // figure out the initial number of users in town square
+ townSquareChannelId := store.Must(th.App.Srv.Store.Channel().GetByName(th.BasicTeam.Id, "town-square", true)).(*model.Channel).Id
+ initialNumTownSquareUsers := len(store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, townSquareChannelId)).([]*model.ChannelMemberHistory))
+
+ // create a new user that joins the default channels
+ user := th.CreateUser()
+ th.App.JoinDefaultChannels(th.BasicTeam.Id, user, model.CHANNEL_USER_ROLE_ID, "")
+
+ // there should be a ChannelMemberHistory record for the user
+ histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, townSquareChannelId)).([]*model.ChannelMemberHistory)
+ assert.Len(t, histories, initialNumTownSquareUsers+1)
+
+ found := false
+ for _, history := range histories {
+ if user.Id == history.UserId && townSquareChannelId == history.ChannelId {
+ found = true
+ break
+ }
+ }
+ assert.True(t, found)
+}
+
+func TestJoinDefaultChannelsOffTopic(t *testing.T) {
+ th := Setup().InitBasic()
+ defer th.TearDown()
+
+ // figure out the initial number of users in off-topic
+ offTopicChannelId := store.Must(th.App.Srv.Store.Channel().GetByName(th.BasicTeam.Id, "off-topic", true)).(*model.Channel).Id
+ initialNumTownSquareUsers := len(store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, offTopicChannelId)).([]*model.ChannelMemberHistory))
+
+ // create a new user that joins the default channels
+ user := th.CreateUser()
+ th.App.JoinDefaultChannels(th.BasicTeam.Id, user, model.CHANNEL_USER_ROLE_ID, "")
+
+ // there should be a ChannelMemberHistory record for the user
+ histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, offTopicChannelId)).([]*model.ChannelMemberHistory)
+ assert.Len(t, histories, initialNumTownSquareUsers+1)
+
+ found := false
+ for _, history := range histories {
+ if user.Id == history.UserId && offTopicChannelId == history.ChannelId {
+ found = true
+ break
+ }
+ }
+ assert.True(t, found)
+}
+
+func TestCreateChannelPublic(t *testing.T) {
+ th := Setup().InitBasic()
+ defer th.TearDown()
+
+ // creates a public channel and adds basic user to it
+ publicChannel := th.createChannel(th.BasicTeam, model.CHANNEL_OPEN)
+
+ // there should be a ChannelMemberHistory record for the user
+ histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, publicChannel.Id)).([]*model.ChannelMemberHistory)
+ assert.Len(t, histories, 1)
+ assert.Equal(t, th.BasicUser.Id, histories[0].UserId)
+ assert.Equal(t, publicChannel.Id, histories[0].ChannelId)
+}
+
+func TestCreateChannelPrivate(t *testing.T) {
+ th := Setup().InitBasic()
+ defer th.TearDown()
+
+ // creates a private channel and adds basic user to it
+ privateChannel := th.createChannel(th.BasicTeam, model.CHANNEL_PRIVATE)
+
+ // there should be a ChannelMemberHistory record for the user
+ histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, privateChannel.Id)).([]*model.ChannelMemberHistory)
+ assert.Len(t, histories, 1)
+ assert.Equal(t, th.BasicUser.Id, histories[0].UserId)
+ assert.Equal(t, privateChannel.Id, histories[0].ChannelId)
+}
+
+func TestCreateGroupChannel(t *testing.T) {
+ th := Setup().InitBasic()
+ defer th.TearDown()
+
+ user1 := th.CreateUser()
+ user2 := th.CreateUser()
+
+ groupUserIds := make([]string, 0)
+ groupUserIds = append(groupUserIds, user1.Id)
+ groupUserIds = append(groupUserIds, user2.Id)
+ groupUserIds = append(groupUserIds, th.BasicUser.Id)
+
+ if channel, err := th.App.CreateGroupChannel(groupUserIds, th.BasicUser.Id); err != nil {
+ t.Fatal("Failed to create group channel. Error: " + err.Message)
+ } else {
+ // there should be a ChannelMemberHistory record for each user
+ histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, channel.Id)).([]*model.ChannelMemberHistory)
+ assert.Len(t, histories, 3)
+
+ channelMemberHistoryUserIds := make([]string, 0)
+ for _, history := range histories {
+ assert.Equal(t, channel.Id, history.ChannelId)
+ channelMemberHistoryUserIds = append(channelMemberHistoryUserIds, history.UserId)
+ }
+ assert.Equal(t, groupUserIds, channelMemberHistoryUserIds)
+ }
+}
+
+func TestAddUserToChannel(t *testing.T) {
+ th := Setup().InitBasic()
+ defer th.TearDown()
+
+ // create a user and add it to a channel
+ user := th.CreateUser()
+ if _, err := th.App.AddTeamMember(th.BasicTeam.Id, user.Id); err != nil {
+ t.Fatal("Failed to add user to team. Error: " + err.Message)
+ }
+
+ groupUserIds := make([]string, 0)
+ groupUserIds = append(groupUserIds, th.BasicUser.Id)
+ groupUserIds = append(groupUserIds, user.Id)
+
+ channel := th.createChannel(th.BasicTeam, model.CHANNEL_OPEN)
+ if _, err := th.App.AddUserToChannel(user, channel); err != nil {
+ t.Fatal("Failed to add user to channel. Error: " + err.Message)
+ }
+
+ // there should be a ChannelMemberHistory record for the user
+ histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, channel.Id)).([]*model.ChannelMemberHistory)
+ assert.Len(t, histories, 2)
+ channelMemberHistoryUserIds := make([]string, 0)
+ for _, history := range histories {
+ assert.Equal(t, channel.Id, history.ChannelId)
+ channelMemberHistoryUserIds = append(channelMemberHistoryUserIds, history.UserId)
+ }
+ assert.Equal(t, groupUserIds, channelMemberHistoryUserIds)
+}
+
+func TestRemoveUserFromChannel(t *testing.T) {
+ th := Setup().InitBasic()
+ defer th.TearDown()
+
+ // a user creates a channel
+ publicChannel := th.createChannel(th.BasicTeam, model.CHANNEL_OPEN)
+ histories := store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, publicChannel.Id)).([]*model.ChannelMemberHistory)
+ assert.Len(t, histories, 1)
+ assert.Equal(t, th.BasicUser.Id, histories[0].UserId)
+ assert.Equal(t, publicChannel.Id, histories[0].ChannelId)
+ assert.Nil(t, histories[0].LeaveTime)
+
+ // the user leaves that channel
+ if err := th.App.LeaveChannel(publicChannel.Id, th.BasicUser.Id); err != nil {
+ t.Fatal("Failed to remove user from channel. Error: " + err.Message)
+ }
+ histories = store.Must(th.App.Srv.Store.ChannelMemberHistory().GetUsersInChannelDuring(model.GetMillis()-100, model.GetMillis()+100, publicChannel.Id)).([]*model.ChannelMemberHistory)
+ assert.Len(t, histories, 1)
+ assert.Equal(t, th.BasicUser.Id, histories[0].UserId)
+ assert.Equal(t, publicChannel.Id, histories[0].ChannelId)
+ assert.NotNil(t, histories[0].LeaveTime)
+}
diff --git a/app/diagnostics.go b/app/diagnostics.go
index 513cf11f5..3f37337ab 100644
--- a/app/diagnostics.go
+++ b/app/diagnostics.go
@@ -44,6 +44,7 @@ const (
TRACK_CONFIG_ELASTICSEARCH = "config_elasticsearch"
TRACK_CONFIG_PLUGIN = "config_plugin"
TRACK_CONFIG_DATA_RETENTION = "config_data_retention"
+ TRACK_CONFIG_MESSAGE_EXPORT = "config_message_export"
TRACK_ACTIVITY = "activity"
TRACK_LICENSE = "license"
@@ -470,6 +471,13 @@ func (a *App) trackConfig() {
"file_retention_days": *cfg.DataRetentionSettings.FileRetentionDays,
"deletion_job_start_time": *cfg.DataRetentionSettings.DeletionJobStartTime,
})
+
+ SendDiagnostic(TRACK_CONFIG_MESSAGE_EXPORT, map[string]interface{}{
+ "enable_message_export": *cfg.MessageExportSettings.EnableExport,
+ "daily_run_time": *cfg.MessageExportSettings.DailyRunTime,
+ "default_export_from_timestamp": *cfg.MessageExportSettings.ExportFromTimestamp,
+ "batch_size": *cfg.MessageExportSettings.BatchSize,
+ })
}
func trackLicense() {
diff --git a/app/diagnostics_test.go b/app/diagnostics_test.go
index 25bc75265..9b884fd43 100644
--- a/app/diagnostics_test.go
+++ b/app/diagnostics_test.go
@@ -135,6 +135,7 @@ func TestDiagnostics(t *testing.T) {
TRACK_CONFIG_PLUGIN,
TRACK_ACTIVITY,
TRACK_SERVER,
+ TRACK_CONFIG_MESSAGE_EXPORT,
TRACK_PLUGINS,
} {
if !strings.Contains(info, item) {
diff --git a/app/license.go b/app/license.go
index 18836c571..cacc71524 100644
--- a/app/license.go
+++ b/app/license.go
@@ -89,6 +89,16 @@ func (a *App) SaveLicense(licenseBytes []byte) (*model.License, *model.AppError)
a.ReloadConfig()
a.InvalidateAllCaches()
+ // start job server if necessary - this handles the edge case where a license file is uploaded, but the job server
+ // doesn't start until the server is restarted, which prevents the 'run job now' buttons in system console from
+ // functioning as expected
+ if *a.Config().JobSettings.RunJobs {
+ a.Jobs.StartWorkers()
+ }
+ if *a.Config().JobSettings.RunScheduler {
+ a.Jobs.StartSchedulers()
+ }
+
return license, nil
}