summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/api-put-object-file.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-put-object-file.go')
-rw-r--r--vendor/github.com/minio/minio-go/api-put-object-file.go307
1 files changed, 307 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/api-put-object-file.go b/vendor/github.com/minio/minio-go/api-put-object-file.go
new file mode 100644
index 000000000..deaed0acd
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/api-put-object-file.go
@@ -0,0 +1,307 @@
+/*
+ * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package minio
+
+import (
+ "crypto/md5"
+ "crypto/sha256"
+ "encoding/hex"
+ "fmt"
+ "hash"
+ "io"
+ "io/ioutil"
+ "mime"
+ "os"
+ "path/filepath"
+ "sort"
+)
+
+// FPutObject - Create an object in a bucket, with contents from file at filePath.
+func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) (n int64, err error) {
+ // Input validation.
+ if err := isValidBucketName(bucketName); err != nil {
+ return 0, err
+ }
+ if err := isValidObjectName(objectName); err != nil {
+ return 0, err
+ }
+
+ // Open the referenced file.
+ fileReader, err := os.Open(filePath)
+ // If any error fail quickly here.
+ if err != nil {
+ return 0, err
+ }
+ defer fileReader.Close()
+
+ // Save the file stat.
+ fileStat, err := fileReader.Stat()
+ if err != nil {
+ return 0, err
+ }
+
+ // Save the file size.
+ fileSize := fileStat.Size()
+
+ // Check for largest object size allowed.
+ if fileSize > int64(maxMultipartPutObjectSize) {
+ return 0, ErrEntityTooLarge(fileSize, maxMultipartPutObjectSize, bucketName, objectName)
+ }
+
+ // Set contentType based on filepath extension if not given or default
+ // value of "binary/octet-stream" if the extension has no associated type.
+ if contentType == "" {
+ if contentType = mime.TypeByExtension(filepath.Ext(filePath)); contentType == "" {
+ contentType = "application/octet-stream"
+ }
+ }
+
+ // NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs.
+ // Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers.
+ if isGoogleEndpoint(c.endpointURL) {
+ if fileSize > int64(maxSinglePutObjectSize) {
+ return 0, ErrorResponse{
+ Code: "NotImplemented",
+ Message: fmt.Sprintf("Invalid Content-Length %d for file uploads to Google Cloud Storage.", fileSize),
+ Key: objectName,
+ BucketName: bucketName,
+ }
+ }
+ // Do not compute MD5 for Google Cloud Storage. Uploads up to 5GiB in size.
+ return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ }
+
+ // NOTE: S3 doesn't allow anonymous multipart requests.
+ if isAmazonEndpoint(c.endpointURL) && c.anonymous {
+ if fileSize > int64(maxSinglePutObjectSize) {
+ return 0, ErrorResponse{
+ Code: "NotImplemented",
+ Message: fmt.Sprintf("For anonymous requests Content-Length cannot be %d.", fileSize),
+ Key: objectName,
+ BucketName: bucketName,
+ }
+ }
+ // Do not compute MD5 for anonymous requests to Amazon
+ // S3. Uploads up to 5GiB in size.
+ return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ }
+
+ // Small object upload is initiated for uploads for input data size smaller than 5MiB.
+ if fileSize < minPartSize && fileSize >= 0 {
+ return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ }
+ // Upload all large objects as multipart.
+ n, err = c.putObjectMultipartFromFile(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ if err != nil {
+ errResp := ToErrorResponse(err)
+ // Verify if multipart functionality is not available, if not
+ // fall back to single PutObject operation.
+ if errResp.Code == "NotImplemented" {
+ // If size of file is greater than '5GiB' fail.
+ if fileSize > maxSinglePutObjectSize {
+ return 0, ErrEntityTooLarge(fileSize, maxSinglePutObjectSize, bucketName, objectName)
+ }
+ // Fall back to uploading as single PutObject operation.
+ return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil)
+ }
+ return n, err
+ }
+ return n, nil
+}
+
+// putObjectMultipartFromFile - Creates object from contents of *os.File
+//
+// NOTE: This function is meant to be used for readers with local
+// file as in *os.File. This function resumes by skipping all the
+// necessary parts which were already uploaded by verifying them
+// against MD5SUM of each individual parts. This function also
+// effectively utilizes file system capabilities of reading from
+// specific sections and not having to create temporary files.
+func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, contentType string, progress io.Reader) (int64, error) {
+ // Input validation.
+ if err := isValidBucketName(bucketName); err != nil {
+ return 0, err
+ }
+ if err := isValidObjectName(objectName); err != nil {
+ return 0, err
+ }
+
+ // Get upload id for an object, initiates a new multipart request
+ // if it cannot find any previously partially uploaded object.
+ uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType)
+ if err != nil {
+ return 0, err
+ }
+
+ // Total data read and written to server. should be equal to 'size' at the end of the call.
+ var totalUploadedSize int64
+
+ // Complete multipart upload.
+ var complMultipartUpload completeMultipartUpload
+
+ // A map of all uploaded parts.
+ var partsInfo = make(map[int]objectPart)
+
+ // If this session is a continuation of a previous session fetch all
+ // previously uploaded parts info.
+ if !isNew {
+ // Fetch previously upload parts and maximum part size.
+ partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ // Calculate the optimal parts info for a given size.
+ totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(fileSize)
+ if err != nil {
+ return 0, err
+ }
+
+ // Create a channel to communicate a part was uploaded.
+ // Buffer this to 10000, the maximum number of parts allowed by S3.
+ uploadedPartsCh := make(chan uploadedPartRes, 10000)
+
+ // Create a channel to communicate which part to upload.
+ // Buffer this to 10000, the maximum number of parts allowed by S3.
+ uploadPartsCh := make(chan int, 10000)
+
+ // Just for readability.
+ lastPartNumber := totalPartsCount
+
+ // Send each part through the partUploadCh to be uploaded.
+ for p := 1; p <= totalPartsCount; p++ {
+ uploadPartsCh <- p
+ }
+ close(uploadPartsCh)
+
+ // Use three 'workers' to upload parts in parallel.
+ for w := 1; w <= 3; w++ {
+ go func() {
+ // Deal with each part as it comes through the channel.
+ for partNumber := range uploadPartsCh {
+ // Add hash algorithms that need to be calculated by computeHash()
+ // In case of a non-v4 signature or https connection, sha256 is not needed.
+ hashAlgos := make(map[string]hash.Hash)
+ hashSums := make(map[string][]byte)
+ hashAlgos["md5"] = md5.New()
+ if c.signature.isV4() && !c.secure {
+ hashAlgos["sha256"] = sha256.New()
+ }
+
+ // Create the part to be uploaded.
+ verifyObjPart := objectPart{
+ ETag: hex.EncodeToString(hashSums["md5"]),
+ PartNumber: partNumber,
+ Size: partSize,
+ }
+ // If this is the last part do not give it the full part size.
+ if partNumber == lastPartNumber {
+ verifyObjPart.Size = lastPartSize
+ }
+
+ // Verify if part should be uploaded.
+ if shouldUploadPart(verifyObjPart, partsInfo) {
+ // If partNumber was not uploaded we calculate the missing
+ // part offset and size. For all other part numbers we
+ // calculate offset based on multiples of partSize.
+ readOffset := int64(partNumber-1) * partSize
+ missingPartSize := partSize
+
+ // As a special case if partNumber is lastPartNumber, we
+ // calculate the offset based on the last part size.
+ if partNumber == lastPartNumber {
+ readOffset = (fileSize - lastPartSize)
+ missingPartSize = lastPartSize
+ }
+
+ // Get a section reader on a particular offset.
+ sectionReader := io.NewSectionReader(fileReader, readOffset, missingPartSize)
+ var prtSize int64
+ prtSize, err = computeHash(hashAlgos, hashSums, sectionReader)
+ if err != nil {
+ uploadedPartsCh <- uploadedPartRes{
+ Error: err,
+ }
+ // Exit the goroutine.
+ return
+ }
+
+ // Proceed to upload the part.
+ var objPart objectPart
+ objPart, err = c.uploadPart(bucketName, objectName, uploadID, sectionReader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize)
+ if err != nil {
+ uploadedPartsCh <- uploadedPartRes{
+ Error: err,
+ }
+ // Exit the goroutine.
+ return
+ }
+ // Save successfully uploaded part metadata.
+ partsInfo[partNumber] = objPart
+ }
+ // Return through the channel the part size.
+ uploadedPartsCh <- uploadedPartRes{
+ Size: verifyObjPart.Size,
+ PartNum: partNumber,
+ Error: nil,
+ }
+ }
+ }()
+ }
+
+ // Retrieve each uploaded part once it is done.
+ for u := 1; u <= totalPartsCount; u++ {
+ uploadRes := <-uploadedPartsCh
+ if uploadRes.Error != nil {
+ return totalUploadedSize, uploadRes.Error
+ }
+ // Retrieve each uploaded part and store it to be completed.
+ part, ok := partsInfo[uploadRes.PartNum]
+ if !ok {
+ return totalUploadedSize, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", uploadRes.PartNum))
+ }
+ // Update the total uploaded size.
+ totalUploadedSize += uploadRes.Size
+ // Update the progress bar if there is one.
+ if progress != nil {
+ if _, err = io.CopyN(ioutil.Discard, progress, uploadRes.Size); err != nil {
+ return totalUploadedSize, err
+ }
+ }
+ // Store the part to be completed.
+ complMultipartUpload.Parts = append(complMultipartUpload.Parts, completePart{
+ ETag: part.ETag,
+ PartNumber: part.PartNumber,
+ })
+ }
+
+ // Verify if we uploaded all data.
+ if totalUploadedSize != fileSize {
+ return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, fileSize, bucketName, objectName)
+ }
+
+ // Sort all completed parts.
+ sort.Sort(completedParts(complMultipartUpload.Parts))
+ _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload)
+ if err != nil {
+ return totalUploadedSize, err
+ }
+
+ // Return final size.
+ return totalUploadedSize, nil
+}