From 96eab1202717e073782ec399a4e0820cae15b1bb Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Thu, 17 Aug 2017 17:19:06 -0700 Subject: Updating server dependancies. (#7246) --- .../minio/minio-go/api-put-object-multipart.go | 64 ++++++++++++---------- 1 file changed, 36 insertions(+), 28 deletions(-) (limited to 'vendor/github.com/minio/minio-go/api-put-object-multipart.go') diff --git a/vendor/github.com/minio/minio-go/api-put-object-multipart.go b/vendor/github.com/minio/minio-go/api-put-object-multipart.go index 1938378f8..6e0015acc 100644 --- a/vendor/github.com/minio/minio-go/api-put-object-multipart.go +++ b/vendor/github.com/minio/minio-go/api-put-object-multipart.go @@ -27,13 +27,14 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/minio/minio-go/pkg/s3utils" ) func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { - n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, size, metadata, progress) + n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, metadata, progress) if err != nil { errResp := ToErrorResponse(err) // Verify if multipart functionality is not available, if not @@ -50,8 +51,17 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read return n, err } -func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, size int64, - metadata map[string][]string, progress io.Reader) (n int64, err error) { +// Pool to manage re-usable memory for upload objects +// with streams with unknown size. +var bufPool = sync.Pool{ + New: func() interface{} { + _, partSize, _, _ := optimalPartInfo(-1) + b := make([]byte, partSize) + return &b + }, +} + +func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) { // Input validation. if err = s3utils.CheckValidBucketName(bucketName); err != nil { return 0, err @@ -68,7 +78,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader var complMultipartUpload completeMultipartUpload // Calculate the optimal parts info for a given size. - totalPartsCount, partSize, _, err := optimalPartInfo(size) + totalPartsCount, _, _, err := optimalPartInfo(-1) if err != nil { return 0, err } @@ -88,9 +98,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader // Part number always starts with '1'. partNumber := 1 - // Initialize a temporary buffer. - tmpBuffer := new(bytes.Buffer) - // Initialize parts uploaded map. partsInfo := make(map[int]ObjectPart) @@ -100,53 +107,54 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader // HTTPS connection. hashAlgos, hashSums := c.hashMaterials() - // Calculates hash sums while copying partSize bytes into tmpBuffer. - prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize) - if rErr != nil && rErr != io.EOF { + bufp := bufPool.Get().(*[]byte) + length, rErr := io.ReadFull(reader, *bufp) + if rErr == io.EOF { + break + } + if rErr != nil && rErr != io.ErrUnexpectedEOF { + bufPool.Put(bufp) return 0, rErr } - var reader io.Reader + // Calculates hash sums while copying partSize bytes into cw. + for k, v := range hashAlgos { + v.Write((*bufp)[:length]) + hashSums[k] = v.Sum(nil) + } + // Update progress reader appropriately to the latest offset // as we read from the source. - reader = newHook(tmpBuffer, progress) + rd := newHook(bytes.NewReader((*bufp)[:length]), progress) // Proceed to upload the part. var objPart ObjectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, - hashSums["md5"], hashSums["sha256"], prtSize, metadata) + objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber, + hashSums["md5"], hashSums["sha256"], int64(length), metadata) if err != nil { - // Reset the temporary buffer upon any error. - tmpBuffer.Reset() + bufPool.Put(bufp) return totalUploadedSize, err } // Save successfully uploaded part metadata. partsInfo[partNumber] = objPart - // Reset the temporary buffer. - tmpBuffer.Reset() - // Save successfully uploaded size. - totalUploadedSize += prtSize + totalUploadedSize += int64(length) // Increment part number. partNumber++ + // Put back data into bufpool. + bufPool.Put(bufp) + // For unknown size, Read EOF we break away. // We do not have to upload till totalPartsCount. - if size < 0 && rErr == io.EOF { + if rErr == io.EOF { break } } - // Verify if we uploaded all the data. - if size > 0 { - if totalUploadedSize != size { - return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) - } - } - // Loop over total uploaded parts to save them in // Parts array before completing the multipart request. for i := 1; i < partNumber; i++ { -- cgit v1.2.3-1-g7c22