summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/goamz/goamz/dynamodb/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/goamz/goamz/dynamodb/stream.go')
-rw-r--r--vendor/github.com/goamz/goamz/dynamodb/stream.go307
1 files changed, 307 insertions, 0 deletions
diff --git a/vendor/github.com/goamz/goamz/dynamodb/stream.go b/vendor/github.com/goamz/goamz/dynamodb/stream.go
new file mode 100644
index 000000000..57f3a145f
--- /dev/null
+++ b/vendor/github.com/goamz/goamz/dynamodb/stream.go
@@ -0,0 +1,307 @@
+package dynamodb
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "reflect"
+
+ simplejson "github.com/bitly/go-simplejson"
+)
+
+type Stream struct {
+ Server *Server
+ Arn string
+}
+
+type StreamListItemT struct {
+ StreamArn string
+ StreamLabel string
+ TableName string
+}
+
+type SequenceNumberRangeT struct {
+ EndingSequenceNumber string
+ StartingSequenceNumber string
+}
+
+type ShardT struct {
+ ParentShardId string
+ SequenceNumberRange SequenceNumberRangeT
+ ShardId string
+}
+
+type StreamDescriptionT struct {
+ CreationDateTime float64
+ KeySchema []KeySchemaT
+ LastEvaluatedShardId string
+ Shards []ShardT
+ StreamArn string
+ StreamLabel string
+ StreamStatus string
+ StreamViewType string
+ TableName string
+}
+
+type RecordT struct {
+ AwsRegion string
+ EventID string
+ EventName string
+ EventSource string
+ EventVersion string
+ StreamRecord *StreamRecordT
+}
+
+type StreamRecordT struct {
+ Keys map[string]*Attribute
+ NewImage map[string]*Attribute
+ OldImage map[string]*Attribute
+ SequenceNumber string
+ StreamViewType string
+ SizeBytes int64
+}
+
+type listStreamsResponse struct {
+ Streams []StreamListItemT
+}
+
+type describeStreamResponse struct {
+ StreamDescription StreamDescriptionT
+}
+
+var ErrNoRecords = errors.New("No records")
+
+func (s *Server) ListStreams(startArn string) ([]StreamListItemT, error) {
+ return s.LimitedListTableStreams("", startArn, 0)
+}
+
+func (s *Server) LimitedListStreams(startArn string, limit int64) ([]StreamListItemT, error) {
+ return s.LimitedListTableStreams("", startArn, limit)
+}
+
+func (s *Server) ListTableStreams(table, startArn string) ([]StreamListItemT, error) {
+ return s.LimitedListTableStreams(table, startArn, 0)
+}
+
+func (s *Server) LimitedListTableStreams(table, startArn string, limit int64) ([]StreamListItemT, error) {
+ query := NewEmptyQuery()
+
+ if len(table) != 0 {
+ query.addTableByName(table)
+ }
+
+ if len(startArn) != 0 {
+ query.AddExclusiveStartStreamArn(startArn)
+ }
+
+ if limit > 0 {
+ query.AddLimit(limit)
+ }
+
+ jsonResponse, err := s.queryServer(streamsTarget("ListStreams"), query)
+ if err != nil {
+ return nil, err
+ }
+
+ var r listStreamsResponse
+ err = json.Unmarshal(jsonResponse, &r)
+ if err != nil {
+ return nil, err
+ }
+
+ return r.Streams, nil
+}
+
+func (s *Server) DescribeStream(arn, startShardId string) (*StreamDescriptionT, error) {
+ return s.LimitedDescribeStream(arn, startShardId, 0)
+}
+
+func (s *Server) LimitedDescribeStream(arn, startShardId string, limit int64) (*StreamDescriptionT, error) {
+ query := NewEmptyQuery()
+ query.AddStreamArn(arn)
+
+ if len(startShardId) != 0 {
+ query.AddExclusiveStartShardId(startShardId)
+ }
+
+ if limit > 0 {
+ query.AddLimit(limit)
+ }
+
+ jsonResponse, err := s.queryServer(streamsTarget("DescribeStream"), query)
+ if err != nil {
+ return nil, err
+ }
+
+ var r describeStreamResponse
+ err = json.Unmarshal(jsonResponse, &r)
+ if err != nil {
+ return nil, err
+ }
+
+ return &r.StreamDescription, nil
+}
+
+func (s *Server) NewStream(streamArn string) *Stream {
+ return &Stream{s, streamArn}
+}
+
+func (s *Stream) DescribeStream(startShardId string) (*StreamDescriptionT, error) {
+ return s.Server.DescribeStream(s.Arn, startShardId)
+}
+
+func (s *Stream) LimitedDescribeStream(startShardId string, limit int64) (*StreamDescriptionT, error) {
+ return s.Server.LimitedDescribeStream(s.Arn, startShardId, limit)
+}
+
+func (s *Server) GetShardIterator(streamArn, shardId, shardIteratorType, sequenceNumber string) (string, error) {
+ query := NewEmptyQuery()
+ query.AddStreamArn(streamArn)
+ query.AddShardId(shardId)
+ query.AddShardIteratorType(shardIteratorType)
+
+ if len(sequenceNumber) != 0 {
+ query.AddSequenceNumber(sequenceNumber)
+ }
+
+ jsonResponse, err := s.queryServer(streamsTarget("GetShardIterator"), query)
+
+ if err != nil {
+ return "unknown", err
+ }
+
+ json, err := simplejson.NewJson(jsonResponse)
+
+ if err != nil {
+ return "unknown", err
+ }
+
+ return json.Get("ShardIterator").MustString(), nil
+}
+
+func (s *Stream) GetShardIterator(shardId, shardIteratorType, sequenceNumber string) (string, error) {
+ return s.Server.GetShardIterator(s.Arn, shardId, shardIteratorType, sequenceNumber)
+}
+
+func (s *Server) GetRecords(shardIterator string) (string, []*RecordT, error) {
+ return s.LimitedGetRecords(shardIterator, 0)
+}
+
+func (s *Server) LimitedGetRecords(shardIterator string, limit int64) (string, []*RecordT, error) {
+ query := NewEmptyQuery()
+ query.AddShardIterator(shardIterator)
+
+ if limit > 0 {
+ query.AddLimit(limit)
+ }
+
+ jsonResponse, err := s.queryServer(streamsTarget("GetRecords"), query)
+ if err != nil {
+ return "", nil, err
+ }
+
+ jsonParsed, err := simplejson.NewJson(jsonResponse)
+ if err != nil {
+ return "", nil, err
+ }
+
+ nextShardIt := ""
+ nextShardItJson, ok := jsonParsed.CheckGet("NextShardIterator")
+ if ok {
+ nextShardIt, err = nextShardItJson.String()
+ if err != nil {
+ message := fmt.Sprintf("Unexpected response %s", jsonResponse)
+ return "", nil, errors.New(message)
+ }
+ }
+
+ recordsJson, ok := jsonParsed.CheckGet("Records")
+ if !ok {
+ return nextShardIt, nil, ErrNoRecords
+ }
+
+ recordsArray, err := recordsJson.Array()
+ if err != nil {
+ message := fmt.Sprintf("Unexpected response %s", jsonResponse)
+ return nextShardIt, nil, errors.New(message)
+ }
+
+ var records []*RecordT
+ for _, record := range recordsArray {
+ if recordMap, ok := record.(map[string]interface{}); ok {
+ r := parseRecord(recordMap)
+ records = append(records, r)
+ }
+ }
+
+ return nextShardIt, records, nil
+}
+
+func (s *Stream) GetRecords(shardIterator string) (string, []*RecordT, error) {
+ return s.Server.GetRecords(shardIterator)
+}
+
+func (s *Stream) LimitedGetRecords(shardIterator string, limit int64) (string, []*RecordT, error) {
+ return s.Server.LimitedGetRecords(shardIterator, limit)
+}
+
+func parseRecord(r map[string]interface{}) *RecordT {
+ record := RecordT{}
+ rValue := reflect.ValueOf(&record)
+
+ keys := []string{"awsRegion", "eventID", "eventName", "eventSource", "eventVersion"}
+ for i, key := range keys {
+ if value, ok := r[key]; ok {
+ if valueStr, ok := value.(string); ok {
+ rValue.Elem().Field(i).SetString(valueStr)
+ }
+ }
+ }
+
+ if streamRecord, ok := r["dynamodb"]; ok {
+ if streamRecordMap, ok := streamRecord.(map[string]interface{}); ok {
+ record.StreamRecord = parseStreamRecord(streamRecordMap)
+ }
+ }
+
+ return &record
+}
+
+func parseStreamRecord(s map[string]interface{}) *StreamRecordT {
+ sr := StreamRecordT{}
+ rValue := reflect.ValueOf(&sr)
+
+ attrKeys := []string{"Keys", "NewImage", "OldImage"}
+ numAttrKeys := len(attrKeys)
+ for i, key := range attrKeys {
+ if value, ok := s[key]; ok {
+ if valueMap, ok := value.(map[string]interface{}); ok {
+ attrs := parseAttributes(valueMap)
+ rValue.Elem().Field(i).Set(reflect.ValueOf(attrs))
+ }
+ }
+ }
+
+ strKeys := []string{"SequenceNumber", "StreamViewType"}
+ numStrKeys := len(strKeys)
+ for i, key := range strKeys {
+ if value, ok := s[key]; ok {
+ if valueStr, ok := value.(string); ok {
+ rValue.Elem().Field(i + numAttrKeys).SetString(valueStr)
+ }
+ }
+ }
+
+ intKeys := []string{"SizeBytes"}
+ for i, key := range intKeys {
+ if value, ok := s[key]; ok {
+ if valueNumber, ok := value.(json.Number); ok {
+ if valueInt, err := valueNumber.Int64(); err == nil {
+ rValue.Elem().Field(i + numAttrKeys + numStrKeys).SetInt(valueInt)
+ }
+ }
+ }
+ }
+
+ return &sr
+}