diff options
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-put-object.go')
-rw-r--r-- | vendor/github.com/minio/minio-go/api-put-object.go | 121 |
1 files changed, 113 insertions, 8 deletions
diff --git a/vendor/github.com/minio/minio-go/api-put-object.go b/vendor/github.com/minio/minio-go/api-put-object.go index 2ea498789..f4107132e 100644 --- a/vendor/github.com/minio/minio-go/api-put-object.go +++ b/vendor/github.com/minio/minio-go/api-put-object.go @@ -17,13 +17,15 @@ package minio import ( + "bytes" + "fmt" "io" "os" "reflect" "runtime" + "sort" "strings" - "github.com/minio/minio-go/pkg/credentials" "github.com/minio/minio-go/pkg/s3utils" ) @@ -178,6 +180,7 @@ func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.R if err != nil { return 0, err } + return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress) } @@ -194,21 +197,16 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader, } if c.overrideSignerType.IsV2() { - if size > 0 && size < minPartSize { + if size >= 0 && size < minPartSize { return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) } return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress) } - // If size cannot be found on a stream, it is not possible - // to upload using streaming signature, fall back to multipart. if size < 0 { - return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress) + return c.putObjectMultipartStreamNoLength(bucketName, objectName, reader, metadata, progress) } - // Set streaming signature. - c.overrideSignerType = credentials.SignatureV4Streaming - if size < minPartSize { return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) } @@ -216,3 +214,110 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader, // For all sizes greater than 64MiB do multipart. return c.putObjectMultipartStream(bucketName, objectName, reader, size, metadata, progress) } + +func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, reader io.Reader, metadata map[string][]string, + progress io.Reader) (n int64, err error) { + // Input validation. + if err = s3utils.CheckValidBucketName(bucketName); err != nil { + return 0, err + } + if err = s3utils.CheckValidObjectName(objectName); err != nil { + return 0, err + } + + // Total data read and written to server. should be equal to + // 'size' at the end of the call. + var totalUploadedSize int64 + + // Complete multipart upload. + var complMultipartUpload completeMultipartUpload + + // Calculate the optimal parts info for a given size. + totalPartsCount, _, _, err := optimalPartInfo(-1) + if err != nil { + return 0, err + } + + // Initiate a new multipart upload. + uploadID, err := c.newUploadID(bucketName, objectName, metadata) + if err != nil { + return 0, err + } + + defer func() { + if err != nil { + c.abortMultipartUpload(bucketName, objectName, uploadID) + } + }() + + // Part number always starts with '1'. + partNumber := 1 + + // Initialize parts uploaded map. + partsInfo := make(map[int]ObjectPart) + + for partNumber <= totalPartsCount { + bufp := bufPool.Get().(*[]byte) + length, rErr := io.ReadFull(reader, *bufp) + if rErr == io.EOF { + break + } + if rErr != nil && rErr != io.ErrUnexpectedEOF { + bufPool.Put(bufp) + return 0, rErr + } + + // Update progress reader appropriately to the latest offset + // as we read from the source. + rd := newHook(bytes.NewReader((*bufp)[:length]), progress) + + // Proceed to upload the part. + var objPart ObjectPart + objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber, + nil, nil, int64(length), metadata) + if err != nil { + bufPool.Put(bufp) + return totalUploadedSize, err + } + + // Save successfully uploaded part metadata. + partsInfo[partNumber] = objPart + + // Save successfully uploaded size. + totalUploadedSize += int64(length) + + // Increment part number. + partNumber++ + + // Put back data into bufpool. + bufPool.Put(bufp) + + // For unknown size, Read EOF we break away. + // We do not have to upload till totalPartsCount. + if rErr == io.EOF { + break + } + } + + // Loop over total uploaded parts to save them in + // Parts array before completing the multipart request. + for i := 1; i < partNumber; i++ { + part, ok := partsInfo[i] + if !ok { + return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i)) + } + complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ + ETag: part.ETag, + PartNumber: part.PartNumber, + }) + } + + // Sort all completed parts. + sort.Sort(completedParts(complMultipartUpload.Parts)) + if _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload); err != nil { + return totalUploadedSize, err + } + + // Return final size. + return totalUploadedSize, nil +} |