summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/goamz/goamz/sqs/sqs.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/goamz/goamz/sqs/sqs.go')
-rw-r--r--vendor/github.com/goamz/goamz/sqs/sqs.go583
1 files changed, 0 insertions, 583 deletions
diff --git a/vendor/github.com/goamz/goamz/sqs/sqs.go b/vendor/github.com/goamz/goamz/sqs/sqs.go
deleted file mode 100644
index 23f1951ab..000000000
--- a/vendor/github.com/goamz/goamz/sqs/sqs.go
+++ /dev/null
@@ -1,583 +0,0 @@
-//
-// gosqs - Go packages to interact with the Amazon SQS Web Services.
-//
-// depends on https://wiki.ubuntu.com/goamz
-//
-//
-// Written by Prudhvi Krishna Surapaneni <me@prudhvi.net>
-// Extended by Fabrizio Milo <mistobaan@gmail.com>
-//
-package sqs
-
-import (
- "encoding/xml"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "net/http"
- "net/http/httputil"
- "net/url"
- "strconv"
- "strings"
- "time"
-
- "github.com/goamz/goamz/aws"
-)
-
-const API_VERSION = "2012-11-05"
-
-const debug = false
-
-// The SQS type encapsulates operation with an SQS region.
-type SQS struct {
- aws.Auth
- aws.Region
- private byte // Reserve the right of using private data.
- transport *http.Transport
-}
-
-// NewFrom Create A new SQS Client given an access and secret Key
-// region must be one of "us.east, us.west, eu.west"
-func NewFrom(accessKey, secretKey, region string) (*SQS, error) {
-
- auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey}
- aws_region := aws.USEast
-
- switch region {
- case "us.east", "us.east.1":
- aws_region = aws.USEast
- case "us.west", "us.west.1":
- aws_region = aws.USWest
- case "us.west.2":
- aws_region = aws.USWest2
- case "eu.west":
- aws_region = aws.EUWest
- case "ap.southeast", "ap.southeast.1":
- aws_region = aws.APSoutheast
- case "ap.southeast.2":
- aws_region = aws.APSoutheast2
- case "ap.northeast", "ap.northeast.1":
- aws_region = aws.APNortheast
- case "ap.northeast.2":
- aws_region = aws.APNortheast2
- case "sa.east", "sa.east.1":
- aws_region = aws.SAEast
- case "cn.north", "cn.north.1":
- aws_region = aws.CNNorth
- default:
- return nil, errors.New(fmt.Sprintf("Unknown/Unsupported region %s", region))
- }
-
- aws_sqs := New(auth, aws_region)
- return aws_sqs, nil
-}
-
-// NewFrom Create A new SQS Client from an exisisting aws.Auth
-func New(auth aws.Auth, region aws.Region) *SQS {
- return &SQS{auth, region, 0, nil}
-}
-
-// NewFromTransport Create A new SQS Client that uses a given &http.Transport
-func NewFromTransport(auth aws.Auth, region aws.Region, transport *http.Transport) *SQS {
- return &SQS{auth, region, 0, transport}
-}
-
-// Queue Reference to a Queue
-type Queue struct {
- *SQS
- Url string
-}
-
-type CreateQueueResponse struct {
- QueueUrl string `xml:"CreateQueueResult>QueueUrl"`
- ResponseMetadata ResponseMetadata
-}
-
-type GetQueueUrlResponse struct {
- QueueUrl string `xml:"GetQueueUrlResult>QueueUrl"`
- ResponseMetadata ResponseMetadata
-}
-
-type ListQueuesResponse struct {
- QueueUrl []string `xml:"ListQueuesResult>QueueUrl"`
- ResponseMetadata ResponseMetadata
-}
-
-type DeleteMessageResponse struct {
- ResponseMetadata ResponseMetadata
-}
-
-type DeleteQueueResponse struct {
- ResponseMetadata ResponseMetadata
-}
-
-type PurgeQueueResponse struct {
- ResponseMetadata ResponseMetadata
-}
-
-type SendMessageResponse struct {
- MD5 string `xml:"SendMessageResult>MD5OfMessageBody"`
- MD5OfMessageAttributes string `xml:"SendMessageResult>MD5OfMessageAttributes"`
- Id string `xml:"SendMessageResult>MessageId"`
- ResponseMetadata ResponseMetadata
-}
-
-type ReceiveMessageResponse struct {
- Messages []Message `xml:"ReceiveMessageResult>Message"`
- ResponseMetadata ResponseMetadata
-}
-
-type Message struct {
- MessageId string `xml:"MessageId"`
- Body string `xml:"Body"`
- MD5OfBody string `xml:"MD5OfBody"`
- ReceiptHandle string `xml:"ReceiptHandle"`
- Attribute []Attribute `xml:"Attribute"`
- MessageAttribute []MessageAttribute `xml:"MessageAttribute"`
- MD5OfMessageAttributes string `xml:"MD5OfMessageAttributes"`
-}
-
-type Attribute struct {
- Name string `xml:"Name"`
- Value string `xml:"Value"`
-}
-
-type MessageAttribute struct {
- Name string `xml:"Name"`
- Value MessageAttributeValue `xml:"Value"`
-}
-
-type MessageAttributeValue struct {
- DataType string `xml:"DataType"`
- BinaryValue []byte `xml:"BinaryValue"`
- StringValue string `xml:"StringValue"`
-
- // Not yet implemented (Reserved for future use)
- BinaryListValues [][]byte `xml:"BinaryListValues"`
- StringListValues []string `xml:"StringListValues"`
-}
-
-type ChangeMessageVisibilityResponse struct {
- ResponseMetadata ResponseMetadata
-}
-
-type GetQueueAttributesResponse struct {
- Attributes []Attribute `xml:"GetQueueAttributesResult>Attribute"`
- ResponseMetadata ResponseMetadata
-}
-
-type ResponseMetadata struct {
- RequestId string
- BoxUsage float64
-}
-
-type Error struct {
- StatusCode int
- Code string
- Message string
- RequestId string
-}
-
-func (err *Error) Error() string {
- if err.Code == "" {
- return err.Message
- }
- return fmt.Sprintf("%s (%s)", err.Message, err.Code)
-}
-
-func (err *Error) String() string {
- return err.Message
-}
-
-type xmlErrors struct {
- RequestId string
- Errors []Error `xml:"Errors>Error"`
- Error Error
-}
-
-// CreateQueue create a queue with a specific name
-func (s *SQS) CreateQueue(queueName string) (*Queue, error) {
- return s.CreateQueueWithTimeout(queueName, 30)
-}
-
-// CreateQueue create a queue with a specific name and a timeout
-func (s *SQS) CreateQueueWithTimeout(queueName string, timeout int) (*Queue, error) {
- params := map[string]string{
- "VisibilityTimeout": strconv.Itoa(timeout),
- }
- return s.CreateQueueWithAttributes(queueName, params)
-}
-
-func (s *SQS) CreateQueueWithAttributes(queueName string, attrs map[string]string) (q *Queue, err error) {
- resp, err := s.newQueue(queueName, attrs)
- if err != nil {
- return nil, err
- }
- q = &Queue{s, resp.QueueUrl}
- return
-}
-
-// GetQueue get a reference to the given quename
-func (s *SQS) GetQueue(queueName string) (*Queue, error) {
- var q *Queue
- resp, err := s.getQueueUrl(queueName)
- if err != nil {
- return q, err
- }
- q = &Queue{s, resp.QueueUrl}
- return q, nil
-}
-
-func (s *SQS) QueueFromArn(queueUrl string) (q *Queue) {
- q = &Queue{s, queueUrl}
- return
-}
-
-func (s *SQS) getQueueUrl(queueName string) (resp *GetQueueUrlResponse, err error) {
- resp = &GetQueueUrlResponse{}
- params := makeParams("GetQueueUrl")
- params["QueueName"] = queueName
- err = s.query("", params, resp)
- return resp, err
-}
-
-func (s *SQS) newQueue(queueName string, attrs map[string]string) (resp *CreateQueueResponse, err error) {
- resp = &CreateQueueResponse{}
- params := makeParams("CreateQueue")
- params["QueueName"] = queueName
-
- i := 1
- for k, v := range attrs {
- nameParam := fmt.Sprintf("Attribute.%d.Name", i)
- valParam := fmt.Sprintf("Attribute.%d.Value", i)
- params[nameParam] = k
- params[valParam] = v
- i++
- }
-
- err = s.query("", params, resp)
- return
-}
-
-func (s *SQS) ListQueues(QueueNamePrefix string) (resp *ListQueuesResponse, err error) {
- resp = &ListQueuesResponse{}
- params := makeParams("ListQueues")
-
- if QueueNamePrefix != "" {
- params["QueueNamePrefix"] = QueueNamePrefix
- }
-
- err = s.query("", params, resp)
- return
-}
-
-func (q *Queue) Delete() (resp *DeleteQueueResponse, err error) {
- resp = &DeleteQueueResponse{}
- params := makeParams("DeleteQueue")
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-func (q *Queue) Purge() (resp *PurgeQueueResponse, err error) {
- resp = &PurgeQueueResponse{}
- params := makeParams("PurgeQueue")
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-func (q *Queue) SendMessageWithDelay(MessageBody string, DelaySeconds int64) (resp *SendMessageResponse, err error) {
- resp = &SendMessageResponse{}
- params := makeParams("SendMessage")
-
- params["MessageBody"] = MessageBody
- params["DelaySeconds"] = strconv.Itoa(int(DelaySeconds))
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-func (q *Queue) SendMessage(MessageBody string) (resp *SendMessageResponse, err error) {
- resp = &SendMessageResponse{}
- params := makeParams("SendMessage")
-
- params["MessageBody"] = MessageBody
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-func (q *Queue) SendMessageWithAttributes(MessageBody string, attrs map[string]string) (resp *SendMessageResponse, err error) {
- resp = &SendMessageResponse{}
- params := makeParams("SendMessage")
-
- params["MessageBody"] = MessageBody
-
- i := 1
- for k, v := range attrs {
- nameParam := fmt.Sprintf("MessageAttribute.%d.Name", i)
- valParam := fmt.Sprintf("MessageAttribute.%d.Value.StringValue", i)
- typeParam := fmt.Sprintf("MessageAttribute.%d.Value.DataType", i)
- params[nameParam] = k
- params[valParam] = v
- params[typeParam] = "String"
- i++
- }
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-// ReceiveMessageWithVisibilityTimeout
-func (q *Queue) ReceiveMessageWithVisibilityTimeout(MaxNumberOfMessages, VisibilityTimeoutSec int) (*ReceiveMessageResponse, error) {
- params := map[string]string{
- "MaxNumberOfMessages": strconv.Itoa(MaxNumberOfMessages),
- "VisibilityTimeout": strconv.Itoa(VisibilityTimeoutSec),
- }
- return q.ReceiveMessageWithParameters(params)
-}
-
-// ReceiveMessage
-func (q *Queue) ReceiveMessage(MaxNumberOfMessages int) (*ReceiveMessageResponse, error) {
- params := map[string]string{
- "MaxNumberOfMessages": strconv.Itoa(MaxNumberOfMessages),
- }
- return q.ReceiveMessageWithParameters(params)
-}
-
-func (q *Queue) ReceiveMessageWithParameters(p map[string]string) (resp *ReceiveMessageResponse, err error) {
- resp = &ReceiveMessageResponse{}
- params := makeParams("ReceiveMessage")
- params["AttributeName"] = "All"
- params["MessageAttributeNames"] = "All"
-
- for k, v := range p {
- params[k] = v
- }
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-func (q *Queue) ChangeMessageVisibility(M *Message, VisibilityTimeout int) (resp *ChangeMessageVisibilityResponse, err error) {
- resp = &ChangeMessageVisibilityResponse{}
- params := makeParams("ChangeMessageVisibility")
- params["VisibilityTimeout"] = strconv.Itoa(VisibilityTimeout)
- params["ReceiptHandle"] = M.ReceiptHandle
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-func (q *Queue) GetQueueAttributes(A string) (resp *GetQueueAttributesResponse, err error) {
- resp = &GetQueueAttributesResponse{}
- params := makeParams("GetQueueAttributes")
- params["AttributeName"] = A
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-func (q *Queue) DeleteMessage(M *Message) (resp *DeleteMessageResponse, err error) {
- return q.DeleteMessageUsingReceiptHandle(M.ReceiptHandle)
-}
-
-func (q *Queue) DeleteMessageUsingReceiptHandle(receiptHandle string) (resp *DeleteMessageResponse, err error) {
- resp = &DeleteMessageResponse{}
- params := makeParams("DeleteMessage")
- params["ReceiptHandle"] = receiptHandle
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-type SendMessageBatchResultEntry struct {
- Id string `xml:"Id"`
- MessageId string `xml:"MessageId"`
- MD5OfMessageBody string `xml:"MD5OfMessageBody"`
-}
-
-type SendMessageBatchResponse struct {
- SendMessageBatchResult []SendMessageBatchResultEntry `xml:"SendMessageBatchResult>SendMessageBatchResultEntry"`
- ResponseMetadata ResponseMetadata
-}
-
-/* SendMessageBatch
- */
-func (q *Queue) SendMessageBatch(msgList []Message) (resp *SendMessageBatchResponse, err error) {
- resp = &SendMessageBatchResponse{}
- params := makeParams("SendMessageBatch")
-
- for idx, msg := range msgList {
- count := idx + 1
- params[fmt.Sprintf("SendMessageBatchRequestEntry.%d.Id", count)] = fmt.Sprintf("msg-%d", count)
- params[fmt.Sprintf("SendMessageBatchRequestEntry.%d.MessageBody", count)] = msg.Body
- }
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-/* SendMessageBatchString
- */
-func (q *Queue) SendMessageBatchString(msgList []string) (resp *SendMessageBatchResponse, err error) {
- resp = &SendMessageBatchResponse{}
- params := makeParams("SendMessageBatch")
-
- for idx, msg := range msgList {
- count := idx + 1
- params[fmt.Sprintf("SendMessageBatchRequestEntry.%d.Id", count)] = fmt.Sprintf("msg-%d", count)
- params[fmt.Sprintf("SendMessageBatchRequestEntry.%d.MessageBody", count)] = msg
- }
-
- err = q.SQS.query(q.Url, params, resp)
- return
-}
-
-type DeleteMessageBatchResponse struct {
- DeleteMessageBatchResult []struct {
- Id string
- SenderFault bool
- Code string
- Message string
- } `xml:"DeleteMessageBatchResult>DeleteMessageBatchResultEntry"`
- ResponseMetadata ResponseMetadata
-}
-
-/* DeleteMessageBatch */
-func (q *Queue) DeleteMessageBatch(msgList []Message) (resp *DeleteMessageBatchResponse, err error) {
- resp = &DeleteMessageBatchResponse{}
- params := makeParams("DeleteMessageBatch")
-
- lutMsg := make(map[string]Message)
-
- for idx := range msgList {
- params[fmt.Sprintf("DeleteMessageBatchRequestEntry.%d.Id", idx+1)] = msgList[idx].MessageId
- params[fmt.Sprintf("DeleteMessageBatchRequestEntry.%d.ReceiptHandle", idx+1)] = msgList[idx].ReceiptHandle
-
- lutMsg[string(msgList[idx].MessageId)] = msgList[idx]
- }
-
- err = q.SQS.query(q.Url, params, resp)
-
- messageWithErrors := make([]Message, 0, len(msgList))
-
- for idx := range resp.DeleteMessageBatchResult {
- if resp.DeleteMessageBatchResult[idx].SenderFault {
- msg, ok := lutMsg[resp.DeleteMessageBatchResult[idx].Id]
- if ok {
- messageWithErrors = append(messageWithErrors, msg)
- }
- }
- }
-
- if len(messageWithErrors) > 0 {
- log.Printf("%d Message have not been sent", len(messageWithErrors))
- }
-
- return
-}
-
-func (s *SQS) query(queueUrl string, params map[string]string, resp interface{}) (err error) {
- params["Version"] = API_VERSION
- params["Timestamp"] = time.Now().In(time.UTC).Format(time.RFC3339)
- var url_ *url.URL
-
- switch {
- // fully qualified queueUrl
- case strings.HasPrefix(queueUrl, "http"):
- url_, err = url.Parse(queueUrl)
- // relative queueUrl
- case strings.HasPrefix(queueUrl, "/"):
- url_, err = url.Parse(s.Region.SQSEndpoint + queueUrl)
- // zero-value for queueUrl
- default:
- url_, err = url.Parse(s.Region.SQSEndpoint)
- }
-
- if err != nil {
- return err
- }
-
- if s.Auth.Token() != "" {
- params["SecurityToken"] = s.Auth.Token()
- }
-
- var r *http.Response
-
- var sarray []string
- for k, v := range params {
- sarray = append(sarray, aws.Encode(k)+"="+aws.Encode(v))
- }
-
- req, err := http.NewRequest("GET", fmt.Sprintf("%s?%s", url_, strings.Join(sarray, "&")), nil)
- if err != nil {
- return err
- }
- signer := aws.NewV4Signer(s.Auth, "sqs", s.Region)
- signer.Sign(req)
- var client http.Client
- if s.transport == nil {
- client = http.Client{}
- } else {
- client = http.Client{Transport: s.transport}
- }
- r, err = client.Do(req)
-
- if debug {
- log.Printf("GET %s\n", url_.String())
- }
-
- if err != nil {
- return err
- }
-
- defer r.Body.Close()
-
- if debug {
- dump, _ := httputil.DumpResponse(r, true)
- log.Printf("DUMP:%s\n", string(dump))
- }
-
- if r.StatusCode != 200 {
- return buildError(r)
- }
- err = xml.NewDecoder(r.Body).Decode(resp)
- io.Copy(ioutil.Discard, r.Body)
-
- return err
-}
-
-func buildError(r *http.Response) error {
- errors := xmlErrors{}
- xml.NewDecoder(r.Body).Decode(&errors)
- var err Error
- if len(errors.Errors) > 0 {
- err = errors.Errors[0]
- } else {
- err = errors.Error
- }
- err.RequestId = errors.RequestId
- err.StatusCode = r.StatusCode
- if err.Message == "" {
- err.Message = r.Status
- }
- return &err
-}
-
-func makeParams(action string) map[string]string {
- params := make(map[string]string)
- params["Action"] = action
- return params
-}
-
-func multimap(p map[string]string) url.Values {
- q := make(url.Values, len(p))
- for k, v := range p {
- q[k] = []string{v}
- }
- return q
-}