summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/api-put-object-multipart.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-put-object-multipart.go')
-rw-r--r--vendor/github.com/minio/minio-go/api-put-object-multipart.go271
1 files changed, 77 insertions, 194 deletions
diff --git a/vendor/github.com/minio/minio-go/api-put-object-multipart.go b/vendor/github.com/minio/minio-go/api-put-object-multipart.go
index 3a299f65b..1938378f8 100644
--- a/vendor/github.com/minio/minio-go/api-put-object-multipart.go
+++ b/vendor/github.com/minio/minio-go/api-put-object-multipart.go
@@ -18,204 +18,87 @@ package minio
import (
"bytes"
- "crypto/md5"
- "crypto/sha256"
- "encoding/hex"
"encoding/xml"
"fmt"
- "hash"
"io"
"io/ioutil"
"net/http"
"net/url"
- "os"
"sort"
"strconv"
"strings"
-)
-
-// Comprehensive put object operation involving multipart resumable uploads.
-//
-// Following code handles these types of readers.
-//
-// - *os.File
-// - *minio.Object
-// - Any reader which has a method 'ReadAt()'
-//
-// If we exhaust all the known types, code proceeds to use stream as
-// is where each part is re-downloaded, checksummed and verified
-// before upload.
-func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
- if size > 0 && size > minPartSize {
- // Verify if reader is *os.File, then use file system functionalities.
- if isFile(reader) {
- return c.putObjectMultipartFromFile(bucketName, objectName, reader.(*os.File), size, metaData, progress)
- }
- // Verify if reader is *minio.Object or io.ReaderAt.
- // NOTE: Verification of object is kept for a specific purpose
- // while it is going to be duck typed similar to io.ReaderAt.
- // It is to indicate that *minio.Object implements io.ReaderAt.
- // and such a functionality is used in the subsequent code
- // path.
- if isObject(reader) || isReadAt(reader) {
- return c.putObjectMultipartFromReadAt(bucketName, objectName, reader.(io.ReaderAt), size, metaData, progress)
- }
- }
- // For any other data size and reader type we do generic multipart
- // approach by staging data in temporary files and uploading them.
- return c.putObjectMultipartStream(bucketName, objectName, reader, size, metaData, progress)
-}
-
-// putObjectMultipartStreamNoChecksum - upload a large object using
-// multipart upload and streaming signature for signing payload.
-// N B We don't resume an incomplete multipart upload, we overwrite
-// existing parts of an incomplete upload.
-func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string,
- reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (int64, error) {
- // Input validation.
- if err := isValidBucketName(bucketName); err != nil {
- return 0, err
- }
- if err := isValidObjectName(objectName); err != nil {
- return 0, err
- }
-
- // Get the upload id of a previously partially uploaded object or initiate a new multipart upload
- uploadID, err := c.findUploadID(bucketName, objectName)
- if err != nil {
- return 0, err
- }
- if uploadID == "" {
- // Initiates a new multipart request
- uploadID, err = c.newUploadID(bucketName, objectName, metadata)
- if err != nil {
- return 0, err
- }
- }
+ "github.com/minio/minio-go/pkg/s3utils"
+)
- // Calculate the optimal parts info for a given size.
- totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size)
+func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64,
+ metadata map[string][]string, progress io.Reader) (n int64, err error) {
+ n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, size, metadata, progress)
if err != nil {
- return 0, err
- }
-
- // Total data read and written to server. should be equal to 'size' at the end of the call.
- var totalUploadedSize int64
-
- // Initialize parts uploaded map.
- partsInfo := make(map[int]ObjectPart)
-
- // Part number always starts with '1'.
- var partNumber int
- for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
- // Update progress reader appropriately to the latest offset
- // as we read from the source.
- hookReader := newHook(reader, progress)
-
- // Proceed to upload the part.
- if partNumber == totalPartsCount {
- partSize = lastPartSize
- }
-
- var objPart ObjectPart
- objPart, err = c.uploadPart(bucketName, objectName, uploadID,
- io.LimitReader(hookReader, partSize), partNumber, nil, nil, partSize)
- // For unknown size, Read EOF we break away.
- // We do not have to upload till totalPartsCount.
- if err == io.EOF && size < 0 {
- break
- }
-
- if err != nil {
- return totalUploadedSize, err
- }
-
- // Save successfully uploaded part metadata.
- partsInfo[partNumber] = objPart
-
- // Save successfully uploaded size.
- totalUploadedSize += partSize
- }
-
- // Verify if we uploaded all the data.
- if size > 0 {
- if totalUploadedSize != size {
- return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
- }
- }
-
- // Complete multipart upload.
- var complMultipartUpload completeMultipartUpload
-
- // Loop over total uploaded parts to save them in
- // Parts array before completing the multipart request.
- for i := 1; i < partNumber; i++ {
- part, ok := partsInfo[i]
- if !ok {
- return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i))
+ errResp := ToErrorResponse(err)
+ // Verify if multipart functionality is not available, if not
+ // fall back to single PutObject operation.
+ if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
+ // Verify if size of reader is greater than '5GiB'.
+ if size > maxSinglePutObjectSize {
+ return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
+ }
+ // Fall back to uploading as single PutObject operation.
+ return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
}
- complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
- ETag: part.ETag,
- PartNumber: part.PartNumber,
- })
- }
-
- // Sort all completed parts.
- sort.Sort(completedParts(complMultipartUpload.Parts))
- _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload)
- if err != nil {
- return totalUploadedSize, err
}
-
- // Return final size.
- return totalUploadedSize, nil
+ return n, err
}
-// putObjectStream uploads files bigger than 64MiB, and also supports
-// special case where size is unknown i.e '-1'.
-func (c Client) putObjectMultipartStream(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
+func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, size int64,
+ metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
- if err := isValidBucketName(bucketName); err != nil {
+ if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
}
- if err := isValidObjectName(objectName); err != nil {
+ if err = s3utils.CheckValidObjectName(objectName); err != nil {
return 0, err
}
- // Total data read and written to server. should be equal to 'size' at the end of the call.
+ // Total data read and written to server. should be equal to
+ // 'size' at the end of the call.
var totalUploadedSize int64
// Complete multipart upload.
var complMultipartUpload completeMultipartUpload
- // Get the upload id of a previously partially uploaded object or initiate a new multipart upload
- uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData)
+ // Calculate the optimal parts info for a given size.
+ totalPartsCount, partSize, _, err := optimalPartInfo(size)
if err != nil {
return 0, err
}
- // Calculate the optimal parts info for a given size.
- totalPartsCount, partSize, _, err := optimalPartInfo(size)
+ // Initiate a new multipart upload.
+ uploadID, err := c.newUploadID(bucketName, objectName, metadata)
if err != nil {
return 0, err
}
+ defer func() {
+ if err != nil {
+ c.abortMultipartUpload(bucketName, objectName, uploadID)
+ }
+ }()
+
// Part number always starts with '1'.
partNumber := 1
// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)
+ // Initialize parts uploaded map.
+ partsInfo := make(map[int]ObjectPart)
+
for partNumber <= totalPartsCount {
- // Choose hash algorithms to be calculated by hashCopyN, avoid sha256
- // with non-v4 signature request or HTTPS connection
- hashSums := make(map[string][]byte)
- hashAlgos := make(map[string]hash.Hash)
- hashAlgos["md5"] = md5.New()
- if c.signature.isV4() && !c.secure {
- hashAlgos["sha256"] = sha256.New()
- }
+ // Choose hash algorithms to be calculated by hashCopyN,
+ // avoid sha256 with non-v4 signature request or
+ // HTTPS connection.
+ hashAlgos, hashSums := c.hashMaterials()
// Calculates hash sums while copying partSize bytes into tmpBuffer.
prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize)
@@ -228,33 +111,19 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
// as we read from the source.
reader = newHook(tmpBuffer, progress)
- part, ok := partsInfo[partNumber]
-
- // Verify if part should be uploaded.
- if !ok || shouldUploadPart(ObjectPart{
- ETag: hex.EncodeToString(hashSums["md5"]),
- PartNumber: partNumber,
- Size: prtSize,
- }, uploadPartReq{PartNum: partNumber, Part: &part}) {
- // Proceed to upload the part.
- var objPart ObjectPart
- objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize)
- if err != nil {
- // Reset the temporary buffer upon any error.
- tmpBuffer.Reset()
- return totalUploadedSize, err
- }
- // Save successfully uploaded part metadata.
- partsInfo[partNumber] = objPart
- } else {
- // Update the progress reader for the skipped part.
- if progress != nil {
- if _, err = io.CopyN(ioutil.Discard, progress, prtSize); err != nil {
- return totalUploadedSize, err
- }
- }
+ // Proceed to upload the part.
+ var objPart ObjectPart
+ objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber,
+ hashSums["md5"], hashSums["sha256"], prtSize, metadata)
+ if err != nil {
+ // Reset the temporary buffer upon any error.
+ tmpBuffer.Reset()
+ return totalUploadedSize, err
}
+ // Save successfully uploaded part metadata.
+ partsInfo[partNumber] = objPart
+
// Reset the temporary buffer.
tmpBuffer.Reset()
@@ -293,8 +162,7 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))
- _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload)
- if err != nil {
+ if _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload); err != nil {
return totalUploadedSize, err
}
@@ -303,12 +171,12 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
}
// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
-func (c Client) initiateMultipartUpload(bucketName, objectName string, metaData map[string][]string) (initiateMultipartUploadResult, error) {
+func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata map[string][]string) (initiateMultipartUploadResult, error) {
// Input validation.
- if err := isValidBucketName(bucketName); err != nil {
+ if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return initiateMultipartUploadResult{}, err
}
- if err := isValidObjectName(objectName); err != nil {
+ if err := s3utils.CheckValidObjectName(objectName); err != nil {
return initiateMultipartUploadResult{}, err
}
@@ -318,14 +186,14 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metaData
// Set ContentType header.
customHeader := make(http.Header)
- for k, v := range metaData {
+ for k, v := range metadata {
if len(v) > 0 {
customHeader.Set(k, v[0])
}
}
// Set a default content-type header if the latter is not provided
- if v, ok := metaData["Content-Type"]; !ok || len(v) == 0 {
+ if v, ok := metadata["Content-Type"]; !ok || len(v) == 0 {
customHeader.Set("Content-Type", "application/octet-stream")
}
@@ -356,13 +224,16 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metaData
return initiateMultipartUploadResult, nil
}
+const serverEncryptionKeyPrefix = "x-amz-server-side-encryption"
+
// uploadPart - Uploads a part in a multipart upload.
-func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Reader, partNumber int, md5Sum, sha256Sum []byte, size int64) (ObjectPart, error) {
+func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Reader,
+ partNumber int, md5Sum, sha256Sum []byte, size int64, metadata map[string][]string) (ObjectPart, error) {
// Input validation.
- if err := isValidBucketName(bucketName); err != nil {
+ if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return ObjectPart{}, err
}
- if err := isValidObjectName(objectName); err != nil {
+ if err := s3utils.CheckValidObjectName(objectName); err != nil {
return ObjectPart{}, err
}
if size > maxPartSize {
@@ -385,10 +256,21 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
// Set upload id.
urlValues.Set("uploadId", uploadID)
+ // Set encryption headers, if any.
+ customHeader := make(http.Header)
+ for k, v := range metadata {
+ if len(v) > 0 {
+ if strings.HasPrefix(strings.ToLower(k), serverEncryptionKeyPrefix) {
+ customHeader.Set(k, v[0])
+ }
+ }
+ }
+
reqMetadata := requestMetadata{
bucketName: bucketName,
objectName: objectName,
queryValues: urlValues,
+ customHeader: customHeader,
contentBody: reader,
contentLength: size,
contentMD5Bytes: md5Sum,
@@ -417,12 +299,13 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
}
// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts.
-func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string, complete completeMultipartUpload) (completeMultipartUploadResult, error) {
+func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string,
+ complete completeMultipartUpload) (completeMultipartUploadResult, error) {
// Input validation.
- if err := isValidBucketName(bucketName); err != nil {
+ if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return completeMultipartUploadResult{}, err
}
- if err := isValidObjectName(objectName); err != nil {
+ if err := s3utils.CheckValidObjectName(objectName); err != nil {
return completeMultipartUploadResult{}, err
}