summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/api-get-object.go
diff options
context:
space:
mode:
authorHarshavardhana <harsha@minio.io>2016-10-26 05:21:07 -0700
committerChristopher Speller <crspeller@gmail.com>2016-10-26 08:21:07 -0400
commitf02620b291b988848392c455a7719699f6b5c00f (patch)
tree695e07607e86b000b9fe78e77df7f33673f1a755 /vendor/github.com/minio/minio-go/api-get-object.go
parentb354d25d3731b53613489d95cfa4c946cf8e0888 (diff)
downloadchat-f02620b291b988848392c455a7719699f6b5c00f.tar.gz
chat-f02620b291b988848392c455a7719699f6b5c00f.tar.bz2
chat-f02620b291b988848392c455a7719699f6b5c00f.zip
Moving away from goamz to use minio-go instead. (#4193)
minio-go does fully managed way of handling S3 API requests - Automatic bucket location management across all s3 regions. - Transparently upload large files in multipart if file 64MB or larger. - Right GetObject() API provides compatibility with io.ReadWriteSeeker interface. - Various other APIs including bulk deletes, server side object copy, bucket policies and bucket notifications. Fixes #4182
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-get-object.go')
-rw-r--r--vendor/github.com/minio/minio-go/api-get-object.go643
1 files changed, 643 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/api-get-object.go b/vendor/github.com/minio/minio-go/api-get-object.go
new file mode 100644
index 000000000..1f0dabb05
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/api-get-object.go
@@ -0,0 +1,643 @@
+/*
+ * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package minio
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "sync"
+ "time"
+)
+
+// GetObject - returns an seekable, readable object.
+func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
+ // Input validation.
+ if err := isValidBucketName(bucketName); err != nil {
+ return nil, err
+ }
+ if err := isValidObjectName(objectName); err != nil {
+ return nil, err
+ }
+
+ var httpReader io.ReadCloser
+ var objectInfo ObjectInfo
+ var err error
+ // Create request channel.
+ reqCh := make(chan getRequest)
+ // Create response channel.
+ resCh := make(chan getResponse)
+ // Create done channel.
+ doneCh := make(chan struct{})
+
+ // This routine feeds partial object data as and when the caller reads.
+ go func() {
+ defer close(reqCh)
+ defer close(resCh)
+
+ // Loop through the incoming control messages and read data.
+ for {
+ select {
+ // When the done channel is closed exit our routine.
+ case <-doneCh:
+ // Close the http response body before returning.
+ // This ends the connection with the server.
+ if httpReader != nil {
+ httpReader.Close()
+ }
+ return
+
+ // Gather incoming request.
+ case req := <-reqCh:
+ // If this is the first request we may not need to do a getObject request yet.
+ if req.isFirstReq {
+ // First request is a Read/ReadAt.
+ if req.isReadOp {
+ // Differentiate between wanting the whole object and just a range.
+ if req.isReadAt {
+ // If this is a ReadAt request only get the specified range.
+ // Range is set with respect to the offset and length of the buffer requested.
+ // Do not set objectInfo from the first readAt request because it will not get
+ // the whole object.
+ httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
+ } else {
+ // First request is a Read request.
+ httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, 0)
+ }
+ if err != nil {
+ resCh <- getResponse{
+ Error: err,
+ }
+ return
+ }
+ // Read at least firstReq.Buffer bytes, if not we have
+ // reached our EOF.
+ size, err := io.ReadFull(httpReader, req.Buffer)
+ if err == io.ErrUnexpectedEOF {
+ // If an EOF happens after reading some but not
+ // all the bytes ReadFull returns ErrUnexpectedEOF
+ err = io.EOF
+ }
+ // Send back the first response.
+ resCh <- getResponse{
+ objectInfo: objectInfo,
+ Size: int(size),
+ Error: err,
+ didRead: true,
+ }
+ } else {
+ // First request is a Stat or Seek call.
+ // Only need to run a StatObject until an actual Read or ReadAt request comes through.
+ objectInfo, err = c.StatObject(bucketName, objectName)
+ if err != nil {
+ resCh <- getResponse{
+ Error: err,
+ }
+ // Exit the go-routine.
+ return
+ }
+ // Send back the first response.
+ resCh <- getResponse{
+ objectInfo: objectInfo,
+ }
+ }
+ } else if req.settingObjectInfo { // Request is just to get objectInfo.
+ objectInfo, err := c.StatObject(bucketName, objectName)
+ if err != nil {
+ resCh <- getResponse{
+ Error: err,
+ }
+ // Exit the goroutine.
+ return
+ }
+ // Send back the objectInfo.
+ resCh <- getResponse{
+ objectInfo: objectInfo,
+ }
+ } else {
+ // Offset changes fetch the new object at an Offset.
+ // Because the httpReader may not be set by the first
+ // request if it was a stat or seek it must be checked
+ // if the object has been read or not to only initialize
+ // new ones when they haven't been already.
+ // All readAt requests are new requests.
+ if req.DidOffsetChange || !req.beenRead {
+ if httpReader != nil {
+ // Close previously opened http reader.
+ httpReader.Close()
+ }
+ // If this request is a readAt only get the specified range.
+ if req.isReadAt {
+ // Range is set with respect to the offset and length of the buffer requested.
+ httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
+ } else {
+ httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, 0)
+ }
+ if err != nil {
+ resCh <- getResponse{
+ Error: err,
+ }
+ return
+ }
+ }
+
+ // Read at least req.Buffer bytes, if not we have
+ // reached our EOF.
+ size, err := io.ReadFull(httpReader, req.Buffer)
+ if err == io.ErrUnexpectedEOF {
+ // If an EOF happens after reading some but not
+ // all the bytes ReadFull returns ErrUnexpectedEOF
+ err = io.EOF
+ }
+ // Reply back how much was read.
+ resCh <- getResponse{
+ Size: int(size),
+ Error: err,
+ didRead: true,
+ objectInfo: objectInfo,
+ }
+ }
+ }
+ }
+ }()
+
+ // Create a newObject through the information sent back by reqCh.
+ return newObject(reqCh, resCh, doneCh), nil
+}
+
+// get request message container to communicate with internal
+// go-routine.
+type getRequest struct {
+ Buffer []byte
+ Offset int64 // readAt offset.
+ DidOffsetChange bool // Tracks the offset changes for Seek requests.
+ beenRead bool // Determines if this is the first time an object is being read.
+ isReadAt bool // Determines if this request is a request to a specific range
+ isReadOp bool // Determines if this request is a Read or Read/At request.
+ isFirstReq bool // Determines if this request is the first time an object is being accessed.
+ settingObjectInfo bool // Determines if this request is to set the objectInfo of an object.
+}
+
+// get response message container to reply back for the request.
+type getResponse struct {
+ Size int
+ Error error
+ didRead bool // Lets subsequent calls know whether or not httpReader has been initiated.
+ objectInfo ObjectInfo // Used for the first request.
+}
+
+// Object represents an open object. It implements Read, ReadAt,
+// Seeker, Close for a HTTP stream.
+type Object struct {
+ // Mutex.
+ mutex *sync.Mutex
+
+ // User allocated and defined.
+ reqCh chan<- getRequest
+ resCh <-chan getResponse
+ doneCh chan<- struct{}
+ prevOffset int64
+ currOffset int64
+ objectInfo ObjectInfo
+
+ // Keeps track of closed call.
+ isClosed bool
+
+ // Keeps track of if this is the first call.
+ isStarted bool
+
+ // Previous error saved for future calls.
+ prevErr error
+
+ // Keeps track of if this object has been read yet.
+ beenRead bool
+
+ // Keeps track of if objectInfo has been set yet.
+ objectInfoSet bool
+}
+
+// doGetRequest - sends and blocks on the firstReqCh and reqCh of an object.
+// Returns back the size of the buffer read, if anything was read, as well
+// as any error encountered. For all first requests sent on the object
+// it is also responsible for sending back the objectInfo.
+func (o *Object) doGetRequest(request getRequest) (getResponse, error) {
+ o.reqCh <- request
+ response := <-o.resCh
+ // This was the first request.
+ if !o.isStarted {
+ // The object has been operated on.
+ o.isStarted = true
+ }
+ // Set the objectInfo if the request was not readAt
+ // and it hasn't been set before.
+ if !o.objectInfoSet && !request.isReadAt {
+ o.objectInfo = response.objectInfo
+ o.objectInfoSet = true
+ }
+ // Set beenRead only if it has not been set before.
+ if !o.beenRead {
+ o.beenRead = response.didRead
+ }
+ // Return any error to the top level.
+ if response.Error != nil {
+ return response, response.Error
+ }
+ return response, nil
+}
+
+// setOffset - handles the setting of offsets for
+// Read/ReadAt/Seek requests.
+func (o *Object) setOffset(bytesRead int64) error {
+ // Update the currentOffset.
+ o.currOffset += bytesRead
+ // Save the current offset as previous offset.
+ o.prevOffset = o.currOffset
+
+ if o.currOffset >= o.objectInfo.Size {
+ return io.EOF
+ }
+ return nil
+}
+
+// Read reads up to len(p) bytes into p. It returns the number of
+// bytes read (0 <= n <= len(p)) and any error encountered. Returns
+// io.EOF upon end of file.
+func (o *Object) Read(b []byte) (n int, err error) {
+ if o == nil {
+ return 0, ErrInvalidArgument("Object is nil")
+ }
+
+ // Locking.
+ o.mutex.Lock()
+ defer o.mutex.Unlock()
+
+ // prevErr is previous error saved from previous operation.
+ if o.prevErr != nil || o.isClosed {
+ return 0, o.prevErr
+ }
+ // Create a new request.
+ readReq := getRequest{
+ isReadOp: true,
+ beenRead: o.beenRead,
+ Buffer: b,
+ }
+
+ // Alert that this is the first request.
+ if !o.isStarted {
+ readReq.isFirstReq = true
+ }
+
+ // Verify if offset has changed and currOffset is greater than
+ // previous offset. Perhaps due to Seek().
+ offsetChange := o.prevOffset - o.currOffset
+ if offsetChange < 0 {
+ offsetChange = -offsetChange
+ }
+ if offsetChange > 0 {
+ // Fetch the new reader at the current offset again.
+ readReq.Offset = o.currOffset
+ readReq.DidOffsetChange = true
+ } else {
+ // No offset changes no need to fetch new reader, continue
+ // reading.
+ readReq.DidOffsetChange = false
+ readReq.Offset = 0
+ }
+
+ // Send and receive from the first request.
+ response, err := o.doGetRequest(readReq)
+ if err != nil && err != io.EOF {
+ // Save the error for future calls.
+ o.prevErr = err
+ return response.Size, err
+ }
+
+ // Bytes read.
+ bytesRead := int64(response.Size)
+
+ // Set the new offset.
+ oerr := o.setOffset(bytesRead)
+ if oerr != nil {
+ // Save the error for future calls.
+ o.prevErr = oerr
+ return response.Size, oerr
+ }
+
+ // Return the response.
+ return response.Size, err
+}
+
+// Stat returns the ObjectInfo structure describing object.
+func (o *Object) Stat() (ObjectInfo, error) {
+ if o == nil {
+ return ObjectInfo{}, ErrInvalidArgument("Object is nil")
+ }
+ // Locking.
+ o.mutex.Lock()
+ defer o.mutex.Unlock()
+
+ if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed {
+ return ObjectInfo{}, o.prevErr
+ }
+
+ // This is the first request.
+ if !o.isStarted || !o.objectInfoSet {
+ statReq := getRequest{
+ isFirstReq: !o.isStarted,
+ settingObjectInfo: !o.objectInfoSet,
+ }
+
+ // Send the request and get the response.
+ _, err := o.doGetRequest(statReq)
+ if err != nil {
+ o.prevErr = err
+ return ObjectInfo{}, err
+ }
+ }
+
+ return o.objectInfo, nil
+}
+
+// ReadAt reads len(b) bytes from the File starting at byte offset
+// off. It returns the number of bytes read and the error, if any.
+// ReadAt always returns a non-nil error when n < len(b). At end of
+// file, that error is io.EOF.
+func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
+ if o == nil {
+ return 0, ErrInvalidArgument("Object is nil")
+ }
+
+ // Locking.
+ o.mutex.Lock()
+ defer o.mutex.Unlock()
+
+ // prevErr is error which was saved in previous operation.
+ if o.prevErr != nil || o.isClosed {
+ return 0, o.prevErr
+ }
+
+ // Can only compare offsets to size when size has been set.
+ if o.objectInfoSet {
+ // If offset is negative than we return io.EOF.
+ // If offset is greater than or equal to object size we return io.EOF.
+ if offset >= o.objectInfo.Size || offset < 0 {
+ return 0, io.EOF
+ }
+ }
+
+ // Create the new readAt request.
+ readAtReq := getRequest{
+ isReadOp: true,
+ isReadAt: true,
+ DidOffsetChange: true, // Offset always changes.
+ beenRead: o.beenRead, // Set if this is the first request to try and read.
+ Offset: offset, // Set the offset.
+ Buffer: b,
+ }
+
+ // Alert that this is the first request.
+ if !o.isStarted {
+ readAtReq.isFirstReq = true
+ }
+
+ // Send and receive from the first request.
+ response, err := o.doGetRequest(readAtReq)
+ if err != nil && err != io.EOF {
+ // Save the error.
+ o.prevErr = err
+ return response.Size, err
+ }
+ // Bytes read.
+ bytesRead := int64(response.Size)
+ // There is no valid objectInfo yet
+ // to compare against for EOF.
+ if !o.objectInfoSet {
+ // Update the currentOffset.
+ o.currOffset += bytesRead
+ // Save the current offset as previous offset.
+ o.prevOffset = o.currOffset
+ } else {
+ // If this was not the first request update
+ // the offsets and compare against objectInfo
+ // for EOF.
+ oerr := o.setOffset(bytesRead)
+ if oerr != nil {
+ o.prevErr = oerr
+ return response.Size, oerr
+ }
+ }
+ return response.Size, err
+}
+
+// Seek sets the offset for the next Read or Write to offset,
+// interpreted according to whence: 0 means relative to the
+// origin of the file, 1 means relative to the current offset,
+// and 2 means relative to the end.
+// Seek returns the new offset and an error, if any.
+//
+// Seeking to a negative offset is an error. Seeking to any positive
+// offset is legal, subsequent io operations succeed until the
+// underlying object is not closed.
+func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
+ if o == nil {
+ return 0, ErrInvalidArgument("Object is nil")
+ }
+
+ // Locking.
+ o.mutex.Lock()
+ defer o.mutex.Unlock()
+
+ if o.prevErr != nil {
+ // At EOF seeking is legal allow only io.EOF, for any other errors we return.
+ if o.prevErr != io.EOF {
+ return 0, o.prevErr
+ }
+ }
+
+ // Negative offset is valid for whence of '2'.
+ if offset < 0 && whence != 2 {
+ return 0, ErrInvalidArgument(fmt.Sprintf("Negative position not allowed for %d.", whence))
+ }
+
+ // This is the first request. So before anything else
+ // get the ObjectInfo.
+ if !o.isStarted || !o.objectInfoSet {
+ // Create the new Seek request.
+ seekReq := getRequest{
+ isReadOp: false,
+ Offset: offset,
+ isFirstReq: true,
+ }
+ // Send and receive from the seek request.
+ _, err := o.doGetRequest(seekReq)
+ if err != nil {
+ // Save the error.
+ o.prevErr = err
+ return 0, err
+ }
+ }
+ // Save current offset as previous offset.
+ o.prevOffset = o.currOffset
+
+ // Switch through whence.
+ switch whence {
+ default:
+ return 0, ErrInvalidArgument(fmt.Sprintf("Invalid whence %d", whence))
+ case 0:
+ if offset > o.objectInfo.Size {
+ return 0, io.EOF
+ }
+ o.currOffset = offset
+ case 1:
+ if o.currOffset+offset > o.objectInfo.Size {
+ return 0, io.EOF
+ }
+ o.currOffset += offset
+ case 2:
+ // Seeking to positive offset is valid for whence '2', but
+ // since we are backing a Reader we have reached 'EOF' if
+ // offset is positive.
+ if offset > 0 {
+ return 0, io.EOF
+ }
+ // Seeking to negative position not allowed for whence.
+ if o.objectInfo.Size+offset < 0 {
+ return 0, ErrInvalidArgument(fmt.Sprintf("Seeking at negative offset not allowed for %d", whence))
+ }
+ o.currOffset = o.objectInfo.Size + offset
+ }
+ // Reset the saved error since we successfully seeked, let the Read
+ // and ReadAt decide.
+ if o.prevErr == io.EOF {
+ o.prevErr = nil
+ }
+ // Return the effective offset.
+ return o.currOffset, nil
+}
+
+// Close - The behavior of Close after the first call returns error
+// for subsequent Close() calls.
+func (o *Object) Close() (err error) {
+ if o == nil {
+ return ErrInvalidArgument("Object is nil")
+ }
+ // Locking.
+ o.mutex.Lock()
+ defer o.mutex.Unlock()
+
+ // if already closed return an error.
+ if o.isClosed {
+ return o.prevErr
+ }
+
+ // Close successfully.
+ close(o.doneCh)
+
+ // Save for future operations.
+ errMsg := "Object is already closed. Bad file descriptor."
+ o.prevErr = errors.New(errMsg)
+ // Save here that we closed done channel successfully.
+ o.isClosed = true
+ return nil
+}
+
+// newObject instantiates a new *minio.Object*
+// ObjectInfo will be set by setObjectInfo
+func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<- struct{}) *Object {
+ return &Object{
+ mutex: &sync.Mutex{},
+ reqCh: reqCh,
+ resCh: resCh,
+ doneCh: doneCh,
+ }
+}
+
+// getObject - retrieve object from Object Storage.
+//
+// Additionally this function also takes range arguments to download the specified
+// range bytes of an object. Setting offset and length = 0 will download the full object.
+//
+// For more information about the HTTP Range header.
+// go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.
+func (c Client) getObject(bucketName, objectName string, offset, length int64) (io.ReadCloser, ObjectInfo, error) {
+ // Validate input arguments.
+ if err := isValidBucketName(bucketName); err != nil {
+ return nil, ObjectInfo{}, err
+ }
+ if err := isValidObjectName(objectName); err != nil {
+ return nil, ObjectInfo{}, err
+ }
+
+ customHeader := make(http.Header)
+ // Set ranges if length and offset are valid.
+ // See https://tools.ietf.org/html/rfc7233#section-3.1 for reference.
+ if length > 0 && offset >= 0 {
+ customHeader.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
+ } else if offset > 0 && length == 0 {
+ customHeader.Set("Range", fmt.Sprintf("bytes=%d-", offset))
+ } else if length < 0 && offset == 0 {
+ customHeader.Set("Range", fmt.Sprintf("bytes=%d", length))
+ }
+
+ // Execute GET on objectName.
+ resp, err := c.executeMethod("GET", requestMetadata{
+ bucketName: bucketName,
+ objectName: objectName,
+ customHeader: customHeader,
+ })
+ if err != nil {
+ return nil, ObjectInfo{}, err
+ }
+ if resp != nil {
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
+ return nil, ObjectInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
+ }
+ }
+
+ // Trim off the odd double quotes from ETag in the beginning and end.
+ md5sum := strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
+ md5sum = strings.TrimSuffix(md5sum, "\"")
+
+ // Parse the date.
+ date, err := time.Parse(http.TimeFormat, resp.Header.Get("Last-Modified"))
+ if err != nil {
+ msg := "Last-Modified time format not recognized. " + reportIssue
+ return nil, ObjectInfo{}, ErrorResponse{
+ Code: "InternalError",
+ Message: msg,
+ RequestID: resp.Header.Get("x-amz-request-id"),
+ HostID: resp.Header.Get("x-amz-id-2"),
+ Region: resp.Header.Get("x-amz-bucket-region"),
+ }
+ }
+ // Get content-type.
+ contentType := strings.TrimSpace(resp.Header.Get("Content-Type"))
+ if contentType == "" {
+ contentType = "application/octet-stream"
+ }
+ var objectStat ObjectInfo
+ objectStat.ETag = md5sum
+ objectStat.Key = objectName
+ objectStat.Size = resp.ContentLength
+ objectStat.LastModified = date
+ objectStat.ContentType = contentType
+
+ // do not close body here, caller will close
+ return resp.Body, objectStat, nil
+}