summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/api-put-object-file.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-put-object-file.go')
-rw-r--r--vendor/github.com/minio/minio-go/api-put-object-file.go121
1 files changed, 61 insertions, 60 deletions
diff --git a/vendor/github.com/minio/minio-go/api-put-object-file.go b/vendor/github.com/minio/minio-go/api-put-object-file.go
index deaed0acd..aa554b321 100644
--- a/vendor/github.com/minio/minio-go/api-put-object-file.go
+++ b/vendor/github.com/minio/minio-go/api-put-object-file.go
@@ -28,6 +28,8 @@ import (
"os"
"path/filepath"
"sort"
+
+ "github.com/minio/minio-go/pkg/s3utils"
)
// FPutObject - Create an object in a bucket, with contents from file at filePath.
@@ -62,6 +64,8 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
return 0, ErrEntityTooLarge(fileSize, maxMultipartPutObjectSize, bucketName, objectName)
}
+ objMetadata := make(map[string][]string)
+
// Set contentType based on filepath extension if not given or default
// value of "binary/octet-stream" if the extension has no associated type.
if contentType == "" {
@@ -70,9 +74,11 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
}
}
+ objMetadata["Content-Type"] = []string{contentType}
+
// NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs.
// Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers.
- if isGoogleEndpoint(c.endpointURL) {
+ if s3utils.IsGoogleEndpoint(c.endpointURL) {
if fileSize > int64(maxSinglePutObjectSize) {
return 0, ErrorResponse{
Code: "NotImplemented",
@@ -82,11 +88,11 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
}
}
// Do not compute MD5 for Google Cloud Storage. Uploads up to 5GiB in size.
- return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
}
// NOTE: S3 doesn't allow anonymous multipart requests.
- if isAmazonEndpoint(c.endpointURL) && c.anonymous {
+ if s3utils.IsAmazonEndpoint(c.endpointURL) && c.anonymous {
if fileSize > int64(maxSinglePutObjectSize) {
return 0, ErrorResponse{
Code: "NotImplemented",
@@ -97,15 +103,15 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
}
// Do not compute MD5 for anonymous requests to Amazon
// S3. Uploads up to 5GiB in size.
- return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
}
// Small object upload is initiated for uploads for input data size smaller than 5MiB.
if fileSize < minPartSize && fileSize >= 0 {
- return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
}
// Upload all large objects as multipart.
- n, err = c.putObjectMultipartFromFile(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ n, err = c.putObjectMultipartFromFile(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
@@ -116,7 +122,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
return 0, ErrEntityTooLarge(fileSize, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
- return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
}
return n, err
}
@@ -131,7 +137,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
// against MD5SUM of each individual parts. This function also
// effectively utilizes file system capabilities of reading from
// specific sections and not having to create temporary files.
-func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, contentType string, progress io.Reader) (int64, error) {
+func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, metaData map[string][]string, progress io.Reader) (int64, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
@@ -140,9 +146,8 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
return 0, err
}
- // Get upload id for an object, initiates a new multipart request
- // if it cannot find any previously partially uploaded object.
- uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType)
+ // Get the upload id of a previously partially uploaded object or initiate a new multipart upload
+ uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData)
if err != nil {
return 0, err
}
@@ -153,19 +158,6 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
// Complete multipart upload.
var complMultipartUpload completeMultipartUpload
- // A map of all uploaded parts.
- var partsInfo = make(map[int]objectPart)
-
- // If this session is a continuation of a previous session fetch all
- // previously uploaded parts info.
- if !isNew {
- // Fetch previously upload parts and maximum part size.
- partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
- if err != nil {
- return 0, err
- }
- }
-
// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(fileSize)
if err != nil {
@@ -178,14 +170,19 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
// Create a channel to communicate which part to upload.
// Buffer this to 10000, the maximum number of parts allowed by S3.
- uploadPartsCh := make(chan int, 10000)
+ uploadPartsCh := make(chan uploadPartReq, 10000)
// Just for readability.
lastPartNumber := totalPartsCount
// Send each part through the partUploadCh to be uploaded.
for p := 1; p <= totalPartsCount; p++ {
- uploadPartsCh <- p
+ part, ok := partsInfo[p]
+ if ok {
+ uploadPartsCh <- uploadPartReq{PartNum: p, Part: &part}
+ } else {
+ uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil}
+ }
}
close(uploadPartsCh)
@@ -193,7 +190,7 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
for w := 1; w <= 3; w++ {
go func() {
// Deal with each part as it comes through the channel.
- for partNumber := range uploadPartsCh {
+ for uploadReq := range uploadPartsCh {
// Add hash algorithms that need to be calculated by computeHash()
// In case of a non-v4 signature or https connection, sha256 is not needed.
hashAlgos := make(map[string]hash.Hash)
@@ -203,47 +200,50 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
hashAlgos["sha256"] = sha256.New()
}
+ // If partNumber was not uploaded we calculate the missing
+ // part offset and size. For all other part numbers we
+ // calculate offset based on multiples of partSize.
+ readOffset := int64(uploadReq.PartNum-1) * partSize
+ missingPartSize := partSize
+
+ // As a special case if partNumber is lastPartNumber, we
+ // calculate the offset based on the last part size.
+ if uploadReq.PartNum == lastPartNumber {
+ readOffset = (fileSize - lastPartSize)
+ missingPartSize = lastPartSize
+ }
+
+ // Get a section reader on a particular offset.
+ sectionReader := io.NewSectionReader(fileReader, readOffset, missingPartSize)
+ var prtSize int64
+ var err error
+
+ prtSize, err = computeHash(hashAlgos, hashSums, sectionReader)
+ if err != nil {
+ uploadedPartsCh <- uploadedPartRes{
+ Error: err,
+ }
+ // Exit the goroutine.
+ return
+ }
+
// Create the part to be uploaded.
verifyObjPart := objectPart{
ETag: hex.EncodeToString(hashSums["md5"]),
- PartNumber: partNumber,
+ PartNumber: uploadReq.PartNum,
Size: partSize,
}
+
// If this is the last part do not give it the full part size.
- if partNumber == lastPartNumber {
+ if uploadReq.PartNum == lastPartNumber {
verifyObjPart.Size = lastPartSize
}
// Verify if part should be uploaded.
- if shouldUploadPart(verifyObjPart, partsInfo) {
- // If partNumber was not uploaded we calculate the missing
- // part offset and size. For all other part numbers we
- // calculate offset based on multiples of partSize.
- readOffset := int64(partNumber-1) * partSize
- missingPartSize := partSize
-
- // As a special case if partNumber is lastPartNumber, we
- // calculate the offset based on the last part size.
- if partNumber == lastPartNumber {
- readOffset = (fileSize - lastPartSize)
- missingPartSize = lastPartSize
- }
-
- // Get a section reader on a particular offset.
- sectionReader := io.NewSectionReader(fileReader, readOffset, missingPartSize)
- var prtSize int64
- prtSize, err = computeHash(hashAlgos, hashSums, sectionReader)
- if err != nil {
- uploadedPartsCh <- uploadedPartRes{
- Error: err,
- }
- // Exit the goroutine.
- return
- }
-
+ if shouldUploadPart(verifyObjPart, uploadReq) {
// Proceed to upload the part.
var objPart objectPart
- objPart, err = c.uploadPart(bucketName, objectName, uploadID, sectionReader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize)
+ objPart, err = c.uploadPart(bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Error: err,
@@ -252,12 +252,13 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
return
}
// Save successfully uploaded part metadata.
- partsInfo[partNumber] = objPart
+ uploadReq.Part = &objPart
}
// Return through the channel the part size.
uploadedPartsCh <- uploadedPartRes{
Size: verifyObjPart.Size,
- PartNum: partNumber,
+ PartNum: uploadReq.PartNum,
+ Part: uploadReq.Part,
Error: nil,
}
}
@@ -271,8 +272,8 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
return totalUploadedSize, uploadRes.Error
}
// Retrieve each uploaded part and store it to be completed.
- part, ok := partsInfo[uploadRes.PartNum]
- if !ok {
+ part := uploadRes.Part
+ if part == nil {
return totalUploadedSize, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", uploadRes.PartNum))
}
// Update the total uploaded size.