summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/api-put-object-readat.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-put-object-readat.go')
-rw-r--r--vendor/github.com/minio/minio-go/api-put-object-readat.go135
1 files changed, 68 insertions, 67 deletions
diff --git a/vendor/github.com/minio/minio-go/api-put-object-readat.go b/vendor/github.com/minio/minio-go/api-put-object-readat.go
index 14fa4b296..4ab1095f6 100644
--- a/vendor/github.com/minio/minio-go/api-put-object-readat.go
+++ b/vendor/github.com/minio/minio-go/api-put-object-readat.go
@@ -32,17 +32,22 @@ type uploadedPartRes struct {
Error error // Any error encountered while uploading the part.
PartNum int // Number of the part uploaded.
Size int64 // Size of the part uploaded.
+ Part *objectPart
+}
+
+type uploadPartReq struct {
+ PartNum int // Number of the part uploaded.
+ Part *objectPart // Size of the part uploaded.
}
// shouldUploadPartReadAt - verify if part should be uploaded.
-func shouldUploadPartReadAt(objPart objectPart, objectParts map[int]objectPart) bool {
+func shouldUploadPartReadAt(objPart objectPart, uploadReq uploadPartReq) bool {
// If part not found part should be uploaded.
- uploadedPart, found := objectParts[objPart.PartNumber]
- if !found {
+ if uploadReq.Part == nil {
return true
}
// if size mismatches part should be uploaded.
- if uploadedPart.Size != objPart.Size {
+ if uploadReq.Part.Size != objPart.Size {
return true
}
return false
@@ -58,7 +63,7 @@ func shouldUploadPartReadAt(objPart objectPart, objectParts map[int]objectPart)
// temporary files for staging all the data, these temporary files are
// cleaned automatically when the caller i.e http client closes the
// stream after uploading all the contents successfully.
-func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, reader io.ReaderAt, size int64, contentType string, progress io.Reader) (n int64, err error) {
+func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, reader io.ReaderAt, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
@@ -67,9 +72,8 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
return 0, err
}
- // Get upload id for an object, initiates a new multipart request
- // if it cannot find any previously partially uploaded object.
- uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType)
+ // Get the upload id of a previously partially uploaded object or initiate a new multipart upload
+ uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData)
if err != nil {
return 0, err
}
@@ -80,17 +84,6 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Complete multipart upload.
var complMultipartUpload completeMultipartUpload
- // A map of all uploaded parts.
- var partsInfo = make(map[int]objectPart)
-
- // Fetch all parts info previously uploaded.
- if !isNew {
- partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
- if err != nil {
- return 0, err
- }
- }
-
// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size)
if err != nil {
@@ -103,7 +96,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Declare a channel that sends the next part number to be uploaded.
// Buffered to 10000 because thats the maximum number of parts allowed
// by S3.
- uploadPartsCh := make(chan int, 10000)
+ uploadPartsCh := make(chan uploadPartReq, 10000)
// Declare a channel that sends back the response of a part upload.
// Buffered to 10000 because thats the maximum number of parts allowed
@@ -112,7 +105,12 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Send each part number to the channel to be processed.
for p := 1; p <= totalPartsCount; p++ {
- uploadPartsCh <- p
+ part, ok := partsInfo[p]
+ if ok {
+ uploadPartsCh <- uploadPartReq{PartNum: p, Part: &part}
+ } else {
+ uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil}
+ }
}
close(uploadPartsCh)
@@ -123,64 +121,65 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
readAtBuffer := make([]byte, optimalReadBufferSize)
// Each worker will draw from the part channel and upload in parallel.
- for partNumber := range uploadPartsCh {
+ for uploadReq := range uploadPartsCh {
// Declare a new tmpBuffer.
tmpBuffer := new(bytes.Buffer)
+ // If partNumber was not uploaded we calculate the missing
+ // part offset and size. For all other part numbers we
+ // calculate offset based on multiples of partSize.
+ readOffset := int64(uploadReq.PartNum-1) * partSize
+ missingPartSize := partSize
+
+ // As a special case if partNumber is lastPartNumber, we
+ // calculate the offset based on the last part size.
+ if uploadReq.PartNum == lastPartNumber {
+ readOffset = (size - lastPartSize)
+ missingPartSize = lastPartSize
+ }
+
+ // Get a section reader on a particular offset.
+ sectionReader := io.NewSectionReader(reader, readOffset, missingPartSize)
+
+ // Choose the needed hash algorithms to be calculated by hashCopyBuffer.
+ // Sha256 is avoided in non-v4 signature requests or HTTPS connections
+ hashSums := make(map[string][]byte)
+ hashAlgos := make(map[string]hash.Hash)
+ hashAlgos["md5"] = md5.New()
+ if c.signature.isV4() && !c.secure {
+ hashAlgos["sha256"] = sha256.New()
+ }
+
+ var prtSize int64
+ var err error
+ prtSize, err = hashCopyBuffer(hashAlgos, hashSums, tmpBuffer, sectionReader, readAtBuffer)
+ if err != nil {
+ // Send the error back through the channel.
+ uploadedPartsCh <- uploadedPartRes{
+ Size: 0,
+ Error: err,
+ }
+ // Exit the goroutine.
+ return
+ }
+
// Verify object if its uploaded.
verifyObjPart := objectPart{
- PartNumber: partNumber,
+ PartNumber: uploadReq.PartNum,
Size: partSize,
}
// Special case if we see a last part number, save last part
// size as the proper part size.
- if partNumber == lastPartNumber {
+ if uploadReq.PartNum == lastPartNumber {
verifyObjPart.Size = lastPartSize
}
// Only upload the necessary parts. Otherwise return size through channel
// to update any progress bar.
- if shouldUploadPartReadAt(verifyObjPart, partsInfo) {
- // If partNumber was not uploaded we calculate the missing
- // part offset and size. For all other part numbers we
- // calculate offset based on multiples of partSize.
- readOffset := int64(partNumber-1) * partSize
- missingPartSize := partSize
-
- // As a special case if partNumber is lastPartNumber, we
- // calculate the offset based on the last part size.
- if partNumber == lastPartNumber {
- readOffset = (size - lastPartSize)
- missingPartSize = lastPartSize
- }
-
- // Get a section reader on a particular offset.
- sectionReader := io.NewSectionReader(reader, readOffset, missingPartSize)
-
- // Choose the needed hash algorithms to be calculated by hashCopyBuffer.
- // Sha256 is avoided in non-v4 signature requests or HTTPS connections
- hashSums := make(map[string][]byte)
- hashAlgos := make(map[string]hash.Hash)
- hashAlgos["md5"] = md5.New()
- if c.signature.isV4() && !c.secure {
- hashAlgos["sha256"] = sha256.New()
- }
-
- var prtSize int64
- prtSize, err = hashCopyBuffer(hashAlgos, hashSums, tmpBuffer, sectionReader, readAtBuffer)
- if err != nil {
- // Send the error back through the channel.
- uploadedPartsCh <- uploadedPartRes{
- Size: 0,
- Error: err,
- }
- // Exit the goroutine.
- return
- }
-
+ if shouldUploadPartReadAt(verifyObjPart, uploadReq) {
// Proceed to upload the part.
var objPart objectPart
- objPart, err = c.uploadPart(bucketName, objectName, uploadID, tmpBuffer, partNumber, hashSums["md5"], hashSums["sha256"], prtSize)
+ objPart, err = c.uploadPart(bucketName, objectName, uploadID, tmpBuffer, uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Size: 0,
@@ -190,12 +189,13 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
return
}
// Save successfully uploaded part metadata.
- partsInfo[partNumber] = objPart
+ uploadReq.Part = &objPart
}
// Send successful part info through the channel.
uploadedPartsCh <- uploadedPartRes{
Size: verifyObjPart.Size,
- PartNum: partNumber,
+ PartNum: uploadReq.PartNum,
+ Part: uploadReq.Part,
Error: nil,
}
}
@@ -210,8 +210,9 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
return totalUploadedSize, uploadRes.Error
}
// Retrieve each uploaded part and store it to be completed.
- part, ok := partsInfo[uploadRes.PartNum]
- if !ok {
+ // part, ok := partsInfo[uploadRes.PartNum]
+ part := uploadRes.Part
+ if part == nil {
return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", uploadRes.PartNum))
}
// Update the totalUploadedSize.