summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore4
-rw-r--r--Makefile23
-rw-r--r--api4/api.go4
-rw-r--r--api4/context.go22
-rw-r--r--api4/job.go57
-rw-r--r--api4/job_test.go103
-rw-r--r--api4/params.go10
-rw-r--r--app/job.go28
-rw-r--r--app/job_test.go78
-rw-r--r--app/license.go27
-rw-r--r--cmd/platform/init.go19
-rw-r--r--cmd/platform/mattermost.go5
-rw-r--r--cmd/platform/server.go7
-rw-r--r--config/config.json3
-rw-r--r--einterfaces/jobs/data_retention.go23
-rw-r--r--i18n/en.json24
-rw-r--r--imports/placeholder.go6
-rw-r--r--jobs/jobs.go74
-rw-r--r--jobs/jobserver/jobserver.go46
-rw-r--r--jobs/server.go46
-rw-r--r--jobs/testjob.go54
-rw-r--r--model/client4.go26
-rw-r--r--model/config.go10
-rw-r--r--model/job.go109
-rw-r--r--model/job_status.go59
-rw-r--r--model/scheduled_task.go110
-rw-r--r--model/scheduled_task_test.go (renamed from model/job_test.go)0
-rw-r--r--store/sql_job_status_store.go190
-rw-r--r--store/sql_job_status_store_test.go151
-rw-r--r--store/sql_store.go19
-rw-r--r--store/store.go9
-rw-r--r--utils/config.go38
-rw-r--r--utils/config_test.go79
-rw-r--r--utils/license.go19
34 files changed, 1331 insertions, 151 deletions
diff --git a/.gitignore b/.gitignore
index c70239a2b..66e2b5d58 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,6 +4,7 @@ logs
node_modules
/dist
/webapp/dist
+jobserver
npm-debug.log
mattermost.mattermost-license
config/mattermost.mattermost-license
@@ -16,8 +17,7 @@ config/active.dat
config/config.json
# Enteprise imports file
-imports.go
-cmd/mattermost/imports.go
+imports/imports.go
# Build Targets
.prebuild
diff --git a/Makefile b/Makefile
index b9c909ff1..6c442c318 100644
--- a/Makefile
+++ b/Makefile
@@ -310,7 +310,8 @@ cover:
prepare-enterprise:
ifeq ($(BUILD_ENTERPRISE_READY),true)
@echo Enterprise build selected, preparing
- cp $(BUILD_ENTERPRISE_DIR)/imports.go cmd/platform/
+ mkdir -p imports/
+ cp $(BUILD_ENTERPRISE_DIR)/imports/imports.go imports/
rm -f enterprise
ln -s $(BUILD_ENTERPRISE_DIR) enterprise
endif
@@ -334,6 +335,19 @@ build-client:
cd $(BUILD_WEBAPP_DIR) && $(MAKE) build
+build-job-server: build-job-server-linux build-job-server-mac build-job-server-windows
+
+build-job-server-linux: .prebuild prepare-enterprise
+ @echo Build mattermost job server for Linux amd64
+ env GOOS=linux GOARCH=amd64 $(GO) build $(GOFLAGS) $(GO_LINKER_FLAGS) ./jobs/jobserver
+
+build-job-server-osx: .prebuild prepare-enterprise
+ @echo Build mattermost job server for OSX amd64
+ env GOOS=darwin GOARCH=amd64 $(GO) build $(GOFLAGS) $(GO_LINKER_FLAGS) ./jobs/jobserver
+
+build-job-server-windows: .prebuild prepare-enterprise
+ @echo Build mattermost job server for Windows amd64
+ env GOOS=windows GOARCH=amd64 $(GO) build $(GOFLAGS) $(GO_LINKER_FLAGS) ./jobs/jobserver
package: build build-client
@ echo Packaging mattermost
@@ -469,6 +483,10 @@ restart-server: | stop-server run-server
restart-client: | stop-client run-client
+run-job-server:
+ @echo Running job server for development
+ $(GO) run $(GOFLAGS) $(GO_LINKER_FLAGS) ./jobs/jobserver/jobserver.go
+
clean: stop-docker
@echo Cleaning
@@ -515,12 +533,13 @@ govet:
$(GO) vet $(GOFLAGS) ./web || exit 1
ifeq ($(BUILD_ENTERPRISE_READY),true)
- $(GO) vet $(GOFLAGS) ./enterprise || exit 1
$(GO) vet $(GOFLAGS) ./enterprise/account_migration || exit 1
$(GO) vet $(GOFLAGS) ./enterprise/brand || exit 1
$(GO) vet $(GOFLAGS) ./enterprise/cluster || exit 1
$(GO) vet $(GOFLAGS) ./enterprise/compliance || exit 1
+ $(GO) vet $(GOFLAGS) ./enterprise/data_retention || exit 1
$(GO) vet $(GOFLAGS) ./enterprise/emoji || exit 1
+ $(GO) vet $(GOFLAGS) ./enterprise/imports || exit 1
$(GO) vet $(GOFLAGS) ./enterprise/ldap || exit 1
$(GO) vet $(GOFLAGS) ./enterprise/metrics || exit 1
$(GO) vet $(GOFLAGS) ./enterprise/mfa || exit 1
diff --git a/api4/api.go b/api4/api.go
index 4ed636593..a636581d7 100644
--- a/api4/api.go
+++ b/api4/api.go
@@ -81,6 +81,8 @@ type Routes struct {
System *mux.Router // 'api/v4/system'
+ Jobs *mux.Router // 'api/v4/jobs'
+
Preferences *mux.Router // 'api/v4/users/{user_id:[A-Za-z0-9]+}/preferences'
License *mux.Router // 'api/v4/license'
@@ -168,6 +170,7 @@ func InitApi(full bool) {
BaseRoutes.License = BaseRoutes.ApiRoot.PathPrefix("/license").Subrouter()
BaseRoutes.Public = BaseRoutes.ApiRoot.PathPrefix("/public").Subrouter()
BaseRoutes.Reactions = BaseRoutes.ApiRoot.PathPrefix("/reactions").Subrouter()
+ BaseRoutes.Jobs = BaseRoutes.ApiRoot.PathPrefix("/jobs").Subrouter()
BaseRoutes.Emojis = BaseRoutes.ApiRoot.PathPrefix("/emoji").Subrouter()
BaseRoutes.Emoji = BaseRoutes.Emojis.PathPrefix("/{emoji_id:[A-Za-z0-9]+}").Subrouter()
@@ -191,6 +194,7 @@ func InitApi(full bool) {
InitCluster()
InitLdap()
InitBrand()
+ InitJob()
InitCommand()
InitStatus()
InitWebSocket()
diff --git a/api4/context.go b/api4/context.go
index 37af2c6d4..8d4ed7f79 100644
--- a/api4/context.go
+++ b/api4/context.go
@@ -540,3 +540,25 @@ func (c *Context) RequireCommandId() *Context {
}
return c
}
+
+func (c *Context) RequireJobId() *Context {
+ if c.Err != nil {
+ return c
+ }
+
+ if len(c.Params.JobId) != 26 {
+ c.SetInvalidUrlParam("job_id")
+ }
+ return c
+}
+
+func (c *Context) RequireJobType() *Context {
+ if c.Err != nil {
+ return c
+ }
+
+ if len(c.Params.JobType) == 0 || len(c.Params.JobType) > 32 {
+ c.SetInvalidUrlParam("job_type")
+ }
+ return c
+}
diff --git a/api4/job.go b/api4/job.go
new file mode 100644
index 000000000..8610d9e74
--- /dev/null
+++ b/api4/job.go
@@ -0,0 +1,57 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package api4
+
+import (
+ "net/http"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/app"
+ "github.com/mattermost/platform/model"
+)
+
+func InitJob() {
+ l4g.Info("Initializing job API routes")
+
+ BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobStatusesByType)).Methods("GET")
+ BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJobStatus)).Methods("GET")
+}
+
+func getJobStatus(c *Context, w http.ResponseWriter, r *http.Request) {
+ c.RequireJobId()
+ if c.Err != nil {
+ return
+ }
+
+ if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_SYSTEM) {
+ c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM)
+ return
+ }
+
+ if status, err := app.GetJobStatus(c.Params.JobId); err != nil {
+ c.Err = err
+ return
+ } else {
+ w.Write([]byte(status.ToJson()))
+ }
+}
+
+func getJobStatusesByType(c *Context, w http.ResponseWriter, r *http.Request) {
+ c.RequireJobType()
+ if c.Err != nil {
+ return
+ }
+
+ if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_SYSTEM) {
+ c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM)
+ return
+ }
+
+ if statuses, err := app.GetJobStatusesByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil {
+ c.Err = err
+ return
+ } else {
+ w.Write([]byte(model.JobStatusesToJson(statuses)))
+ }
+}
diff --git a/api4/job_test.go b/api4/job_test.go
new file mode 100644
index 000000000..0f39fc306
--- /dev/null
+++ b/api4/job_test.go
@@ -0,0 +1,103 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package api4
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/mattermost/platform/app"
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/store"
+)
+
+func TestGetJobStatus(t *testing.T) {
+ th := Setup().InitBasic().InitSystemAdmin()
+ defer TearDown()
+
+ status := &model.JobStatus{
+ Id: model.NewId(),
+ Status: model.NewId(),
+ }
+ if result := <-app.Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer app.Srv.Store.JobStatus().Delete(status.Id)
+
+ received, resp := th.SystemAdminClient.GetJobStatus(status.Id)
+ CheckNoError(t, resp)
+
+ if received.Id != status.Id || received.Status != status.Status {
+ t.Fatal("incorrect job status received")
+ }
+
+ _, resp = th.SystemAdminClient.GetJobStatus("1234")
+ CheckBadRequestStatus(t, resp)
+
+ _, resp = th.Client.GetJobStatus(status.Id)
+ CheckForbiddenStatus(t, resp)
+
+ _, resp = th.SystemAdminClient.GetJobStatus(model.NewId())
+ CheckNotFoundStatus(t, resp)
+}
+
+func TestGetJobStatusesByType(t *testing.T) {
+ th := Setup().InitBasic().InitSystemAdmin()
+ defer TearDown()
+
+ jobType := model.NewId()
+
+ statuses := []*model.JobStatus{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1000,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 999,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1001,
+ },
+ }
+
+ for _, status := range statuses {
+ store.Must(app.Srv.Store.JobStatus().SaveOrUpdate(status))
+ defer app.Srv.Store.JobStatus().Delete(status.Id)
+ }
+
+ received, resp := th.SystemAdminClient.GetJobStatusesByType(jobType, 0, 2)
+ CheckNoError(t, resp)
+
+ if len(received) != 2 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[1].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != statuses[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ received, resp = th.SystemAdminClient.GetJobStatusesByType(jobType, 1, 2)
+ CheckNoError(t, resp)
+
+ if len(received) != 1 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[2].Id {
+ t.Fatal("should've received oldest job last")
+ }
+
+ _, resp = th.SystemAdminClient.GetJobStatusesByType("", 0, 60)
+ CheckNotFoundStatus(t, resp)
+
+ _, resp = th.SystemAdminClient.GetJobStatusesByType(strings.Repeat("a", 33), 0, 60)
+ CheckBadRequestStatus(t, resp)
+
+ _, resp = th.Client.GetJobStatusesByType(jobType, 0, 60)
+ CheckForbiddenStatus(t, resp)
+}
diff --git a/api4/params.go b/api4/params.go
index 785b2267b..aa865fd2a 100644
--- a/api4/params.go
+++ b/api4/params.go
@@ -35,6 +35,8 @@ type ApiParams struct {
EmojiName string
Category string
Service string
+ JobId string
+ JobType string
Page int
PerPage int
}
@@ -116,6 +118,14 @@ func ApiParamsFromRequest(r *http.Request) *ApiParams {
params.EmojiName = val
}
+ if val, ok := props["job_id"]; ok {
+ params.JobId = val
+ }
+
+ if val, ok := props["job_type"]; ok {
+ params.JobType = val
+ }
+
if val, err := strconv.Atoi(r.URL.Query().Get("page")); err != nil || val < 0 {
params.Page = PAGE_DEFAULT
} else {
diff --git a/app/job.go b/app/job.go
new file mode 100644
index 000000000..00439e4d2
--- /dev/null
+++ b/app/job.go
@@ -0,0 +1,28 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "github.com/mattermost/platform/model"
+)
+
+func GetJobStatus(id string) (*model.JobStatus, *model.AppError) {
+ if result := <-Srv.Store.JobStatus().Get(id); result.Err != nil {
+ return nil, result.Err
+ } else {
+ return result.Data.(*model.JobStatus), nil
+ }
+}
+
+func GetJobStatusesByTypePage(jobType string, page int, perPage int) ([]*model.JobStatus, *model.AppError) {
+ return GetJobStatusesByType(jobType, page*perPage, perPage)
+}
+
+func GetJobStatusesByType(jobType string, offset int, limit int) ([]*model.JobStatus, *model.AppError) {
+ if result := <-Srv.Store.JobStatus().GetAllByTypePage(jobType, offset, limit); result.Err != nil {
+ return nil, result.Err
+ } else {
+ return result.Data.([]*model.JobStatus), nil
+ }
+}
diff --git a/app/job_test.go b/app/job_test.go
new file mode 100644
index 000000000..20e9dee8a
--- /dev/null
+++ b/app/job_test.go
@@ -0,0 +1,78 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "testing"
+
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/store"
+)
+
+func TestGetJobStatus(t *testing.T) {
+ Setup()
+
+ status := &model.JobStatus{
+ Id: model.NewId(),
+ Status: model.NewId(),
+ }
+ if result := <-Srv.Store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer Srv.Store.JobStatus().Delete(status.Id)
+
+ if received, err := GetJobStatus(status.Id); err != nil {
+ t.Fatal(err)
+ } else if received.Id != status.Id || received.Status != status.Status {
+ t.Fatal("inccorrect job status received")
+ }
+}
+
+func TestGetJobStatusesByType(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ statuses := []*model.JobStatus{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1000,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 999,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1001,
+ },
+ }
+
+ for _, status := range statuses {
+ store.Must(Srv.Store.JobStatus().SaveOrUpdate(status))
+ defer Srv.Store.JobStatus().Delete(status.Id)
+ }
+
+ if received, err := GetJobStatusesByType(jobType, 0, 2); err != nil {
+ t.Fatal(err)
+ } else if len(received) != 2 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[1].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != statuses[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ if received, err := GetJobStatusesByType(jobType, 2, 2); err != nil {
+ t.Fatal(err)
+ } else if len(received) != 1 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[2].Id {
+ t.Fatal("should've received oldest job last")
+ }
+}
diff --git a/app/license.go b/app/license.go
index 7a00d7fb4..44b700d5b 100644
--- a/app/license.go
+++ b/app/license.go
@@ -4,7 +4,6 @@
package app
import (
- "os"
"strings"
l4g "github.com/alecthomas/log4go"
@@ -23,28 +22,14 @@ func LoadLicense() {
if len(licenseId) != 26 {
// Lets attempt to load the file from disk since it was missing from the DB
- fileName := utils.GetLicenseFileLocation(*utils.Cfg.ServiceSettings.LicenseFileLocation)
-
- if _, err := os.Stat(fileName); err == nil {
- l4g.Info("License key has not been uploaded. Loading license key from disk at %v", fileName)
- licenseBytes := utils.GetLicenseFileFromDisk(fileName)
-
- if success, licenseStr := utils.ValidateLicense(licenseBytes); success {
- licenseFileFromDisk := model.LicenseFromJson(strings.NewReader(licenseStr))
- licenseId = licenseFileFromDisk.Id
- if _, err := SaveLicense(licenseBytes); err != nil {
- l4g.Info("Failed to save license key loaded from disk err=%v", err.Error())
- return
- }
+ license, licenseBytes := utils.GetAndValidateLicenseFileFromDisk()
+
+ if license != nil {
+ if _, err := SaveLicense(licenseBytes); err != nil {
+ l4g.Info("Failed to save license key loaded from disk err=%v", err.Error())
} else {
- l4g.Error("Found license key at %v but it appears to be invalid.", fileName)
- return
+ licenseId = license.Id
}
-
- } else {
- l4g.Info(utils.T("mattermost.load_license.find.warn"))
- l4g.Debug("We could not find the license key in the database or on disk at %v", fileName)
- return
}
}
diff --git a/cmd/platform/init.go b/cmd/platform/init.go
index 7d01eb890..b650cf2fd 100644
--- a/cmd/platform/init.go
+++ b/cmd/platform/init.go
@@ -1,29 +1,12 @@
package main
import (
- "fmt"
-
"github.com/mattermost/platform/app"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/utils"
"github.com/spf13/cobra"
)
-func doLoadConfig(filename string) (err string) {
- defer func() {
- if r := recover(); r != nil {
- err = fmt.Sprintf("%v", r)
- }
- }()
- utils.TranslationsPreInit()
- utils.EnableConfigFromEnviromentVars()
- utils.LoadConfig(filename)
- utils.InitializeConfigWatch()
- utils.EnableConfigWatch()
-
- return ""
-}
-
func initDBCommandContextCobra(cmd *cobra.Command) error {
config, err := cmd.Flags().GetString("config")
if err != nil {
@@ -35,7 +18,7 @@ func initDBCommandContextCobra(cmd *cobra.Command) error {
}
func initDBCommandContext(configFileLocation string) {
- if errstr := doLoadConfig(configFileLocation); errstr != "" {
+ if errstr := utils.InitAndLoadConfig(configFileLocation); errstr != "" {
return
}
diff --git a/cmd/platform/mattermost.go b/cmd/platform/mattermost.go
index a95825b55..1646faf85 100644
--- a/cmd/platform/mattermost.go
+++ b/cmd/platform/mattermost.go
@@ -14,6 +14,9 @@ import (
// Plugins
_ "github.com/mattermost/platform/model/gitlab"
+ // Enterprise Imports
+ _ "github.com/mattermost/platform/imports"
+
// Enterprise Deps
_ "github.com/dgryski/dgoogauth"
_ "github.com/go-ldap/ldap"
@@ -26,8 +29,6 @@ import (
_ "gopkg.in/olivere/elastic.v5"
)
-//ENTERPRISE_IMPORTS
-
func main() {
if err := rootCmd.Execute(); err != nil {
os.Exit(1)
diff --git a/cmd/platform/server.go b/cmd/platform/server.go
index cb1530951..9846f8de9 100644
--- a/cmd/platform/server.go
+++ b/cmd/platform/server.go
@@ -14,6 +14,7 @@ import (
"github.com/mattermost/platform/api4"
"github.com/mattermost/platform/app"
"github.com/mattermost/platform/einterfaces"
+ "github.com/mattermost/platform/jobs"
"github.com/mattermost/platform/manualtesting"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/utils"
@@ -43,7 +44,7 @@ func runServerCmd(cmd *cobra.Command, args []string) error {
}
func runServer(configFileLocation string) {
- if errstr := doLoadConfig(configFileLocation); errstr != "" {
+ if errstr := utils.InitAndLoadConfig(configFileLocation); errstr != "" {
l4g.Exit("Unable to load mattermost configuration file: ", errstr)
return
}
@@ -120,6 +121,8 @@ func runServer(configFileLocation string) {
}
}
+ jobs := jobs.InitJobs(app.Srv.Store).Start()
+
// wait for kill signal before attempting to gracefully shutdown
// the running service
c := make(chan os.Signal)
@@ -134,6 +137,8 @@ func runServer(configFileLocation string) {
einterfaces.GetMetricsInterface().StopServer()
}
+ jobs.Stop()
+
app.StopServer()
}
diff --git a/config/config.json b/config/config.json
index 8112331b8..b9c0516fa 100644
--- a/config/config.json
+++ b/config/config.json
@@ -273,5 +273,8 @@
"TurnURI": "",
"TurnUsername": "",
"TurnSharedKey": ""
+ },
+ "DataRetentionSettings": {
+ "Enable": false
}
}
diff --git a/einterfaces/jobs/data_retention.go b/einterfaces/jobs/data_retention.go
new file mode 100644
index 000000000..340ed1b88
--- /dev/null
+++ b/einterfaces/jobs/data_retention.go
@@ -0,0 +1,23 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/store"
+)
+
+type DataRetentionInterface interface {
+ MakeJob(store store.Store) model.Job
+}
+
+var theDataRetentionInterface DataRetentionInterface
+
+func RegisterDataRetentionInterface(newInterface DataRetentionInterface) {
+ theDataRetentionInterface = newInterface
+}
+
+func GetDataRetentionInterface() DataRetentionInterface {
+ return theDataRetentionInterface
+}
diff --git a/i18n/en.json b/i18n/en.json
index 214e91837..f6f6b48a3 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -5096,6 +5096,30 @@
"translation": "We couldn't save the file info"
},
{
+ "id": "store.sql_file_info.save_or_update.app_error",
+ "translation": "We couldn't save or update the file info"
+ },
+ {
+ "id": "store.sql_job_status.delete_by_type.app_error",
+ "translation": "We couldn't delete the job status"
+ },
+ {
+ "id": "store.sql_job_status.get.app_error",
+ "translation": "We couldn't get the job status"
+ },
+ {
+ "id": "store.sql_job_status.get_all.app_error",
+ "translation": "We couldn't get all job statuses"
+ },
+ {
+ "id": "store.sql_job_status.save.app_error",
+ "translation": "We couldn't save the job status"
+ },
+ {
+ "id": "store.sql_job_status.update.app_error",
+ "translation": "We couldn't update the job status"
+ },
+ {
"id": "store.sql_license.get.app_error",
"translation": "We encountered an error getting the license"
},
diff --git a/imports/placeholder.go b/imports/placeholder.go
new file mode 100644
index 000000000..98e5decd5
--- /dev/null
+++ b/imports/placeholder.go
@@ -0,0 +1,6 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package imports
+
+// This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty
diff --git a/jobs/jobs.go b/jobs/jobs.go
new file mode 100644
index 000000000..8c84f4eea
--- /dev/null
+++ b/jobs/jobs.go
@@ -0,0 +1,74 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "sync"
+
+ l4g "github.com/alecthomas/log4go"
+ ejobs "github.com/mattermost/platform/einterfaces/jobs"
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/store"
+ "github.com/mattermost/platform/utils"
+)
+
+type Jobs struct {
+ startOnce sync.Once
+
+ DataRetention model.Job
+ // SearchIndexing model.Job
+
+ listenerId string
+}
+
+func InitJobs(s store.Store) *Jobs {
+ jobs := &Jobs{
+ // SearchIndexing: MakeTestJob(s, "SearchIndexing"),
+ }
+
+ if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
+ jobs.DataRetention = dataRetentionInterface.MakeJob(s)
+ }
+
+ return jobs
+}
+
+func (jobs *Jobs) Start() *Jobs {
+ l4g.Info("Starting jobs")
+
+ jobs.startOnce.Do(func() {
+ if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ go jobs.DataRetention.Run()
+ }
+
+ // go jobs.SearchIndexing.Run()
+ })
+
+ jobs.listenerId = utils.AddConfigListener(jobs.handleConfigChange)
+
+ return jobs
+}
+
+func (jobs *Jobs) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
+ if jobs.DataRetention != nil {
+ if !*oldConfig.DataRetentionSettings.Enable && *newConfig.DataRetentionSettings.Enable {
+ go jobs.DataRetention.Run()
+ } else if *oldConfig.DataRetentionSettings.Enable && !*newConfig.DataRetentionSettings.Enable {
+ jobs.DataRetention.Stop()
+ }
+ }
+}
+
+func (jobs *Jobs) Stop() *Jobs {
+ utils.RemoveConfigListener(jobs.listenerId)
+
+ if jobs.DataRetention != nil && *utils.Cfg.DataRetentionSettings.Enable {
+ jobs.DataRetention.Stop()
+ }
+ // jobs.SearchIndexing.Stop()
+
+ l4g.Info("Stopped jobs")
+
+ return jobs
+}
diff --git a/jobs/jobserver/jobserver.go b/jobs/jobserver/jobserver.go
new file mode 100644
index 000000000..813676b78
--- /dev/null
+++ b/jobs/jobserver/jobserver.go
@@ -0,0 +1,46 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package main
+
+import (
+ "os"
+ "os/signal"
+ "syscall"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/jobs"
+ "github.com/mattermost/platform/store"
+ "github.com/mattermost/platform/utils"
+
+ _ "github.com/mattermost/platform/imports"
+)
+
+var Srv jobs.JobServer
+
+func main() {
+ // Initialize
+ utils.InitAndLoadConfig("config.json")
+ defer l4g.Close()
+
+ Srv.Store = store.NewSqlStore()
+ defer Srv.Store.Close()
+
+ Srv.LoadLicense()
+
+ // Run jobs
+ l4g.Info("Starting Mattermost job server")
+ Srv.Jobs = jobs.InitJobs(Srv.Store)
+ Srv.Jobs.Start()
+
+ var signalChan chan os.Signal = make(chan os.Signal)
+ signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+ <-signalChan
+
+ // Cleanup anything that isn't handled by a defer statement
+ l4g.Info("Stopping Mattermost job server")
+
+ Srv.Jobs.Stop()
+
+ l4g.Info("Stopped Mattermost job server")
+}
diff --git a/jobs/server.go b/jobs/server.go
new file mode 100644
index 000000000..dd3448842
--- /dev/null
+++ b/jobs/server.go
@@ -0,0 +1,46 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/store"
+ "github.com/mattermost/platform/utils"
+)
+
+type JobServer struct {
+ Store store.Store
+ Jobs *Jobs
+}
+
+func (server *JobServer) LoadLicense() {
+ licenseId := ""
+ if result := <-server.Store.System().Get(); result.Err == nil {
+ props := result.Data.(model.StringMap)
+ licenseId = props[model.SYSTEM_ACTIVE_LICENSE_ID]
+ }
+
+ var licenseBytes []byte
+
+ if len(licenseId) != 26 {
+ // Lets attempt to load the file from disk since it was missing from the DB
+ _, licenseBytes = utils.GetAndValidateLicenseFileFromDisk()
+ } else {
+ if result := <-server.Store.License().Get(licenseId); result.Err == nil {
+ record := result.Data.(*model.LicenseRecord)
+ licenseBytes = []byte(record.Bytes)
+ l4g.Info("License key valid unlocking enterprise features.")
+ } else {
+ l4g.Info(utils.T("mattermost.load_license.find.warn"))
+ }
+ }
+
+ if licenseBytes != nil {
+ utils.LoadLicense(licenseBytes)
+ l4g.Info("License key valid unlocking enterprise features.")
+ } else {
+ l4g.Info(utils.T("mattermost.load_license.find.warn"))
+ }
+}
diff --git a/jobs/testjob.go b/jobs/testjob.go
new file mode 100644
index 000000000..59d5274e5
--- /dev/null
+++ b/jobs/testjob.go
@@ -0,0 +1,54 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package jobs
+
+import (
+ "time"
+
+ l4g "github.com/alecthomas/log4go"
+ "github.com/mattermost/platform/store"
+)
+
+type TestJob struct {
+ store store.Store
+
+ name string
+ stop chan bool
+ stopped chan bool
+}
+
+func MakeTestJob(s store.Store, name string) *TestJob {
+ return &TestJob{
+ store: s,
+ name: name,
+ stop: make(chan bool, 1),
+ stopped: make(chan bool, 1),
+ }
+}
+
+func (job *TestJob) Run() {
+ l4g.Debug("Job %v: Started", job.name)
+
+ running := true
+ for running {
+ l4g.Debug("Job %v: Tick", job.name)
+
+ select {
+ case <-job.stop:
+ l4g.Debug("Job %v: Received stop signal", job.name)
+ running = false
+ case <-time.After(10 * time.Second):
+ continue
+ }
+ }
+
+ l4g.Debug("Job %v: Finished", job.name)
+ job.stopped <- true
+}
+
+func (job *TestJob) Stop() {
+ l4g.Debug("Job %v: Stopping", job.name)
+ job.stop <- true
+ <-job.stopped
+}
diff --git a/model/client4.go b/model/client4.go
index f4a247e12..28434c2e5 100644
--- a/model/client4.go
+++ b/model/client4.go
@@ -254,6 +254,10 @@ func (c *Client4) GetOpenGraphRoute() string {
return fmt.Sprintf("/opengraph")
}
+func (c *Client4) GetJobsRoute() string {
+ return fmt.Sprintf("/jobs")
+}
+
func (c *Client4) DoApiGet(url string, etag string) (*http.Response, *AppError) {
return c.DoApiRequest(http.MethodGet, c.ApiUrl+url, "", etag)
}
@@ -2638,3 +2642,25 @@ func (c *Client4) OpenGraph(url string) (map[string]string, *Response) {
return MapFromJson(r.Body), BuildResponse(r)
}
}
+
+// Jobs Section
+
+// GetJobStatus gets the status of a single job.
+func (c *Client4) GetJobStatus(id string) (*JobStatus, *Response) {
+ if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/%v/status", id), ""); err != nil {
+ return nil, &Response{StatusCode: r.StatusCode, Error: err}
+ } else {
+ defer closeBody(r)
+ return JobStatusFromJson(r.Body), BuildResponse(r)
+ }
+}
+
+// GetJobStatusesByType gets the status of all jobs of a given type, sorted with the job that most recently started first.
+func (c *Client4) GetJobStatusesByType(jobType string, page int, perPage int) ([]*JobStatus, *Response) {
+ if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/type/%v/statuses?page=%v&per_page=%v", jobType, page, perPage), ""); err != nil {
+ return nil, &Response{StatusCode: r.StatusCode, Error: err}
+ } else {
+ defer closeBody(r)
+ return JobStatusesFromJson(r.Body), BuildResponse(r)
+ }
+}
diff --git a/model/config.go b/model/config.go
index 14f092373..8c1814792 100644
--- a/model/config.go
+++ b/model/config.go
@@ -410,6 +410,10 @@ type ElasticSearchSettings struct {
Sniff *bool
}
+type DataRetentionSettings struct {
+ Enable *bool
+}
+
type Config struct {
ServiceSettings ServiceSettings
TeamSettings TeamSettings
@@ -434,6 +438,7 @@ type Config struct {
AnalyticsSettings AnalyticsSettings
WebrtcSettings WebrtcSettings
ElasticSearchSettings ElasticSearchSettings
+ DataRetentionSettings DataRetentionSettings
}
func (o *Config) ToJson() string {
@@ -1257,6 +1262,11 @@ func (o *Config) SetDefaults() {
*o.ElasticSearchSettings.Sniff = true
}
+ if o.DataRetentionSettings.Enable == nil {
+ o.DataRetentionSettings.Enable = new(bool)
+ *o.DataRetentionSettings.Enable = false
+ }
+
o.defaultWebrtcSettings()
}
diff --git a/model/job.go b/model/job.go
index 453828bd2..d539b5bf9 100644
--- a/model/job.go
+++ b/model/job.go
@@ -1,110 +1,9 @@
-// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package model
-import (
- "fmt"
- "sync"
- "time"
-)
-
-type TaskFunc func()
-
-type ScheduledTask struct {
- Name string `json:"name"`
- Interval time.Duration `json:"interval"`
- Recurring bool `json:"recurring"`
- function TaskFunc
- timer *time.Timer
-}
-
-var taskMutex = sync.Mutex{}
-var tasks = make(map[string]*ScheduledTask)
-
-func addTask(task *ScheduledTask) {
- taskMutex.Lock()
- defer taskMutex.Unlock()
- tasks[task.Name] = task
-}
-
-func removeTaskByName(name string) {
- taskMutex.Lock()
- defer taskMutex.Unlock()
- delete(tasks, name)
-}
-
-func GetTaskByName(name string) *ScheduledTask {
- taskMutex.Lock()
- defer taskMutex.Unlock()
- if task, ok := tasks[name]; ok {
- return task
- }
- return nil
-}
-
-func GetAllTasks() *map[string]*ScheduledTask {
- taskMutex.Lock()
- defer taskMutex.Unlock()
- return &tasks
-}
-
-func CreateTask(name string, function TaskFunc, timeToExecution time.Duration) *ScheduledTask {
- task := &ScheduledTask{
- Name: name,
- Interval: timeToExecution,
- Recurring: false,
- function: function,
- }
-
- taskRunner := func() {
- go task.function()
- removeTaskByName(task.Name)
- }
-
- task.timer = time.AfterFunc(timeToExecution, taskRunner)
-
- addTask(task)
-
- return task
-}
-
-func CreateRecurringTask(name string, function TaskFunc, interval time.Duration) *ScheduledTask {
- task := &ScheduledTask{
- Name: name,
- Interval: interval,
- Recurring: true,
- function: function,
- }
-
- taskRecurer := func() {
- go task.function()
- task.timer.Reset(task.Interval)
- }
-
- task.timer = time.AfterFunc(interval, taskRecurer)
-
- addTask(task)
-
- return task
-}
-
-func (task *ScheduledTask) Cancel() {
- task.timer.Stop()
- removeTaskByName(task.Name)
-}
-
-// Executes the task immediatly. A recurring task will be run regularally after interval.
-func (task *ScheduledTask) Execute() {
- task.function()
- task.timer.Reset(task.Interval)
-}
-
-func (task *ScheduledTask) String() string {
- return fmt.Sprintf(
- "%s\nInterval: %s\nRecurring: %t\n",
- task.Name,
- task.Interval.String(),
- task.Recurring,
- )
+type Job interface {
+ Run()
+ Stop()
}
diff --git a/model/job_status.go b/model/job_status.go
new file mode 100644
index 000000000..cf490648f
--- /dev/null
+++ b/model/job_status.go
@@ -0,0 +1,59 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "encoding/json"
+ "io"
+)
+
+const (
+ JOB_TYPE_DATA_RETENTION = "data_retention"
+ JOB_TYPE_SEARCH_INDEXING = "search_indexing"
+)
+
+type JobStatus struct {
+ Id string `json:"id"`
+ Type string `json:"type"`
+ StartAt int64 `json:"start_at"`
+ LastActivityAt int64 `json:"last_activity_at"`
+ LastRunStartedAt int64 `json:"last_run_started_at"`
+ LastRunCompletedAt int64 `json:"last_run_completed_at"`
+ Status string `json:"status"`
+ Data map[string]interface{} `json:"data"`
+}
+
+func (js *JobStatus) ToJson() string {
+ if b, err := json.Marshal(js); err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+func JobStatusFromJson(data io.Reader) *JobStatus {
+ var status JobStatus
+ if err := json.NewDecoder(data).Decode(&status); err == nil {
+ return &status
+ } else {
+ return nil
+ }
+}
+
+func JobStatusesToJson(statuses []*JobStatus) string {
+ if b, err := json.Marshal(statuses); err != nil {
+ return ""
+ } else {
+ return string(b)
+ }
+}
+
+func JobStatusesFromJson(data io.Reader) []*JobStatus {
+ var statuses []*JobStatus
+ if err := json.NewDecoder(data).Decode(&statuses); err == nil {
+ return statuses
+ } else {
+ return nil
+ }
+}
diff --git a/model/scheduled_task.go b/model/scheduled_task.go
new file mode 100644
index 000000000..453828bd2
--- /dev/null
+++ b/model/scheduled_task.go
@@ -0,0 +1,110 @@
+// Copyright (c) 2016-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package model
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+type TaskFunc func()
+
+type ScheduledTask struct {
+ Name string `json:"name"`
+ Interval time.Duration `json:"interval"`
+ Recurring bool `json:"recurring"`
+ function TaskFunc
+ timer *time.Timer
+}
+
+var taskMutex = sync.Mutex{}
+var tasks = make(map[string]*ScheduledTask)
+
+func addTask(task *ScheduledTask) {
+ taskMutex.Lock()
+ defer taskMutex.Unlock()
+ tasks[task.Name] = task
+}
+
+func removeTaskByName(name string) {
+ taskMutex.Lock()
+ defer taskMutex.Unlock()
+ delete(tasks, name)
+}
+
+func GetTaskByName(name string) *ScheduledTask {
+ taskMutex.Lock()
+ defer taskMutex.Unlock()
+ if task, ok := tasks[name]; ok {
+ return task
+ }
+ return nil
+}
+
+func GetAllTasks() *map[string]*ScheduledTask {
+ taskMutex.Lock()
+ defer taskMutex.Unlock()
+ return &tasks
+}
+
+func CreateTask(name string, function TaskFunc, timeToExecution time.Duration) *ScheduledTask {
+ task := &ScheduledTask{
+ Name: name,
+ Interval: timeToExecution,
+ Recurring: false,
+ function: function,
+ }
+
+ taskRunner := func() {
+ go task.function()
+ removeTaskByName(task.Name)
+ }
+
+ task.timer = time.AfterFunc(timeToExecution, taskRunner)
+
+ addTask(task)
+
+ return task
+}
+
+func CreateRecurringTask(name string, function TaskFunc, interval time.Duration) *ScheduledTask {
+ task := &ScheduledTask{
+ Name: name,
+ Interval: interval,
+ Recurring: true,
+ function: function,
+ }
+
+ taskRecurer := func() {
+ go task.function()
+ task.timer.Reset(task.Interval)
+ }
+
+ task.timer = time.AfterFunc(interval, taskRecurer)
+
+ addTask(task)
+
+ return task
+}
+
+func (task *ScheduledTask) Cancel() {
+ task.timer.Stop()
+ removeTaskByName(task.Name)
+}
+
+// Executes the task immediatly. A recurring task will be run regularally after interval.
+func (task *ScheduledTask) Execute() {
+ task.function()
+ task.timer.Reset(task.Interval)
+}
+
+func (task *ScheduledTask) String() string {
+ return fmt.Sprintf(
+ "%s\nInterval: %s\nRecurring: %t\n",
+ task.Name,
+ task.Interval.String(),
+ task.Recurring,
+ )
+}
diff --git a/model/job_test.go b/model/scheduled_task_test.go
index 6ba8edaf9..6ba8edaf9 100644
--- a/model/job_test.go
+++ b/model/scheduled_task_test.go
diff --git a/store/sql_job_status_store.go b/store/sql_job_status_store.go
new file mode 100644
index 000000000..ef039d99a
--- /dev/null
+++ b/store/sql_job_status_store.go
@@ -0,0 +1,190 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "database/sql"
+ "net/http"
+
+ "github.com/mattermost/platform/model"
+)
+
+type SqlJobStatusStore struct {
+ *SqlStore
+}
+
+func NewSqlJobStatusStore(sqlStore *SqlStore) JobStatusStore {
+ s := &SqlJobStatusStore{sqlStore}
+
+ for _, db := range sqlStore.GetAllConns() {
+ table := db.AddTableWithName(model.JobStatus{}, "JobStatuses").SetKeys(false, "Id")
+ table.ColMap("Id").SetMaxSize(26)
+ table.ColMap("Type").SetMaxSize(32)
+ table.ColMap("Status").SetMaxSize(32)
+ table.ColMap("Data").SetMaxSize(1024)
+ }
+
+ return s
+}
+
+func (jss SqlJobStatusStore) CreateIndexesIfNotExists() {
+ jss.CreateIndexIfNotExists("idx_jobstatuses_type", "JobStatuses", "Type")
+}
+
+func (jss SqlJobStatusStore) SaveOrUpdate(status *model.JobStatus) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if err := jss.GetReplica().SelectOne(&model.JobStatus{},
+ `SELECT
+ *
+ FROM
+ JobStatuses
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": status.Id}); err == nil {
+ if _, err := jss.GetMaster().Update(status); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
+ "store.sql_job_status.update.app_error", nil, "id="+status.Id+", "+err.Error())
+ }
+ } else if err == sql.ErrNoRows {
+ if err := jss.GetMaster().Insert(status); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
+ "store.sql_job_status.save.app_error", nil, "id="+status.Id+", "+err.Error())
+ }
+ } else {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.SaveOrUpdate",
+ "store.sql_job_status.save_or_update.app_error", nil, "id="+status.Id+", "+err.Error())
+ }
+
+ if result.Err == nil {
+ result.Data = status
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStatusStore) Get(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var status *model.JobStatus
+
+ if err := jss.GetReplica().SelectOne(&status,
+ `SELECT
+ *
+ FROM
+ JobStatuses
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ if err == sql.ErrNoRows {
+ result.Err = model.NewAppError("SqlJobStatusStore.Get",
+ "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
+ } else {
+ result.Err = model.NewAppError("SqlJobStatusStore.Get",
+ "store.sql_job_status.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
+ }
+ } else {
+ result.Data = status
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStatusStore) GetAllByType(jobType string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.JobStatus
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ JobStatuses
+ WHERE
+ Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByType",
+ "store.sql_job_status.get_all_by_type.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStatusStore) GetAllByTypePage(jobType string, offset int, limit int) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.JobStatus
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ JobStatuses
+ WHERE
+ Type = :Type
+ ORDER BY
+ StartAt ASC
+ LIMIT
+ :Limit
+ OFFSET
+ :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.GetAllByTypePage",
+ "store.sql_job_status.get_all_by_type_page.app_error", nil, "Type="+jobType+", "+err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
+func (jss SqlJobStatusStore) Delete(id string) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ if _, err := jss.GetReplica().Exec(
+ `DELETE FROM
+ JobStatuses
+ WHERE
+ Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStatusStore.DeleteByType",
+ "store.sql_job_status.delete.app_error", nil, "id="+id+", "+err.Error())
+ } else {
+ result.Data = id
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
diff --git a/store/sql_job_status_store_test.go b/store/sql_job_status_store_test.go
new file mode 100644
index 000000000..18c29e522
--- /dev/null
+++ b/store/sql_job_status_store_test.go
@@ -0,0 +1,151 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package store
+
+import (
+ "testing"
+
+ "github.com/mattermost/platform/model"
+)
+
+func TestJobStatusSaveGetUpdate(t *testing.T) {
+ Setup()
+
+ status := &model.JobStatus{
+ Id: model.NewId(),
+ Type: model.NewId(),
+ Status: model.NewId(),
+ Data: map[string]interface{}{
+ "Processed": 0,
+ "Total": 12345,
+ "LastProcessed": "abcd",
+ },
+ }
+
+ if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ defer func() {
+ <-store.JobStatus().Delete(status.Id)
+ }()
+
+ if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.(*model.JobStatus); received.Id != status.Id {
+ t.Fatal("received incorrect status after save")
+ }
+
+ status.Status = model.NewId()
+ status.Data = map[string]interface{}{
+ "Processed": 12345,
+ "Total": 12345,
+ "LastProcessed": "abcd",
+ }
+
+ if result := <-store.JobStatus().SaveOrUpdate(status); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+
+ if result := <-store.JobStatus().Get(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.(*model.JobStatus); received.Id != status.Id || received.Status != status.Status {
+ t.Fatal("received incorrect status after update")
+ }
+}
+
+func TestJobStatusGetAllByType(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ statuses := []*model.JobStatus{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ },
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ },
+ }
+
+ for _, status := range statuses {
+ Must(store.JobStatus().SaveOrUpdate(status))
+ defer store.JobStatus().Delete(status.Id)
+ }
+
+ if result := <-store.JobStatus().GetAllByType(jobType); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[0].Id && received[1].Id != statuses[0].Id {
+ t.Fatal("should've received first status")
+ } else if received[0].Id != statuses[1].Id && received[1].Id != statuses[1].Id {
+ t.Fatal("should've received second status")
+ }
+}
+
+func TestJobStatusGetAllByTypePage(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ statuses := []*model.JobStatus{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1000,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 999,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ StartAt: 1001,
+ },
+ }
+
+ for _, status := range statuses {
+ Must(store.JobStatus().SaveOrUpdate(status))
+ defer store.JobStatus().Delete(status.Id)
+ }
+
+ if result := <-store.JobStatus().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.JobStatus); len(received) != 2 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[1].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != statuses[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ if result := <-store.JobStatus().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.JobStatus); len(received) != 1 {
+ t.Fatal("received wrong number of statuses")
+ } else if received[0].Id != statuses[2].Id {
+ t.Fatal("should've received oldest job last")
+ }
+}
+
+func TestJobStatusDelete(t *testing.T) {
+ Setup()
+
+ status := Must(store.JobStatus().SaveOrUpdate(&model.JobStatus{
+ Id: model.NewId(),
+ })).(*model.JobStatus)
+
+ if result := <-store.JobStatus().Delete(status.Id); result.Err != nil {
+ t.Fatal(result.Err)
+ }
+}
diff --git a/store/sql_store.go b/store/sql_store.go
index f13fe2ec0..4261c849a 100644
--- a/store/sql_store.go
+++ b/store/sql_store.go
@@ -87,6 +87,7 @@ type SqlStore struct {
status StatusStore
fileInfo FileInfoStore
reaction ReactionStore
+ jobStatus JobStatusStore
SchemaVersion string
rrCounter int64
srCounter int64
@@ -151,6 +152,7 @@ func NewSqlStore() Store {
sqlStore.status = NewSqlStatusStore(sqlStore)
sqlStore.fileInfo = NewSqlFileInfoStore(sqlStore)
sqlStore.reaction = NewSqlReactionStore(sqlStore)
+ sqlStore.jobStatus = NewSqlJobStatusStore(sqlStore)
err := sqlStore.master.CreateTablesIfNotExists()
if err != nil {
@@ -179,6 +181,7 @@ func NewSqlStore() Store {
sqlStore.status.(*SqlStatusStore).CreateIndexesIfNotExists()
sqlStore.fileInfo.(*SqlFileInfoStore).CreateIndexesIfNotExists()
sqlStore.reaction.(*SqlReactionStore).CreateIndexesIfNotExists()
+ sqlStore.jobStatus.(*SqlJobStatusStore).CreateIndexesIfNotExists()
sqlStore.preference.(*SqlPreferenceStore).DeleteUnusedFeatures()
@@ -735,6 +738,10 @@ func (ss *SqlStore) Reaction() ReactionStore {
return ss.reaction
}
+func (ss *SqlStore) JobStatus() JobStatusStore {
+ return ss.jobStatus
+}
+
func (ss *SqlStore) DropAllTables() {
ss.master.TruncateTables()
}
@@ -752,6 +759,8 @@ func (me mattermConverter) ToDb(val interface{}) (interface{}, error) {
return encrypt([]byte(utils.Cfg.SqlSettings.AtRestEncryptKey), model.MapToJson(t))
case model.StringInterface:
return model.StringInterfaceToJson(t), nil
+ case map[string]interface{}:
+ return model.StringInterfaceToJson(model.StringInterface(t)), nil
}
return val, nil
@@ -805,6 +814,16 @@ func (me mattermConverter) FromDb(target interface{}) (gorp.CustomScanner, bool)
return json.Unmarshal(b, target)
}
return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true
+ case *map[string]interface{}:
+ binder := func(holder, target interface{}) error {
+ s, ok := holder.(*string)
+ if !ok {
+ return errors.New(utils.T("store.sql.convert_string_interface"))
+ }
+ b := []byte(*s)
+ return json.Unmarshal(b, target)
+ }
+ return gorp.CustomScanner{Holder: new(string), Target: target, Binder: binder}, true
}
return gorp.CustomScanner{}, false
diff --git a/store/store.go b/store/store.go
index acbeafdd6..cd7792ce1 100644
--- a/store/store.go
+++ b/store/store.go
@@ -47,6 +47,7 @@ type Store interface {
Status() StatusStore
FileInfo() FileInfoStore
Reaction() ReactionStore
+ JobStatus() JobStatusStore
MarkSystemRanUnitTests()
Close()
DropAllTables()
@@ -371,3 +372,11 @@ type ReactionStore interface {
GetForPost(postId string, allowFromCache bool) StoreChannel
DeleteAllWithEmojiName(emojiName string) StoreChannel
}
+
+type JobStatusStore interface {
+ SaveOrUpdate(status *model.JobStatus) StoreChannel
+ Get(id string) StoreChannel
+ GetAllByType(jobType string) StoreChannel
+ GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
+ Delete(id string) StoreChannel
+}
diff --git a/utils/config.go b/utils/config.go
index 234acae11..95cfc43aa 100644
--- a/utils/config.go
+++ b/utils/config.go
@@ -50,6 +50,22 @@ func SetSiteURL(url string) {
siteURL = strings.TrimRight(url, "/")
}
+var cfgListeners = map[string]func(*model.Config, *model.Config){}
+
+// Registers a function with a given to be called when the config is reloaded and may have changed. The function
+// will be called with two arguments: the old config and the new config. AddConfigListener returns a unique ID
+// for the listener that can later be used to remove it.
+func AddConfigListener(listener func(*model.Config, *model.Config)) string {
+ id := model.NewId()
+ cfgListeners[id] = listener
+ return id
+}
+
+// Removes a listener function by the unique ID returned when AddConfigListener was called
+func RemoveConfigListener(id string) {
+ delete(cfgListeners, id)
+}
+
func FindConfigFile(fileName string) string {
if _, err := os.Stat("./config/" + fileName); err == nil {
fileName, _ = filepath.Abs("./config/" + fileName)
@@ -242,6 +258,21 @@ func DisableConfigWatch() {
}
}
+func InitAndLoadConfig(filename string) (err string) {
+ defer func() {
+ if r := recover(); r != nil {
+ err = fmt.Sprintf("%v", r)
+ }
+ }()
+ TranslationsPreInit()
+ EnableConfigFromEnviromentVars()
+ LoadConfig(filename)
+ InitializeConfigWatch()
+ EnableConfigWatch()
+
+ return ""
+}
+
// LoadConfig will try to search around for the corresponding config file.
// It will search /tmp/fileName then attempt ./config/fileName,
// then ../config/fileName and last it will look at fileName
@@ -249,6 +280,9 @@ func LoadConfig(fileName string) {
cfgMutex.Lock()
defer cfgMutex.Unlock()
+ // Cfg should never be null
+ oldConfig := *Cfg
+
fileNameWithExtension := filepath.Base(fileName)
fileExtension := filepath.Ext(fileNameWithExtension)
fileDir := filepath.Dir(fileName)
@@ -339,6 +373,10 @@ func LoadConfig(fileName string) {
SetDefaultRolesBasedOnConfig()
SetSiteURL(*Cfg.ServiceSettings.SiteURL)
+
+ for _, listener := range cfgListeners {
+ listener(&oldConfig, &config)
+ }
}
func RegenerateClientConfig() {
diff --git a/utils/config_test.go b/utils/config_test.go
index 755cd9acd..bce85d2ae 100644
--- a/utils/config_test.go
+++ b/utils/config_test.go
@@ -6,6 +6,8 @@ package utils
import (
"os"
"testing"
+
+ "github.com/mattermost/platform/model"
)
func TestConfig(t *testing.T) {
@@ -59,3 +61,80 @@ func TestConfigFromEnviroVars(t *testing.T) {
}
}
+
+func TestAddRemoveConfigListener(t *testing.T) {
+ if len(cfgListeners) != 0 {
+ t.Fatal("should've started with 0 listeners")
+ }
+
+ id1 := AddConfigListener(func(*model.Config, *model.Config) {
+ })
+ if len(cfgListeners) != 1 {
+ t.Fatal("should now have 1 listener")
+ }
+
+ id2 := AddConfigListener(func(*model.Config, *model.Config) {
+ })
+ if len(cfgListeners) != 2 {
+ t.Fatal("should now have 2 listeners")
+ }
+
+ RemoveConfigListener(id1)
+ if len(cfgListeners) != 1 {
+ t.Fatal("should've removed first listener")
+ }
+
+ RemoveConfigListener(id2)
+ if len(cfgListeners) != 0 {
+ t.Fatal("should've removed both listeners")
+ }
+}
+
+func TestConfigListener(t *testing.T) {
+ TranslationsPreInit()
+ EnableConfigFromEnviromentVars()
+ LoadConfig("config.json")
+
+ SiteName := Cfg.TeamSettings.SiteName
+ defer func() {
+ Cfg.TeamSettings.SiteName = SiteName
+ SaveConfig(CfgFileName, Cfg)
+ }()
+ Cfg.TeamSettings.SiteName = "test123"
+
+ listenerCalled := false
+ listener := func(oldConfig *model.Config, newConfig *model.Config) {
+ if listenerCalled {
+ t.Fatal("listener called twice")
+ }
+
+ if oldConfig.TeamSettings.SiteName != "test123" {
+ t.Fatal("old config contains incorrect site name")
+ } else if newConfig.TeamSettings.SiteName != "Mattermost" {
+ t.Fatal("new config contains incorrect site name")
+ }
+
+ listenerCalled = true
+ }
+ listenerId := AddConfigListener(listener)
+ defer RemoveConfigListener(listenerId)
+
+ listener2Called := false
+ listener2 := func(oldConfig *model.Config, newConfig *model.Config) {
+ if listener2Called {
+ t.Fatal("listener2 called twice")
+ }
+
+ listener2Called = true
+ }
+ listener2Id := AddConfigListener(listener2)
+ defer RemoveConfigListener(listener2Id)
+
+ LoadConfig("config.json")
+
+ if !listenerCalled {
+ t.Fatal("listener should've been called")
+ } else if !listener2Called {
+ t.Fatal("listener 2 should've been called")
+ }
+}
diff --git a/utils/license.go b/utils/license.go
index d3e2c1362..c0a17bf79 100644
--- a/utils/license.go
+++ b/utils/license.go
@@ -114,6 +114,25 @@ func ValidateLicense(signed []byte) (bool, string) {
return true, string(plaintext)
}
+func GetAndValidateLicenseFileFromDisk() (*model.License, []byte) {
+ fileName := GetLicenseFileLocation(*Cfg.ServiceSettings.LicenseFileLocation)
+
+ if _, err := os.Stat(fileName); err != nil {
+ l4g.Debug("We could not find the license key in the database or on disk at %v", fileName)
+ return nil, nil
+ }
+
+ l4g.Info("License key has not been uploaded. Loading license key from disk at %v", fileName)
+ licenseBytes := GetLicenseFileFromDisk(fileName)
+
+ if success, licenseStr := ValidateLicense(licenseBytes); !success {
+ l4g.Error("Found license key at %v but it appears to be invalid.", fileName)
+ return nil, nil
+ } else {
+ return model.LicenseFromJson(strings.NewReader(licenseStr)), licenseBytes
+ }
+}
+
func GetLicenseFileFromDisk(fileName string) []byte {
file, err := os.Open(fileName)
if err != nil {