summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/goamz/goamz/dynamodb/query_builder.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/goamz/goamz/dynamodb/query_builder.go')
-rw-r--r--vendor/github.com/goamz/goamz/dynamodb/query_builder.go362
1 files changed, 362 insertions, 0 deletions
diff --git a/vendor/github.com/goamz/goamz/dynamodb/query_builder.go b/vendor/github.com/goamz/goamz/dynamodb/query_builder.go
new file mode 100644
index 000000000..47a90bb1c
--- /dev/null
+++ b/vendor/github.com/goamz/goamz/dynamodb/query_builder.go
@@ -0,0 +1,362 @@
+package dynamodb
+
+import (
+ "encoding/json"
+ "sort"
+)
+
+type msi map[string]interface{}
+type Query struct {
+ buffer msi
+}
+
+func NewEmptyQuery() *Query {
+ return &Query{msi{}}
+}
+
+func NewQuery(t *Table) *Query {
+ q := &Query{msi{}}
+ q.addTable(t)
+ return q
+}
+
+// This way of specifing the key is used when doing a Get.
+// If rangeKey is "", it is assumed to not want to be used
+func (q *Query) AddKey(t *Table, key *Key) {
+ k := t.Key
+ keymap := msi{
+ k.KeyAttribute.Name: msi{
+ k.KeyAttribute.Type: key.HashKey},
+ }
+ if k.HasRange() {
+ keymap[k.RangeAttribute.Name] = msi{k.RangeAttribute.Type: key.RangeKey}
+ }
+
+ q.buffer["Key"] = keymap
+}
+
+func keyAttributes(t *Table, key *Key) msi {
+ k := t.Key
+
+ out := msi{}
+ out[k.KeyAttribute.Name] = msi{k.KeyAttribute.Type: key.HashKey}
+ if k.HasRange() {
+ out[k.RangeAttribute.Name] = msi{k.RangeAttribute.Type: key.RangeKey}
+ }
+ return out
+}
+
+func (q *Query) AddAttributesToGet(attributes []string) {
+ if len(attributes) == 0 {
+ return
+ }
+
+ q.buffer["AttributesToGet"] = attributes
+}
+
+func (q *Query) ConsistentRead(c bool) {
+ if c == true {
+ q.buffer["ConsistentRead"] = "true" //String "true", not bool true
+ }
+}
+
+func (q *Query) AddGetRequestItems(tableKeys map[*Table][]Key) {
+ requestitems := msi{}
+ for table, keys := range tableKeys {
+ keyslist := []msi{}
+ for _, key := range keys {
+ keyslist = append(keyslist, keyAttributes(table, &key))
+ }
+ requestitems[table.Name] = msi{"Keys": keyslist}
+ }
+ q.buffer["RequestItems"] = requestitems
+}
+
+func (q *Query) AddWriteRequestItems(tableItems map[*Table]map[string][][]Attribute) {
+ b := q.buffer
+
+ b["RequestItems"] = func() msi {
+ out := msi{}
+ for table, itemActions := range tableItems {
+ out[table.Name] = func() interface{} {
+ out2 := []interface{}{}
+
+ // here breaks an order of array....
+ // For now, we iterate over sorted key by action for stable testing
+ keys := []string{}
+ for k := range itemActions {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+
+ for ki := range keys {
+ action := keys[ki]
+ items := itemActions[action]
+ for _, attributes := range items {
+ Item_or_Key := map[bool]string{true: "Item", false: "Key"}[action == "Put"]
+ out2 = append(out2, msi{action + "Request": msi{Item_or_Key: attributeList(attributes)}})
+ }
+ }
+ return out2
+ }()
+ }
+ return out
+ }()
+}
+
+func (q *Query) AddCreateRequestTable(description TableDescriptionT) {
+ b := q.buffer
+
+ attDefs := []interface{}{}
+ for _, attr := range description.AttributeDefinitions {
+ attDefs = append(attDefs, msi{
+ "AttributeName": attr.Name,
+ "AttributeType": attr.Type,
+ })
+ }
+ b["AttributeDefinitions"] = attDefs
+ b["KeySchema"] = description.KeySchema
+ b["TableName"] = description.TableName
+ b["ProvisionedThroughput"] = msi{
+ "ReadCapacityUnits": int(description.ProvisionedThroughput.ReadCapacityUnits),
+ "WriteCapacityUnits": int(description.ProvisionedThroughput.WriteCapacityUnits),
+ }
+
+ if description.StreamSpecification.StreamEnabled {
+ b["StreamSpecification"] = msi{
+ "StreamEnabled": "true",
+ "StreamViewType": description.StreamSpecification.StreamViewType,
+ }
+ }
+
+ localSecondaryIndexes := []interface{}{}
+
+ for _, ind := range description.LocalSecondaryIndexes {
+ localSecondaryIndexes = append(localSecondaryIndexes, msi{
+ "IndexName": ind.IndexName,
+ "KeySchema": ind.KeySchema,
+ "Projection": ind.Projection,
+ })
+ }
+
+ globalSecondaryIndexes := []interface{}{}
+ intmax := func(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+ }
+ for _, ind := range description.GlobalSecondaryIndexes {
+ rec := msi{
+ "IndexName": ind.IndexName,
+ "KeySchema": ind.KeySchema,
+ "Projection": ind.Projection,
+ }
+ // need at least one unit, and since go's max() is float based.
+ rec["ProvisionedThroughput"] = msi{
+ "ReadCapacityUnits": intmax(1, ind.ProvisionedThroughput.ReadCapacityUnits),
+ "WriteCapacityUnits": intmax(1, ind.ProvisionedThroughput.WriteCapacityUnits),
+ }
+ globalSecondaryIndexes = append(globalSecondaryIndexes, rec)
+ }
+
+ if len(localSecondaryIndexes) > 0 {
+ b["LocalSecondaryIndexes"] = localSecondaryIndexes
+ }
+
+ if len(globalSecondaryIndexes) > 0 {
+ b["GlobalSecondaryIndexes"] = globalSecondaryIndexes
+ }
+}
+
+func (q *Query) AddDeleteRequestTable(description TableDescriptionT) {
+ b := q.buffer
+ b["TableName"] = description.TableName
+}
+
+func (q *Query) AddUpdateRequestTable(description TableDescriptionT) {
+ b := q.buffer
+
+ attDefs := []interface{}{}
+ for _, attr := range description.AttributeDefinitions {
+ attDefs = append(attDefs, msi{
+ "AttributeName": attr.Name,
+ "AttributeType": attr.Type,
+ })
+ }
+ if len(attDefs) > 0 {
+ b["AttributeDefinitions"] = attDefs
+ }
+ b["TableName"] = description.TableName
+ b["ProvisionedThroughput"] = msi{
+ "ReadCapacityUnits": int(description.ProvisionedThroughput.ReadCapacityUnits),
+ "WriteCapacityUnits": int(description.ProvisionedThroughput.WriteCapacityUnits),
+ }
+
+}
+
+func (q *Query) AddKeyConditions(comparisons []AttributeComparison) {
+ q.buffer["KeyConditions"] = buildComparisons(comparisons)
+}
+
+func (q *Query) AddLimit(limit int64) {
+ q.buffer["Limit"] = limit
+}
+func (q *Query) AddSelect(value string) {
+ q.buffer["Select"] = value
+}
+
+func (q *Query) AddIndex(value string) {
+ q.buffer["IndexName"] = value
+}
+
+func (q *Query) ScanIndexDescending() {
+ q.buffer["ScanIndexForward"] = "false"
+}
+
+/*
+ "ScanFilter":{
+ "AttributeName1":{"AttributeValueList":[{"S":"AttributeValue"}],"ComparisonOperator":"EQ"}
+ },
+*/
+func (q *Query) AddScanFilter(comparisons []AttributeComparison) {
+ q.buffer["ScanFilter"] = buildComparisons(comparisons)
+}
+
+func (q *Query) AddParallelScanConfiguration(segment int, totalSegments int) {
+ q.buffer["Segment"] = segment
+ q.buffer["TotalSegments"] = totalSegments
+}
+
+func buildComparisons(comparisons []AttributeComparison) msi {
+ out := msi{}
+
+ for _, c := range comparisons {
+ avlist := []interface{}{}
+ for _, attributeValue := range c.AttributeValueList {
+ avlist = append(avlist, msi{attributeValue.Type: attributeValue.Value})
+ }
+ out[c.AttributeName] = msi{
+ "AttributeValueList": avlist,
+ "ComparisonOperator": c.ComparisonOperator,
+ }
+ }
+
+ return out
+}
+
+// The primary key must be included in attributes.
+func (q *Query) AddItem(attributes []Attribute) {
+ q.buffer["Item"] = attributeList(attributes)
+}
+
+func (q *Query) AddUpdates(attributes []Attribute, action string) {
+ updates := msi{}
+ for _, a := range attributes {
+ au := msi{
+ "Value": msi{
+ a.Type: map[bool]interface{}{true: a.SetValues, false: a.Value}[a.SetType()],
+ },
+ "Action": action,
+ }
+ // Delete 'Value' from AttributeUpdates if Type is not Set
+ if action == "DELETE" && !a.SetType() {
+ delete(au, "Value")
+ }
+ updates[a.Name] = au
+ }
+
+ q.buffer["AttributeUpdates"] = updates
+}
+
+func (q *Query) AddExpected(attributes []Attribute) {
+ expected := msi{}
+ for _, a := range attributes {
+ value := msi{}
+ if a.Exists != "" {
+ value["Exists"] = a.Exists
+ }
+ // If set Exists to false, we must remove Value
+ if value["Exists"] != "false" {
+ value["Value"] = msi{a.Type: map[bool]interface{}{true: a.SetValues, false: a.Value}[a.SetType()]}
+ }
+ expected[a.Name] = value
+ }
+ q.buffer["Expected"] = expected
+}
+
+// Add the ReturnValues parameter, used in UpdateItem queries.
+func (q *Query) AddReturnValues(returnValues ReturnValues) {
+ q.buffer["ReturnValues"] = string(returnValues)
+}
+
+// Add the UpdateExpression parameter, used in UpdateItem queries.
+func (q *Query) AddUpdateExpression(expression string) {
+ q.buffer["UpdateExpression"] = expression
+}
+
+// Add the ConditionExpression parameter, used in UpdateItem queries.
+func (q *Query) AddConditionExpression(expression string) {
+ q.buffer["ConditionExpression"] = expression
+}
+
+func (q *Query) AddExpressionAttributes(attributes []Attribute) {
+ existing, ok := q.buffer["ExpressionAttributes"].(msi)
+ if !ok {
+ existing = msi{}
+ q.buffer["ExpressionAttributes"] = existing
+ }
+ for key, val := range attributeList(attributes) {
+ existing[key] = val
+ }
+}
+
+func (q *Query) AddExclusiveStartStreamArn(arn string) {
+ q.buffer["ExclusiveStartStreamArn"] = arn
+}
+
+func (q *Query) AddStreamArn(arn string) {
+ q.buffer["StreamArn"] = arn
+}
+
+func (q *Query) AddExclusiveStartShardId(shardId string) {
+ q.buffer["ExclusiveStartShardId"] = shardId
+}
+
+func (q *Query) AddShardId(shardId string) {
+ q.buffer["ShardId"] = shardId
+}
+
+func (q *Query) AddShardIteratorType(shardIteratorType string) {
+ q.buffer["ShardIteratorType"] = shardIteratorType
+}
+
+func (q *Query) AddSequenceNumber(sequenceNumber string) {
+ q.buffer["SequenceNumber"] = sequenceNumber
+}
+
+func (q *Query) AddShardIterator(shardIterator string) {
+ q.buffer["ShardIterator"] = shardIterator
+}
+
+func attributeList(attributes []Attribute) msi {
+ b := msi{}
+ for _, a := range attributes {
+ //UGH!! (I miss the query operator)
+ b[a.Name] = msi{a.Type: map[bool]interface{}{true: a.SetValues, false: a.Value}[a.SetType()]}
+ }
+ return b
+}
+
+func (q *Query) addTable(t *Table) {
+ q.addTableByName(t.Name)
+}
+
+func (q *Query) addTableByName(tableName string) {
+ q.buffer["TableName"] = tableName
+}
+
+func (q *Query) String() string {
+ bytes, _ := json.Marshal(q.buffer)
+ return string(bytes)
+}