summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--api4/apitestlib.go9
-rw-r--r--api4/job.go81
-rw-r--r--api4/job_test.go182
-rw-r--r--app/job.go21
-rw-r--r--app/job_test.go14
-rw-r--r--i18n/en.json24
-rw-r--r--jobs/jobs.go16
-rw-r--r--model/authorization.go7
-rw-r--r--model/client4.go36
-rw-r--r--model/job.go37
-rw-r--r--store/sql_job_store.go38
-rw-r--r--store/sql_job_store_test.go76
-rw-r--r--store/store.go1
13 files changed, 471 insertions, 71 deletions
diff --git a/api4/apitestlib.go b/api4/apitestlib.go
index 537d8610c..d70b9e5f6 100644
--- a/api4/apitestlib.go
+++ b/api4/apitestlib.go
@@ -24,6 +24,7 @@ import (
"github.com/mattermost/platform/wsapi"
s3 "github.com/minio/minio-go"
+ "github.com/mattermost/platform/jobs"
)
type TestHelper struct {
@@ -68,6 +69,10 @@ func SetupEnterprise() *TestHelper {
*utils.Cfg.TeamSettings.EnableOpenServer = true
}
+ if jobs.Srv.Store == nil {
+ jobs.Srv.Store = app.Srv.Store
+ }
+
th := &TestHelper{}
th.Client = th.CreateClient()
th.SystemAdminClient = th.CreateClient()
@@ -99,6 +104,10 @@ func Setup() *TestHelper {
*utils.Cfg.TeamSettings.EnableOpenServer = true
}
+ if jobs.Srv.Store == nil {
+ jobs.Srv.Store = app.Srv.Store
+ }
+
th := &TestHelper{}
th.Client = th.CreateClient()
th.SystemAdminClient = th.CreateClient()
diff --git a/api4/job.go b/api4/job.go
index e6c17c42d..941e5d543 100644
--- a/api4/job.go
+++ b/api4/job.go
@@ -14,8 +14,11 @@ import (
func InitJob() {
l4g.Info("Initializing job API routes")
- BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}/statuses", ApiSessionRequired(getJobsByType)).Methods("GET")
- BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/status", ApiSessionRequired(getJob)).Methods("GET")
+ BaseRoutes.Jobs.Handle("", ApiSessionRequired(getJobs)).Methods("GET")
+ BaseRoutes.Jobs.Handle("", ApiSessionRequired(createJob)).Methods("POST")
+ BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}", ApiSessionRequired(getJob)).Methods("GET")
+ BaseRoutes.Jobs.Handle("/{job_id:[A-Za-z0-9]+}/cancel", ApiSessionRequired(cancelJob)).Methods("POST")
+ BaseRoutes.Jobs.Handle("/type/{job_type:[A-Za-z0-9_-]+}", ApiSessionRequired(getJobsByType)).Methods("GET")
}
func getJob(c *Context, w http.ResponseWriter, r *http.Request) {
@@ -24,16 +27,55 @@ func getJob(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_SYSTEM) {
- c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM)
+ if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) {
+ c.SetPermissionError(model.PERMISSION_MANAGE_JOBS)
return
}
- if status, err := app.GetJob(c.Params.JobId); err != nil {
+ if job, err := app.GetJob(c.Params.JobId); err != nil {
c.Err = err
return
} else {
- w.Write([]byte(status.ToJson()))
+ w.Write([]byte(job.ToJson()))
+ }
+}
+
+func createJob(c *Context, w http.ResponseWriter, r *http.Request) {
+ job := model.JobFromJson(r.Body)
+ if job == nil {
+ c.SetInvalidParam("job")
+ return
+ }
+
+ if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) {
+ c.SetPermissionError(model.PERMISSION_MANAGE_JOBS)
+ return
+ }
+
+ if job, err := app.CreateJob(job); err != nil {
+ c.Err = err
+ return
+ } else {
+ w.WriteHeader(http.StatusCreated)
+ w.Write([]byte(job.ToJson()))
+ }
+}
+
+func getJobs(c *Context, w http.ResponseWriter, r *http.Request) {
+ if c.Err != nil {
+ return
+ }
+
+ if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) {
+ c.SetPermissionError(model.PERMISSION_MANAGE_JOBS)
+ return
+ }
+
+ if jobs, err := app.GetJobsPage(c.Params.Page, c.Params.PerPage); err != nil {
+ c.Err = err
+ return
+ } else {
+ w.Write([]byte(model.JobsToJson(jobs)))
}
}
@@ -43,15 +85,34 @@ func getJobsByType(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
- if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_SYSTEM) {
- c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM)
+ if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) {
+ c.SetPermissionError(model.PERMISSION_MANAGE_JOBS)
return
}
- if statuses, err := app.GetJobsByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil {
+ if jobs, err := app.GetJobsByTypePage(c.Params.JobType, c.Params.Page, c.Params.PerPage); err != nil {
c.Err = err
return
} else {
- w.Write([]byte(model.JobsToJson(statuses)))
+ w.Write([]byte(model.JobsToJson(jobs)))
}
}
+
+func cancelJob(c *Context, w http.ResponseWriter, r *http.Request) {
+ c.RequireJobId()
+ if c.Err != nil {
+ return
+ }
+
+ if !app.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_JOBS) {
+ c.SetPermissionError(model.PERMISSION_MANAGE_JOBS)
+ return
+ }
+
+ if err := app.CancelJob(c.Params.JobId); err != nil {
+ c.Err = err
+ return
+ }
+
+ ReturnStatusOK(w)
+}
diff --git a/api4/job_test.go b/api4/job_test.go
index 8bbea83e1..3dcdbe58b 100644
--- a/api4/job_test.go
+++ b/api4/job_test.go
@@ -12,74 +12,157 @@ import (
"github.com/mattermost/platform/store"
)
-func TestGetJobStatus(t *testing.T) {
+func TestCreateJob(t *testing.T) {
th := Setup().InitBasic().InitSystemAdmin()
defer TearDown()
- status := &model.Job{
+ job := &model.Job{
+ Type: model.JOB_TYPE_DATA_RETENTION,
+ Data: map[string]interface{}{
+ "thing": "stuff",
+ },
+ }
+
+ received, resp := th.SystemAdminClient.CreateJob(job)
+ CheckNoError(t, resp)
+
+ defer app.Srv.Store.Job().Delete(received.Id)
+
+ job = &model.Job{
+ Type: model.NewId(),
+ }
+
+ _, resp = th.SystemAdminClient.CreateJob(job)
+ CheckBadRequestStatus(t, resp)
+
+ _, resp = th.Client.CreateJob(job)
+ CheckForbiddenStatus(t, resp)
+}
+
+func TestGetJob(t *testing.T) {
+ th := Setup().InitBasic().InitSystemAdmin()
+ defer TearDown()
+
+ job := &model.Job{
Id: model.NewId(),
- Status: model.NewId(),
+ Status: model.JOB_STATUS_PENDING,
}
- if result := <-app.Srv.Store.Job().Save(status); result.Err != nil {
+ if result := <-app.Srv.Store.Job().Save(job); result.Err != nil {
t.Fatal(result.Err)
}
- defer app.Srv.Store.Job().Delete(status.Id)
+ defer app.Srv.Store.Job().Delete(job.Id)
- received, resp := th.SystemAdminClient.GetJob(status.Id)
+ received, resp := th.SystemAdminClient.GetJob(job.Id)
CheckNoError(t, resp)
- if received.Id != status.Id || received.Status != status.Status {
- t.Fatal("incorrect job status received")
+ if received.Id != job.Id || received.Status != job.Status {
+ t.Fatal("incorrect job received")
}
_, resp = th.SystemAdminClient.GetJob("1234")
CheckBadRequestStatus(t, resp)
- _, resp = th.Client.GetJob(status.Id)
+ _, resp = th.Client.GetJob(job.Id)
CheckForbiddenStatus(t, resp)
_, resp = th.SystemAdminClient.GetJob(model.NewId())
CheckNotFoundStatus(t, resp)
}
-func TestGetJobStatusesByType(t *testing.T) {
+func TestGetJobs(t *testing.T) {
+ th := Setup().InitBasic().InitSystemAdmin()
+ defer TearDown()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis() + 1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis(),
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis() + 2,
+ },
+ }
+
+ for _, job := range jobs {
+ store.Must(app.Srv.Store.Job().Save(job))
+ defer app.Srv.Store.Job().Delete(job.Id)
+ }
+
+ received, resp := th.SystemAdminClient.GetJobs(0, 2)
+ CheckNoError(t, resp)
+
+ if len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[2].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != jobs[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ received, resp = th.SystemAdminClient.GetJobs(1, 2)
+ CheckNoError(t, resp)
+
+ if received[0].Id != jobs[1].Id {
+ t.Fatal("should've received oldest job last")
+ }
+
+ _, resp = th.Client.GetJobs(0, 60)
+ CheckForbiddenStatus(t, resp)
+}
+
+func TestGetJobsByType(t *testing.T) {
th := Setup().InitBasic().InitSystemAdmin()
defer TearDown()
jobType := model.NewId()
- statuses := []*model.Job{
+ jobs := []*model.Job{
{
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1000,
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1000,
},
{
- Id: model.NewId(),
- Type: jobType,
- StartAt: 999,
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 999,
},
{
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1001,
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1001,
+ },
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ CreateAt: 1002,
},
}
- for _, status := range statuses {
- store.Must(app.Srv.Store.Job().Save(status))
- defer app.Srv.Store.Job().Delete(status.Id)
+ for _, job := range jobs {
+ store.Must(app.Srv.Store.Job().Save(job))
+ defer app.Srv.Store.Job().Delete(job.Id)
}
received, resp := th.SystemAdminClient.GetJobsByType(jobType, 0, 2)
CheckNoError(t, resp)
if len(received) != 2 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[1].Id {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[2].Id {
t.Fatal("should've received newest job first")
- } else if received[1].Id != statuses[0].Id {
+ } else if received[1].Id != jobs[0].Id {
t.Fatal("should've received second newest job second")
}
@@ -87,8 +170,8 @@ func TestGetJobStatusesByType(t *testing.T) {
CheckNoError(t, resp)
if len(received) != 1 {
- t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[2].Id {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[1].Id {
t.Fatal("should've received oldest job last")
}
@@ -101,3 +184,46 @@ func TestGetJobStatusesByType(t *testing.T) {
_, resp = th.Client.GetJobsByType(jobType, 0, 60)
CheckForbiddenStatus(t, resp)
}
+
+func TestCancelJob(t *testing.T) {
+ th := Setup().InitBasic().InitSystemAdmin()
+ defer TearDown()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ Status: model.JOB_STATUS_PENDING,
+ },
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ Status: model.JOB_STATUS_IN_PROGRESS,
+ },
+ {
+ Id: model.NewId(),
+ Type: model.NewId(),
+ Status: model.JOB_STATUS_SUCCESS,
+ },
+ }
+
+ for _, job := range jobs {
+ store.Must(app.Srv.Store.Job().Save(job))
+ defer app.Srv.Store.Job().Delete(job.Id)
+ }
+
+ _, resp := th.Client.CancelJob(jobs[0].Id)
+ CheckForbiddenStatus(t, resp)
+
+ _, resp = th.SystemAdminClient.CancelJob(jobs[0].Id)
+ CheckNoError(t, resp)
+
+ _, resp = th.SystemAdminClient.CancelJob(jobs[1].Id)
+ CheckNoError(t, resp)
+
+ _, resp = th.SystemAdminClient.CancelJob(jobs[2].Id)
+ CheckInternalErrorStatus(t, resp)
+
+ _, resp = th.SystemAdminClient.CancelJob(model.NewId())
+ CheckInternalErrorStatus(t, resp)
+}
diff --git a/app/job.go b/app/job.go
index c625ce15f..36c0b1992 100644
--- a/app/job.go
+++ b/app/job.go
@@ -5,6 +5,7 @@ package app
import (
"github.com/mattermost/platform/model"
+ "github.com/mattermost/platform/jobs"
)
func GetJob(id string) (*model.Job, *model.AppError) {
@@ -15,6 +16,18 @@ func GetJob(id string) (*model.Job, *model.AppError) {
}
}
+func GetJobsPage(page int, perPage int) ([]*model.Job, *model.AppError) {
+ return GetJobs(page*perPage, perPage)
+}
+
+func GetJobs(offset int, limit int) ([]*model.Job, *model.AppError) {
+ if result := <-Srv.Store.Job().GetAllPage(offset, limit); result.Err != nil {
+ return nil, result.Err
+ } else {
+ return result.Data.([]*model.Job), nil
+ }
+}
+
func GetJobsByTypePage(jobType string, page int, perPage int) ([]*model.Job, *model.AppError) {
return GetJobsByType(jobType, page*perPage, perPage)
}
@@ -26,3 +39,11 @@ func GetJobsByType(jobType string, offset int, limit int) ([]*model.Job, *model.
return result.Data.([]*model.Job), nil
}
}
+
+func CreateJob(job *model.Job) (*model.Job, *model.AppError) {
+ return jobs.CreateJob(job.Type, job.Data)
+}
+
+func CancelJob(jobId string) *model.AppError {
+ return jobs.RequestCancellation(jobId)
+}
diff --git a/app/job_test.go b/app/job_test.go
index ced65788f..8f068901a 100644
--- a/app/job_test.go
+++ b/app/job_test.go
@@ -10,7 +10,7 @@ import (
"github.com/mattermost/platform/store"
)
-func TestGetJobStatus(t *testing.T) {
+func TestGetJob(t *testing.T) {
Setup()
status := &model.Job{
@@ -30,7 +30,7 @@ func TestGetJobStatus(t *testing.T) {
}
}
-func TestGetJobStatusesByType(t *testing.T) {
+func TestGetJobByType(t *testing.T) {
Setup()
jobType := model.NewId()
@@ -39,17 +39,17 @@ func TestGetJobStatusesByType(t *testing.T) {
{
Id: model.NewId(),
Type: jobType,
- StartAt: 1000,
+ CreateAt: 1000,
},
{
Id: model.NewId(),
Type: jobType,
- StartAt: 999,
+ CreateAt: 999,
},
{
Id: model.NewId(),
Type: jobType,
- StartAt: 1001,
+ CreateAt: 1001,
},
}
@@ -62,7 +62,7 @@ func TestGetJobStatusesByType(t *testing.T) {
t.Fatal(err)
} else if len(received) != 2 {
t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[1].Id {
+ } else if received[0].Id != statuses[2].Id {
t.Fatal("should've received newest job first")
} else if received[1].Id != statuses[0].Id {
t.Fatal("should've received second newest job second")
@@ -72,7 +72,7 @@ func TestGetJobStatusesByType(t *testing.T) {
t.Fatal(err)
} else if len(received) != 1 {
t.Fatal("received wrong number of statuses")
- } else if received[0].Id != statuses[2].Id {
+ } else if received[0].Id != statuses[1].Id {
t.Fatal("should've received oldest job last")
}
}
diff --git a/i18n/en.json b/i18n/en.json
index 27e65c6ba..90a08f7dc 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -3372,6 +3372,14 @@
"translation": "Create Teams"
},
{
+ "id": "authentication.permissions.manage_jobs.description",
+ "translation": "Ability to manage jobs"
+ },
+ {
+ "id": "authentication.permissions.manage_jobs.name",
+ "translation": "Manage Jobs"
+ },
+ {
"id": "authentication.permissions.manage_team_roles.description",
"translation": "Ability to change the roles of a team member"
},
@@ -4472,6 +4480,22 @@
"translation": "Invalid user id"
},
{
+ "id": "model.job.is_valid.id.app_error",
+ "translation": "Invalid job Id"
+ },
+ {
+ "id": "model.job.is_valid.create_at.app_error",
+ "translation": "Create at must be a valid time"
+ },
+ {
+ "id": "model.job.is_valid.type.app_error",
+ "translation": "Invalid job type"
+ },
+ {
+ "id": "model.job.is_valid.status.app_error",
+ "translation": "Invalid job status"
+ },
+ {
"id": "model.oauth.is_valid.app_id.app_error",
"translation": "Invalid app id"
},
diff --git a/jobs/jobs.go b/jobs/jobs.go
index 9247355d0..1986b22b6 100644
--- a/jobs/jobs.go
+++ b/jobs/jobs.go
@@ -25,6 +25,10 @@ func CreateJob(jobType string, jobData map[string]interface{}) (*model.Job, *mod
Data: jobData,
}
+ if err := job.IsValid(); err != nil {
+ return nil, err
+ }
+
if result := <-Srv.Store.Job().Save(&job); result.Err != nil {
return nil, result.Err
}
@@ -41,7 +45,7 @@ func ClaimJob(job *model.Job) (bool, *model.AppError) {
}
}
-func SetJobProgress(job *model.Job, progress int64) (*model.AppError) {
+func SetJobProgress(job *model.Job, progress int64) *model.AppError {
job.Status = model.JOB_STATUS_IN_PROGRESS
job.Progress = progress
@@ -78,7 +82,7 @@ func SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
return result.Err
} else {
if !result.Data.(bool) {
- return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id=" + job.Id, http.StatusInternalServerError)
+ return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id="+job.Id, http.StatusInternalServerError)
}
}
}
@@ -92,20 +96,20 @@ func SetJobCanceled(job *model.Job) *model.AppError {
return result.Err
}
-func RequestCancellation(job *model.Job) *model.AppError {
- if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
+func RequestCancellation(jobId string) *model.AppError {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
return result.Err
} else if result.Data.(bool) {
return nil
}
- if result := <-Srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
+ if result := <-Srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
return result.Err
} else if result.Data.(bool) {
return nil
}
- return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id=" + job.Id, http.StatusInternalServerError)
+ return model.NewAppError("Jobs.RequestCancellation", "jobs.request_cancellation.status.error", nil, "id="+jobId, http.StatusInternalServerError)
}
func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan interface{}) {
diff --git a/model/authorization.go b/model/authorization.go
index 458ed1bdb..880d25e27 100644
--- a/model/authorization.go
+++ b/model/authorization.go
@@ -58,6 +58,7 @@ var PERMISSION_MANAGE_TEAM *Permission
var PERMISSION_IMPORT_TEAM *Permission
var PERMISSION_VIEW_TEAM *Permission
var PERMISSION_LIST_USERS_WITHOUT_TEAM *Permission
+var PERMISSION_MANAGE_JOBS *Permission
// General permission that encompases all system admin functions
// in the future this could be broken up to allow access to some
@@ -292,6 +293,11 @@ func InitalizePermissions() {
"authentication.permisssions.list_users_without_team.name",
"authentication.permisssions.list_users_without_team.description",
}
+ PERMISSION_MANAGE_JOBS = &Permission{
+ "manage_jobs",
+ "authentication.permisssions.manage_jobs.name",
+ "authentication.permisssions.manage_jobs.description",
+ }
}
func InitalizeRoles() {
@@ -405,6 +411,7 @@ func InitalizeRoles() {
PERMISSION_CREATE_TEAM.Id,
PERMISSION_ADD_USER_TO_TEAM.Id,
PERMISSION_LIST_USERS_WITHOUT_TEAM.Id,
+ PERMISSION_MANAGE_JOBS.Id,
},
ROLE_TEAM_USER.Permissions...,
),
diff --git a/model/client4.go b/model/client4.go
index feff9f8de..6f5eb03c6 100644
--- a/model/client4.go
+++ b/model/client4.go
@@ -2800,7 +2800,7 @@ func (c *Client4) OpenGraph(url string) (map[string]string, *Response) {
// GetJob gets a single job.
func (c *Client4) GetJob(id string) (*Job, *Response) {
- if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/%v/status", id), ""); err != nil {
+ if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/%v", id), ""); err != nil {
return nil, BuildErrorResponse(r, err)
} else {
defer closeBody(r)
@@ -2808,12 +2808,42 @@ func (c *Client4) GetJob(id string) (*Job, *Response) {
}
}
-// GetJobsByType gets all jobs of a given type, sorted with the job that most recently started first.
+// Get all jobs, sorted with the job that was created most recently first.
+func (c *Client4) GetJobs(page int, perPage int) ([]*Job, *Response) {
+ if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("?page=%v&per_page=%v", page, perPage), ""); err != nil {
+ return nil, BuildErrorResponse(r, err)
+ } else {
+ defer closeBody(r)
+ return JobsFromJson(r.Body), BuildResponse(r)
+ }
+}
+
+// GetJobsByType gets all jobs of a given type, sorted with the job that was created most recently first.
func (c *Client4) GetJobsByType(jobType string, page int, perPage int) ([]*Job, *Response) {
- if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/type/%v/statuses?page=%v&per_page=%v", jobType, page, perPage), ""); err != nil {
+ if r, err := c.DoApiGet(c.GetJobsRoute()+fmt.Sprintf("/type/%v?page=%v&per_page=%v", jobType, page, perPage), ""); err != nil {
return nil, BuildErrorResponse(r, err)
} else {
defer closeBody(r)
return JobsFromJson(r.Body), BuildResponse(r)
}
}
+
+// CreateJob creates a job based on the provided job struct.
+func (c *Client4) CreateJob(job *Job) (*Job, *Response) {
+ if r, err := c.DoApiPost(c.GetJobsRoute(), job.ToJson()); err != nil {
+ return nil, BuildErrorResponse(r, err)
+ } else {
+ defer closeBody(r)
+ return JobFromJson(r.Body), BuildResponse(r)
+ }
+}
+
+// CancelJob requests the cancellation of the job with the provided Id.
+func (c *Client4) CancelJob(jobId string) (bool, *Response) {
+ if r, err := c.DoApiPost(c.GetJobsRoute()+fmt.Sprintf("/%v/cancel", jobId), ""); err != nil {
+ return false, BuildErrorResponse(r, err)
+ } else {
+ defer closeBody(r)
+ return CheckStatusOK(r), BuildResponse(r)
+ }
+}
diff --git a/model/job.go b/model/job.go
index b0567bf1a..ebc849b30 100644
--- a/model/job.go
+++ b/model/job.go
@@ -6,6 +6,7 @@ package model
import (
"encoding/json"
"io"
+ "net/http"
)
const (
@@ -32,6 +33,36 @@ type Job struct {
Data map[string]interface{} `json:"data"`
}
+func (j *Job) IsValid() *AppError {
+ if len(j.Id) != 26 {
+ return NewAppError("Job.IsValid", "model.job.is_valid.id.app_error", nil, "id="+j.Id, http.StatusBadRequest)
+ }
+
+ if j.CreateAt == 0 {
+ return NewAppError("Job.IsValid", "model.job.is_valid.create_at.app_error", nil, "id="+j.Id, http.StatusBadRequest)
+ }
+
+ switch j.Type {
+ case JOB_TYPE_DATA_RETENTION:
+ case JOB_TYPE_SEARCH_INDEXING:
+ default:
+ return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest)
+ }
+
+ switch j.Status {
+ case JOB_STATUS_PENDING:
+ case JOB_STATUS_IN_PROGRESS:
+ case JOB_STATUS_SUCCESS:
+ case JOB_STATUS_ERROR:
+ case JOB_STATUS_CANCEL_REQUESTED:
+ case JOB_STATUS_CANCELED:
+ default:
+ return NewAppError("Job.IsValid", "model.job.is_valid.status.app_error", nil, "id="+j.Id, http.StatusBadRequest)
+ }
+
+ return nil
+}
+
func (js *Job) ToJson() string {
if b, err := json.Marshal(js); err != nil {
return ""
@@ -41,9 +72,9 @@ func (js *Job) ToJson() string {
}
func JobFromJson(data io.Reader) *Job {
- var status Job
- if err := json.NewDecoder(data).Decode(&status); err == nil {
- return &status
+ var job Job
+ if err := json.NewDecoder(data).Decode(&job); err == nil {
+ return &job
} else {
return nil
}
diff --git a/store/sql_job_store.go b/store/sql_job_store.go
index c00e37d86..e287edad6 100644
--- a/store/sql_job_store.go
+++ b/store/sql_job_store.go
@@ -210,6 +210,38 @@ func (jss SqlJobStore) Get(id string) StoreChannel {
return storeChannel
}
+func (jss SqlJobStore) GetAllPage(offset int, limit int) StoreChannel {
+ storeChannel := make(StoreChannel, 1)
+
+ go func() {
+ result := StoreResult{}
+
+ var statuses []*model.Job
+
+ if _, err := jss.GetReplica().Select(&statuses,
+ `SELECT
+ *
+ FROM
+ Jobs
+ ORDER BY
+ CreateAt DESC
+ LIMIT
+ :Limit
+ OFFSET
+ :Offset`, map[string]interface{}{"Limit": limit, "Offset": offset}); err != nil {
+ result.Err = model.NewLocAppError("SqlJobStore.GetAllPage",
+ "store.sql_job.get_all.app_error", nil, err.Error())
+ } else {
+ result.Data = statuses
+ }
+
+ storeChannel <- result
+ close(storeChannel)
+ }()
+
+ return storeChannel
+}
+
func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel {
storeChannel := make(StoreChannel, 1)
@@ -224,7 +256,9 @@ func (jss SqlJobStore) GetAllByType(jobType string) StoreChannel {
FROM
Jobs
WHERE
- Type = :Type`, map[string]interface{}{"Type": jobType}); err != nil {
+ Type = :Type
+ ORDER BY
+ CreateAt DESC`, map[string]interface{}{"Type": jobType}); err != nil {
result.Err = model.NewLocAppError("SqlJobStore.GetAllByType",
"store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error())
} else {
@@ -254,7 +288,7 @@ func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) S
WHERE
Type = :Type
ORDER BY
- StartAt ASC
+ CreateAt DESC
LIMIT
:Limit
OFFSET
diff --git a/store/sql_job_store_test.go b/store/sql_job_store_test.go
index edf09a4c0..97e95ab92 100644
--- a/store/sql_job_store_test.go
+++ b/store/sql_job_store_test.go
@@ -82,19 +82,24 @@ func TestJobGetAllByTypePage(t *testing.T) {
jobs := []*model.Job{
{
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1000,
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1000,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 999,
},
{
- Id: model.NewId(),
- Type: jobType,
- StartAt: 999,
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: 1001,
},
{
- Id: model.NewId(),
- Type: jobType,
- StartAt: 1001,
+ Id: model.NewId(),
+ Type: model.NewId(),
+ CreateAt: 1002,
},
}
@@ -107,7 +112,7 @@ func TestJobGetAllByTypePage(t *testing.T) {
t.Fatal(result.Err)
} else if received := result.Data.([]*model.Job); len(received) != 2 {
t.Fatal("received wrong number of jobs")
- } else if received[0].Id != jobs[1].Id {
+ } else if received[0].Id != jobs[2].Id {
t.Fatal("should've received newest job first")
} else if received[1].Id != jobs[0].Id {
t.Fatal("should've received second newest job second")
@@ -117,7 +122,54 @@ func TestJobGetAllByTypePage(t *testing.T) {
t.Fatal(result.Err)
} else if received := result.Data.([]*model.Job); len(received) != 1 {
t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[1].Id {
+ t.Fatal("should've received oldest job last")
+ }
+}
+
+func TestJobGetAllPage(t *testing.T) {
+ Setup()
+
+ jobType := model.NewId()
+
+ jobs := []*model.Job{
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis() + 1,
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis(),
+ },
+ {
+ Id: model.NewId(),
+ Type: jobType,
+ CreateAt: model.GetMillis() + 2,
+ },
+ }
+
+ for _, job := range jobs {
+ Must(store.Job().Save(job))
+ defer store.Job().Delete(job.Id)
+ }
+
+ if result := <-store.Job().GetAllPage(0, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) != 2 {
+ t.Fatal("received wrong number of jobs")
} else if received[0].Id != jobs[2].Id {
+ t.Fatal("should've received newest job first")
+ } else if received[1].Id != jobs[0].Id {
+ t.Fatal("should've received second newest job second")
+ }
+
+ if result := <-store.Job().GetAllPage(2, 2); result.Err != nil {
+ t.Fatal(result.Err)
+ } else if received := result.Data.([]*model.Job); len(received) < 1 {
+ t.Fatal("received wrong number of jobs")
+ } else if received[0].Id != jobs[1].Id {
t.Fatal("should've received oldest job last")
}
}
@@ -331,11 +383,11 @@ func TestJobUpdateStatusUpdateStatusOptimistically(t *testing.T) {
func TestJobDelete(t *testing.T) {
Setup()
- status := Must(store.Job().Save(&model.Job{
+ job := Must(store.Job().Save(&model.Job{
Id: model.NewId(),
})).(*model.Job)
- if result := <-store.Job().Delete(status.Id); result.Err != nil {
+ if result := <-store.Job().Delete(job.Id); result.Err != nil {
t.Fatal(result.Err)
}
}
diff --git a/store/store.go b/store/store.go
index 062ed0fbd..ab3d97d9b 100644
--- a/store/store.go
+++ b/store/store.go
@@ -391,6 +391,7 @@ type JobStore interface {
UpdateStatus(id string, status string) StoreChannel
UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel
Get(id string) StoreChannel
+ GetAllPage(offset int, limit int) StoreChannel
GetAllByType(jobType string) StoreChannel
GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
GetAllByStatus(status string) StoreChannel