diff options
author | Christopher Speller <crspeller@gmail.com> | 2018-01-29 14:17:40 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-29 14:17:40 -0800 |
commit | 961c04cae992eadb42d286d2f85f8a675bdc68c8 (patch) | |
tree | 3408f2d06f847e966c53485e2d54c692cdd037c1 /vendor/github.com/minio/minio-go/api-put-object-streaming.go | |
parent | 8d66523ba7d9a77129844be476732ebfd5272d64 (diff) | |
download | chat-961c04cae992eadb42d286d2f85f8a675bdc68c8.tar.gz chat-961c04cae992eadb42d286d2f85f8a675bdc68c8.tar.bz2 chat-961c04cae992eadb42d286d2f85f8a675bdc68c8.zip |
Upgrading server dependancies (#8154)
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-put-object-streaming.go')
-rw-r--r-- | vendor/github.com/minio/minio-go/api-put-object-streaming.go | 115 |
1 files changed, 48 insertions, 67 deletions
diff --git a/vendor/github.com/minio/minio-go/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/api-put-object-streaming.go index 40cd5c252..be1dc57ef 100644 --- a/vendor/github.com/minio/minio-go/api-put-object-streaming.go +++ b/vendor/github.com/minio/minio-go/api-put-object-streaming.go @@ -1,5 +1,6 @@ /* - * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2017 Minio, Inc. + * Minio Go Library for Amazon S3 Compatible Cloud Storage + * Copyright 2017 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +18,7 @@ package minio import ( + "context" "fmt" "io" "net/http" @@ -26,33 +28,23 @@ import ( "github.com/minio/minio-go/pkg/s3utils" ) -// PutObjectStreaming using AWS streaming signature V4 -func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Reader) (n int64, err error) { - return c.PutObjectWithProgress(bucketName, objectName, reader, nil, nil) -} - // putObjectMultipartStream - upload a large object using // multipart upload and streaming signature for signing payload. // Comprehensive put object operation involving multipart uploads. // // Following code handles these types of readers. // -// - *os.File // - *minio.Object // - Any reader which has a method 'ReadAt()' // -func (c Client) putObjectMultipartStream(bucketName, objectName string, - reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { - - // Verify if reader is *minio.Object, *os.File or io.ReaderAt. - // NOTE: Verification of object is kept for a specific purpose - // while it is going to be duck typed similar to io.ReaderAt. - // It is to indicate that *minio.Object implements io.ReaderAt. - // and such a functionality is used in the subsequent code path. - if isFile(reader) || !isObject(reader) && isReadAt(reader) { - n, err = c.putObjectMultipartStreamFromReadAt(bucketName, objectName, reader.(io.ReaderAt), size, metadata, progress) +func (c Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string, + reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) { + + if !isObject(reader) && isReadAt(reader) { + // Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader. + n, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts) } else { - n, err = c.putObjectMultipartStreamNoChecksum(bucketName, objectName, reader, size, metadata, progress) + n, err = c.putObjectMultipartStreamNoChecksum(ctx, bucketName, objectName, reader, size, opts) } if err != nil { errResp := ToErrorResponse(err) @@ -64,7 +56,7 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) } // Fall back to uploading as single PutObject operation. - return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) + return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts) } } return n, err @@ -94,8 +86,8 @@ type uploadPartReq struct { // temporary files for staging all the data, these temporary files are // cleaned automatically when the caller i.e http client closes the // stream after uploading all the contents successfully. -func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string, - reader io.ReaderAt, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { +func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string, + reader io.ReaderAt, size int64, opts PutObjectOptions) (n int64, err error) { // Input validation. if err = s3utils.CheckValidBucketName(bucketName); err != nil { return 0, err @@ -111,7 +103,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string } // Initiate a new multipart upload. - uploadID, err := c.newUploadID(bucketName, objectName, metadata) + uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) if err != nil { return 0, err } @@ -122,7 +114,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string // to relinquish storage space. defer func() { if err != nil { - c.abortMultipartUpload(bucketName, objectName, uploadID) + c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) } }() @@ -150,9 +142,8 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil} } close(uploadPartsCh) - // Receive each part number from the channel allowing three parallel uploads. - for w := 1; w <= totalWorkers; w++ { + for w := 1; w <= opts.getNumThreads(); w++ { go func(partSize int64) { // Each worker will draw from the part channel and upload in parallel. for uploadReq := range uploadPartsCh { @@ -170,13 +161,13 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string } // Get a section reader on a particular offset. - sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), progress) + sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress) // Proceed to upload the part. var objPart ObjectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, + objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, - nil, nil, partSize, metadata) + "", "", partSize, opts.UserMetadata) if err != nil { uploadedPartsCh <- uploadedPartRes{ Size: 0, @@ -229,7 +220,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string // Sort all completed parts. sort.Sort(completedParts(complMultipartUpload.Parts)) - _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload) + _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload) if err != nil { return totalUploadedSize, err } @@ -238,8 +229,8 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string return totalUploadedSize, nil } -func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string, - reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { +func (c Client) putObjectMultipartStreamNoChecksum(ctx context.Context, bucketName, objectName string, + reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) { // Input validation. if err = s3utils.CheckValidBucketName(bucketName); err != nil { return 0, err @@ -253,9 +244,8 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string if err != nil { return 0, err } - // Initiates a new multipart request - uploadID, err := c.newUploadID(bucketName, objectName, metadata) + uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) if err != nil { return 0, err } @@ -266,7 +256,7 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string // storage space. defer func() { if err != nil { - c.abortMultipartUpload(bucketName, objectName, uploadID) + c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) } }() @@ -281,17 +271,16 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { // Update progress reader appropriately to the latest offset // as we read from the source. - hookReader := newHook(reader, progress) + hookReader := newHook(reader, opts.Progress) // Proceed to upload the part. if partNumber == totalPartsCount { partSize = lastPartSize } - var objPart ObjectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, + objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, io.LimitReader(hookReader, partSize), - partNumber, nil, nil, partSize, metadata) + partNumber, "", "", partSize, opts.UserMetadata) if err != nil { return totalUploadedSize, err } @@ -328,7 +317,7 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string // Sort all completed parts. sort.Sort(completedParts(complMultipartUpload.Parts)) - _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload) + _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload) if err != nil { return totalUploadedSize, err } @@ -339,7 +328,7 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string // putObjectNoChecksum special function used Google Cloud Storage. This special function // is used for Google Cloud Storage since Google's multipart API is not S3 compatible. -func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) { +func (c Client) putObjectNoChecksum(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) { // Input validation. if err := s3utils.CheckValidBucketName(bucketName); err != nil { return 0, err @@ -350,22 +339,27 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea // Size -1 is only supported on Google Cloud Storage, we error // out in all other situations. - if size < 0 && !s3utils.IsGoogleEndpoint(c.endpointURL) { + if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) { return 0, ErrEntityTooSmall(size, bucketName, objectName) } if size > 0 { if isReadAt(reader) && !isObject(reader) { - reader = io.NewSectionReader(reader.(io.ReaderAt), 0, size) + seeker, _ := reader.(io.Seeker) + offset, err := seeker.Seek(0, io.SeekCurrent) + if err != nil { + return 0, ErrInvalidArgument(err.Error()) + } + reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size) } } // Update progress reader appropriately to the latest offset as we // read from the source. - readSeeker := newHook(reader, progress) + readSeeker := newHook(reader, opts.Progress) // This function does not calculate sha256 and md5sum for payload. // Execute put object. - st, err := c.putObjectDo(bucketName, objectName, readSeeker, nil, nil, size, metaData) + st, err := c.putObjectDo(ctx, bucketName, objectName, readSeeker, "", "", size, opts) if err != nil { return 0, err } @@ -377,7 +371,7 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea // putObjectDo - executes the put object http operation. // NOTE: You must have WRITE permissions on a bucket to add an object to it. -func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5Sum []byte, sha256Sum []byte, size int64, metaData map[string][]string) (ObjectInfo, error) { +func (c Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (ObjectInfo, error) { // Input validation. if err := s3utils.CheckValidBucketName(bucketName); err != nil { return ObjectInfo{}, err @@ -385,35 +379,22 @@ func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5 if err := s3utils.CheckValidObjectName(objectName); err != nil { return ObjectInfo{}, err } - // Set headers. - customHeader := make(http.Header) - - // Set metadata to headers - for k, v := range metaData { - if len(v) > 0 { - customHeader.Set(k, v[0]) - } - } - - // If Content-Type is not provided, set the default application/octet-stream one - if v, ok := metaData["Content-Type"]; !ok || len(v) == 0 { - customHeader.Set("Content-Type", "application/octet-stream") - } + customHeader := opts.Header() // Populate request metadata. reqMetadata := requestMetadata{ - bucketName: bucketName, - objectName: objectName, - customHeader: customHeader, - contentBody: reader, - contentLength: size, - contentMD5Bytes: md5Sum, - contentSHA256Bytes: sha256Sum, + bucketName: bucketName, + objectName: objectName, + customHeader: customHeader, + contentBody: reader, + contentLength: size, + contentMD5Base64: md5Base64, + contentSHA256Hex: sha256Hex, } // Execute PUT an objectName. - resp, err := c.executeMethod("PUT", reqMetadata) + resp, err := c.executeMethod(ctx, "PUT", reqMetadata) defer closeResponse(resp) if err != nil { return ObjectInfo{}, err |