summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/api-put-object.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-put-object.go')
-rw-r--r--vendor/github.com/minio/minio-go/api-put-object.go240
1 files changed, 90 insertions, 150 deletions
diff --git a/vendor/github.com/minio/minio-go/api-put-object.go b/vendor/github.com/minio/minio-go/api-put-object.go
index 94db82593..6d90eab74 100644
--- a/vendor/github.com/minio/minio-go/api-put-object.go
+++ b/vendor/github.com/minio/minio-go/api-put-object.go
@@ -1,5 +1,6 @@
/*
- * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc.
+ * Minio Go Library for Amazon S3 Compatible Cloud Storage
+ * Copyright 2015-2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,119 +19,91 @@ package minio
import (
"bytes"
+ "context"
"fmt"
"io"
- "os"
- "reflect"
- "runtime"
+ "net/http"
"runtime/debug"
"sort"
- "strings"
+ "github.com/minio/minio-go/pkg/encrypt"
"github.com/minio/minio-go/pkg/s3utils"
+ "golang.org/x/net/lex/httplex"
)
-// toInt - converts go value to its integer representation based
-// on the value kind if it is an integer.
-func toInt(value reflect.Value) (size int64) {
- size = -1
- if value.IsValid() {
- switch value.Kind() {
- case reflect.Int:
- fallthrough
- case reflect.Int8:
- fallthrough
- case reflect.Int16:
- fallthrough
- case reflect.Int32:
- fallthrough
- case reflect.Int64:
- size = value.Int()
- }
+// PutObjectOptions represents options specified by user for PutObject call
+type PutObjectOptions struct {
+ UserMetadata map[string]string
+ Progress io.Reader
+ ContentType string
+ ContentEncoding string
+ ContentDisposition string
+ CacheControl string
+ EncryptMaterials encrypt.Materials
+ NumThreads uint
+ StorageClass string
+}
+
+// getNumThreads - gets the number of threads to be used in the multipart
+// put object operation
+func (opts PutObjectOptions) getNumThreads() (numThreads int) {
+ if opts.NumThreads > 0 {
+ numThreads = int(opts.NumThreads)
+ } else {
+ numThreads = totalWorkers
}
- return size
+ return
}
-// getReaderSize - Determine the size of Reader if available.
-func getReaderSize(reader io.Reader) (size int64, err error) {
- size = -1
- if reader == nil {
- return -1, nil
+// Header - constructs the headers from metadata entered by user in
+// PutObjectOptions struct
+func (opts PutObjectOptions) Header() (header http.Header) {
+ header = make(http.Header)
+
+ if opts.ContentType != "" {
+ header["Content-Type"] = []string{opts.ContentType}
+ } else {
+ header["Content-Type"] = []string{"application/octet-stream"}
+ }
+ if opts.ContentEncoding != "" {
+ header["Content-Encoding"] = []string{opts.ContentEncoding}
+ }
+ if opts.ContentDisposition != "" {
+ header["Content-Disposition"] = []string{opts.ContentDisposition}
}
- // Verify if there is a method by name 'Size'.
- sizeFn := reflect.ValueOf(reader).MethodByName("Size")
- // Verify if there is a method by name 'Len'.
- lenFn := reflect.ValueOf(reader).MethodByName("Len")
- if sizeFn.IsValid() {
- if sizeFn.Kind() == reflect.Func {
- // Call the 'Size' function and save its return value.
- result := sizeFn.Call([]reflect.Value{})
- if len(result) == 1 {
- size = toInt(result[0])
- }
+ if opts.CacheControl != "" {
+ header["Cache-Control"] = []string{opts.CacheControl}
+ }
+ if opts.EncryptMaterials != nil {
+ header[amzHeaderIV] = []string{opts.EncryptMaterials.GetIV()}
+ header[amzHeaderKey] = []string{opts.EncryptMaterials.GetKey()}
+ header[amzHeaderMatDesc] = []string{opts.EncryptMaterials.GetDesc()}
+ }
+ if opts.StorageClass != "" {
+ header[amzStorageClass] = []string{opts.StorageClass}
+ }
+ for k, v := range opts.UserMetadata {
+ if !isAmzHeader(k) && !isStandardHeader(k) && !isSSEHeader(k) && !isStorageClassHeader(k) {
+ header["X-Amz-Meta-"+k] = []string{v}
+ } else {
+ header[k] = []string{v}
}
- } else if lenFn.IsValid() {
- if lenFn.Kind() == reflect.Func {
- // Call the 'Len' function and save its return value.
- result := lenFn.Call([]reflect.Value{})
- if len(result) == 1 {
- size = toInt(result[0])
- }
+ }
+ return
+}
+
+// validate() checks if the UserMetadata map has standard headers or client side
+// encryption headers and raises an error if so.
+func (opts PutObjectOptions) validate() (err error) {
+ for k, v := range opts.UserMetadata {
+ if !httplex.ValidHeaderFieldName(k) || isStandardHeader(k) || isCSEHeader(k) || isStorageClassHeader(k) {
+ return ErrInvalidArgument(k + " unsupported user defined metadata name")
}
- } else {
- // Fallback to Stat() method, two possible Stat() structs exist.
- switch v := reader.(type) {
- case *os.File:
- var st os.FileInfo
- st, err = v.Stat()
- if err != nil {
- // Handle this case specially for "windows",
- // certain files for example 'Stdin', 'Stdout' and
- // 'Stderr' it is not allowed to fetch file information.
- if runtime.GOOS == "windows" {
- if strings.Contains(err.Error(), "GetFileInformationByHandle") {
- return -1, nil
- }
- }
- return
- }
- // Ignore if input is a directory, throw an error.
- if st.Mode().IsDir() {
- return -1, ErrInvalidArgument("Input file cannot be a directory.")
- }
- // Ignore 'Stdin', 'Stdout' and 'Stderr', since they
- // represent *os.File type but internally do not
- // implement Seekable calls. Ignore them and treat
- // them like a stream with unknown length.
- switch st.Name() {
- case "stdin", "stdout", "stderr":
- return
- // Ignore read/write stream of os.Pipe() which have unknown length too.
- case "|0", "|1":
- return
- }
- var pos int64
- pos, err = v.Seek(0, 1) // SeekCurrent.
- if err != nil {
- return -1, err
- }
- size = st.Size() - pos
- case *Object:
- var st ObjectInfo
- st, err = v.Stat()
- if err != nil {
- return
- }
- var pos int64
- pos, err = v.Seek(0, 1) // SeekCurrent.
- if err != nil {
- return -1, err
- }
- size = st.Size - pos
+ if !httplex.ValidHeaderFieldValue(v) {
+ return ErrInvalidArgument(v + " unsupported user defined metadata value")
}
}
- // Returns the size here.
- return size, err
+ return nil
}
// completedParts is a collection of parts sortable by their part numbers.
@@ -152,72 +125,41 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part
// - For size input as -1 PutObject does a multipart Put operation
// until input stream reaches EOF. Maximum object size that can
// be uploaded through this operation will be 5TiB.
-func (c Client) PutObject(bucketName, objectName string, reader io.Reader, contentType string) (n int64, err error) {
- return c.PutObjectWithMetadata(bucketName, objectName, reader, map[string][]string{
- "Content-Type": []string{contentType},
- }, nil)
+func (c Client) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64,
+ opts PutObjectOptions) (n int64, err error) {
+ return c.PutObjectWithContext(context.Background(), bucketName, objectName, reader, objectSize, opts)
}
-// PutObjectWithSize - is a helper PutObject similar in behavior to PutObject()
-// but takes the size argument explicitly, this function avoids doing reflection
-// internally to figure out the size of input stream. Also if the input size is
-// lesser than 0 this function returns an error.
-func (c Client) PutObjectWithSize(bucketName, objectName string, reader io.Reader, readerSize int64, metadata map[string][]string, progress io.Reader) (n int64, err error) {
- return c.putObjectCommon(bucketName, objectName, reader, readerSize, metadata, progress)
-}
-
-// PutObjectWithMetadata using AWS streaming signature V4
-func (c Client) PutObjectWithMetadata(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
- return c.PutObjectWithProgress(bucketName, objectName, reader, metadata, progress)
-}
-
-// PutObjectWithProgress using AWS streaming signature V4
-func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
- // Size of the object.
- var size int64
-
- // Get reader size.
- size, err = getReaderSize(reader)
- if err != nil {
- return 0, err
- }
-
- return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress)
-}
-
-func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) {
+func (c Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
// Check for largest object size allowed.
if size > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
}
// NOTE: Streaming signature is not supported by GCS.
- if s3utils.IsGoogleEndpoint(c.endpointURL) {
+ if s3utils.IsGoogleEndpoint(*c.endpointURL) {
// Do not compute MD5 for Google Cloud Storage.
- return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
}
if c.overrideSignerType.IsV2() {
if size >= 0 && size < minPartSize {
- return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
}
- return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
}
-
if size < 0 {
- return c.putObjectMultipartStreamNoLength(bucketName, objectName, reader, metadata, progress)
+ return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
}
if size < minPartSize {
- return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
}
-
// For all sizes greater than 64MiB do multipart.
- return c.putObjectMultipartStream(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)
}
-func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, reader io.Reader, metadata map[string][]string,
- progress io.Reader) (n int64, err error) {
+func (c Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
@@ -238,16 +180,15 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
if err != nil {
return 0, err
}
-
// Initiate a new multipart upload.
- uploadID, err := c.newUploadID(bucketName, objectName, metadata)
+ uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return 0, err
}
defer func() {
if err != nil {
- c.abortMultipartUpload(bucketName, objectName, uploadID)
+ c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
}
}()
@@ -263,21 +204,20 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
for partNumber <= totalPartsCount {
length, rErr := io.ReadFull(reader, buf)
- if rErr == io.EOF {
+ if rErr == io.EOF && partNumber > 1 {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
return 0, rErr
}
-
// Update progress reader appropriately to the latest offset
// as we read from the source.
- rd := newHook(bytes.NewReader(buf[:length]), progress)
+ rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
// Proceed to upload the part.
var objPart ObjectPart
- objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
- nil, nil, int64(length), metadata)
+ objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
+ "", "", int64(length), opts.UserMetadata)
if err != nil {
return totalUploadedSize, err
}
@@ -313,7 +253,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))
- if _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload); err != nil {
+ if _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil {
return totalUploadedSize, err
}