diff options
Diffstat (limited to 'vendor/github.com/goamz/goamz/dynamodb/query_builder.go')
-rw-r--r-- | vendor/github.com/goamz/goamz/dynamodb/query_builder.go | 362 |
1 files changed, 362 insertions, 0 deletions
diff --git a/vendor/github.com/goamz/goamz/dynamodb/query_builder.go b/vendor/github.com/goamz/goamz/dynamodb/query_builder.go new file mode 100644 index 000000000..47a90bb1c --- /dev/null +++ b/vendor/github.com/goamz/goamz/dynamodb/query_builder.go @@ -0,0 +1,362 @@ +package dynamodb + +import ( + "encoding/json" + "sort" +) + +type msi map[string]interface{} +type Query struct { + buffer msi +} + +func NewEmptyQuery() *Query { + return &Query{msi{}} +} + +func NewQuery(t *Table) *Query { + q := &Query{msi{}} + q.addTable(t) + return q +} + +// This way of specifing the key is used when doing a Get. +// If rangeKey is "", it is assumed to not want to be used +func (q *Query) AddKey(t *Table, key *Key) { + k := t.Key + keymap := msi{ + k.KeyAttribute.Name: msi{ + k.KeyAttribute.Type: key.HashKey}, + } + if k.HasRange() { + keymap[k.RangeAttribute.Name] = msi{k.RangeAttribute.Type: key.RangeKey} + } + + q.buffer["Key"] = keymap +} + +func keyAttributes(t *Table, key *Key) msi { + k := t.Key + + out := msi{} + out[k.KeyAttribute.Name] = msi{k.KeyAttribute.Type: key.HashKey} + if k.HasRange() { + out[k.RangeAttribute.Name] = msi{k.RangeAttribute.Type: key.RangeKey} + } + return out +} + +func (q *Query) AddAttributesToGet(attributes []string) { + if len(attributes) == 0 { + return + } + + q.buffer["AttributesToGet"] = attributes +} + +func (q *Query) ConsistentRead(c bool) { + if c == true { + q.buffer["ConsistentRead"] = "true" //String "true", not bool true + } +} + +func (q *Query) AddGetRequestItems(tableKeys map[*Table][]Key) { + requestitems := msi{} + for table, keys := range tableKeys { + keyslist := []msi{} + for _, key := range keys { + keyslist = append(keyslist, keyAttributes(table, &key)) + } + requestitems[table.Name] = msi{"Keys": keyslist} + } + q.buffer["RequestItems"] = requestitems +} + +func (q *Query) AddWriteRequestItems(tableItems map[*Table]map[string][][]Attribute) { + b := q.buffer + + b["RequestItems"] = func() msi { + out := msi{} + for table, itemActions := range tableItems { + out[table.Name] = func() interface{} { + out2 := []interface{}{} + + // here breaks an order of array.... + // For now, we iterate over sorted key by action for stable testing + keys := []string{} + for k := range itemActions { + keys = append(keys, k) + } + sort.Strings(keys) + + for ki := range keys { + action := keys[ki] + items := itemActions[action] + for _, attributes := range items { + Item_or_Key := map[bool]string{true: "Item", false: "Key"}[action == "Put"] + out2 = append(out2, msi{action + "Request": msi{Item_or_Key: attributeList(attributes)}}) + } + } + return out2 + }() + } + return out + }() +} + +func (q *Query) AddCreateRequestTable(description TableDescriptionT) { + b := q.buffer + + attDefs := []interface{}{} + for _, attr := range description.AttributeDefinitions { + attDefs = append(attDefs, msi{ + "AttributeName": attr.Name, + "AttributeType": attr.Type, + }) + } + b["AttributeDefinitions"] = attDefs + b["KeySchema"] = description.KeySchema + b["TableName"] = description.TableName + b["ProvisionedThroughput"] = msi{ + "ReadCapacityUnits": int(description.ProvisionedThroughput.ReadCapacityUnits), + "WriteCapacityUnits": int(description.ProvisionedThroughput.WriteCapacityUnits), + } + + if description.StreamSpecification.StreamEnabled { + b["StreamSpecification"] = msi{ + "StreamEnabled": "true", + "StreamViewType": description.StreamSpecification.StreamViewType, + } + } + + localSecondaryIndexes := []interface{}{} + + for _, ind := range description.LocalSecondaryIndexes { + localSecondaryIndexes = append(localSecondaryIndexes, msi{ + "IndexName": ind.IndexName, + "KeySchema": ind.KeySchema, + "Projection": ind.Projection, + }) + } + + globalSecondaryIndexes := []interface{}{} + intmax := func(x, y int64) int64 { + if x > y { + return x + } + return y + } + for _, ind := range description.GlobalSecondaryIndexes { + rec := msi{ + "IndexName": ind.IndexName, + "KeySchema": ind.KeySchema, + "Projection": ind.Projection, + } + // need at least one unit, and since go's max() is float based. + rec["ProvisionedThroughput"] = msi{ + "ReadCapacityUnits": intmax(1, ind.ProvisionedThroughput.ReadCapacityUnits), + "WriteCapacityUnits": intmax(1, ind.ProvisionedThroughput.WriteCapacityUnits), + } + globalSecondaryIndexes = append(globalSecondaryIndexes, rec) + } + + if len(localSecondaryIndexes) > 0 { + b["LocalSecondaryIndexes"] = localSecondaryIndexes + } + + if len(globalSecondaryIndexes) > 0 { + b["GlobalSecondaryIndexes"] = globalSecondaryIndexes + } +} + +func (q *Query) AddDeleteRequestTable(description TableDescriptionT) { + b := q.buffer + b["TableName"] = description.TableName +} + +func (q *Query) AddUpdateRequestTable(description TableDescriptionT) { + b := q.buffer + + attDefs := []interface{}{} + for _, attr := range description.AttributeDefinitions { + attDefs = append(attDefs, msi{ + "AttributeName": attr.Name, + "AttributeType": attr.Type, + }) + } + if len(attDefs) > 0 { + b["AttributeDefinitions"] = attDefs + } + b["TableName"] = description.TableName + b["ProvisionedThroughput"] = msi{ + "ReadCapacityUnits": int(description.ProvisionedThroughput.ReadCapacityUnits), + "WriteCapacityUnits": int(description.ProvisionedThroughput.WriteCapacityUnits), + } + +} + +func (q *Query) AddKeyConditions(comparisons []AttributeComparison) { + q.buffer["KeyConditions"] = buildComparisons(comparisons) +} + +func (q *Query) AddLimit(limit int64) { + q.buffer["Limit"] = limit +} +func (q *Query) AddSelect(value string) { + q.buffer["Select"] = value +} + +func (q *Query) AddIndex(value string) { + q.buffer["IndexName"] = value +} + +func (q *Query) ScanIndexDescending() { + q.buffer["ScanIndexForward"] = "false" +} + +/* + "ScanFilter":{ + "AttributeName1":{"AttributeValueList":[{"S":"AttributeValue"}],"ComparisonOperator":"EQ"} + }, +*/ +func (q *Query) AddScanFilter(comparisons []AttributeComparison) { + q.buffer["ScanFilter"] = buildComparisons(comparisons) +} + +func (q *Query) AddParallelScanConfiguration(segment int, totalSegments int) { + q.buffer["Segment"] = segment + q.buffer["TotalSegments"] = totalSegments +} + +func buildComparisons(comparisons []AttributeComparison) msi { + out := msi{} + + for _, c := range comparisons { + avlist := []interface{}{} + for _, attributeValue := range c.AttributeValueList { + avlist = append(avlist, msi{attributeValue.Type: attributeValue.Value}) + } + out[c.AttributeName] = msi{ + "AttributeValueList": avlist, + "ComparisonOperator": c.ComparisonOperator, + } + } + + return out +} + +// The primary key must be included in attributes. +func (q *Query) AddItem(attributes []Attribute) { + q.buffer["Item"] = attributeList(attributes) +} + +func (q *Query) AddUpdates(attributes []Attribute, action string) { + updates := msi{} + for _, a := range attributes { + au := msi{ + "Value": msi{ + a.Type: map[bool]interface{}{true: a.SetValues, false: a.Value}[a.SetType()], + }, + "Action": action, + } + // Delete 'Value' from AttributeUpdates if Type is not Set + if action == "DELETE" && !a.SetType() { + delete(au, "Value") + } + updates[a.Name] = au + } + + q.buffer["AttributeUpdates"] = updates +} + +func (q *Query) AddExpected(attributes []Attribute) { + expected := msi{} + for _, a := range attributes { + value := msi{} + if a.Exists != "" { + value["Exists"] = a.Exists + } + // If set Exists to false, we must remove Value + if value["Exists"] != "false" { + value["Value"] = msi{a.Type: map[bool]interface{}{true: a.SetValues, false: a.Value}[a.SetType()]} + } + expected[a.Name] = value + } + q.buffer["Expected"] = expected +} + +// Add the ReturnValues parameter, used in UpdateItem queries. +func (q *Query) AddReturnValues(returnValues ReturnValues) { + q.buffer["ReturnValues"] = string(returnValues) +} + +// Add the UpdateExpression parameter, used in UpdateItem queries. +func (q *Query) AddUpdateExpression(expression string) { + q.buffer["UpdateExpression"] = expression +} + +// Add the ConditionExpression parameter, used in UpdateItem queries. +func (q *Query) AddConditionExpression(expression string) { + q.buffer["ConditionExpression"] = expression +} + +func (q *Query) AddExpressionAttributes(attributes []Attribute) { + existing, ok := q.buffer["ExpressionAttributes"].(msi) + if !ok { + existing = msi{} + q.buffer["ExpressionAttributes"] = existing + } + for key, val := range attributeList(attributes) { + existing[key] = val + } +} + +func (q *Query) AddExclusiveStartStreamArn(arn string) { + q.buffer["ExclusiveStartStreamArn"] = arn +} + +func (q *Query) AddStreamArn(arn string) { + q.buffer["StreamArn"] = arn +} + +func (q *Query) AddExclusiveStartShardId(shardId string) { + q.buffer["ExclusiveStartShardId"] = shardId +} + +func (q *Query) AddShardId(shardId string) { + q.buffer["ShardId"] = shardId +} + +func (q *Query) AddShardIteratorType(shardIteratorType string) { + q.buffer["ShardIteratorType"] = shardIteratorType +} + +func (q *Query) AddSequenceNumber(sequenceNumber string) { + q.buffer["SequenceNumber"] = sequenceNumber +} + +func (q *Query) AddShardIterator(shardIterator string) { + q.buffer["ShardIterator"] = shardIterator +} + +func attributeList(attributes []Attribute) msi { + b := msi{} + for _, a := range attributes { + //UGH!! (I miss the query operator) + b[a.Name] = msi{a.Type: map[bool]interface{}{true: a.SetValues, false: a.Value}[a.SetType()]} + } + return b +} + +func (q *Query) addTable(t *Table) { + q.addTableByName(t.Name) +} + +func (q *Query) addTableByName(tableName string) { + q.buffer["TableName"] = tableName +} + +func (q *Query) String() string { + bytes, _ := json.Marshal(q.buffer) + return string(bytes) +} |