From 38ee83e45b4de7edf89bf9f0ef629eb4c6ad0fa8 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Thu, 12 May 2016 23:56:07 -0400 Subject: Moving to glide --- vendor/github.com/goamz/goamz/sqs/Makefile | 20 + vendor/github.com/goamz/goamz/sqs/README.md | 38 ++ .../github.com/goamz/goamz/sqs/responses_test.go | 196 +++++++ vendor/github.com/goamz/goamz/sqs/sqs.go | 570 +++++++++++++++++++++ vendor/github.com/goamz/goamz/sqs/sqs_test.go | 414 +++++++++++++++ vendor/github.com/goamz/goamz/sqs/suite_test.go | 145 ++++++ 6 files changed, 1383 insertions(+) create mode 100644 vendor/github.com/goamz/goamz/sqs/Makefile create mode 100644 vendor/github.com/goamz/goamz/sqs/README.md create mode 100644 vendor/github.com/goamz/goamz/sqs/responses_test.go create mode 100644 vendor/github.com/goamz/goamz/sqs/sqs.go create mode 100644 vendor/github.com/goamz/goamz/sqs/sqs_test.go create mode 100644 vendor/github.com/goamz/goamz/sqs/suite_test.go (limited to 'vendor/github.com/goamz/goamz/sqs') diff --git a/vendor/github.com/goamz/goamz/sqs/Makefile b/vendor/github.com/goamz/goamz/sqs/Makefile new file mode 100644 index 000000000..1219acfb9 --- /dev/null +++ b/vendor/github.com/goamz/goamz/sqs/Makefile @@ -0,0 +1,20 @@ +include $(GOROOT)/src/Make.inc + +TARG=launchpad.net/goamz/sqs + +GOFILES=\ + sqs.go\ + +include $(GOROOT)/src/Make.pkg + +GOFMT=gofmt +BADFMT=$(shell $(GOFMT) -l $(GOFILES) 2> /dev/null) + +gofmt: $(BADFMT) + @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done + +ifneq ($(BADFMT),) +ifneq ($(MAKECMDGOALS), gofmt) +#$(warning WARNING: make gofmt: $(BADFMT)) +endif +endif diff --git a/vendor/github.com/goamz/goamz/sqs/README.md b/vendor/github.com/goamz/goamz/sqs/README.md new file mode 100644 index 000000000..a283a4eec --- /dev/null +++ b/vendor/github.com/goamz/goamz/sqs/README.md @@ -0,0 +1,38 @@ +Amazon Simple Queue Service API Client Written in Golang. +========================================================= + +Merged from https://github.com/Mistobaan/sqs + +Installation +------------ + + go get github.com/goamz/goamz/sqs + +Documentation +------------- + +http://godoc.org/github.com/goamz/goamz/sqs + + +Sample Usage +------------ + + var auth = aws.Auth{ + AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"), + SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), + } + + conn := sqs.New(auth, aws.USEast) + + q, err := conn.CreateQueue(queueName) + if err != nil { + log.Fatalf(err.Error()) + } + + q.SendMessage(batch) + + +Testing +------- + + go test . diff --git a/vendor/github.com/goamz/goamz/sqs/responses_test.go b/vendor/github.com/goamz/goamz/sqs/responses_test.go new file mode 100644 index 000000000..857fb9104 --- /dev/null +++ b/vendor/github.com/goamz/goamz/sqs/responses_test.go @@ -0,0 +1,196 @@ +package sqs + +var TestCreateQueueXmlOK = ` + + + http://sqs.us-east-1.amazonaws.com/123456789012/testQueue + + + 7a62c49f-347e-4fc4-9331-6e8e7a96aa73 + + +` + +var TestListQueuesXmlOK = ` + + + http://sqs.us-east-1.amazonaws.com/123456789012/testQueue + + + 725275ae-0b9b-4762-b238-436d7c65a1ac + + +` + +var TestDeleteQueueXmlOK = ` + + + 6fde8d1e-52cd-4581-8cd9-c512f4c64223 + + +` + +var TestPurgeQueueXmlOK = ` + + + 6fde8d1e-52cd-4581-8cd9-c512f4c64223 + + +` + +var TestSendMessageXmlOK = ` + + + fafb00f5732ab283681e124bf8747ed1 + 5fea7756-0ea4-451a-a703-a558b933e274 + ba056227cfd9533dba1f72ad9816d233 + + + 27daac76-34dd-47df-bd01-1f6e873584a0 + + +` + +var TestSendMessageBatchXmlOk = ` + + + + test_msg_001 + 0a5231c7-8bff-4955-be2e-8dc7c50a25fa + 0e024d309850c78cba5eabbeff7cae71 + + + test_msg_002 + 15ee1ed3-87e7-40c1-bdaa-2e49968ea7e9 + 7fb8146a82f95e0af155278f406862c2 + + + + ca1ad5d0-8271-408b-8d0f-1351bf547e74 + + +` + +var TestReceiveMessageXmlOK = ` + + + + 5fea7756-0ea4-451a-a703-a558b933e274 + MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+CwLj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QEauMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0= + fafb00f5732ab283681e124bf8747ed1 + This is a test message + + SenderId + 195004372649 + + + SentTimestamp + 1238099229000 + + + ApproximateReceiveCount + 5 + + + ApproximateFirstReceiveTimestamp + 1250700979248 + + + CustomAttribute + + String + Testing, testing, 1, 2, 3 + + + + BinaryCustomAttribute + + Binary + iVBORw0KGgoAAAANSUhEUgAAABIAAAASCAYAAABWzo5XAAABA0lEQVQ4T72UrQ4CMRCEewhyiiBPopBgcfAUSIICB88CDhRB8hTgsCBRyJMEdUFwZJpMs/3LHQlhVdPufJ1ut03UjyKJcR5zVc4umbW87eeqvVFBjTdJwP54D+4xGXVUCGiBxoOsJOCd9IKgRnnV8wAezrnRmwGcpKtCJ8UgJBNWLFNzVAOimyqIhElXGkQ3LmQ6fKrdqaW1cixhdKVBcEOBLEwViBugVv8B1elVuLYcoTea624drcl5LW4KTRsFhQpLtVzzQKGCh2DuHI8FvdVH7vGQKEPerHRjgegKMESsXgAgWBtu5D1a9BQWCXSrzx9BvjPPkRQR6IJcQNTRV/cvkj93DqUTWzVDIQAAAABJRU5ErkJggg== + + + + + + b6633655-283d-45b4-aee4-4e84e0ae6afa + + +` + +var TestChangeMessageVisibilityXmlOK = ` + + + 6a7a282a-d013-4a59-aba9-335b0fa48bed + + +` + +var TestDeleteMessageBatchXmlOK = ` + + + + msg1 + + + msg2 + + + + d6f86b7a-74d1-4439-b43f-196a1e29cd85 + + +` + +var TestDeleteMessageUsingReceiptXmlOK = ` + + + d6d86b7a-74d1-4439-b43f-196a1e29cd85 + + +` + +var TestGetQueueAttributesXmlOK = ` + + + + ReceiveMessageWaitTimeSeconds + 2 + + + VisibilityTimeout + 30 + + + ApproximateNumberOfMessages + 0 + + + ApproximateNumberOfMessagesNotVisible + 0 + + + CreatedTimestamp + 1286771522 + + + LastModifiedTimestamp + 1286771522 + + + QueueArn + arn:aws:sqs:us-east-1:123456789012:qfoo + + + MaximumMessageSize + 8192 + + + MessageRetentionPeriod + 345600 + + + + 1ea71be5-b5a2-4f9d-b85a-945d8d08cd0b + + +` diff --git a/vendor/github.com/goamz/goamz/sqs/sqs.go b/vendor/github.com/goamz/goamz/sqs/sqs.go new file mode 100644 index 000000000..4005b1b37 --- /dev/null +++ b/vendor/github.com/goamz/goamz/sqs/sqs.go @@ -0,0 +1,570 @@ +// +// gosqs - Go packages to interact with the Amazon SQS Web Services. +// +// depends on https://wiki.ubuntu.com/goamz +// +// +// Written by Prudhvi Krishna Surapaneni +// Extended by Fabrizio Milo +// +package sqs + +import ( + "encoding/xml" + "errors" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/http/httputil" + "net/url" + "strconv" + "strings" + "time" + + "github.com/goamz/goamz/aws" +) + +const API_VERSION = "2012-11-05" + +const debug = false + +// The SQS type encapsulates operation with an SQS region. +type SQS struct { + aws.Auth + aws.Region + private byte // Reserve the right of using private data. +} + +// NewFrom Create A new SQS Client given an access and secret Key +// region must be one of "us.east, us.west, eu.west" +func NewFrom(accessKey, secretKey, region string) (*SQS, error) { + + auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey} + aws_region := aws.USEast + + switch region { + case "us.east", "us.east.1": + aws_region = aws.USEast + case "us.west", "us.west.1": + aws_region = aws.USWest + case "us.west.2": + aws_region = aws.USWest2 + case "eu.west": + aws_region = aws.EUWest + case "ap.southeast", "ap.southeast.1": + aws_region = aws.APSoutheast + case "ap.southeast.2": + aws_region = aws.APSoutheast2 + case "ap.northeast", "ap.northeast.1": + aws_region = aws.APNortheast + case "sa.east", "sa.east.1": + aws_region = aws.SAEast + case "cn.north", "cn.north.1": + aws_region = aws.CNNorth + default: + return nil, errors.New(fmt.Sprintf("Unknown/Unsupported region %s", region)) + } + + aws_sqs := New(auth, aws_region) + return aws_sqs, nil +} + +// NewFrom Create A new SQS Client from an exisisting aws.Auth +func New(auth aws.Auth, region aws.Region) *SQS { + return &SQS{auth, region, 0} +} + +// Queue Reference to a Queue +type Queue struct { + *SQS + Url string +} + +type CreateQueueResponse struct { + QueueUrl string `xml:"CreateQueueResult>QueueUrl"` + ResponseMetadata ResponseMetadata +} + +type GetQueueUrlResponse struct { + QueueUrl string `xml:"GetQueueUrlResult>QueueUrl"` + ResponseMetadata ResponseMetadata +} + +type ListQueuesResponse struct { + QueueUrl []string `xml:"ListQueuesResult>QueueUrl"` + ResponseMetadata ResponseMetadata +} + +type DeleteMessageResponse struct { + ResponseMetadata ResponseMetadata +} + +type DeleteQueueResponse struct { + ResponseMetadata ResponseMetadata +} + +type PurgeQueueResponse struct { + ResponseMetadata ResponseMetadata +} + +type SendMessageResponse struct { + MD5 string `xml:"SendMessageResult>MD5OfMessageBody"` + MD5OfMessageAttributes string `xml:"SendMessageResult>MD5OfMessageAttributes"` + Id string `xml:"SendMessageResult>MessageId"` + ResponseMetadata ResponseMetadata +} + +type ReceiveMessageResponse struct { + Messages []Message `xml:"ReceiveMessageResult>Message"` + ResponseMetadata ResponseMetadata +} + +type Message struct { + MessageId string `xml:"MessageId"` + Body string `xml:"Body"` + MD5OfBody string `xml:"MD5OfBody"` + ReceiptHandle string `xml:"ReceiptHandle"` + Attribute []Attribute `xml:"Attribute"` + MessageAttribute []MessageAttribute `xml:"MessageAttribute"` + MD5OfMessageAttributes string `xml:"MD5OfMessageAttributes"` +} + +type Attribute struct { + Name string `xml:"Name"` + Value string `xml:"Value"` +} + +type MessageAttribute struct { + Name string `xml:"Name"` + Value MessageAttributeValue `xml:"Value"` +} + +type MessageAttributeValue struct { + DataType string `xml:"DataType"` + BinaryValue []byte `xml:"BinaryValue"` + StringValue string `xml:"StringValue"` + + // Not yet implemented (Reserved for future use) + BinaryListValues [][]byte `xml:"BinaryListValues"` + StringListValues []string `xml:"StringListValues"` +} + +type ChangeMessageVisibilityResponse struct { + ResponseMetadata ResponseMetadata +} + +type GetQueueAttributesResponse struct { + Attributes []Attribute `xml:"GetQueueAttributesResult>Attribute"` + ResponseMetadata ResponseMetadata +} + +type ResponseMetadata struct { + RequestId string + BoxUsage float64 +} + +type Error struct { + StatusCode int + Code string + Message string + RequestId string +} + +func (err *Error) Error() string { + if err.Code == "" { + return err.Message + } + return fmt.Sprintf("%s (%s)", err.Message, err.Code) +} + +func (err *Error) String() string { + return err.Message +} + +type xmlErrors struct { + RequestId string + Errors []Error `xml:"Errors>Error"` + Error Error +} + +// CreateQueue create a queue with a specific name +func (s *SQS) CreateQueue(queueName string) (*Queue, error) { + return s.CreateQueueWithTimeout(queueName, 30) +} + +// CreateQueue create a queue with a specific name and a timeout +func (s *SQS) CreateQueueWithTimeout(queueName string, timeout int) (*Queue, error) { + params := map[string]string{ + "VisibilityTimeout": strconv.Itoa(timeout), + } + return s.CreateQueueWithAttributes(queueName, params) +} + +func (s *SQS) CreateQueueWithAttributes(queueName string, attrs map[string]string) (q *Queue, err error) { + resp, err := s.newQueue(queueName, attrs) + if err != nil { + return nil, err + } + q = &Queue{s, resp.QueueUrl} + return +} + +// GetQueue get a reference to the given quename +func (s *SQS) GetQueue(queueName string) (*Queue, error) { + var q *Queue + resp, err := s.getQueueUrl(queueName) + if err != nil { + return q, err + } + q = &Queue{s, resp.QueueUrl} + return q, nil +} + +func (s *SQS) QueueFromArn(queueUrl string) (q *Queue) { + q = &Queue{s, queueUrl} + return +} + +func (s *SQS) getQueueUrl(queueName string) (resp *GetQueueUrlResponse, err error) { + resp = &GetQueueUrlResponse{} + params := makeParams("GetQueueUrl") + params["QueueName"] = queueName + err = s.query("", params, resp) + return resp, err +} + +func (s *SQS) newQueue(queueName string, attrs map[string]string) (resp *CreateQueueResponse, err error) { + resp = &CreateQueueResponse{} + params := makeParams("CreateQueue") + params["QueueName"] = queueName + + i := 1 + for k, v := range attrs { + nameParam := fmt.Sprintf("Attribute.%d.Name", i) + valParam := fmt.Sprintf("Attribute.%d.Value", i) + params[nameParam] = k + params[valParam] = v + i++ + } + + err = s.query("", params, resp) + return +} + +func (s *SQS) ListQueues(QueueNamePrefix string) (resp *ListQueuesResponse, err error) { + resp = &ListQueuesResponse{} + params := makeParams("ListQueues") + + if QueueNamePrefix != "" { + params["QueueNamePrefix"] = QueueNamePrefix + } + + err = s.query("", params, resp) + return +} + +func (q *Queue) Delete() (resp *DeleteQueueResponse, err error) { + resp = &DeleteQueueResponse{} + params := makeParams("DeleteQueue") + + err = q.SQS.query(q.Url, params, resp) + return +} + +func (q *Queue) Purge() (resp *PurgeQueueResponse, err error) { + resp = &PurgeQueueResponse{} + params := makeParams("PurgeQueue") + + err = q.SQS.query(q.Url, params, resp) + return +} + +func (q *Queue) SendMessageWithDelay(MessageBody string, DelaySeconds int64) (resp *SendMessageResponse, err error) { + resp = &SendMessageResponse{} + params := makeParams("SendMessage") + + params["MessageBody"] = MessageBody + params["DelaySeconds"] = strconv.Itoa(int(DelaySeconds)) + + err = q.SQS.query(q.Url, params, resp) + return +} + +func (q *Queue) SendMessage(MessageBody string) (resp *SendMessageResponse, err error) { + resp = &SendMessageResponse{} + params := makeParams("SendMessage") + + params["MessageBody"] = MessageBody + + err = q.SQS.query(q.Url, params, resp) + return +} + +func (q *Queue) SendMessageWithAttributes(MessageBody string, attrs map[string]string) (resp *SendMessageResponse, err error) { + resp = &SendMessageResponse{} + params := makeParams("SendMessage") + + params["MessageBody"] = MessageBody + + i := 1 + for k, v := range attrs { + nameParam := fmt.Sprintf("MessageAttribute.%d.Name", i) + valParam := fmt.Sprintf("MessageAttribute.%d.Value.StringValue", i) + typeParam := fmt.Sprintf("MessageAttribute.%d.Value.DataType", i) + params[nameParam] = k + params[valParam] = v + params[typeParam] = "String" + i++ + } + + err = q.SQS.query(q.Url, params, resp) + return +} + +// ReceiveMessageWithVisibilityTimeout +func (q *Queue) ReceiveMessageWithVisibilityTimeout(MaxNumberOfMessages, VisibilityTimeoutSec int) (*ReceiveMessageResponse, error) { + params := map[string]string{ + "MaxNumberOfMessages": strconv.Itoa(MaxNumberOfMessages), + "VisibilityTimeout": strconv.Itoa(VisibilityTimeoutSec), + } + return q.ReceiveMessageWithParameters(params) +} + +// ReceiveMessage +func (q *Queue) ReceiveMessage(MaxNumberOfMessages int) (*ReceiveMessageResponse, error) { + params := map[string]string{ + "MaxNumberOfMessages": strconv.Itoa(MaxNumberOfMessages), + } + return q.ReceiveMessageWithParameters(params) +} + +func (q *Queue) ReceiveMessageWithParameters(p map[string]string) (resp *ReceiveMessageResponse, err error) { + resp = &ReceiveMessageResponse{} + params := makeParams("ReceiveMessage") + params["AttributeName"] = "All" + params["MessageAttributeNames"] = "All" + + for k, v := range p { + params[k] = v + } + + err = q.SQS.query(q.Url, params, resp) + return +} + +func (q *Queue) ChangeMessageVisibility(M *Message, VisibilityTimeout int) (resp *ChangeMessageVisibilityResponse, err error) { + resp = &ChangeMessageVisibilityResponse{} + params := makeParams("ChangeMessageVisibility") + params["VisibilityTimeout"] = strconv.Itoa(VisibilityTimeout) + params["ReceiptHandle"] = M.ReceiptHandle + + err = q.SQS.query(q.Url, params, resp) + return +} + +func (q *Queue) GetQueueAttributes(A string) (resp *GetQueueAttributesResponse, err error) { + resp = &GetQueueAttributesResponse{} + params := makeParams("GetQueueAttributes") + params["AttributeName"] = A + + err = q.SQS.query(q.Url, params, resp) + return +} + +func (q *Queue) DeleteMessage(M *Message) (resp *DeleteMessageResponse, err error) { + return q.DeleteMessageUsingReceiptHandle(M.ReceiptHandle) +} + +func (q *Queue) DeleteMessageUsingReceiptHandle(receiptHandle string) (resp *DeleteMessageResponse, err error) { + resp = &DeleteMessageResponse{} + params := makeParams("DeleteMessage") + params["ReceiptHandle"] = receiptHandle + + err = q.SQS.query(q.Url, params, resp) + return +} + +type SendMessageBatchResultEntry struct { + Id string `xml:"Id"` + MessageId string `xml:"MessageId"` + MD5OfMessageBody string `xml:"MD5OfMessageBody"` +} + +type SendMessageBatchResponse struct { + SendMessageBatchResult []SendMessageBatchResultEntry `xml:"SendMessageBatchResult>SendMessageBatchResultEntry"` + ResponseMetadata ResponseMetadata +} + +/* SendMessageBatch + */ +func (q *Queue) SendMessageBatch(msgList []Message) (resp *SendMessageBatchResponse, err error) { + resp = &SendMessageBatchResponse{} + params := makeParams("SendMessageBatch") + + for idx, msg := range msgList { + count := idx + 1 + params[fmt.Sprintf("SendMessageBatchRequestEntry.%d.Id", count)] = fmt.Sprintf("msg-%d", count) + params[fmt.Sprintf("SendMessageBatchRequestEntry.%d.MessageBody", count)] = msg.Body + } + + err = q.SQS.query(q.Url, params, resp) + return +} + +/* SendMessageBatchString + */ +func (q *Queue) SendMessageBatchString(msgList []string) (resp *SendMessageBatchResponse, err error) { + resp = &SendMessageBatchResponse{} + params := makeParams("SendMessageBatch") + + for idx, msg := range msgList { + count := idx + 1 + params[fmt.Sprintf("SendMessageBatchRequestEntry.%d.Id", count)] = fmt.Sprintf("msg-%d", count) + params[fmt.Sprintf("SendMessageBatchRequestEntry.%d.MessageBody", count)] = msg + } + + err = q.SQS.query(q.Url, params, resp) + return +} + +type DeleteMessageBatchResponse struct { + DeleteMessageBatchResult []struct { + Id string + SenderFault bool + Code string + Message string + } `xml:"DeleteMessageBatchResult>DeleteMessageBatchResultEntry"` + ResponseMetadata ResponseMetadata +} + +/* DeleteMessageBatch */ +func (q *Queue) DeleteMessageBatch(msgList []Message) (resp *DeleteMessageBatchResponse, err error) { + resp = &DeleteMessageBatchResponse{} + params := makeParams("DeleteMessageBatch") + + lutMsg := make(map[string]Message) + + for idx := range msgList { + params[fmt.Sprintf("DeleteMessageBatchRequestEntry.%d.Id", idx+1)] = msgList[idx].MessageId + params[fmt.Sprintf("DeleteMessageBatchRequestEntry.%d.ReceiptHandle", idx+1)] = msgList[idx].ReceiptHandle + + lutMsg[string(msgList[idx].MessageId)] = msgList[idx] + } + + err = q.SQS.query(q.Url, params, resp) + + messageWithErrors := make([]Message, 0, len(msgList)) + + for idx := range resp.DeleteMessageBatchResult { + if resp.DeleteMessageBatchResult[idx].SenderFault { + msg, ok := lutMsg[resp.DeleteMessageBatchResult[idx].Id] + if ok { + messageWithErrors = append(messageWithErrors, msg) + } + } + } + + if len(messageWithErrors) > 0 { + log.Printf("%d Message have not been sent", len(messageWithErrors)) + } + + return +} + +func (s *SQS) query(queueUrl string, params map[string]string, resp interface{}) (err error) { + params["Version"] = API_VERSION + params["Timestamp"] = time.Now().In(time.UTC).Format(time.RFC3339) + var url_ *url.URL + + switch { + // fully qualified queueUrl + case strings.HasPrefix(queueUrl, "http"): + url_, err = url.Parse(queueUrl) + // relative queueUrl + case strings.HasPrefix(queueUrl, "/"): + url_, err = url.Parse(s.Region.SQSEndpoint + queueUrl) + // zero-value for queueUrl + default: + url_, err = url.Parse(s.Region.SQSEndpoint) + } + + if err != nil { + return err + } + + if s.Auth.Token() != "" { + params["SecurityToken"] = s.Auth.Token() + } + + var r *http.Response + + var sarray []string + for k, v := range params { + sarray = append(sarray, aws.Encode(k)+"="+aws.Encode(v)) + } + + req, err := http.NewRequest("GET", fmt.Sprintf("%s?%s", url_, strings.Join(sarray, "&")), nil) + if err != nil { + return err + } + signer := aws.NewV4Signer(s.Auth, "sqs", s.Region) + signer.Sign(req) + client := http.Client{} + r, err = client.Do(req) + + if debug { + log.Printf("GET ", url_.String()) + } + + if err != nil { + return err + } + + defer r.Body.Close() + + if debug { + dump, _ := httputil.DumpResponse(r, true) + log.Printf("DUMP:\n", string(dump)) + } + + if r.StatusCode != 200 { + return buildError(r) + } + err = xml.NewDecoder(r.Body).Decode(resp) + io.Copy(ioutil.Discard, r.Body) + + return err +} + +func buildError(r *http.Response) error { + errors := xmlErrors{} + xml.NewDecoder(r.Body).Decode(&errors) + var err Error + if len(errors.Errors) > 0 { + err = errors.Errors[0] + } else { + err = errors.Error + } + err.RequestId = errors.RequestId + err.StatusCode = r.StatusCode + if err.Message == "" { + err.Message = r.Status + } + return &err +} + +func makeParams(action string) map[string]string { + params := make(map[string]string) + params["Action"] = action + return params +} + +func multimap(p map[string]string) url.Values { + q := make(url.Values, len(p)) + for k, v := range p { + q[k] = []string{v} + } + return q +} diff --git a/vendor/github.com/goamz/goamz/sqs/sqs_test.go b/vendor/github.com/goamz/goamz/sqs/sqs_test.go new file mode 100644 index 000000000..a06433535 --- /dev/null +++ b/vendor/github.com/goamz/goamz/sqs/sqs_test.go @@ -0,0 +1,414 @@ +package sqs + +import ( + "crypto/md5" + "encoding/binary" + "fmt" + "hash" + + "github.com/goamz/goamz/aws" + . "gopkg.in/check.v1" +) + +var _ = Suite(&S{}) + +type S struct { + HTTPSuite + sqs *SQS +} + +func (s *S) SetUpSuite(c *C) { + s.HTTPSuite.SetUpSuite(c) + auth := aws.Auth{AccessKey: "abc", SecretKey: "123"} + s.sqs = New(auth, aws.Region{SQSEndpoint: testServer.URL}) +} + +func (s *S) TestCreateQueue(c *C) { + testServer.PrepareResponse(200, nil, TestCreateQueueXmlOK) + + resp, err := s.sqs.CreateQueue("testQueue") + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/") + c.Assert(req.Header["Date"], Not(Equals), "") + fmt.Printf("%+v\n", req) + c.Assert(req.Form["Action"], DeepEquals, []string{"CreateQueue"}) + c.Assert(req.Form["Attribute.1.Name"], DeepEquals, []string{"VisibilityTimeout"}) + c.Assert(req.Form["Attribute.1.Value"], DeepEquals, []string{"30"}) + + c.Assert(resp.Url, Equals, "http://sqs.us-east-1.amazonaws.com/123456789012/testQueue") + c.Assert(err, IsNil) +} + +func (s *S) TestCreateQueueWithTimeout(c *C) { + testServer.PrepareResponse(200, nil, TestCreateQueueXmlOK) + + s.sqs.CreateQueueWithTimeout("testQueue", 180) + req := testServer.WaitRequest() + + // TestCreateQueue() tests the core functionality, just check the timeout in this test + c.Assert(req.Form["Attribute.1.Name"], DeepEquals, []string{"VisibilityTimeout"}) + c.Assert(req.Form["Attribute.1.Value"], DeepEquals, []string{"180"}) +} + +func (s *S) TestCreateQueueWithAttributes(c *C) { + testServer.PrepareResponse(200, nil, TestCreateQueueXmlOK) + + s.sqs.CreateQueueWithAttributes("testQueue", map[string]string{ + "ReceiveMessageWaitTimeSeconds": "20", + "VisibilityTimeout": "240", + }) + req := testServer.WaitRequest() + + // TestCreateQueue() tests the core functionality, just check the timeout in this test + var receiveMessageWaitSet bool + var visibilityTimeoutSet bool + + for i := 1; i <= 2; i++ { + prefix := fmt.Sprintf("Attribute.%d.", i) + attr := req.FormValue(prefix + "Name") + value := req.FormValue(prefix + "Value") + switch attr { + case "ReceiveMessageWaitTimeSeconds": + c.Assert(value, DeepEquals, "20") + receiveMessageWaitSet = true + case "VisibilityTimeout": + c.Assert(value, DeepEquals, "240") + visibilityTimeoutSet = true + } + } + c.Assert(receiveMessageWaitSet, Equals, true) + c.Assert(visibilityTimeoutSet, Equals, true) +} + +func (s *S) TestListQueues(c *C) { + testServer.PrepareResponse(200, nil, TestListQueuesXmlOK) + + resp, err := s.sqs.ListQueues("") + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/") + c.Assert(req.Header["Date"], Not(Equals), "") + + c.Assert(len(resp.QueueUrl), Not(Equals), 0) + c.Assert(resp.QueueUrl[0], Equals, "http://sqs.us-east-1.amazonaws.com/123456789012/testQueue") + c.Assert(resp.ResponseMetadata.RequestId, Equals, "725275ae-0b9b-4762-b238-436d7c65a1ac") + c.Assert(err, IsNil) +} + +func (s *S) TestDeleteQueue(c *C) { + testServer.PrepareResponse(200, nil, TestDeleteQueueXmlOK) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + resp, err := q.Delete() + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + c.Assert(resp.ResponseMetadata.RequestId, Equals, "6fde8d1e-52cd-4581-8cd9-c512f4c64223") + c.Assert(err, IsNil) +} + +func (s *S) TestPurgeQueue(c *C) { + testServer.PrepareResponse(200, nil, TestPurgeQueueXmlOK) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + resp, err := q.Purge() + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + c.Assert(resp.ResponseMetadata.RequestId, Equals, "6fde8d1e-52cd-4581-8cd9-c512f4c64223") + c.Assert(err, IsNil) +} + +func (s *S) TestSendMessage(c *C) { + testServer.PrepareResponse(200, nil, TestSendMessageXmlOK) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + resp, err := q.SendMessage("This is a test message") + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + msg := "This is a test message" + var h hash.Hash = md5.New() + h.Write([]byte(msg)) + c.Assert(resp.MD5, Equals, fmt.Sprintf("%x", h.Sum(nil))) + c.Assert(resp.Id, Equals, "5fea7756-0ea4-451a-a703-a558b933e274") + c.Assert(err, IsNil) +} + +func (s *S) TestSendMessageRelativePath(c *C) { + testServer.PrepareResponse(200, nil, TestSendMessageXmlOK) + + q := &Queue{s.sqs, "/123456789012/testQueue/"} + resp, err := q.SendMessage("This is a test message") + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + msg := "This is a test message" + var h hash.Hash = md5.New() + h.Write([]byte(msg)) + c.Assert(resp.MD5, Equals, fmt.Sprintf("%x", h.Sum(nil))) + c.Assert(resp.Id, Equals, "5fea7756-0ea4-451a-a703-a558b933e274") + c.Assert(err, IsNil) +} + +func encodeMessageAttribute(str string) []byte { + bstr := []byte(str) + bs := make([]byte, 4+len(bstr)) + binary.BigEndian.PutUint32(bs, uint32(len(bstr))) + copy(bs[4:len(bs)], bstr) + return bs +} + +func (s *S) TestSendMessageWithAttributes(c *C) { + testServer.PrepareResponse(200, nil, TestSendMessageXmlOK) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + attrs := map[string]string{ + "test_attribute_name_1": "test_attribute_value_1", + } + resp, err := q.SendMessageWithAttributes("This is a test message", attrs) + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + var attrsHash = md5.New() + attrsHash.Write(encodeMessageAttribute("test_attribute_name_1")) + attrsHash.Write(encodeMessageAttribute("String")) + attrsHash.Write([]byte{1}) + attrsHash.Write(encodeMessageAttribute("test_attribute_value_1")) + c.Assert(resp.MD5OfMessageAttributes, Equals, fmt.Sprintf("%x", attrsHash.Sum(nil))) + + msg := "This is a test message" + var h hash.Hash = md5.New() + h.Write([]byte(msg)) + c.Assert(resp.MD5, Equals, fmt.Sprintf("%x", h.Sum(nil))) + c.Assert(resp.Id, Equals, "5fea7756-0ea4-451a-a703-a558b933e274") + c.Assert(err, IsNil) +} + +func (s *S) TestSendMessageBatch(c *C) { + testServer.PrepareResponse(200, nil, TestSendMessageBatchXmlOk) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + + msgList := []string{"test message body 1", "test message body 2"} + resp, err := q.SendMessageBatchString(msgList) + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + for idx, msg := range msgList { + var h hash.Hash = md5.New() + h.Write([]byte(msg)) + c.Assert(resp.SendMessageBatchResult[idx].MD5OfMessageBody, Equals, fmt.Sprintf("%x", h.Sum(nil))) + c.Assert(err, IsNil) + } +} + +func (s *S) TestDeleteMessageBatch(c *C) { + testServer.PrepareResponse(200, nil, TestDeleteMessageBatchXmlOK) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + + msgList := []Message{*(&Message{ReceiptHandle: "gfk0T0R0waama4fVFffkjPQrrvzMrOg0fTFk2LxT33EuB8wR0ZCFgKWyXGWFoqqpCIiprQUEhir%2F5LeGPpYTLzjqLQxyQYaQALeSNHb0us3uE84uujxpBhsDkZUQkjFFkNqBXn48xlMcVhTcI3YLH%2Bd%2BIqetIOHgBCZAPx6r%2B09dWaBXei6nbK5Ygih21DCDdAwFV68Jo8DXhb3ErEfoDqx7vyvC5nCpdwqv%2BJhU%2FTNGjNN8t51v5c%2FAXvQsAzyZVNapxUrHIt4NxRhKJ72uICcxruyE8eRXlxIVNgeNP8ZEDcw7zZU1Zw%3D%3D"}), + *(&Message{ReceiptHandle: "gfk0T0R0waama4fVFffkjKzmhMCymjQvfTFk2LxT33G4ms5subrE0deLKWSscPU1oD3J9zgeS4PQQ3U30qOumIE6AdAv3w%2F%2Fa1IXW6AqaWhGsEPaLm3Vf6IiWqdM8u5imB%2BNTwj3tQRzOWdTOePjOjPcTpRxBtXix%2BEvwJOZUma9wabv%2BSw6ZHjwmNcVDx8dZXJhVp16Bksiox%2FGrUvrVTCJRTWTLc59oHLLF8sEkKzRmGNzTDGTiV%2BYjHfQj60FD3rVaXmzTsoNxRhKJ72uIHVMGVQiAGgB%2BqAbSqfKHDQtVOmJJgkHug%3D%3D"}), + } + + resp, err := q.DeleteMessageBatch(msgList) + c.Assert(err, IsNil) + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + for idx, _ := range msgList { + c.Assert(resp.DeleteMessageBatchResult[idx].Id, Equals, fmt.Sprintf("msg%d", idx+1)) + } +} + +func (s *S) TestDeleteMessageUsingReceiptHandle(c *C) { + testServer.PrepareResponse(200, nil, TestDeleteMessageUsingReceiptXmlOK) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + + msg := &Message{ReceiptHandle: "gfk0T0R0waama4fVFffkjRQrrvzMrOg0fTFk2LxT33EuB8wR0ZCFgKWyXGWFoqqpCIiprQUEhir%2F5LeGPpYTLzjqLQxyQYaQALeSNHb0us3uE84uujxpBhsDkZUQkjFFkNqBXn48xlMcVhTcI3YLH%2Bd%2BIqetIOHgBCZAPx6r%2B09dWaBXei6nbK5Ygih21DCDdAwFV68Jo8DXhb3ErEfoDqx7vyvC5nCpdwqv%2BJhU%2FTNGjNN8t51v5c%2FAXvQsAzyZVNapxUrHIt4NxRhKJ72uICcxruyE8eRXlxIVNgeNP8ZEDcw7zZU1Zw%3D%3D"} + + resp, err := q.DeleteMessageUsingReceiptHandle(msg.ReceiptHandle) + c.Assert(err, IsNil) + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + c.Assert(resp.ResponseMetadata.RequestId, Equals, "d6d86b7a-74d1-4439-b43f-196a1e29cd85") +} + +func (s *S) TestReceiveMessage(c *C) { + testServer.PrepareResponse(200, nil, TestReceiveMessageXmlOK) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + resp, err := q.ReceiveMessage(5) + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + c.Assert(len(resp.Messages), Not(Equals), 0) + c.Assert(resp.Messages[0].MessageId, Equals, "5fea7756-0ea4-451a-a703-a558b933e274") + c.Assert(resp.Messages[0].MD5OfBody, Equals, "fafb00f5732ab283681e124bf8747ed1") + c.Assert(resp.Messages[0].ReceiptHandle, Equals, "MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+CwLj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QEauMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=") + c.Assert(resp.Messages[0].Body, Equals, "This is a test message") + + c.Assert(len(resp.Messages[0].Attribute), Not(Equals), 0) + + expectedAttributeResults := []struct { + Name string + Value string + }{ + {Name: "SenderId", Value: "195004372649"}, + {Name: "SentTimestamp", Value: "1238099229000"}, + {Name: "ApproximateReceiveCount", Value: "5"}, + {Name: "ApproximateFirstReceiveTimestamp", Value: "1250700979248"}, + } + + for i, expected := range expectedAttributeResults { + c.Assert(resp.Messages[0].Attribute[i].Name, Equals, expected.Name) + c.Assert(resp.Messages[0].Attribute[i].Value, Equals, expected.Value) + } + + c.Assert(len(resp.Messages[0].MessageAttribute), Not(Equals), 0) + + expectedMessageAttributeResults := []struct { + Name string + Value struct { + DataType string + BinaryValue []byte + StringValue string + + // Not yet implemented (Reserved for future use) + BinaryListValues [][]byte + StringListValues []string + } + }{ + { + Name: "CustomAttribute", + Value: struct { + DataType string + BinaryValue []byte + StringValue string + + // Not yet implemented (Reserved for future use) + BinaryListValues [][]byte + StringListValues []string + }{ + DataType: "String", + StringValue: "Testing, testing, 1, 2, 3", + }, + }, + { + Name: "BinaryCustomAttribute", + Value: struct { + DataType string + BinaryValue []byte + StringValue string + + // Not yet implemented (Reserved for future use) + BinaryListValues [][]byte + StringListValues []string + }{ + DataType: "Binary", + BinaryValue: []byte("iVBORw0KGgoAAAANSUhEUgAAABIAAAASCAYAAABWzo5XAAABA0lEQVQ4T72UrQ4CMRCEewhyiiBPopBgcfAUSIICB88CDhRB8hTgsCBRyJMEdUFwZJpMs/3LHQlhVdPufJ1ut03UjyKJcR5zVc4umbW87eeqvVFBjTdJwP54D+4xGXVUCGiBxoOsJOCd9IKgRnnV8wAezrnRmwGcpKtCJ8UgJBNWLFNzVAOimyqIhElXGkQ3LmQ6fKrdqaW1cixhdKVBcEOBLEwViBugVv8B1elVuLYcoTea624drcl5LW4KTRsFhQpLtVzzQKGCh2DuHI8FvdVH7vGQKEPerHRjgegKMESsXgAgWBtu5D1a9BQWCXSrzx9BvjPPkRQR6IJcQNTRV/cvkj93DqUTWzVDIQAAAABJRU5ErkJggg=="), + }, + }, + } + + for i, expected := range expectedMessageAttributeResults { + c.Assert(resp.Messages[0].MessageAttribute[i].Name, Equals, expected.Name) + c.Assert(resp.Messages[0].MessageAttribute[i].Value.DataType, Equals, expected.Value.DataType) + c.Assert(string(resp.Messages[0].MessageAttribute[i].Value.BinaryValue), Equals, string(expected.Value.BinaryValue)) + c.Assert(resp.Messages[0].MessageAttribute[i].Value.StringValue, Equals, expected.Value.StringValue) + } + + c.Assert(err, IsNil) +} + +func (s *S) TestChangeMessageVisibility(c *C) { + testServer.PrepareResponse(200, nil, TestReceiveMessageXmlOK) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + + resp1, err := q.ReceiveMessage(1) + req := testServer.WaitRequest() + + testServer.PrepareResponse(200, nil, TestChangeMessageVisibilityXmlOK) + + resp, err := q.ChangeMessageVisibility(&resp1.Messages[0], 50) + req = testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + c.Assert(req.Header["Date"], Not(Equals), "") + + c.Assert(resp.ResponseMetadata.RequestId, Equals, "6a7a282a-d013-4a59-aba9-335b0fa48bed") + c.Assert(err, IsNil) +} + +func (s *S) TestGetQueueAttributes(c *C) { + testServer.PrepareResponse(200, nil, TestGetQueueAttributesXmlOK) + + q := &Queue{s.sqs, testServer.URL + "/123456789012/testQueue/"} + + resp, err := q.GetQueueAttributes("All") + req := testServer.WaitRequest() + + c.Assert(req.Method, Equals, "GET") + c.Assert(req.URL.Path, Equals, "/123456789012/testQueue/") + + c.Assert(resp.ResponseMetadata.RequestId, Equals, "1ea71be5-b5a2-4f9d-b85a-945d8d08cd0b") + + c.Assert(len(resp.Attributes), Equals, 9) + + expectedResults := []struct { + Name string + Value string + }{ + {Name: "ReceiveMessageWaitTimeSeconds", Value: "2"}, + {Name: "VisibilityTimeout", Value: "30"}, + {Name: "ApproximateNumberOfMessages", Value: "0"}, + {Name: "ApproximateNumberOfMessagesNotVisible", Value: "0"}, + {Name: "CreatedTimestamp", Value: "1286771522"}, + {Name: "LastModifiedTimestamp", Value: "1286771522"}, + {Name: "QueueArn", Value: "arn:aws:sqs:us-east-1:123456789012:qfoo"}, + {Name: "MaximumMessageSize", Value: "8192"}, + {Name: "MessageRetentionPeriod", Value: "345600"}, + } + + for i, expected := range expectedResults { + c.Assert(resp.Attributes[i].Name, Equals, expected.Name) + c.Assert(resp.Attributes[i].Value, Equals, expected.Value) + } + + c.Assert(err, IsNil) +} diff --git a/vendor/github.com/goamz/goamz/sqs/suite_test.go b/vendor/github.com/goamz/goamz/sqs/suite_test.go new file mode 100644 index 000000000..8de1bc04f --- /dev/null +++ b/vendor/github.com/goamz/goamz/sqs/suite_test.go @@ -0,0 +1,145 @@ +package sqs + +import ( + "flag" + "fmt" + "net/http" + "net/url" + "os" + "testing" + "time" + + "github.com/goamz/goamz/aws" + . "gopkg.in/check.v1" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var integration = flag.Bool("i", false, "Enable integration tests") + +type SuiteI struct { + auth aws.Auth +} + +func (s *SuiteI) SetUpSuite(c *C) { + if !*integration { + c.Skip("Integration tests not enabled (-i flag)") + } + auth, err := aws.EnvAuth() + if err != nil { + c.Fatal(err.Error()) + } + s.auth = auth +} + +type HTTPSuite struct{} + +var testServer = NewTestHTTPServer("http://localhost:4455", 5e9) + +func (s *HTTPSuite) SetUpSuite(c *C) { + testServer.Start() +} + +func (s *HTTPSuite) TearDownTest(c *C) { + testServer.FlushRequests() +} + +type TestHTTPServer struct { + URL string + Timeout time.Duration + started bool + request chan *http.Request + response chan *testResponse + pending chan bool +} + +type testResponse struct { + Status int + Headers map[string]string + Body string +} + +func NewTestHTTPServer(url string, timeout time.Duration) *TestHTTPServer { + return &TestHTTPServer{URL: url, Timeout: timeout} +} + +func (s *TestHTTPServer) Start() { + if s.started { + return + } + s.started = true + + s.request = make(chan *http.Request, 64) + s.response = make(chan *testResponse, 64) + s.pending = make(chan bool, 64) + + url, _ := url.Parse(s.URL) + go func() { + err := http.ListenAndServe(url.Host, s) + if err != nil { + panic(err) + } + }() + + s.PrepareResponse(202, nil, "Nothing.") + for { + // Wait for it to be up. + resp, err := http.Get(s.URL) + if err == nil && resp.StatusCode == 202 { + break + } + fmt.Fprintf(os.Stderr, "\nWaiting for fake server to be up... ") + time.Sleep(1e8) + } + fmt.Fprintf(os.Stderr, "done\n\n") + s.WaitRequest() // Consume dummy request. +} + +// FlushRequests discards requests which were not yet consumed by WaitRequest. +func (s *TestHTTPServer) FlushRequests() { + for { + select { + case <-s.request: + default: + return + } + } +} + +func (s *TestHTTPServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.request <- req + var resp *testResponse + select { + case resp = <-s.response: + case <-time.After(s.Timeout): + fmt.Fprintf(os.Stderr, "ERROR: Timeout waiting for test to provide response\n") + resp = &testResponse{500, nil, ""} + } + if resp.Headers != nil { + h := w.Header() + for k, v := range resp.Headers { + h.Set(k, v) + } + } + if resp.Status != 0 { + w.WriteHeader(resp.Status) + } + w.Write([]byte(resp.Body)) +} + +func (s *TestHTTPServer) WaitRequest() *http.Request { + select { + case req := <-s.request: + req.ParseForm() + return req + case <-time.After(s.Timeout): + panic("Timeout waiting for goamz request") + } + panic("unreached") +} + +func (s *TestHTTPServer) PrepareResponse(status int, headers map[string]string, body string) { + s.response <- &testResponse{status, headers, body} +} -- cgit v1.2.3-1-g7c22