summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/api-put-object.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-put-object.go')
-rw-r--r--vendor/github.com/minio/minio-go/api-put-object.go121
1 files changed, 113 insertions, 8 deletions
diff --git a/vendor/github.com/minio/minio-go/api-put-object.go b/vendor/github.com/minio/minio-go/api-put-object.go
index 2ea498789..f4107132e 100644
--- a/vendor/github.com/minio/minio-go/api-put-object.go
+++ b/vendor/github.com/minio/minio-go/api-put-object.go
@@ -17,13 +17,15 @@
package minio
import (
+ "bytes"
+ "fmt"
"io"
"os"
"reflect"
"runtime"
+ "sort"
"strings"
- "github.com/minio/minio-go/pkg/credentials"
"github.com/minio/minio-go/pkg/s3utils"
)
@@ -178,6 +180,7 @@ func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.R
if err != nil {
return 0, err
}
+
return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress)
}
@@ -194,21 +197,16 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader,
}
if c.overrideSignerType.IsV2() {
- if size > 0 && size < minPartSize {
+ if size >= 0 && size < minPartSize {
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
}
return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress)
}
- // If size cannot be found on a stream, it is not possible
- // to upload using streaming signature, fall back to multipart.
if size < 0 {
- return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectMultipartStreamNoLength(bucketName, objectName, reader, metadata, progress)
}
- // Set streaming signature.
- c.overrideSignerType = credentials.SignatureV4Streaming
-
if size < minPartSize {
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
}
@@ -216,3 +214,110 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader,
// For all sizes greater than 64MiB do multipart.
return c.putObjectMultipartStream(bucketName, objectName, reader, size, metadata, progress)
}
+
+func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, reader io.Reader, metadata map[string][]string,
+ progress io.Reader) (n int64, err error) {
+ // Input validation.
+ if err = s3utils.CheckValidBucketName(bucketName); err != nil {
+ return 0, err
+ }
+ if err = s3utils.CheckValidObjectName(objectName); err != nil {
+ return 0, err
+ }
+
+ // Total data read and written to server. should be equal to
+ // 'size' at the end of the call.
+ var totalUploadedSize int64
+
+ // Complete multipart upload.
+ var complMultipartUpload completeMultipartUpload
+
+ // Calculate the optimal parts info for a given size.
+ totalPartsCount, _, _, err := optimalPartInfo(-1)
+ if err != nil {
+ return 0, err
+ }
+
+ // Initiate a new multipart upload.
+ uploadID, err := c.newUploadID(bucketName, objectName, metadata)
+ if err != nil {
+ return 0, err
+ }
+
+ defer func() {
+ if err != nil {
+ c.abortMultipartUpload(bucketName, objectName, uploadID)
+ }
+ }()
+
+ // Part number always starts with '1'.
+ partNumber := 1
+
+ // Initialize parts uploaded map.
+ partsInfo := make(map[int]ObjectPart)
+
+ for partNumber <= totalPartsCount {
+ bufp := bufPool.Get().(*[]byte)
+ length, rErr := io.ReadFull(reader, *bufp)
+ if rErr == io.EOF {
+ break
+ }
+ if rErr != nil && rErr != io.ErrUnexpectedEOF {
+ bufPool.Put(bufp)
+ return 0, rErr
+ }
+
+ // Update progress reader appropriately to the latest offset
+ // as we read from the source.
+ rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
+
+ // Proceed to upload the part.
+ var objPart ObjectPart
+ objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
+ nil, nil, int64(length), metadata)
+ if err != nil {
+ bufPool.Put(bufp)
+ return totalUploadedSize, err
+ }
+
+ // Save successfully uploaded part metadata.
+ partsInfo[partNumber] = objPart
+
+ // Save successfully uploaded size.
+ totalUploadedSize += int64(length)
+
+ // Increment part number.
+ partNumber++
+
+ // Put back data into bufpool.
+ bufPool.Put(bufp)
+
+ // For unknown size, Read EOF we break away.
+ // We do not have to upload till totalPartsCount.
+ if rErr == io.EOF {
+ break
+ }
+ }
+
+ // Loop over total uploaded parts to save them in
+ // Parts array before completing the multipart request.
+ for i := 1; i < partNumber; i++ {
+ part, ok := partsInfo[i]
+ if !ok {
+ return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i))
+ }
+ complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
+ ETag: part.ETag,
+ PartNumber: part.PartNumber,
+ })
+ }
+
+ // Sort all completed parts.
+ sort.Sort(completedParts(complMultipartUpload.Parts))
+ if _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload); err != nil {
+ return totalUploadedSize, err
+ }
+
+ // Return final size.
+ return totalUploadedSize, nil
+}