diff options
Diffstat (limited to 'vendor/google.golang.org/appengine/taskqueue/taskqueue.go')
-rw-r--r-- | vendor/google.golang.org/appengine/taskqueue/taskqueue.go | 541 |
1 files changed, 0 insertions, 541 deletions
diff --git a/vendor/google.golang.org/appengine/taskqueue/taskqueue.go b/vendor/google.golang.org/appengine/taskqueue/taskqueue.go deleted file mode 100644 index 965c5ab4c..000000000 --- a/vendor/google.golang.org/appengine/taskqueue/taskqueue.go +++ /dev/null @@ -1,541 +0,0 @@ -// Copyright 2011 Google Inc. All rights reserved. -// Use of this source code is governed by the Apache 2.0 -// license that can be found in the LICENSE file. - -/* -Package taskqueue provides a client for App Engine's taskqueue service. -Using this service, applications may perform work outside a user's request. - -A Task may be constructed manually; alternatively, since the most common -taskqueue operation is to add a single POST task, NewPOSTTask makes it easy. - - t := taskqueue.NewPOSTTask("/worker", url.Values{ - "key": {key}, - }) - taskqueue.Add(c, t, "") // add t to the default queue -*/ -package taskqueue // import "google.golang.org/appengine/taskqueue" - -import ( - "errors" - "fmt" - "net/http" - "net/url" - "strconv" - "time" - - "github.com/golang/protobuf/proto" - "golang.org/x/net/context" - - "google.golang.org/appengine" - "google.golang.org/appengine/internal" - dspb "google.golang.org/appengine/internal/datastore" - pb "google.golang.org/appengine/internal/taskqueue" -) - -var ( - // ErrTaskAlreadyAdded is the error returned by Add and AddMulti when a task has already been added with a particular name. - ErrTaskAlreadyAdded = errors.New("taskqueue: task has already been added") -) - -// RetryOptions let you control whether to retry a task and the backoff intervals between tries. -type RetryOptions struct { - // Number of tries/leases after which the task fails permanently and is deleted. - // If AgeLimit is also set, both limits must be exceeded for the task to fail permanently. - RetryLimit int32 - - // Maximum time allowed since the task's first try before the task fails permanently and is deleted (only for push tasks). - // If RetryLimit is also set, both limits must be exceeded for the task to fail permanently. - AgeLimit time.Duration - - // Minimum time between successive tries (only for push tasks). - MinBackoff time.Duration - - // Maximum time between successive tries (only for push tasks). - MaxBackoff time.Duration - - // Maximum number of times to double the interval between successive tries before the intervals increase linearly (only for push tasks). - MaxDoublings int32 - - // If MaxDoublings is zero, set ApplyZeroMaxDoublings to true to override the default non-zero value. - // Otherwise a zero MaxDoublings is ignored and the default is used. - ApplyZeroMaxDoublings bool -} - -// toRetryParameter converts RetryOptions to pb.TaskQueueRetryParameters. -func (opt *RetryOptions) toRetryParameters() *pb.TaskQueueRetryParameters { - params := &pb.TaskQueueRetryParameters{} - if opt.RetryLimit > 0 { - params.RetryLimit = proto.Int32(opt.RetryLimit) - } - if opt.AgeLimit > 0 { - params.AgeLimitSec = proto.Int64(int64(opt.AgeLimit.Seconds())) - } - if opt.MinBackoff > 0 { - params.MinBackoffSec = proto.Float64(opt.MinBackoff.Seconds()) - } - if opt.MaxBackoff > 0 { - params.MaxBackoffSec = proto.Float64(opt.MaxBackoff.Seconds()) - } - if opt.MaxDoublings > 0 || (opt.MaxDoublings == 0 && opt.ApplyZeroMaxDoublings) { - params.MaxDoublings = proto.Int32(opt.MaxDoublings) - } - return params -} - -// A Task represents a task to be executed. -type Task struct { - // Path is the worker URL for the task. - // If unset, it will default to /_ah/queue/<queue_name>. - Path string - - // Payload is the data for the task. - // This will be delivered as the HTTP request body. - // It is only used when Method is POST, PUT or PULL. - // url.Values' Encode method may be used to generate this for POST requests. - Payload []byte - - // Additional HTTP headers to pass at the task's execution time. - // To schedule the task to be run with an alternate app version - // or backend, set the "Host" header. - Header http.Header - - // Method is the HTTP method for the task ("GET", "POST", etc.), - // or "PULL" if this is task is destined for a pull-based queue. - // If empty, this defaults to "POST". - Method string - - // A name for the task. - // If empty, a name will be chosen. - Name string - - // Delay specifies the duration the task queue service must wait - // before executing the task. - // Either Delay or ETA may be set, but not both. - Delay time.Duration - - // ETA specifies the earliest time a task may be executed (push queues) - // or leased (pull queues). - // Either Delay or ETA may be set, but not both. - ETA time.Time - - // The number of times the task has been dispatched or leased. - RetryCount int32 - - // Tag for the task. Only used when Method is PULL. - Tag string - - // Retry options for this task. May be nil. - RetryOptions *RetryOptions -} - -func (t *Task) method() string { - if t.Method == "" { - return "POST" - } - return t.Method -} - -// NewPOSTTask creates a Task that will POST to a path with the given form data. -func NewPOSTTask(path string, params url.Values) *Task { - h := make(http.Header) - h.Set("Content-Type", "application/x-www-form-urlencoded") - return &Task{ - Path: path, - Payload: []byte(params.Encode()), - Header: h, - Method: "POST", - } -} - -// RequestHeaders are the special HTTP request headers available to push task -// HTTP request handlers. These headers are set internally by App Engine. -// See https://cloud.google.com/appengine/docs/standard/go/taskqueue/push/creating-handlers#reading_request_headers -// for a description of the fields. -type RequestHeaders struct { - QueueName string - TaskName string - TaskRetryCount int64 - TaskExecutionCount int64 - TaskETA time.Time - - TaskPreviousResponse int - TaskRetryReason string - FailFast bool -} - -// ParseRequestHeaders parses the special HTTP request headers available to push -// task request handlers. This function silently ignores values of the wrong -// format. -func ParseRequestHeaders(h http.Header) *RequestHeaders { - ret := &RequestHeaders{ - QueueName: h.Get("X-AppEngine-QueueName"), - TaskName: h.Get("X-AppEngine-TaskName"), - } - - ret.TaskRetryCount, _ = strconv.ParseInt(h.Get("X-AppEngine-TaskRetryCount"), 10, 64) - ret.TaskExecutionCount, _ = strconv.ParseInt(h.Get("X-AppEngine-TaskExecutionCount"), 10, 64) - - etaSecs, _ := strconv.ParseInt(h.Get("X-AppEngine-TaskETA"), 10, 64) - if etaSecs != 0 { - ret.TaskETA = time.Unix(etaSecs, 0) - } - - ret.TaskPreviousResponse, _ = strconv.Atoi(h.Get("X-AppEngine-TaskPreviousResponse")) - ret.TaskRetryReason = h.Get("X-AppEngine-TaskRetryReason") - if h.Get("X-AppEngine-FailFast") != "" { - ret.FailFast = true - } - - return ret -} - -var ( - currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") - defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace") -) - -func getDefaultNamespace(ctx context.Context) string { - return internal.IncomingHeaders(ctx).Get(defaultNamespace) -} - -func newAddReq(c context.Context, task *Task, queueName string) (*pb.TaskQueueAddRequest, error) { - if queueName == "" { - queueName = "default" - } - path := task.Path - if path == "" { - path = "/_ah/queue/" + queueName - } - eta := task.ETA - if eta.IsZero() { - eta = time.Now().Add(task.Delay) - } else if task.Delay != 0 { - panic("taskqueue: both Delay and ETA are set") - } - req := &pb.TaskQueueAddRequest{ - QueueName: []byte(queueName), - TaskName: []byte(task.Name), - EtaUsec: proto.Int64(eta.UnixNano() / 1e3), - } - method := task.method() - if method == "PULL" { - // Pull-based task - req.Body = task.Payload - req.Mode = pb.TaskQueueMode_PULL.Enum() - if task.Tag != "" { - req.Tag = []byte(task.Tag) - } - } else { - // HTTP-based task - if v, ok := pb.TaskQueueAddRequest_RequestMethod_value[method]; ok { - req.Method = pb.TaskQueueAddRequest_RequestMethod(v).Enum() - } else { - return nil, fmt.Errorf("taskqueue: bad method %q", method) - } - req.Url = []byte(path) - for k, vs := range task.Header { - for _, v := range vs { - req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{ - Key: []byte(k), - Value: []byte(v), - }) - } - } - if method == "POST" || method == "PUT" { - req.Body = task.Payload - } - - // Namespace headers. - if _, ok := task.Header[currentNamespace]; !ok { - // Fetch the current namespace of this request. - ns := internal.NamespaceFromContext(c) - req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{ - Key: []byte(currentNamespace), - Value: []byte(ns), - }) - } - if _, ok := task.Header[defaultNamespace]; !ok { - // Fetch the X-AppEngine-Default-Namespace header of this request. - if ns := getDefaultNamespace(c); ns != "" { - req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{ - Key: []byte(defaultNamespace), - Value: []byte(ns), - }) - } - } - } - - if task.RetryOptions != nil { - req.RetryParameters = task.RetryOptions.toRetryParameters() - } - - return req, nil -} - -var alreadyAddedErrors = map[pb.TaskQueueServiceError_ErrorCode]bool{ - pb.TaskQueueServiceError_TASK_ALREADY_EXISTS: true, - pb.TaskQueueServiceError_TOMBSTONED_TASK: true, -} - -// Add adds the task to a named queue. -// An empty queue name means that the default queue will be used. -// Add returns an equivalent Task with defaults filled in, including setting -// the task's Name field to the chosen name if the original was empty. -func Add(c context.Context, task *Task, queueName string) (*Task, error) { - req, err := newAddReq(c, task, queueName) - if err != nil { - return nil, err - } - res := &pb.TaskQueueAddResponse{} - if err := internal.Call(c, "taskqueue", "Add", req, res); err != nil { - apiErr, ok := err.(*internal.APIError) - if ok && alreadyAddedErrors[pb.TaskQueueServiceError_ErrorCode(apiErr.Code)] { - return nil, ErrTaskAlreadyAdded - } - return nil, err - } - resultTask := *task - resultTask.Method = task.method() - if task.Name == "" { - resultTask.Name = string(res.ChosenTaskName) - } - return &resultTask, nil -} - -// AddMulti adds multiple tasks to a named queue. -// An empty queue name means that the default queue will be used. -// AddMulti returns a slice of equivalent tasks with defaults filled in, including setting -// each task's Name field to the chosen name if the original was empty. -// If a given task is badly formed or could not be added, an appengine.MultiError is returned. -func AddMulti(c context.Context, tasks []*Task, queueName string) ([]*Task, error) { - req := &pb.TaskQueueBulkAddRequest{ - AddRequest: make([]*pb.TaskQueueAddRequest, len(tasks)), - } - me, any := make(appengine.MultiError, len(tasks)), false - for i, t := range tasks { - req.AddRequest[i], me[i] = newAddReq(c, t, queueName) - any = any || me[i] != nil - } - if any { - return nil, me - } - res := &pb.TaskQueueBulkAddResponse{} - if err := internal.Call(c, "taskqueue", "BulkAdd", req, res); err != nil { - return nil, err - } - if len(res.Taskresult) != len(tasks) { - return nil, errors.New("taskqueue: server error") - } - tasksOut := make([]*Task, len(tasks)) - for i, tr := range res.Taskresult { - tasksOut[i] = new(Task) - *tasksOut[i] = *tasks[i] - tasksOut[i].Method = tasksOut[i].method() - if tasksOut[i].Name == "" { - tasksOut[i].Name = string(tr.ChosenTaskName) - } - if *tr.Result != pb.TaskQueueServiceError_OK { - if alreadyAddedErrors[*tr.Result] { - me[i] = ErrTaskAlreadyAdded - } else { - me[i] = &internal.APIError{ - Service: "taskqueue", - Code: int32(*tr.Result), - } - } - any = true - } - } - if any { - return tasksOut, me - } - return tasksOut, nil -} - -// Delete deletes a task from a named queue. -func Delete(c context.Context, task *Task, queueName string) error { - err := DeleteMulti(c, []*Task{task}, queueName) - if me, ok := err.(appengine.MultiError); ok { - return me[0] - } - return err -} - -// DeleteMulti deletes multiple tasks from a named queue. -// If a given task could not be deleted, an appengine.MultiError is returned. -// Each task is deleted independently; one may fail to delete while the others -// are sucessfully deleted. -func DeleteMulti(c context.Context, tasks []*Task, queueName string) error { - taskNames := make([][]byte, len(tasks)) - for i, t := range tasks { - taskNames[i] = []byte(t.Name) - } - if queueName == "" { - queueName = "default" - } - req := &pb.TaskQueueDeleteRequest{ - QueueName: []byte(queueName), - TaskName: taskNames, - } - res := &pb.TaskQueueDeleteResponse{} - if err := internal.Call(c, "taskqueue", "Delete", req, res); err != nil { - return err - } - if a, b := len(req.TaskName), len(res.Result); a != b { - return fmt.Errorf("taskqueue: internal error: requested deletion of %d tasks, got %d results", a, b) - } - me, any := make(appengine.MultiError, len(res.Result)), false - for i, ec := range res.Result { - if ec != pb.TaskQueueServiceError_OK { - me[i] = &internal.APIError{ - Service: "taskqueue", - Code: int32(ec), - } - any = true - } - } - if any { - return me - } - return nil -} - -func lease(c context.Context, maxTasks int, queueName string, leaseTime int, groupByTag bool, tag []byte) ([]*Task, error) { - if queueName == "" { - queueName = "default" - } - req := &pb.TaskQueueQueryAndOwnTasksRequest{ - QueueName: []byte(queueName), - LeaseSeconds: proto.Float64(float64(leaseTime)), - MaxTasks: proto.Int64(int64(maxTasks)), - GroupByTag: proto.Bool(groupByTag), - Tag: tag, - } - res := &pb.TaskQueueQueryAndOwnTasksResponse{} - if err := internal.Call(c, "taskqueue", "QueryAndOwnTasks", req, res); err != nil { - return nil, err - } - tasks := make([]*Task, len(res.Task)) - for i, t := range res.Task { - tasks[i] = &Task{ - Payload: t.Body, - Name: string(t.TaskName), - Method: "PULL", - ETA: time.Unix(0, *t.EtaUsec*1e3), - RetryCount: *t.RetryCount, - Tag: string(t.Tag), - } - } - return tasks, nil -} - -// Lease leases tasks from a queue. -// leaseTime is in seconds. -// The number of tasks fetched will be at most maxTasks. -func Lease(c context.Context, maxTasks int, queueName string, leaseTime int) ([]*Task, error) { - return lease(c, maxTasks, queueName, leaseTime, false, nil) -} - -// LeaseByTag leases tasks from a queue, grouped by tag. -// If tag is empty, then the returned tasks are grouped by the tag of the task with earliest ETA. -// leaseTime is in seconds. -// The number of tasks fetched will be at most maxTasks. -func LeaseByTag(c context.Context, maxTasks int, queueName string, leaseTime int, tag string) ([]*Task, error) { - return lease(c, maxTasks, queueName, leaseTime, true, []byte(tag)) -} - -// Purge removes all tasks from a queue. -func Purge(c context.Context, queueName string) error { - if queueName == "" { - queueName = "default" - } - req := &pb.TaskQueuePurgeQueueRequest{ - QueueName: []byte(queueName), - } - res := &pb.TaskQueuePurgeQueueResponse{} - return internal.Call(c, "taskqueue", "PurgeQueue", req, res) -} - -// ModifyLease modifies the lease of a task. -// Used to request more processing time, or to abandon processing. -// leaseTime is in seconds and must not be negative. -func ModifyLease(c context.Context, task *Task, queueName string, leaseTime int) error { - if queueName == "" { - queueName = "default" - } - req := &pb.TaskQueueModifyTaskLeaseRequest{ - QueueName: []byte(queueName), - TaskName: []byte(task.Name), - EtaUsec: proto.Int64(task.ETA.UnixNano() / 1e3), // Used to verify ownership. - LeaseSeconds: proto.Float64(float64(leaseTime)), - } - res := &pb.TaskQueueModifyTaskLeaseResponse{} - if err := internal.Call(c, "taskqueue", "ModifyTaskLease", req, res); err != nil { - return err - } - task.ETA = time.Unix(0, *res.UpdatedEtaUsec*1e3) - return nil -} - -// QueueStatistics represents statistics about a single task queue. -type QueueStatistics struct { - Tasks int // may be an approximation - OldestETA time.Time // zero if there are no pending tasks - - Executed1Minute int // tasks executed in the last minute - InFlight int // tasks executing now - EnforcedRate float64 // requests per second -} - -// QueueStats retrieves statistics about queues. -func QueueStats(c context.Context, queueNames []string) ([]QueueStatistics, error) { - req := &pb.TaskQueueFetchQueueStatsRequest{ - QueueName: make([][]byte, len(queueNames)), - } - for i, q := range queueNames { - if q == "" { - q = "default" - } - req.QueueName[i] = []byte(q) - } - res := &pb.TaskQueueFetchQueueStatsResponse{} - if err := internal.Call(c, "taskqueue", "FetchQueueStats", req, res); err != nil { - return nil, err - } - qs := make([]QueueStatistics, len(res.Queuestats)) - for i, qsg := range res.Queuestats { - qs[i] = QueueStatistics{ - Tasks: int(*qsg.NumTasks), - } - if eta := *qsg.OldestEtaUsec; eta > -1 { - qs[i].OldestETA = time.Unix(0, eta*1e3) - } - if si := qsg.ScannerInfo; si != nil { - qs[i].Executed1Minute = int(*si.ExecutedLastMinute) - qs[i].InFlight = int(si.GetRequestsInFlight()) - qs[i].EnforcedRate = si.GetEnforcedRate() - } - } - return qs, nil -} - -func setTransaction(x *pb.TaskQueueAddRequest, t *dspb.Transaction) { - x.Transaction = t -} - -func init() { - internal.RegisterErrorCodeMap("taskqueue", pb.TaskQueueServiceError_ErrorCode_name) - - // Datastore error codes are shifted by DATASTORE_ERROR when presented through taskqueue. - dsCode := int32(pb.TaskQueueServiceError_DATASTORE_ERROR) + int32(dspb.Error_TIMEOUT) - internal.RegisterTimeoutErrorCode("taskqueue", dsCode) - - // Transaction registration. - internal.RegisterTransactionSetter(setTransaction) - internal.RegisterTransactionSetter(func(x *pb.TaskQueueBulkAddRequest, t *dspb.Transaction) { - for _, req := range x.AddRequest { - setTransaction(req, t) - } - }) -} |