diff options
34 files changed, 1331 insertions, 151 deletions
diff --git a/.gitignore b/.gitignore index c70239a2b..66e2b5d58 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ logs node_modules /dist /webapp/dist +jobserver npm-debug.log mattermost.mattermost-license config/mattermost.mattermost-license @@ -16,8 +17,7 @@ config/active.dat config/config.json # Enteprise imports file -imports.go -cmd/mattermost/imports.go +imports/imports.go # Build Targets .prebuild @@ -310,7 +310,8 @@ cover: prepare-enterprise: ifeq ($(BUILD_ENTERPRISE_READY),true) @echo Enterprise build selected, preparing - cp $(BUILD_ENTERPRISE_DIR)/imports.go cmd/platform/ + mkdir -p imports/ + cp $(BUILD_ENTERPRISE_DIR)/imports/imports.go imports/ rm -f enterprise ln -s $(BUILD_ENTERPRISE_DIR) enterprise endif @@ -334,6 +335,19 @@ build-client: cd $(BUILD_WEBAPP_DIR) && $(MAKE) build +build-job-server: build-job-server-linux build-job-server-mac build-job-server-windows + +build-job-server-linux: .prebuild prepare-enterprise + @echo Build mattermost job server for Linux amd64 + env GOOS=linux GOARCH=amd64 $(GO) build $(GOFLAGS) $(GO_LINKER_FLAGS) ./jobs/jobserver + +build-job-server-osx: .prebuild prepare-enterprise + @echo Build mattermost job server for OSX amd64 + env GOOS=darwin GOARCH=amd64 $(GO) build $(GOFLAGS) $(GO_LINKER_FLAGS) ./jobs/jobserver + +build-job-server-windows: .prebuild prepare-enterprise + @echo Build mattermost job server for Windows amd64 + env GOOS=windows GOARCH=amd64 $(GO) build $(GOFLAGS) $(GO_LINKER_FLAGS) ./jobs/jobserver package: build build-client @ echo Packaging mattermost @@ -469,6 +483,10 @@ restart-server: | stop-server run-server restart-client: | stop-client run-client +run-job-server: + @echo Running job server for development + $(GO) run $(GOFLAGS) $(GO_LINKER_FLAGS) ./jobs/jobserver/jobserver.go + clean: stop-docker @echo Cleaning @@ -515,12 +533,13 @@ govet: $(GO) vet $(GOFLAGS) ./web || exit 1 ifeq ($(BUILD_ENTERPRISE_READY),true) - $(GO) vet $(GOFLAGS) ./enterprise || exit 1 $(GO) vet $(GOFLAGS) ./enterprise/account_migration || exit 1 $(GO) vet $(GOFLAGS) ./enterprise/brand || exit 1 $(GO) vet $(GOFLAGS) ./enterprise/cluster || exit 1 $(GO) vet $(GOFLAGS) ./enterprise/compliance || exit 1 + $(GO) vet $(GOFLAGS) ./enterprise/data_retention || exit 1 $(GO) vet $(GOFLAGS) ./enterprise/emoji || exit 1 + $(GO) vet $(GOFLAGS) ./enterprise/imports || exit 1 $(GO) vet $(GOFLAGS) ./enterprise/ldap || exit 1 $(GO) vet $(GOFLAGS) ./enterprise/metrics || exit 1 $(GO) vet $(GOFLAGS) ./enterprise/mfa || exit 1 diff --git a/api4/api.go b/api4/api.go index 4ed636593..a636581d7 100644 --- a/api4/api.go +++ b/api4/api.go @@ -81,6 +81,8 @@ type Routes struct { System *mux.Router // 'api/v4/system' + Jobs *mux.Router // 'api/v4/jobs' + Preferences *mux.Router // 'api/v4/users/{user_id:[A-Za-z0-9]+}/preferences' License *mux.Router // 'api/v4/license' @@ -168,6 +170,7 @@ func InitApi(full bool) { BaseRoutes.License = BaseRoutes.ApiRoot.PathPrefix("/license").Subrouter() BaseRoutes.Public = BaseRoutes.ApiRoot.PathPrefix("/public").Subrouter() BaseRoutes.Reactions = BaseRoutes.ApiRoot.PathPrefix("/reactions").Subrouter() + BaseRoutes.Jobs = BaseRoutes.ApiRoot.PathPrefix("/jobs").Subrouter() BaseRoutes.Emojis = BaseRoutes.ApiRoot.PathPrefix("/emoji").Subrouter() BaseRoutes.Emoji = BaseRoutes.Emojis.PathPrefix("/{emoji_id:[A-Za-z0-9]+}").Subrouter() @@ -191,6 +194,7 @@ func InitApi(full bool) { InitCluster() InitLdap() InitBrand() + InitJob() InitCommand() InitStatus() InitWebSocket() diff --git a/api4/context.go b/api4/context.go index 37af2c6d4..8d4ed7f79 100644 --- a/api4/context.go +++ b/api4/context.go @@ -540,3 +540,25 @@ func (c *Context) RequireCommandId() *Context { } return c } + +func (c *Context) RequireJobId() *Context { + if c.Err != nil { + return c + } + + if len(c.Params.JobId) != 26 { + c.SetInvalidUrlParam("job_id") + } + return c +} + +func (c *Context) RequireJobType() *Context { + if c.Err != nil { + return c + } + + if len(c.Params.JobType) == 0 || len(c.Params.JobType) > 32 { + c.SetInvalidUrlParam("job_type") + } + return c +} diff --git a/api4/job.go b/api4/job.go new file mode 100644 index 000000000..8610d9e74 --- /dev/null +++ b/api4/job.go @@ -0,0 +1,57 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package api4 + +import ( + "net/http" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/app" + "github.com/mattermost/platform/model" +) + +func InitJob() { + l4g.Info("Initializing job API routes") + + BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobStatusesByType)).Methods("GET") + BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJobStatus)).Methods("GET") +} + +func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) { + c.RequireJobId() + if c.Err != nil { + return + } + + if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_SYSTEM) { + c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM) + return + } + + if status, err := app.GetJobStatus(c.Params.JobId); err != nil { + c.Err = err + return + } else { + w.Write([]byte(status.ToJson())) + } +} + +func getJobStatusesByType(c *Context, w http.ResponseWriter, r *http.Request) { + c.RequireJobType() + if c.Err != nil { + return + } + + if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_SYSTEM) { + c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM) + return + } + + if statuses, err := app.GetJobStatusesByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil { + c.Err = err + return + } else { + w.Write([]byte(model.JobStatusesToJson(statuses))) + } +} diff --git a/api4/job_test.go b/api4/job_test.go new file mode 100644 index 000000000..0f39fc306 --- /dev/null +++ b/api4/job_test.go @@ -0,0 +1,103 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package api4 + +import ( + "strings" + "testing" + + "github.com/mattermost/platform/app" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/store" +) + +func TestGetJobStatus(t *testing.T) { + th := Setup().InitBasic().InitSystemAdmin() + defer TearDown() + + status := &model.JobStatus{ + Id: model.NewId(), + Status: model.NewId(), + } + if result := <-app.Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil { + t.Fatal(result.Err) + } + + defer app.Srv.Store.JobStatus().Delete(status.Id) + + received, resp := th.SystemAdminClient.GetJobStatus(status.Id) + CheckNoError(t, resp) + + if received.Id != status.Id || received.Status != status.Status { + t.Fatal("incorrect job status received") + } + + _, resp = th.SystemAdminClient.GetJobStatus("1234") + CheckBadRequestStatus(t, resp) + + _, resp = th.Client.GetJobStatus(status.Id) + CheckForbiddenStatus(t, resp) + + _, resp = th.SystemAdminClient.GetJobStatus(model.NewId()) + CheckNotFoundStatus(t, resp) +} + +func TestGetJobStatusesByType(t *testing.T) { + th := Setup().InitBasic().InitSystemAdmin() + defer TearDown() + + jobType := model.NewId() + + statuses := []*model.JobStatus{ + { + Id: model.NewId(), + Type: jobType, + StartAt: 1000, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 999, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 1001, + }, + } + + for _, status := range statuses { + store.Must(app.Srv.Store.JobStatus().SaveOrUpdate(status)) + defer app.Srv.Store.JobStatus().Delete(status.Id) + } + + received, resp := th.SystemAdminClient.GetJobStatusesByType(jobType, 0, 2) + CheckNoError(t, resp) + + if len(received) != 2 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[1].Id { + t.Fatal("should've received newest job first") + } else if received[1].Id != statuses[0].Id { + t.Fatal("should've received second newest job second") + } + + received, resp = th.SystemAdminClient.GetJobStatusesByType(jobType, 1, 2) + CheckNoError(t, resp) + + if len(received) != 1 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[2].Id { + t.Fatal("should've received oldest job last") + } + + _, resp = th.SystemAdminClient.GetJobStatusesByType("", 0, 60) + CheckNotFoundStatus(t, resp) + + _, resp = th.SystemAdminClient.GetJobStatusesByType(strings.Repeat("a", 33), 0, 60) + CheckBadRequestStatus(t, resp) + + _, resp = th.Client.GetJobStatusesByType(jobType, 0, 60) + CheckForbiddenStatus(t, resp) +} diff --git a/api4/params.go b/api4/params.go index 785b2267b..aa865fd2a 100644 --- a/api4/params.go +++ b/api4/params.go @@ -35,6 +35,8 @@ type ApiParams struct { EmojiName string Category string Service string + JobId string + JobType string Page int PerPage int } @@ -116,6 +118,14 @@ func ApiParamsFromRequest(r *http.Request) *ApiParams { params.EmojiName = val } + if val, ok := props["job_id"]; ok { + params.JobId = val + } + + if val, ok := props["job_type"]; ok { + params.JobType = val + } + if val, err := strconv.Atoi(r.URL.Query().Get("page")); err != nil || val < 0 { params.Page = PAGE_DEFAULT } else { diff --git a/app/job.go b/app/job.go new file mode 100644 index 000000000..00439e4d2 --- /dev/null +++ b/app/job.go @@ -0,0 +1,28 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import ( + "github.com/mattermost/platform/model" +) + +func GetJobStatus(id string) (*model.JobStatus, *model.AppError) { + if result := <-Srv.Store.JobStatus().Get(id); result.Err != nil { + return nil, result.Err + } else { + return result.Data.(*model.JobStatus), nil + } +} + +func GetJobStatusesByTypePage(jobType string, page int, perPage int) ([]*model.JobStatus, *model.AppError) { + return GetJobStatusesByType(jobType, page*perPage, perPage) +} + +func GetJobStatusesByType(jobType string, offset int, limit int) ([]*model.JobStatus, *model.AppError) { + if result := <-Srv.Store.JobStatus().GetAllByTypePage(jobType, offset, limit); result.Err != nil { + return nil, result.Err + } else { + return result.Data.([]*model.JobStatus), nil + } +} diff --git a/app/job_test.go b/app/job_test.go new file mode 100644 index 000000000..20e9dee8a --- /dev/null +++ b/app/job_test.go @@ -0,0 +1,78 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package app + +import ( + "testing" + + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/store" +) + +func TestGetJobStatus(t *testing.T) { + Setup() + + status := &model.JobStatus{ + Id: model.NewId(), + Status: model.NewId(), + } + if result := <-Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil { + t.Fatal(result.Err) + } + + defer Srv.Store.JobStatus().Delete(status.Id) + + if received, err := GetJobStatus(status.Id); err != nil { + t.Fatal(err) + } else if received.Id != status.Id || received.Status != status.Status { + t.Fatal("inccorrect job status received") + } +} + +func TestGetJobStatusesByType(t *testing.T) { + Setup() + + jobType := model.NewId() + + statuses := []*model.JobStatus{ + { + Id: model.NewId(), + Type: jobType, + StartAt: 1000, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 999, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 1001, + }, + } + + for _, status := range statuses { + store.Must(Srv.Store.JobStatus().SaveOrUpdate(status)) + defer Srv.Store.JobStatus().Delete(status.Id) + } + + if received, err := GetJobStatusesByType(jobType, 0, 2); err != nil { + t.Fatal(err) + } else if len(received) != 2 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[1].Id { + t.Fatal("should've received newest job first") + } else if received[1].Id != statuses[0].Id { + t.Fatal("should've received second newest job second") + } + + if received, err := GetJobStatusesByType(jobType, 2, 2); err != nil { + t.Fatal(err) + } else if len(received) != 1 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[2].Id { + t.Fatal("should've received oldest job last") + } +} diff --git a/app/license.go b/app/license.go index 7a00d7fb4..44b700d5b 100644 --- a/app/license.go +++ b/app/license.go @@ -4,7 +4,6 @@ package app import ( - "os" "strings" l4g "github.com/alecthomas/log4go" @@ -23,28 +22,14 @@ func LoadLicense() { if len(licenseId) != 26 { // Lets attempt to load the file from disk since it was missing from the DB - fileName := utils.GetLicenseFileLocation(*utils.Cfg.ServiceSettings.LicenseFileLocation) - - if _, err := os.Stat(fileName); err == nil { - l4g.Info("License key has not been uploaded. Loading license key from disk at %v", fileName) - licenseBytes := utils.GetLicenseFileFromDisk(fileName) - - if success, licenseStr := utils.ValidateLicense(licenseBytes); success { - licenseFileFromDisk := model.LicenseFromJson(strings.NewReader(licenseStr)) - licenseId = licenseFileFromDisk.Id - if _, err := SaveLicense(licenseBytes); err != nil { - l4g.Info("Failed to save license key loaded from disk err=%v", err.Error()) - return - } + license, licenseBytes := utils.GetAndValidateLicenseFileFromDisk() + + if license != nil { + if _, err := SaveLicense(licenseBytes); err != nil { + l4g.Info("Failed to save license key loaded from disk err=%v", err.Error()) } else { - l4g.Error("Found license key at %v but it appears to be invalid.", fileName) - return + licenseId = license.Id } - - } else { - l4g.Info(utils.T("mattermost.load_license.find.warn")) - l4g.Debug("We could not find the license key in the database or on disk at %v", fileName) - return } } diff --git a/cmd/platform/init.go b/cmd/platform/init.go index 7d01eb890..b650cf2fd 100644 --- a/cmd/platform/init.go +++ b/cmd/platform/init.go @@ -1,29 +1,12 @@ package main import ( - "fmt" - "github.com/mattermost/platform/app" "github.com/mattermost/platform/model" "github.com/mattermost/platform/utils" "github.com/spf13/cobra" ) -func doLoadConfig(filename string) (err string) { - defer func() { - if r := recover(); r != nil { - err = fmt.Sprintf("%v", r) - } - }() - utils.TranslationsPreInit() - utils.EnableConfigFromEnviromentVars() - utils.LoadConfig(filename) - utils.InitializeConfigWatch() - utils.EnableConfigWatch() - - return "" -} - func initDBCommandContextCobra(cmd *cobra.Command) error { config, err := cmd.Flags().GetString("config") if err != nil { @@ -35,7 +18,7 @@ func initDBCommandContextCobra(cmd *cobra.Command) error { } func initDBCommandContext(configFileLocation string) { - if errstr := doLoadConfig(configFileLocation); errstr != "" { + if errstr := utils.InitAndLoadConfig(configFileLocation); errstr != "" { return } diff --git a/cmd/platform/mattermost.go b/cmd/platform/mattermost.go index a95825b55..1646faf85 100644 --- a/cmd/platform/mattermost.go +++ b/cmd/platform/mattermost.go @@ -14,6 +14,9 @@ import ( // Plugins _ "github.com/mattermost/platform/model/gitlab" + // Enterprise Imports + _ "github.com/mattermost/platform/imports" + // Enterprise Deps _ "github.com/dgryski/dgoogauth" _ "github.com/go-ldap/ldap" @@ -26,8 +29,6 @@ import ( _ "gopkg.in/olivere/elastic.v5" ) -//ENTERPRISE_IMPORTS - func main() { if err := rootCmd.Execute(); err != nil { os.Exit(1) diff --git a/cmd/platform/server.go b/cmd/platform/server.go index cb1530951..9846f8de9 100644 --- a/cmd/platform/server.go +++ b/cmd/platform/server.go @@ -14,6 +14,7 @@ import ( "github.com/mattermost/platform/api4" "github.com/mattermost/platform/app" "github.com/mattermost/platform/einterfaces" + "github.com/mattermost/platform/jobs" "github.com/mattermost/platform/manualtesting" "github.com/mattermost/platform/model" "github.com/mattermost/platform/utils" @@ -43,7 +44,7 @@ func runServerCmd(cmd *cobra.Command, args []string) error { } func runServer(configFileLocation string) { - if errstr := doLoadConfig(configFileLocation); errstr != "" { + if errstr := utils.InitAndLoadConfig(configFileLocation); errstr != "" { l4g.Exit("Unable to load mattermost configuration file: ", errstr) return } @@ -120,6 +121,8 @@ func runServer(configFileLocation string) { } } + jobs := jobs.InitJobs(app.Srv.Store).Start() + // wait for kill signal before attempting to gracefully shutdown // the running service c := make(chan os.Signal) @@ -134,6 +137,8 @@ func runServer(configFileLocation string) { einterfaces.GetMetricsInterface().StopServer() } + jobs.Stop() + app.StopServer() } diff --git a/config/config.json b/config/config.json index 8112331b8..b9c0516fa 100644 --- a/config/config.json +++ b/config/config.json @@ -273,5 +273,8 @@ "TurnURI": "", "TurnUsername": "", "TurnSharedKey": "" + }, + "DataRetentionSettings": { + "Enable": false } } diff --git a/einterfaces/jobs/data_retention.go b/einterfaces/jobs/data_retention.go new file mode 100644 index 000000000..340ed1b88 --- /dev/null +++ b/einterfaces/jobs/data_retention.go @@ -0,0 +1,23 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/store" +) + +type DataRetentionInterface interface { + MakeJob(store store.Store) model.Job +} + +var theDataRetentionInterface DataRetentionInterface + +func RegisterDataRetentionInterface(newInterface DataRetentionInterface) { + theDataRetentionInterface = newInterface +} + +func GetDataRetentionInterface() DataRetentionInterface { + return theDataRetentionInterface +} diff --git a/i18n/en.json b/i18n/en.json index 214e91837..f6f6b48a3 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -5096,6 +5096,30 @@ "translation": "We couldn't save the file info" }, { + "id": "store.sql_file_info.save_or_update.app_error", + "translation": "We couldn't save or update the file info" + }, + { + "id": "store.sql_job_status.delete_by_type.app_error", + "translation": "We couldn't delete the job status" + }, + { + "id": "store.sql_job_status.get.app_error", + "translation": "We couldn't get the job status" + }, + { + "id": "store.sql_job_status.get_all.app_error", + "translation": "We couldn't get all job statuses" + }, + { + "id": "store.sql_job_status.save.app_error", + "translation": "We couldn't save the job status" + }, + { + "id": "store.sql_job_status.update.app_error", + "translation": "We couldn't update the job status" + }, + { "id": "store.sql_license.get.app_error", "translation": "We encountered an error getting the license" }, diff --git a/imports/placeholder.go b/imports/placeholder.go new file mode 100644 index 000000000..98e5decd5 --- /dev/null +++ b/imports/placeholder.go @@ -0,0 +1,6 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package imports + +// This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty diff --git a/jobs/jobs.go b/jobs/jobs.go new file mode 100644 index 000000000..8c84f4eea --- /dev/null +++ b/jobs/jobs.go @@ -0,0 +1,74 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "sync" + + l4g "github.com/alecthomas/log4go" + ejobs "github.com/mattermost/platform/einterfaces/jobs" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/store" + "github.com/mattermost/platform/utils" +) + +type Jobs struct { + startOnce sync.Once + + DataRetention model.Job + // SearchIndexing model.Job + + listenerId string +} + +func InitJobs(s store.Store) *Jobs { + jobs := &Jobs{ + // SearchIndexing: MakeTestJob(s, "SearchIndexing"), + } + + if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil { + jobs.DataRetention = dataRetentionInterface.MakeJob(s) + } + + return jobs +} + +func (jobs *Jobs) Start() *Jobs { + l4g.Info("Starting jobs") + + jobs.startOnce.Do(func() { + if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + go jobs.DataRetention.Run() + } + + // go jobs.SearchIndexing.Run() + }) + + jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange) + + return jobs +} + +func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) { + if jobs.DataRetention != nil { + if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable { + go jobs.DataRetention.Run() + } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable { + jobs.DataRetention.Stop() + } + } +} + +func (jobs *Jobs) Stop() *Jobs { + utils.RemoveConfigListener(jobs.listenerId) + + if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable { + jobs.DataRetention.Stop() + } + // jobs.SearchIndexing.Stop() + + l4g.Info("Stopped jobs") + + return jobs +} diff --git a/jobs/jobserver/jobserver.go b/jobs/jobserver/jobserver.go new file mode 100644 index 000000000..813676b78 --- /dev/null +++ b/jobs/jobserver/jobserver.go @@ -0,0 +1,46 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package main + +import ( + "os" + "os/signal" + "syscall" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/jobs" + "github.com/mattermost/platform/store" + "github.com/mattermost/platform/utils" + + _ "github.com/mattermost/platform/imports" +) + +var Srv jobs.JobServer + +func main() { + // Initialize + utils.InitAndLoadConfig("config.json") + defer l4g.Close() + + Srv.Store = store.NewSqlStore() + defer Srv.Store.Close() + + Srv.LoadLicense() + + // Run jobs + l4g.Info("Starting Mattermost job server") + Srv.Jobs = jobs.InitJobs(Srv.Store) + Srv.Jobs.Start() + + var signalChan chan os.Signal = make(chan os.Signal) + signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-signalChan + + // Cleanup anything that isn't handled by a defer statement + l4g.Info("Stopping Mattermost job server") + + Srv.Jobs.Stop() + + l4g.Info("Stopped Mattermost job server") +} diff --git a/jobs/server.go b/jobs/server.go new file mode 100644 index 000000000..dd3448842 --- /dev/null +++ b/jobs/server.go @@ -0,0 +1,46 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/store" + "github.com/mattermost/platform/utils" +) + +type JobServer struct { + Store store.Store + Jobs *Jobs +} + +func (server *JobServer) LoadLicense() { + licenseId := "" + if result := <-server.Store.System().Get(); result.Err == nil { + props := result.Data.(model.StringMap) + licenseId = props[model.SYSTEM_ACTIVE_LICENSE_ID] + } + + var licenseBytes []byte + + if len(licenseId) != 26 { + // Lets attempt to load the file from disk since it was missing from the DB + _, licenseBytes = utils.GetAndValidateLicenseFileFromDisk() + } else { + if result := <-server.Store.License().Get(licenseId); result.Err == nil { + record := result.Data.(*model.LicenseRecord) + licenseBytes = []byte(record.Bytes) + l4g.Info("License key valid unlocking enterprise features.") + } else { + l4g.Info(utils.T("mattermost.load_license.find.warn")) + } + } + + if licenseBytes != nil { + utils.LoadLicense(licenseBytes) + l4g.Info("License key valid unlocking enterprise features.") + } else { + l4g.Info(utils.T("mattermost.load_license.find.warn")) + } +} diff --git a/jobs/testjob.go b/jobs/testjob.go new file mode 100644 index 000000000..59d5274e5 --- /dev/null +++ b/jobs/testjob.go @@ -0,0 +1,54 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package jobs + +import ( + "time" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/store" +) + +type TestJob struct { + store store.Store + + name string + stop chan bool + stopped chan bool +} + +func MakeTestJob(s store.Store, name string) *TestJob { + return &TestJob{ + store: s, + name: name, + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + } +} + +func (job *TestJob) Run() { + l4g.Debug("Job %v: Started", job.name) + + running := true + for running { + l4g.Debug("Job %v: Tick", job.name) + + select { + case <-job.stop: + l4g.Debug("Job %v: Received stop signal", job.name) + running = false + case <-time.After(10 * time.Second): + continue + } + } + + l4g.Debug("Job %v: Finished", job.name) + job.stopped <- true +} + +func (job *TestJob) Stop() { + l4g.Debug("Job %v: Stopping", job.name) + job.stop <- true + <-job.stopped +} diff --git a/model/client4.go b/model/client4.go index f4a247e12..28434c2e5 100644 --- a/model/client4.go +++ b/model/client4.go @@ -254,6 +254,10 @@ func (c *Client4) GetOpenGraphRoute() string { return fmt.Sprintf("/opengraph") } +func (c *Client4) GetJobsRoute() string { + return fmt.Sprintf("/jobs") +} + func (c *Client4) DoApiGet(url string, etag string) (*http.Response, *AppError) { return c.DoApiRequest(http.MethodGet, c.ApiUrl+url, "", etag) } @@ -2638,3 +2642,25 @@ func (c *Client4) OpenGraph(url string) (map[string]string, *Response) { return MapFromJson(r.Body), BuildResponse(r) } } + +// Jobs Section + +// GetJobStatus gets the status of a single job. +func (c *Client4) GetJobStatus(id string) (*JobStatus, *Response) { + if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/%v/status", id), ""); err != nil { + return nil, &Response{StatusCode: r.StatusCode, Error: err} + } else { + defer closeBody(r) + return JobStatusFromJson(r.Body), BuildResponse(r) + } +} + +// GetJobStatusesByType gets the status of all jobs of a given type, sorted with the job that most recently started first. +func (c *Client4) GetJobStatusesByType(jobType string, page int, perPage int) ([]*JobStatus, *Response) { + if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/type/%v/statuses?page=%v&per_page=%v", jobType, page, perPage), ""); err != nil { + return nil, &Response{StatusCode: r.StatusCode, Error: err} + } else { + defer closeBody(r) + return JobStatusesFromJson(r.Body), BuildResponse(r) + } +} diff --git a/model/config.go b/model/config.go index 14f092373..8c1814792 100644 --- a/model/config.go +++ b/model/config.go @@ -410,6 +410,10 @@ type ElasticSearchSettings struct { Sniff *bool } +type DataRetentionSettings struct { + Enable *bool +} + type Config struct { ServiceSettings ServiceSettings TeamSettings TeamSettings @@ -434,6 +438,7 @@ type Config struct { AnalyticsSettings AnalyticsSettings WebrtcSettings WebrtcSettings ElasticSearchSettings ElasticSearchSettings + DataRetentionSettings DataRetentionSettings } func (o *Config) ToJson() string { @@ -1257,6 +1262,11 @@ func (o *Config) SetDefaults() { *o.ElasticSearchSettings.Sniff = true } + if o.DataRetentionSettings.Enable == nil { + o.DataRetentionSettings.Enable = new(bool) + *o.DataRetentionSettings.Enable = false + } + o.defaultWebrtcSettings() } diff --git a/model/job.go b/model/job.go index 453828bd2..d539b5bf9 100644 --- a/model/job.go +++ b/model/job.go @@ -1,110 +1,9 @@ -// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. // See License.txt for license information. package model -import ( - "fmt" - "sync" - "time" -) - -type TaskFunc func() - -type ScheduledTask struct { - Name string `json:"name"` - Interval time.Duration `json:"interval"` - Recurring bool `json:"recurring"` - function TaskFunc - timer *time.Timer -} - -var taskMutex = sync.Mutex{} -var tasks = make(map[string]*ScheduledTask) - -func addTask(task *ScheduledTask) { - taskMutex.Lock() - defer taskMutex.Unlock() - tasks[task.Name] = task -} - -func removeTaskByName(name string) { - taskMutex.Lock() - defer taskMutex.Unlock() - delete(tasks, name) -} - -func GetTaskByName(name string) *ScheduledTask { - taskMutex.Lock() - defer taskMutex.Unlock() - if task, ok := tasks[name]; ok { - return task - } - return nil -} - -func GetAllTasks() *map[string]*ScheduledTask { - taskMutex.Lock() - defer taskMutex.Unlock() - return &tasks -} - -func CreateTask(name string, function TaskFunc, timeToExecution time.Duration) *ScheduledTask { - task := &ScheduledTask{ - Name: name, - Interval: timeToExecution, - Recurring: false, - function: function, - } - - taskRunner := func() { - go task.function() - removeTaskByName(task.Name) - } - - task.timer = time.AfterFunc(timeToExecution, taskRunner) - - addTask(task) - - return task -} - -func CreateRecurringTask(name string, function TaskFunc, interval time.Duration) *ScheduledTask { - task := &ScheduledTask{ - Name: name, - Interval: interval, - Recurring: true, - function: function, - } - - taskRecurer := func() { - go task.function() - task.timer.Reset(task.Interval) - } - - task.timer = time.AfterFunc(interval, taskRecurer) - - addTask(task) - - return task -} - -func (task *ScheduledTask) Cancel() { - task.timer.Stop() - removeTaskByName(task.Name) -} - -// Executes the task immediatly. A recurring task will be run regularally after interval. -func (task *ScheduledTask) Execute() { - task.function() - task.timer.Reset(task.Interval) -} - -func (task *ScheduledTask) String() string { - return fmt.Sprintf( - "%s\nInterval: %s\nRecurring: %t\n", - task.Name, - task.Interval.String(), - task.Recurring, - ) +type Job interface { + Run() + Stop() } diff --git a/model/job_status.go b/model/job_status.go new file mode 100644 index 000000000..cf490648f --- /dev/null +++ b/model/job_status.go @@ -0,0 +1,59 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package model + +import ( + "encoding/json" + "io" +) + +const ( + JOB_TYPE_DATA_RETENTION = "data_retention" + JOB_TYPE_SEARCH_INDEXING = "search_indexing" +) + +type JobStatus struct { + Id string `json:"id"` + Type string `json:"type"` + StartAt int64 `json:"start_at"` + LastActivityAt int64 `json:"last_activity_at"` + LastRunStartedAt int64 `json:"last_run_started_at"` + LastRunCompletedAt int64 `json:"last_run_completed_at"` + Status string `json:"status"` + Data map[string]interface{} `json:"data"` +} + +func (js *JobStatus) ToJson() string { + if b, err := json.Marshal(js); err != nil { + return "" + } else { + return string(b) + } +} + +func JobStatusFromJson(data io.Reader) *JobStatus { + var status JobStatus + if err := json.NewDecoder(data).Decode(&status); err == nil { + return &status + } else { + return nil + } +} + +func JobStatusesToJson(statuses []*JobStatus) string { + if b, err := json.Marshal(statuses); err != nil { + return "" + } else { + return string(b) + } +} + +func JobStatusesFromJson(data io.Reader) []*JobStatus { + var statuses []*JobStatus + if err := json.NewDecoder(data).Decode(&statuses); err == nil { + return statuses + } else { + return nil + } +} diff --git a/model/scheduled_task.go b/model/scheduled_task.go new file mode 100644 index 000000000..453828bd2 --- /dev/null +++ b/model/scheduled_task.go @@ -0,0 +1,110 @@ +// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package model + +import ( + "fmt" + "sync" + "time" +) + +type TaskFunc func() + +type ScheduledTask struct { + Name string `json:"name"` + Interval time.Duration `json:"interval"` + Recurring bool `json:"recurring"` + function TaskFunc + timer *time.Timer +} + +var taskMutex = sync.Mutex{} +var tasks = make(map[string]*ScheduledTask) + +func addTask(task *ScheduledTask) { + taskMutex.Lock() + defer taskMutex.Unlock() + tasks[task.Name] = task +} + +func removeTaskByName(name string) { + taskMutex.Lock() + defer taskMutex.Unlock() + delete(tasks, name) +} + +func GetTaskByName(name string) *ScheduledTask { + taskMutex.Lock() + defer taskMutex.Unlock() + if task, ok := tasks[name]; ok { + return task + } + return nil +} + +func GetAllTasks() *map[string]*ScheduledTask { + taskMutex.Lock() + defer taskMutex.Unlock() + return &tasks +} + +func CreateTask(name string, function TaskFunc, timeToExecution time.Duration) *ScheduledTask { + task := &ScheduledTask{ + Name: name, + Interval: timeToExecution, + Recurring: false, + function: function, + } + + taskRunner := func() { + go task.function() + removeTaskByName(task.Name) + } + + task.timer = time.AfterFunc(timeToExecution, taskRunner) + + addTask(task) + + return task +} + +func CreateRecurringTask(name string, function TaskFunc, interval time.Duration) *ScheduledTask { + task := &ScheduledTask{ + Name: name, + Interval: interval, + Recurring: true, + function: function, + } + + taskRecurer := func() { + go task.function() + task.timer.Reset(task.Interval) + } + + task.timer = time.AfterFunc(interval, taskRecurer) + + addTask(task) + + return task +} + +func (task *ScheduledTask) Cancel() { + task.timer.Stop() + removeTaskByName(task.Name) +} + +// Executes the task immediatly. A recurring task will be run regularally after interval. +func (task *ScheduledTask) Execute() { + task.function() + task.timer.Reset(task.Interval) +} + +func (task *ScheduledTask) String() string { + return fmt.Sprintf( + "%s\nInterval: %s\nRecurring: %t\n", + task.Name, + task.Interval.String(), + task.Recurring, + ) +} diff --git a/model/job_test.go b/model/scheduled_task_test.go index 6ba8edaf9..6ba8edaf9 100644 --- a/model/job_test.go +++ b/model/scheduled_task_test.go diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go new file mode 100644 index 000000000..ef039d99a --- /dev/null +++ b/store/sql_job_status_store.go @@ -0,0 +1,190 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "database/sql" + "net/http" + + "github.com/mattermost/platform/model" +) + +type SqlJobStatusStore struct { + *SqlStore +} + +func NewSqlJobStatusStore(sqlStore *SqlStore) JobStatusStore { + s := &SqlJobStatusStore{sqlStore} + + for _, db := range sqlStore.GetAllConns() { + table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id") + table.ColMap("Id").SetMaxSize(26) + table.ColMap("Type").SetMaxSize(32) + table.ColMap("Status").SetMaxSize(32) + table.ColMap("Data").SetMaxSize(1024) + } + + return s +} + +func (jss SqlJobStatusStore) CreateIndexesIfNotExists() { + jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type") +} + +func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if err := jss.GetReplica().SelectOne(&model.JobStatus{}, + `SELECT + * + FROM + JobStatuses + WHERE + Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil { + if _, err := jss.GetMaster().Update(status); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", + "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error()) + } + } else if err == sql.ErrNoRows { + if err := jss.GetMaster().Insert(status); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", + "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error()) + } + } else { + result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate", + "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error()) + } + + if result.Err == nil { + result.Data = status + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStatusStore) Get(id string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var status *model.JobStatus + + if err := jss.GetReplica().SelectOne(&status, + `SELECT + * + FROM + JobStatuses + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + if err == sql.ErrNoRows { + result.Err = model.NewAppError("SqlJobStatusStore.Get", + "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound) + } else { + result.Err = model.NewAppError("SqlJobStatusStore.Get", + "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError) + } + } else { + result.Data = status + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.JobStatus + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + JobStatuses + WHERE + Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType", + "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + var statuses []*model.JobStatus + + if _, err := jss.GetReplica().Select(&statuses, + `SELECT + * + FROM + JobStatuses + WHERE + Type = :Type + ORDER BY + StartAt ASC + LIMIT + :Limit + OFFSET + :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage", + "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error()) + } else { + result.Data = statuses + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} + +func (jss SqlJobStatusStore) Delete(id string) StoreChannel { + storeChannel := make(StoreChannel, 1) + + go func() { + result := StoreResult{} + + if _, err := jss.GetReplica().Exec( + `DELETE FROM + JobStatuses + WHERE + Id = :Id`, map[string]interface{}{"Id": id}); err != nil { + result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType", + "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error()) + } else { + result.Data = id + } + + storeChannel <- result + close(storeChannel) + }() + + return storeChannel +} diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go new file mode 100644 index 000000000..18c29e522 --- /dev/null +++ b/store/sql_job_status_store_test.go @@ -0,0 +1,151 @@ +// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package store + +import ( + "testing" + + "github.com/mattermost/platform/model" +) + +func TestJobStatusSaveGetUpdate(t *testing.T) { + Setup() + + status := &model.JobStatus{ + Id: model.NewId(), + Type: model.NewId(), + Status: model.NewId(), + Data: map[string]interface{}{ + "Processed": 0, + "Total": 12345, + "LastProcessed": "abcd", + }, + } + + if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil { + t.Fatal(result.Err) + } + + defer func() { + <-store.JobStatus().Delete(status.Id) + }() + + if result := <-store.JobStatus().Get(status.Id); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.(*model.JobStatus); received.Id != status.Id { + t.Fatal("received incorrect status after save") + } + + status.Status = model.NewId() + status.Data = map[string]interface{}{ + "Processed": 12345, + "Total": 12345, + "LastProcessed": "abcd", + } + + if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil { + t.Fatal(result.Err) + } + + if result := <-store.JobStatus().Get(status.Id); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status { + t.Fatal("received incorrect status after update") + } +} + +func TestJobStatusGetAllByType(t *testing.T) { + Setup() + + jobType := model.NewId() + + statuses := []*model.JobStatus{ + { + Id: model.NewId(), + Type: jobType, + }, + { + Id: model.NewId(), + Type: jobType, + }, + { + Id: model.NewId(), + Type: model.NewId(), + }, + } + + for _, status := range statuses { + Must(store.JobStatus().SaveOrUpdate(status)) + defer store.JobStatus().Delete(status.Id) + } + + if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.JobStatus); len(received) != 2 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id { + t.Fatal("should've received first status") + } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id { + t.Fatal("should've received second status") + } +} + +func TestJobStatusGetAllByTypePage(t *testing.T) { + Setup() + + jobType := model.NewId() + + statuses := []*model.JobStatus{ + { + Id: model.NewId(), + Type: jobType, + StartAt: 1000, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 999, + }, + { + Id: model.NewId(), + Type: jobType, + StartAt: 1001, + }, + } + + for _, status := range statuses { + Must(store.JobStatus().SaveOrUpdate(status)) + defer store.JobStatus().Delete(status.Id) + } + + if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.JobStatus); len(received) != 2 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[1].Id { + t.Fatal("should've received newest job first") + } else if received[1].Id != statuses[0].Id { + t.Fatal("should've received second newest job second") + } + + if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil { + t.Fatal(result.Err) + } else if received := result.Data.([]*model.JobStatus); len(received) != 1 { + t.Fatal("received wrong number of statuses") + } else if received[0].Id != statuses[2].Id { + t.Fatal("should've received oldest job last") + } +} + +func TestJobStatusDelete(t *testing.T) { + Setup() + + status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{ + Id: model.NewId(), + })).(*model.JobStatus) + + if result := <-store.JobStatus().Delete(status.Id); result.Err != nil { + t.Fatal(result.Err) + } +} diff --git a/store/sql_store.go b/store/sql_store.go index f13fe2ec0..4261c849a 100644 --- a/store/sql_store.go +++ b/store/sql_store.go @@ -87,6 +87,7 @@ type SqlStore struct { status StatusStore fileInfo FileInfoStore reaction ReactionStore + jobStatus JobStatusStore SchemaVersion string rrCounter int64 srCounter int64 @@ -151,6 +152,7 @@ func NewSqlStore() Store { sqlStore.status = NewSqlStatusStore(sqlStore) sqlStore.fileInfo = NewSqlFileInfoStore(sqlStore) sqlStore.reaction = NewSqlReactionStore(sqlStore) + sqlStore.jobStatus = NewSqlJobStatusStore(sqlStore) err := sqlStore.master.CreateTablesIfNotExists() if err != nil { @@ -179,6 +181,7 @@ func NewSqlStore() Store { sqlStore.status.(*SqlStatusStore).CreateIndexesIfNotExists() sqlStore.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists() sqlStore.reaction.(*SqlReactionStore).CreateIndexesIfNotExists() + sqlStore.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists() sqlStore.preference.(*SqlPreferenceStore).DeleteUnusedFeatures() @@ -735,6 +738,10 @@ func (ss *SqlStore) Reaction() ReactionStore { return ss.reaction } +func (ss *SqlStore) JobStatus() JobStatusStore { + return ss.jobStatus +} + func (ss *SqlStore) DropAllTables() { ss.master.TruncateTables() } @@ -752,6 +759,8 @@ func (me mattermConverter) ToDb(val interface{}) (interface{}, error) { return encrypt([]byte(utils.Cfg.SqlSettings.AtRestEncryptKey), model.MapToJson(t)) case model.StringInterface: return model.StringInterfaceToJson(t), nil + case map[string]interface{}: + return model.StringInterfaceToJson(model.StringInterface(t)), nil } return val, nil @@ -805,6 +814,16 @@ func (me mattermConverter) FromDb(target interface{}) (gorp.CustomScanner, bool) return json.Unmarshal(b, target) } return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true + case *map[string]interface{}: + binder := func(holder, target interface{}) error { + s, ok := holder.(*string) + if !ok { + return errors.New(utils.T("store.sql.convert_string_interface")) + } + b := []byte(*s) + return json.Unmarshal(b, target) + } + return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true } return gorp.CustomScanner{}, false diff --git a/store/store.go b/store/store.go index acbeafdd6..cd7792ce1 100644 --- a/store/store.go +++ b/store/store.go @@ -47,6 +47,7 @@ type Store interface { Status() StatusStore FileInfo() FileInfoStore Reaction() ReactionStore + JobStatus() JobStatusStore MarkSystemRanUnitTests() Close() DropAllTables() @@ -371,3 +372,11 @@ type ReactionStore interface { GetForPost(postId string, allowFromCache bool) StoreChannel DeleteAllWithEmojiName(emojiName string) StoreChannel } + +type JobStatusStore interface { + SaveOrUpdate(status *model.JobStatus) StoreChannel + Get(id string) StoreChannel + GetAllByType(jobType string) StoreChannel + GetAllByTypePage(jobType string, offset int, limit int) StoreChannel + Delete(id string) StoreChannel +} diff --git a/utils/config.go b/utils/config.go index 234acae11..95cfc43aa 100644 --- a/utils/config.go +++ b/utils/config.go @@ -50,6 +50,22 @@ func SetSiteURL(url string) { siteURL = strings.TrimRight(url, "/") } +var cfgListeners = map[string]func(*model.Config, *model.Config){} + +// Registers a function with a given to be called when the config is reloaded and may have changed. The function +// will be called with two arguments: the old config and the new config. AddConfigListener returns a unique ID +// for the listener that can later be used to remove it. +func AddConfigListener(listener func(*model.Config, *model.Config)) string { + id := model.NewId() + cfgListeners[id] = listener + return id +} + +// Removes a listener function by the unique ID returned when AddConfigListener was called +func RemoveConfigListener(id string) { + delete(cfgListeners, id) +} + func FindConfigFile(fileName string) string { if _, err := os.Stat("./config/" + fileName); err == nil { fileName, _ = filepath.Abs("./config/" + fileName) @@ -242,6 +258,21 @@ func DisableConfigWatch() { } } +func InitAndLoadConfig(filename string) (err string) { + defer func() { + if r := recover(); r != nil { + err = fmt.Sprintf("%v", r) + } + }() + TranslationsPreInit() + EnableConfigFromEnviromentVars() + LoadConfig(filename) + InitializeConfigWatch() + EnableConfigWatch() + + return "" +} + // LoadConfig will try to search around for the corresponding config file. // It will search /tmp/fileName then attempt ./config/fileName, // then ../config/fileName and last it will look at fileName @@ -249,6 +280,9 @@ func LoadConfig(fileName string) { cfgMutex.Lock() defer cfgMutex.Unlock() + // Cfg should never be null + oldConfig := *Cfg + fileNameWithExtension := filepath.Base(fileName) fileExtension := filepath.Ext(fileNameWithExtension) fileDir := filepath.Dir(fileName) @@ -339,6 +373,10 @@ func LoadConfig(fileName string) { SetDefaultRolesBasedOnConfig() SetSiteURL(*Cfg.ServiceSettings.SiteURL) + + for _, listener := range cfgListeners { + listener(&oldConfig, &config) + } } func RegenerateClientConfig() { diff --git a/utils/config_test.go b/utils/config_test.go index 755cd9acd..bce85d2ae 100644 --- a/utils/config_test.go +++ b/utils/config_test.go @@ -6,6 +6,8 @@ package utils import ( "os" "testing" + + "github.com/mattermost/platform/model" ) func TestConfig(t *testing.T) { @@ -59,3 +61,80 @@ func TestConfigFromEnviroVars(t *testing.T) { } } + +func TestAddRemoveConfigListener(t *testing.T) { + if len(cfgListeners) != 0 { + t.Fatal("should've started with 0 listeners") + } + + id1 := AddConfigListener(func(*model.Config, *model.Config) { + }) + if len(cfgListeners) != 1 { + t.Fatal("should now have 1 listener") + } + + id2 := AddConfigListener(func(*model.Config, *model.Config) { + }) + if len(cfgListeners) != 2 { + t.Fatal("should now have 2 listeners") + } + + RemoveConfigListener(id1) + if len(cfgListeners) != 1 { + t.Fatal("should've removed first listener") + } + + RemoveConfigListener(id2) + if len(cfgListeners) != 0 { + t.Fatal("should've removed both listeners") + } +} + +func TestConfigListener(t *testing.T) { + TranslationsPreInit() + EnableConfigFromEnviromentVars() + LoadConfig("config.json") + + SiteName := Cfg.TeamSettings.SiteName + defer func() { + Cfg.TeamSettings.SiteName = SiteName + SaveConfig(CfgFileName, Cfg) + }() + Cfg.TeamSettings.SiteName = "test123" + + listenerCalled := false + listener := func(oldConfig *model.Config, newConfig *model.Config) { + if listenerCalled { + t.Fatal("listener called twice") + } + + if oldConfig.TeamSettings.SiteName != "test123" { + t.Fatal("old config contains incorrect site name") + } else if newConfig.TeamSettings.SiteName != "Mattermost" { + t.Fatal("new config contains incorrect site name") + } + + listenerCalled = true + } + listenerId := AddConfigListener(listener) + defer RemoveConfigListener(listenerId) + + listener2Called := false + listener2 := func(oldConfig *model.Config, newConfig *model.Config) { + if listener2Called { + t.Fatal("listener2 called twice") + } + + listener2Called = true + } + listener2Id := AddConfigListener(listener2) + defer RemoveConfigListener(listener2Id) + + LoadConfig("config.json") + + if !listenerCalled { + t.Fatal("listener should've been called") + } else if !listener2Called { + t.Fatal("listener 2 should've been called") + } +} diff --git a/utils/license.go b/utils/license.go index d3e2c1362..c0a17bf79 100644 --- a/utils/license.go +++ b/utils/license.go @@ -114,6 +114,25 @@ func ValidateLicense(signed []byte) (bool, string) { return true, string(plaintext) } +func GetAndValidateLicenseFileFromDisk() (*model.License, []byte) { + fileName := GetLicenseFileLocation(*Cfg.ServiceSettings.LicenseFileLocation) + + if _, err := os.Stat(fileName); err != nil { + l4g.Debug("We could not find the license key in the database or on disk at %v", fileName) + return nil, nil + } + + l4g.Info("License key has not been uploaded. Loading license key from disk at %v", fileName) + licenseBytes := GetLicenseFileFromDisk(fileName) + + if success, licenseStr := ValidateLicense(licenseBytes); !success { + l4g.Error("Found license key at %v but it appears to be invalid.", fileName) + return nil, nil + } else { + return model.LicenseFromJson(strings.NewReader(licenseStr)), licenseBytes + } +} + func GetLicenseFileFromDisk(fileName string) []byte { file, err := os.Open(fileName) if err != nil { |