From f02620b291b988848392c455a7719699f6b5c00f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 26 Oct 2016 05:21:07 -0700 Subject: Moving away from goamz to use minio-go instead. (#4193) minio-go does fully managed way of handling S3 API requests - Automatic bucket location management across all s3 regions. - Transparently upload large files in multipart if file 64MB or larger. - Right GetObject() API provides compatibility with io.ReadWriteSeeker interface. - Various other APIs including bulk deletes, server side object copy, bucket policies and bucket notifications. Fixes #4182 --- .../minio/minio-go/api-put-object-multipart.go | 393 +++++++++++++++++++++ 1 file changed, 393 insertions(+) create mode 100644 vendor/github.com/minio/minio-go/api-put-object-multipart.go (limited to 'vendor/github.com/minio/minio-go/api-put-object-multipart.go') diff --git a/vendor/github.com/minio/minio-go/api-put-object-multipart.go b/vendor/github.com/minio/minio-go/api-put-object-multipart.go new file mode 100644 index 000000000..cdd3f53c2 --- /dev/null +++ b/vendor/github.com/minio/minio-go/api-put-object-multipart.go @@ -0,0 +1,393 @@ +/* + * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package minio + +import ( + "bytes" + "crypto/md5" + "crypto/sha256" + "encoding/hex" + "encoding/xml" + "fmt" + "hash" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "sort" + "strconv" + "strings" +) + +// Comprehensive put object operation involving multipart resumable uploads. +// +// Following code handles these types of readers. +// +// - *os.File +// - *minio.Object +// - Any reader which has a method 'ReadAt()' +// +// If we exhaust all the known types, code proceeds to use stream as +// is where each part is re-downloaded, checksummed and verified +// before upload. +func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) { + if size > 0 && size > minPartSize { + // Verify if reader is *os.File, then use file system functionalities. + if isFile(reader) { + return c.putObjectMultipartFromFile(bucketName, objectName, reader.(*os.File), size, contentType, progress) + } + // Verify if reader is *minio.Object or io.ReaderAt. + // NOTE: Verification of object is kept for a specific purpose + // while it is going to be duck typed similar to io.ReaderAt. + // It is to indicate that *minio.Object implements io.ReaderAt. + // and such a functionality is used in the subsequent code + // path. + if isObject(reader) || isReadAt(reader) { + return c.putObjectMultipartFromReadAt(bucketName, objectName, reader.(io.ReaderAt), size, contentType, progress) + } + } + // For any other data size and reader type we do generic multipart + // approach by staging data in temporary files and uploading them. + return c.putObjectMultipartStream(bucketName, objectName, reader, size, contentType, progress) +} + +// putObjectStream uploads files bigger than 5MiB, and also supports +// special case where size is unknown i.e '-1'. +func (c Client) putObjectMultipartStream(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) { + // Input validation. + if err := isValidBucketName(bucketName); err != nil { + return 0, err + } + if err := isValidObjectName(objectName); err != nil { + return 0, err + } + + // Total data read and written to server. should be equal to 'size' at the end of the call. + var totalUploadedSize int64 + + // Complete multipart upload. + var complMultipartUpload completeMultipartUpload + + // A map of all previously uploaded parts. + var partsInfo = make(map[int]objectPart) + + // getUploadID for an object, initiates a new multipart request + // if it cannot find any previously partially uploaded object. + uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType) + if err != nil { + return 0, err + } + + // If This session is a continuation of a previous session fetch all + // previously uploaded parts info and as a special case only fetch partsInfo + // for only known upload size. + if !isNew { + // Fetch previously uploaded parts and maximum part size. + partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID) + if err != nil { + return 0, err + } + } + + // Calculate the optimal parts info for a given size. + totalPartsCount, partSize, _, err := optimalPartInfo(size) + if err != nil { + return 0, err + } + + // Part number always starts with '1'. + partNumber := 1 + + // Initialize a temporary buffer. + tmpBuffer := new(bytes.Buffer) + + for partNumber <= totalPartsCount { + // Choose hash algorithms to be calculated by hashCopyN, avoid sha256 + // with non-v4 signature request or HTTPS connection + hashSums := make(map[string][]byte) + hashAlgos := make(map[string]hash.Hash) + hashAlgos["md5"] = md5.New() + if c.signature.isV4() && !c.secure { + hashAlgos["sha256"] = sha256.New() + } + + // Calculates hash sums while copying partSize bytes into tmpBuffer. + prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize) + if rErr != nil { + if rErr != io.EOF { + return 0, rErr + } + } + + var reader io.Reader + // Update progress reader appropriately to the latest offset + // as we read from the source. + reader = newHook(tmpBuffer, progress) + + // Verify if part should be uploaded. + if shouldUploadPart(objectPart{ + ETag: hex.EncodeToString(hashSums["md5"]), + PartNumber: partNumber, + Size: prtSize, + }, partsInfo) { + // Proceed to upload the part. + var objPart objectPart + objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize) + if err != nil { + // Reset the temporary buffer upon any error. + tmpBuffer.Reset() + return totalUploadedSize, err + } + // Save successfully uploaded part metadata. + partsInfo[partNumber] = objPart + } else { + // Update the progress reader for the skipped part. + if progress != nil { + if _, err = io.CopyN(ioutil.Discard, progress, prtSize); err != nil { + return totalUploadedSize, err + } + } + } + + // Reset the temporary buffer. + tmpBuffer.Reset() + + // Save successfully uploaded size. + totalUploadedSize += prtSize + + // Increment part number. + partNumber++ + + // For unknown size, Read EOF we break away. + // We do not have to upload till totalPartsCount. + if size < 0 && rErr == io.EOF { + break + } + } + + // Verify if we uploaded all the data. + if size > 0 { + if totalUploadedSize != size { + return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) + } + } + + // Loop over total uploaded parts to save them in + // Parts array before completing the multipart request. + for i := 1; i < partNumber; i++ { + part, ok := partsInfo[i] + if !ok { + return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i)) + } + complMultipartUpload.Parts = append(complMultipartUpload.Parts, completePart{ + ETag: part.ETag, + PartNumber: part.PartNumber, + }) + } + + // Sort all completed parts. + sort.Sort(completedParts(complMultipartUpload.Parts)) + _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload) + if err != nil { + return totalUploadedSize, err + } + + // Return final size. + return totalUploadedSize, nil +} + +// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID. +func (c Client) initiateMultipartUpload(bucketName, objectName, contentType string) (initiateMultipartUploadResult, error) { + // Input validation. + if err := isValidBucketName(bucketName); err != nil { + return initiateMultipartUploadResult{}, err + } + if err := isValidObjectName(objectName); err != nil { + return initiateMultipartUploadResult{}, err + } + + // Initialize url queries. + urlValues := make(url.Values) + urlValues.Set("uploads", "") + + if contentType == "" { + contentType = "application/octet-stream" + } + + // Set ContentType header. + customHeader := make(http.Header) + customHeader.Set("Content-Type", contentType) + + reqMetadata := requestMetadata{ + bucketName: bucketName, + objectName: objectName, + queryValues: urlValues, + customHeader: customHeader, + } + + // Execute POST on an objectName to initiate multipart upload. + resp, err := c.executeMethod("POST", reqMetadata) + defer closeResponse(resp) + if err != nil { + return initiateMultipartUploadResult{}, err + } + if resp != nil { + if resp.StatusCode != http.StatusOK { + return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName) + } + } + // Decode xml for new multipart upload. + initiateMultipartUploadResult := initiateMultipartUploadResult{} + err = xmlDecoder(resp.Body, &initiateMultipartUploadResult) + if err != nil { + return initiateMultipartUploadResult, err + } + return initiateMultipartUploadResult, nil +} + +// uploadPart - Uploads a part in a multipart upload. +func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Reader, partNumber int, md5Sum, sha256Sum []byte, size int64) (objectPart, error) { + // Input validation. + if err := isValidBucketName(bucketName); err != nil { + return objectPart{}, err + } + if err := isValidObjectName(objectName); err != nil { + return objectPart{}, err + } + if size > maxPartSize { + return objectPart{}, ErrEntityTooLarge(size, maxPartSize, bucketName, objectName) + } + if size <= -1 { + return objectPart{}, ErrEntityTooSmall(size, bucketName, objectName) + } + if partNumber <= 0 { + return objectPart{}, ErrInvalidArgument("Part number cannot be negative or equal to zero.") + } + if uploadID == "" { + return objectPart{}, ErrInvalidArgument("UploadID cannot be empty.") + } + + // Get resources properly escaped and lined up before using them in http request. + urlValues := make(url.Values) + // Set part number. + urlValues.Set("partNumber", strconv.Itoa(partNumber)) + // Set upload id. + urlValues.Set("uploadId", uploadID) + + reqMetadata := requestMetadata{ + bucketName: bucketName, + objectName: objectName, + queryValues: urlValues, + contentBody: reader, + contentLength: size, + contentMD5Bytes: md5Sum, + contentSHA256Bytes: sha256Sum, + } + + // Execute PUT on each part. + resp, err := c.executeMethod("PUT", reqMetadata) + defer closeResponse(resp) + if err != nil { + return objectPart{}, err + } + if resp != nil { + if resp.StatusCode != http.StatusOK { + return objectPart{}, httpRespToErrorResponse(resp, bucketName, objectName) + } + } + // Once successfully uploaded, return completed part. + objPart := objectPart{} + objPart.Size = size + objPart.PartNumber = partNumber + // Trim off the odd double quotes from ETag in the beginning and end. + objPart.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"") + objPart.ETag = strings.TrimSuffix(objPart.ETag, "\"") + return objPart, nil +} + +// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts. +func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string, complete completeMultipartUpload) (completeMultipartUploadResult, error) { + // Input validation. + if err := isValidBucketName(bucketName); err != nil { + return completeMultipartUploadResult{}, err + } + if err := isValidObjectName(objectName); err != nil { + return completeMultipartUploadResult{}, err + } + + // Initialize url queries. + urlValues := make(url.Values) + urlValues.Set("uploadId", uploadID) + + // Marshal complete multipart body. + completeMultipartUploadBytes, err := xml.Marshal(complete) + if err != nil { + return completeMultipartUploadResult{}, err + } + + // Instantiate all the complete multipart buffer. + completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes) + reqMetadata := requestMetadata{ + bucketName: bucketName, + objectName: objectName, + queryValues: urlValues, + contentBody: completeMultipartUploadBuffer, + contentLength: int64(len(completeMultipartUploadBytes)), + contentSHA256Bytes: sum256(completeMultipartUploadBytes), + } + + // Execute POST to complete multipart upload for an objectName. + resp, err := c.executeMethod("POST", reqMetadata) + defer closeResponse(resp) + if err != nil { + return completeMultipartUploadResult{}, err + } + if resp != nil { + if resp.StatusCode != http.StatusOK { + return completeMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName) + } + } + + // Read resp.Body into a []bytes to parse for Error response inside the body + var b []byte + b, err = ioutil.ReadAll(resp.Body) + if err != nil { + return completeMultipartUploadResult{}, err + } + // Decode completed multipart upload response on success. + completeMultipartUploadResult := completeMultipartUploadResult{} + err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult) + if err != nil { + // xml parsing failure due to presence an ill-formed xml fragment + return completeMultipartUploadResult, err + } else if completeMultipartUploadResult.Bucket == "" { + // xml's Decode method ignores well-formed xml that don't apply to the type of value supplied. + // In this case, it would leave completeMultipartUploadResult with the corresponding zero-values + // of the members. + + // Decode completed multipart upload response on failure + completeMultipartUploadErr := ErrorResponse{} + err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr) + if err != nil { + // xml parsing failure due to presence an ill-formed xml fragment + return completeMultipartUploadResult, err + } + return completeMultipartUploadResult, completeMultipartUploadErr + } + return completeMultipartUploadResult, nil +} -- cgit v1.2.3-1-g7c22