summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org
diff options
context:
space:
mode:
authorChristopher Speller <crspeller@gmail.com>2018-08-28 10:05:26 -0700
committerGitHub <noreply@github.com>2018-08-28 10:05:26 -0700
commit61e27beabc9804fdcf59ed9df2180802175a4f70 (patch)
tree52c86f5cdbd4e13d05b8f9dddad1a01b88e26cab /vendor/google.golang.org
parent347ee1d205c95f5fd766e206cc65bfb9782a2623 (diff)
downloadchat-61e27beabc9804fdcf59ed9df2180802175a4f70.tar.gz
chat-61e27beabc9804fdcf59ed9df2180802175a4f70.tar.bz2
chat-61e27beabc9804fdcf59ed9df2180802175a4f70.zip
Updating dependancies. (#9303)
Diffstat (limited to 'vendor/google.golang.org')
-rw-r--r--vendor/google.golang.org/genproto/googleapis/rpc/status/status.pb.go6
-rw-r--r--vendor/google.golang.org/grpc/.travis.yml32
-rw-r--r--vendor/google.golang.org/grpc/Makefile7
-rw-r--r--vendor/google.golang.org/grpc/balancer/balancer.go6
-rw-r--r--vendor/google.golang.org/grpc/call.go33
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go478
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials.go77
-rw-r--r--vendor/google.golang.org/grpc/credentials/go16.go (renamed from vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go)0
-rw-r--r--vendor/google.golang.org/grpc/credentials/go17.go (renamed from vendor/google.golang.org/grpc/credentials/credentials_util_go17.go)3
-rw-r--r--vendor/google.golang.org/grpc/credentials/go18.go (renamed from vendor/google.golang.org/grpc/credentials/credentials_util_go18.go)8
-rw-r--r--vendor/google.golang.org/grpc/credentials/go19.go35
-rw-r--r--vendor/google.golang.org/grpc/dialoptions.go450
-rw-r--r--vendor/google.golang.org/grpc/go16.go7
-rw-r--r--vendor/google.golang.org/grpc/go17.go7
-rwxr-xr-xvendor/google.golang.org/grpc/install_gae.sh6
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/types.go7
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/types_linux.go54
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go38
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/util_linux_go19.go39
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/util_nonlinux_pre_go19.go26
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/envconfig.go (renamed from vendor/google.golang.org/grpc/envconfig.go)14
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/bdp_estimator.go (renamed from vendor/google.golang.org/grpc/transport/bdp_estimator.go)0
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go (renamed from vendor/google.golang.org/grpc/transport/controlbuf.go)88
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/defaults.go49
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/flowcontrol.go (renamed from vendor/google.golang.org/grpc/transport/flowcontrol.go)24
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/go16.go (renamed from vendor/google.golang.org/grpc/transport/go16.go)11
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/go17.go (renamed from vendor/google.golang.org/grpc/transport/go17.go)11
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go (renamed from vendor/google.golang.org/grpc/transport/handler_server.go)15
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go (renamed from vendor/google.golang.org/grpc/transport/http2_client.go)142
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go (renamed from vendor/google.golang.org/grpc/transport/http2_server.go)135
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go (renamed from vendor/google.golang.org/grpc/transport/http_util.go)59
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/log.go (renamed from vendor/google.golang.org/grpc/transport/log.go)0
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go (renamed from vendor/google.golang.org/grpc/transport/transport.go)73
-rw-r--r--vendor/google.golang.org/grpc/picker_wrapper.go160
-rw-r--r--vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go52
-rw-r--r--vendor/google.golang.org/grpc/resolver/resolver.go8
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go32
-rw-r--r--vendor/google.golang.org/grpc/server.go47
-rw-r--r--vendor/google.golang.org/grpc/service_config.go149
-rw-r--r--vendor/google.golang.org/grpc/stickiness_linkedmap.go97
-rw-r--r--vendor/google.golang.org/grpc/stream.go694
-rw-r--r--vendor/google.golang.org/grpc/version.go2
-rwxr-xr-xvendor/google.golang.org/grpc/vet.sh11
43 files changed, 1979 insertions, 1213 deletions
diff --git a/vendor/google.golang.org/genproto/googleapis/rpc/status/status.pb.go b/vendor/google.golang.org/genproto/googleapis/rpc/status/status.pb.go
index 3b07a25b7..7bfe37a3d 100644
--- a/vendor/google.golang.org/genproto/googleapis/rpc/status/status.pb.go
+++ b/vendor/google.golang.org/genproto/googleapis/rpc/status/status.pb.go
@@ -90,7 +90,7 @@ func (m *Status) Reset() { *m = Status{} }
func (m *Status) String() string { return proto.CompactTextString(m) }
func (*Status) ProtoMessage() {}
func (*Status) Descriptor() ([]byte, []int) {
- return fileDescriptor_status_c656c685916bdf47, []int{0}
+ return fileDescriptor_status_c6e4de62dcdf2edf, []int{0}
}
func (m *Status) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Status.Unmarshal(m, b)
@@ -135,9 +135,9 @@ func init() {
proto.RegisterType((*Status)(nil), "google.rpc.Status")
}
-func init() { proto.RegisterFile("google/rpc/status.proto", fileDescriptor_status_c656c685916bdf47) }
+func init() { proto.RegisterFile("google/rpc/status.proto", fileDescriptor_status_c6e4de62dcdf2edf) }
-var fileDescriptor_status_c656c685916bdf47 = []byte{
+var fileDescriptor_status_c6e4de62dcdf2edf = []byte{
// 209 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4f, 0xcf, 0xcf, 0x4f,
0xcf, 0x49, 0xd5, 0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x2e, 0x49, 0x2c, 0x29, 0x2d, 0xd6, 0x2b, 0x28,
diff --git a/vendor/google.golang.org/grpc/.travis.yml b/vendor/google.golang.org/grpc/.travis.yml
index 3c2621ab7..5b5bfb13d 100644
--- a/vendor/google.golang.org/grpc/.travis.yml
+++ b/vendor/google.golang.org/grpc/.travis.yml
@@ -1,24 +1,34 @@
language: go
-go:
- - 1.6.x
- - 1.7.x
- - 1.8.x
- - 1.9.x
- - 1.10.x
-
matrix:
include:
- go: 1.10.x
+ env: VET=1 RACE=1
+ - go: 1.6.x
+ - go: 1.7.x
+ - go: 1.8.x
+ - go: 1.9.x
+ - go: 1.9.x
+ env: GAE=1
+ - go: 1.10.x
env: RUN386=1
+ - go: 1.10.x
+ env: GRPC_GO_RETRY=on
go_import_path: google.golang.org/grpc
before_install:
- if [[ -n "$RUN386" ]]; then export GOARCH=386; fi
- - if [[ "$TRAVIS_GO_VERSION" = 1.10* && "$GOARCH" != "386" ]]; then ./vet.sh -install || exit 1; fi
+ - if [[ "$TRAVIS_EVENT_TYPE" = "cron" && -z "$RUN386" ]]; then RACE=1; fi
+ - if [[ "$TRAVIS_EVENT_TYPE" != "cron" ]]; then VET_SKIP_PROTO=1; fi
+
+install:
+ - if [[ "$GAE" = 1 ]]; then source ./install_gae.sh; fi
+ - if [[ "$VET" = 1 ]]; then ./vet.sh -install; fi
script:
- - if [[ "$TRAVIS_GO_VERSION" = 1.10* && "$GOARCH" != "386" ]]; then ./vet.sh || exit 1; fi
- - make test || exit 1
- - if [[ "$GOARCH" != "386" ]]; then make testrace; fi
+ - set -e
+ - if [[ "$GAE" = 1 ]]; then make testappengine; exit 0; fi
+ - if [[ "$VET" = 1 ]]; then ./vet.sh; fi
+ - make test
+ - if [[ "$RACE" = 1 ]]; then make testrace; fi
diff --git a/vendor/google.golang.org/grpc/Makefile b/vendor/google.golang.org/grpc/Makefile
index 6f393a808..50454530f 100644
--- a/vendor/google.golang.org/grpc/Makefile
+++ b/vendor/google.golang.org/grpc/Makefile
@@ -9,6 +9,9 @@ updatedeps:
testdeps:
go get -d -v -t google.golang.org/grpc/...
+testgaedeps:
+ goapp get -d -v -t -tags 'appengine appenginevm' google.golang.org/grpc/...
+
updatetestdeps:
go get -d -v -t -u -f google.golang.org/grpc/...
@@ -31,6 +34,9 @@ test: testdeps
testrace: testdeps
go test -race -cpu 1,4 -timeout 7m google.golang.org/grpc/...
+testappengine: testgaedeps
+ goapp test -cpu 1,4 -timeout 5m google.golang.org/grpc/...
+
clean:
go clean -i google.golang.org/grpc/...
@@ -39,6 +45,7 @@ clean:
deps \
updatedeps \
testdeps \
+ testgaedeps \
updatetestdeps \
build \
proto \
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index f9d83c2f3..069feb1e7 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -143,7 +143,11 @@ type Builder interface {
}
// PickOptions contains addition information for the Pick operation.
-type PickOptions struct{}
+type PickOptions struct {
+ // FullMethodName is the method name that NewClientStream() is called
+ // with. The canonical format is /service/Method.
+ FullMethodName string
+}
// DoneInfo contains additional information for done.
type DoneInfo struct {
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
index f73b7d552..180d79d06 100644
--- a/vendor/google.golang.org/grpc/call.go
+++ b/vendor/google.golang.org/grpc/call.go
@@ -63,31 +63,12 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
var unaryStreamDesc = &StreamDesc{ServerStreams: false, ClientStreams: false}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
- // TODO: implement retries in clientStream and make this simply
- // newClientStream, SendMsg, RecvMsg.
- firstAttempt := true
- for {
- csInt, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
- if err != nil {
- return err
- }
- cs := csInt.(*clientStream)
- if err := cs.SendMsg(req); err != nil {
- if !cs.c.failFast && cs.attempt.s.Unprocessed() && firstAttempt {
- // TODO: Add a field to header for grpc-transparent-retry-attempts
- firstAttempt = false
- continue
- }
- return err
- }
- if err := cs.RecvMsg(reply); err != nil {
- if !cs.c.failFast && cs.attempt.s.Unprocessed() && firstAttempt {
- // TODO: Add a field to header for grpc-transparent-retry-attempts
- firstAttempt = false
- continue
- }
- return err
- }
- return nil
+ cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
+ if err != nil {
+ return err
}
+ if err := cs.SendMsg(req); err != nil {
+ return err
+ }
+ return cs.RecvMsg(reply)
}
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 84ba9e5ad..9b035e8f5 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -26,6 +26,7 @@ import (
"reflect"
"strings"
"sync"
+ "sync/atomic"
"time"
"golang.org/x/net/context"
@@ -36,16 +37,14 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
- "google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
- "google.golang.org/grpc/transport"
)
const (
@@ -94,33 +93,12 @@ var (
errNetworkIO = errors.New("grpc: failed with network I/O error")
)
-// dialOptions configure a Dial call. dialOptions are set by the DialOption
-// values passed to Dial.
-type dialOptions struct {
- unaryInt UnaryClientInterceptor
- streamInt StreamClientInterceptor
- cp Compressor
- dc Decompressor
- bs backoff.Strategy
- block bool
- insecure bool
- timeout time.Duration
- scChan <-chan ServiceConfig
- copts transport.ConnectOptions
- callOptions []CallOption
- // This is used by v1 balancer dial option WithBalancer to support v1
- // balancer, and also by WithBalancerName dial option.
- balancerBuilder balancer.Builder
- // This is to support grpclb.
- resolverBuilder resolver.Builder
- waitForHandshake bool
- channelzParentID int64
- disableServiceConfig bool
-}
-
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = math.MaxInt32
+ // http2IOBufSize specifies the buffer size for sending frames.
+ defaultWriteBufSize = 32 * 1024
+ defaultReadBufSize = 32 * 1024
)
// RegisterChannelz turns on channelz service.
@@ -129,312 +107,6 @@ func RegisterChannelz() {
channelz.TurnOn()
}
-// DialOption configures how we set up the connection.
-type DialOption func(*dialOptions)
-
-// WithWaitForHandshake blocks until the initial settings frame is received from the
-// server before assigning RPCs to the connection.
-// Experimental API.
-func WithWaitForHandshake() DialOption {
- return func(o *dialOptions) {
- o.waitForHandshake = true
- }
-}
-
-// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
-// before doing a write on the wire.
-func WithWriteBufferSize(s int) DialOption {
- return func(o *dialOptions) {
- o.copts.WriteBufferSize = s
- }
-}
-
-// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
-// for each read syscall.
-func WithReadBufferSize(s int) DialOption {
- return func(o *dialOptions) {
- o.copts.ReadBufferSize = s
- }
-}
-
-// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
-// The lower bound for window size is 64K and any value smaller than that will be ignored.
-func WithInitialWindowSize(s int32) DialOption {
- return func(o *dialOptions) {
- o.copts.InitialWindowSize = s
- }
-}
-
-// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
-// The lower bound for window size is 64K and any value smaller than that will be ignored.
-func WithInitialConnWindowSize(s int32) DialOption {
- return func(o *dialOptions) {
- o.copts.InitialConnWindowSize = s
- }
-}
-
-// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
-//
-// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
-func WithMaxMsgSize(s int) DialOption {
- return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
-}
-
-// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
-func WithDefaultCallOptions(cos ...CallOption) DialOption {
- return func(o *dialOptions) {
- o.callOptions = append(o.callOptions, cos...)
- }
-}
-
-// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
-//
-// Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead.
-func WithCodec(c Codec) DialOption {
- return WithDefaultCallOptions(CallCustomCodec(c))
-}
-
-// WithCompressor returns a DialOption which sets a Compressor to use for
-// message compression. It has lower priority than the compressor set by
-// the UseCompressor CallOption.
-//
-// Deprecated: use UseCompressor instead.
-func WithCompressor(cp Compressor) DialOption {
- return func(o *dialOptions) {
- o.cp = cp
- }
-}
-
-// WithDecompressor returns a DialOption which sets a Decompressor to use for
-// incoming message decompression. If incoming response messages are encoded
-// using the decompressor's Type(), it will be used. Otherwise, the message
-// encoding will be used to look up the compressor registered via
-// encoding.RegisterCompressor, which will then be used to decompress the
-// message. If no compressor is registered for the encoding, an Unimplemented
-// status error will be returned.
-//
-// Deprecated: use encoding.RegisterCompressor instead.
-func WithDecompressor(dc Decompressor) DialOption {
- return func(o *dialOptions) {
- o.dc = dc
- }
-}
-
-// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
-// Name resolver will be ignored if this DialOption is specified.
-//
-// Deprecated: use the new balancer APIs in balancer package and WithBalancerName.
-func WithBalancer(b Balancer) DialOption {
- return func(o *dialOptions) {
- o.balancerBuilder = &balancerWrapperBuilder{
- b: b,
- }
- }
-}
-
-// WithBalancerName sets the balancer that the ClientConn will be initialized
-// with. Balancer registered with balancerName will be used. This function
-// panics if no balancer was registered by balancerName.
-//
-// The balancer cannot be overridden by balancer option specified by service
-// config.
-//
-// This is an EXPERIMENTAL API.
-func WithBalancerName(balancerName string) DialOption {
- builder := balancer.Get(balancerName)
- if builder == nil {
- panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
- }
- return func(o *dialOptions) {
- o.balancerBuilder = builder
- }
-}
-
-// withResolverBuilder is only for grpclb.
-func withResolverBuilder(b resolver.Builder) DialOption {
- return func(o *dialOptions) {
- o.resolverBuilder = b
- }
-}
-
-// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
-//
-// Deprecated: service config should be received through name resolver, as specified here.
-// https://github.com/grpc/grpc/blob/master/doc/service_config.md
-func WithServiceConfig(c <-chan ServiceConfig) DialOption {
- return func(o *dialOptions) {
- o.scChan = c
- }
-}
-
-// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
-// when backing off after failed connection attempts.
-func WithBackoffMaxDelay(md time.Duration) DialOption {
- return WithBackoffConfig(BackoffConfig{MaxDelay: md})
-}
-
-// WithBackoffConfig configures the dialer to use the provided backoff
-// parameters after connection failures.
-//
-// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
-// for use.
-func WithBackoffConfig(b BackoffConfig) DialOption {
-
- return withBackoff(backoff.Exponential{
- MaxDelay: b.MaxDelay,
- })
-}
-
-// withBackoff sets the backoff strategy used for connectRetryNum after a
-// failed connection attempt.
-//
-// This can be exported if arbitrary backoff strategies are allowed by gRPC.
-func withBackoff(bs backoff.Strategy) DialOption {
- return func(o *dialOptions) {
- o.bs = bs
- }
-}
-
-// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
-// connection is up. Without this, Dial returns immediately and connecting the server
-// happens in background.
-func WithBlock() DialOption {
- return func(o *dialOptions) {
- o.block = true
- }
-}
-
-// WithInsecure returns a DialOption which disables transport security for this ClientConn.
-// Note that transport security is required unless WithInsecure is set.
-func WithInsecure() DialOption {
- return func(o *dialOptions) {
- o.insecure = true
- }
-}
-
-// WithTransportCredentials returns a DialOption which configures a
-// connection level security credentials (e.g., TLS/SSL).
-func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
- return func(o *dialOptions) {
- o.copts.TransportCredentials = creds
- }
-}
-
-// WithPerRPCCredentials returns a DialOption which sets
-// credentials and places auth state on each outbound RPC.
-func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
- return func(o *dialOptions) {
- o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
- }
-}
-
-// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
-// initially. This is valid if and only if WithBlock() is present.
-//
-// Deprecated: use DialContext and context.WithTimeout instead.
-func WithTimeout(d time.Duration) DialOption {
- return func(o *dialOptions) {
- o.timeout = d
- }
-}
-
-func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
- return func(o *dialOptions) {
- o.copts.Dialer = f
- }
-}
-
-func init() {
- internal.WithContextDialer = withContextDialer
- internal.WithResolverBuilder = withResolverBuilder
-}
-
-// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
-// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
-// Temporary() method to decide if it should try to reconnect to the network address.
-func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
- return withContextDialer(
- func(ctx context.Context, addr string) (net.Conn, error) {
- if deadline, ok := ctx.Deadline(); ok {
- return f(addr, deadline.Sub(time.Now()))
- }
- return f(addr, 0)
- })
-}
-
-// WithStatsHandler returns a DialOption that specifies the stats handler
-// for all the RPCs and underlying network connections in this ClientConn.
-func WithStatsHandler(h stats.Handler) DialOption {
- return func(o *dialOptions) {
- o.copts.StatsHandler = h
- }
-}
-
-// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
-// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
-// address and won't try to reconnect.
-// The default value of FailOnNonTempDialError is false.
-// This is an EXPERIMENTAL API.
-func FailOnNonTempDialError(f bool) DialOption {
- return func(o *dialOptions) {
- o.copts.FailOnNonTempDialError = f
- }
-}
-
-// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
-func WithUserAgent(s string) DialOption {
- return func(o *dialOptions) {
- o.copts.UserAgent = s
- }
-}
-
-// WithKeepaliveParams returns a DialOption that specifies keepalive parameters for the client transport.
-func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
- return func(o *dialOptions) {
- o.copts.KeepaliveParams = kp
- }
-}
-
-// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
-func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
- return func(o *dialOptions) {
- o.unaryInt = f
- }
-}
-
-// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
-func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
- return func(o *dialOptions) {
- o.streamInt = f
- }
-}
-
-// WithAuthority returns a DialOption that specifies the value to be used as
-// the :authority pseudo-header. This value only works with WithInsecure and
-// has no effect if TransportCredentials are present.
-func WithAuthority(a string) DialOption {
- return func(o *dialOptions) {
- o.copts.Authority = a
- }
-}
-
-// WithChannelzParentID returns a DialOption that specifies the channelz ID of current ClientConn's
-// parent. This function is used in nested channel creation (e.g. grpclb dial).
-func WithChannelzParentID(id int64) DialOption {
- return func(o *dialOptions) {
- o.channelzParentID = id
- }
-}
-
-// WithDisableServiceConfig returns a DialOption that causes grpc to ignore any
-// service config provided by the resolver and provides a hint to the resolver
-// to not fetch service configs.
-func WithDisableServiceConfig() DialOption {
- return func(o *dialOptions) {
- o.disableServiceConfig = true
- }
-}
-
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
@@ -458,16 +130,17 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
- target: target,
- csMgr: &connectivityStateManager{},
- conns: make(map[*addrConn]struct{}),
-
+ target: target,
+ csMgr: &connectivityStateManager{},
+ conns: make(map[*addrConn]struct{}),
+ dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
}
+ cc.retryThrottler.Store((*retryThrottler)(nil))
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
- opt(&cc.dopts)
+ opt.apply(&cc.dopts)
}
if channelz.IsOn() {
@@ -567,8 +240,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
- } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
- cc.authority = cc.dopts.copts.Authority
+ } else if cc.dopts.insecure && cc.dopts.authority != "" {
+ cc.authority = cc.dopts.authority
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
@@ -699,6 +372,7 @@ type ClientConn struct {
preBalancerName string // previous balancer name.
curAddresses []resolver.Address
balancerWrapper *ccBalancerWrapper
+ retryThrottler atomic.Value
channelzID int64 // channelz unique identification number
czmu sync.RWMutex
@@ -830,8 +504,6 @@ func (cc *ClientConn) switchBalancer(name string) {
if cc.balancerWrapper != nil {
cc.balancerWrapper.close()
}
- // Clear all stickiness state.
- cc.blockingpicker.clearStickinessState()
builder := balancer.Get(name)
if builder == nil {
@@ -908,6 +580,12 @@ func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric {
}
}
+// Target returns the target string of the ClientConn.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) Target() string {
+ return cc.target
+}
+
func (cc *ClientConn) incrCallsStarted() {
cc.czmu.Lock()
cc.callsStarted++
@@ -1012,8 +690,10 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
return m
}
-func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) {
- t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{})
+func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
+ t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
+ FullMethodName: method,
+ })
if err != nil {
return nil, nil, toRPCErr(err)
}
@@ -1033,6 +713,19 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
cc.mu.Lock()
cc.scRaw = js
cc.sc = sc
+
+ if sc.retryThrottling != nil {
+ newThrottler := &retryThrottler{
+ tokens: sc.retryThrottling.MaxTokens,
+ max: sc.retryThrottling.MaxTokens,
+ thresh: sc.retryThrottling.MaxTokens / 2,
+ ratio: sc.retryThrottling.TokenRatio,
+ }
+ cc.retryThrottler.Store(newThrottler)
+ } else {
+ cc.retryThrottler.Store((*retryThrottler)(nil))
+ }
+
if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
if cc.curBalancerName == grpclbName {
// If current balancer is grpclb, there's at least one grpclb
@@ -1047,17 +740,6 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
}
}
- if envConfigStickinessOn {
- var newStickinessMDKey string
- if sc.stickinessMetadataKey != nil && *sc.stickinessMetadataKey != "" {
- newStickinessMDKey = *sc.stickinessMetadataKey
- }
- // newStickinessMDKey is "" if one of the following happens:
- // - stickinessMetadataKey is set to ""
- // - stickinessMetadataKey field doesn't exist in service config
- cc.blockingpicker.updateStickinessMDKey(strings.ToLower(newStickinessMDKey))
- }
-
cc.mu.Unlock()
return nil
}
@@ -1311,7 +993,7 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
// Didn't receive server preface, must kill this new transport now.
grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
newTr.Close()
- break
+ continue
case <-ac.ctx.Done():
}
}
@@ -1447,46 +1129,6 @@ func (ac *addrConn) transportMonitor() {
}
}
-// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
-// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
-func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
- for {
- ac.mu.Lock()
- switch {
- case ac.state == connectivity.Shutdown:
- if failfast || !hasBalancer {
- // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
- err := ac.tearDownErr
- ac.mu.Unlock()
- return nil, err
- }
- ac.mu.Unlock()
- return nil, errConnClosing
- case ac.state == connectivity.Ready:
- ct := ac.transport
- ac.mu.Unlock()
- return ct, nil
- case ac.state == connectivity.TransientFailure:
- if failfast || hasBalancer {
- ac.mu.Unlock()
- return nil, errConnUnavailable
- }
- }
- ready := ac.ready
- if ready == nil {
- ready = make(chan struct{})
- ac.ready = ready
- }
- ac.mu.Unlock()
- select {
- case <-ctx.Done():
- return nil, toRPCErr(ctx.Err())
- // Wait until the new transport is ready or failed.
- case <-ready:
- }
- }
-}
-
// getReadyTransport returns the transport if ac's state is READY.
// Otherwise it returns nil, false.
// If ac's state is IDLE, it will trigger ac to connect.
@@ -1551,13 +1193,6 @@ func (ac *addrConn) getState() connectivity.State {
return ac.state
}
-func (ac *addrConn) getCurAddr() (ret resolver.Address) {
- ac.mu.Lock()
- ret = ac.curAddr
- ac.mu.Unlock()
- return
-}
-
func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
ac.mu.Lock()
addr := ac.curAddr.Addr
@@ -1594,6 +1229,43 @@ func (ac *addrConn) incrCallsFailed() {
ac.czmu.Unlock()
}
+type retryThrottler struct {
+ max float64
+ thresh float64
+ ratio float64
+
+ mu sync.Mutex
+ tokens float64 // TODO(dfawley): replace with atomic and remove lock.
+}
+
+// throttle subtracts a retry token from the pool and returns whether a retry
+// should be throttled (disallowed) based upon the retry throttling policy in
+// the service config.
+func (rt *retryThrottler) throttle() bool {
+ if rt == nil {
+ return false
+ }
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ rt.tokens--
+ if rt.tokens < 0 {
+ rt.tokens = 0
+ }
+ return rt.tokens <= rt.thresh
+}
+
+func (rt *retryThrottler) successfulRPC() {
+ if rt == nil {
+ return
+ }
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ rt.tokens += rt.ratio
+ if rt.tokens > rt.max {
+ rt.tokens = rt.max
+ }
+}
+
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//
diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go
index 3351bf0ee..1dae57ab1 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials.go
@@ -31,6 +31,7 @@ import (
"net"
"strings"
+ "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
)
@@ -118,6 +119,18 @@ func (t TLSInfo) AuthType() string {
return "tls"
}
+// GetChannelzSecurityValue returns security info requested by channelz.
+func (t TLSInfo) GetChannelzSecurityValue() ChannelzSecurityValue {
+ v := &TLSChannelzSecurityValue{
+ StandardName: cipherSuiteLookup[t.State.CipherSuite],
+ }
+ // Currently there's no way to get LocalCertificate info from tls package.
+ if len(t.State.PeerCertificates) > 0 {
+ v.RemoteCertificate = t.State.PeerCertificates[0].Raw
+ }
+ return v
+}
+
// tlsCreds is the credentials required for authenticating a connection using TLS.
type tlsCreds struct {
// TLS configuration
@@ -155,7 +168,7 @@ func (c *tlsCreds) ClientHandshake(ctx context.Context, authority string, rawCon
case <-ctx.Done():
return nil, nil, ctx.Err()
}
- return conn, TLSInfo{conn.ConnectionState()}, nil
+ return tlsConn{Conn: conn, rawConn: rawConn}, TLSInfo{conn.ConnectionState()}, nil
}
func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) {
@@ -163,7 +176,7 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
if err := conn.Handshake(); err != nil {
return nil, nil, err
}
- return conn, TLSInfo{conn.ConnectionState()}, nil
+ return tlsConn{Conn: conn, rawConn: rawConn}, TLSInfo{conn.ConnectionState()}, nil
}
func (c *tlsCreds) Clone() TransportCredentials {
@@ -218,3 +231,63 @@ func NewServerTLSFromFile(certFile, keyFile string) (TransportCredentials, error
}
return NewTLS(&tls.Config{Certificates: []tls.Certificate{cert}}), nil
}
+
+// ChannelzSecurityInfo defines the interface that security protocols should implement
+// in order to provide security info to channelz.
+type ChannelzSecurityInfo interface {
+ GetSecurityValue() ChannelzSecurityValue
+}
+
+// ChannelzSecurityValue defines the interface that GetSecurityValue() return value
+// should satisfy. This interface should only be satisfied by *TLSChannelzSecurityValue
+// and *OtherChannelzSecurityValue.
+type ChannelzSecurityValue interface {
+ isChannelzSecurityValue()
+}
+
+// TLSChannelzSecurityValue defines the struct that TLS protocol should return
+// from GetSecurityValue(), containing security info like cipher and certificate used.
+type TLSChannelzSecurityValue struct {
+ StandardName string
+ LocalCertificate []byte
+ RemoteCertificate []byte
+}
+
+func (*TLSChannelzSecurityValue) isChannelzSecurityValue() {}
+
+// OtherChannelzSecurityValue defines the struct that non-TLS protocol should return
+// from GetSecurityValue(), which contains protocol specific security info. Note
+// the Value field will be sent to users of channelz requesting channel info, and
+// thus sensitive info should better be avoided.
+type OtherChannelzSecurityValue struct {
+ Name string
+ Value proto.Message
+}
+
+func (*OtherChannelzSecurityValue) isChannelzSecurityValue() {}
+
+type tlsConn struct {
+ *tls.Conn
+ rawConn net.Conn
+}
+
+var cipherSuiteLookup = map[uint16]string{
+ tls.TLS_RSA_WITH_RC4_128_SHA: "TLS_RSA_WITH_RC4_128_SHA",
+ tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA: "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
+ tls.TLS_RSA_WITH_AES_128_CBC_SHA: "TLS_RSA_WITH_AES_128_CBC_SHA",
+ tls.TLS_RSA_WITH_AES_256_CBC_SHA: "TLS_RSA_WITH_AES_256_CBC_SHA",
+ tls.TLS_RSA_WITH_AES_128_GCM_SHA256: "TLS_RSA_WITH_AES_128_GCM_SHA256",
+ tls.TLS_RSA_WITH_AES_256_GCM_SHA384: "TLS_RSA_WITH_AES_256_GCM_SHA384",
+ tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA: "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA: "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA: "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
+ tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA: "TLS_ECDHE_RSA_WITH_RC4_128_SHA",
+ tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA: "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
+ tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
+ tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA: "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
+ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
+ tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384: "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
+ tls.TLS_FALLBACK_SCSV: "TLS_FALLBACK_SCSV",
+}
diff --git a/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go b/vendor/google.golang.org/grpc/credentials/go16.go
index d6bbcc9fd..d6bbcc9fd 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go
+++ b/vendor/google.golang.org/grpc/credentials/go16.go
diff --git a/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go b/vendor/google.golang.org/grpc/credentials/go17.go
index 60409aac0..fbd500002 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go
+++ b/vendor/google.golang.org/grpc/credentials/go17.go
@@ -1,5 +1,4 @@
-// +build go1.7
-// +build !go1.8
+// +build go1.7,!go1.8
/*
*
diff --git a/vendor/google.golang.org/grpc/credentials/credentials_util_go18.go b/vendor/google.golang.org/grpc/credentials/go18.go
index 93f0e1d8d..db30d46cc 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials_util_go18.go
+++ b/vendor/google.golang.org/grpc/credentials/go18.go
@@ -24,6 +24,14 @@ import (
"crypto/tls"
)
+func init() {
+ cipherSuiteLookup[tls.TLS_RSA_WITH_AES_128_CBC_SHA256] = "TLS_RSA_WITH_AES_128_CBC_SHA256"
+ cipherSuiteLookup[tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256] = "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256"
+ cipherSuiteLookup[tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256] = "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256"
+ cipherSuiteLookup[tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305] = "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305"
+ cipherSuiteLookup[tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305] = "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305"
+}
+
// cloneTLSConfig returns a shallow clone of the exported
// fields of cfg, ignoring the unexported sync.Once, which
// contains a mutex and must not be copied.
diff --git a/vendor/google.golang.org/grpc/credentials/go19.go b/vendor/google.golang.org/grpc/credentials/go19.go
new file mode 100644
index 000000000..2a4ca1a57
--- /dev/null
+++ b/vendor/google.golang.org/grpc/credentials/go19.go
@@ -0,0 +1,35 @@
+// +build go1.9,!appengine
+
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package credentials
+
+import (
+ "errors"
+ "syscall"
+)
+
+// implements the syscall.Conn interface
+func (c tlsConn) SyscallConn() (syscall.RawConn, error) {
+ conn, ok := c.rawConn.(syscall.Conn)
+ if !ok {
+ return nil, errors.New("RawConn does not implement syscall.Conn")
+ }
+ return conn.SyscallConn()
+}
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
new file mode 100644
index 000000000..20accf1a1
--- /dev/null
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -0,0 +1,450 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "fmt"
+ "net"
+ "time"
+
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal"
+ "google.golang.org/grpc/internal/backoff"
+ "google.golang.org/grpc/internal/envconfig"
+ "google.golang.org/grpc/internal/transport"
+ "google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/stats"
+)
+
+// dialOptions configure a Dial call. dialOptions are set by the DialOption
+// values passed to Dial.
+type dialOptions struct {
+ unaryInt UnaryClientInterceptor
+ streamInt StreamClientInterceptor
+ cp Compressor
+ dc Decompressor
+ bs backoff.Strategy
+ block bool
+ insecure bool
+ timeout time.Duration
+ scChan <-chan ServiceConfig
+ authority string
+ copts transport.ConnectOptions
+ callOptions []CallOption
+ // This is used by v1 balancer dial option WithBalancer to support v1
+ // balancer, and also by WithBalancerName dial option.
+ balancerBuilder balancer.Builder
+ // This is to support grpclb.
+ resolverBuilder resolver.Builder
+ waitForHandshake bool
+ channelzParentID int64
+ disableServiceConfig bool
+ disableRetry bool
+}
+
+// DialOption configures how we set up the connection.
+type DialOption interface {
+ apply(*dialOptions)
+}
+
+// EmptyDialOption does not alter the dial configuration. It can be embedded in
+// another structure to build custom dial options.
+//
+// This API is EXPERIMENTAL.
+type EmptyDialOption struct{}
+
+func (EmptyDialOption) apply(*dialOptions) {}
+
+// funcDialOption wraps a function that modifies dialOptions into an
+// implementation of the DialOption interface.
+type funcDialOption struct {
+ f func(*dialOptions)
+}
+
+func (fdo *funcDialOption) apply(do *dialOptions) {
+ fdo.f(do)
+}
+
+func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
+ return &funcDialOption{
+ f: f,
+ }
+}
+
+// WithWaitForHandshake blocks until the initial settings frame is received from
+// the server before assigning RPCs to the connection. Experimental API.
+func WithWaitForHandshake() DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.waitForHandshake = true
+ })
+}
+
+// WithWriteBufferSize determines how much data can be batched before doing a
+// write on the wire. The corresponding memory allocation for this buffer will
+// be twice the size to keep syscalls low. The default value for this buffer is
+// 32KB.
+//
+// Zero will disable the write buffer such that each write will be on underlying
+// connection. Note: A Send call may not directly translate to a write.
+func WithWriteBufferSize(s int) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.WriteBufferSize = s
+ })
+}
+
+// WithReadBufferSize lets you set the size of read buffer, this determines how
+// much data can be read at most for each read syscall.
+//
+// The default value for this buffer is 32KB. Zero will disable read buffer for
+// a connection so data framer can access the underlying conn directly.
+func WithReadBufferSize(s int) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.ReadBufferSize = s
+ })
+}
+
+// WithInitialWindowSize returns a DialOption which sets the value for initial
+// window size on a stream. The lower bound for window size is 64K and any value
+// smaller than that will be ignored.
+func WithInitialWindowSize(s int32) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.InitialWindowSize = s
+ })
+}
+
+// WithInitialConnWindowSize returns a DialOption which sets the value for
+// initial window size on a connection. The lower bound for window size is 64K
+// and any value smaller than that will be ignored.
+func WithInitialConnWindowSize(s int32) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.InitialConnWindowSize = s
+ })
+}
+
+// WithMaxMsgSize returns a DialOption which sets the maximum message size the
+// client can receive.
+//
+// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
+func WithMaxMsgSize(s int) DialOption {
+ return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
+}
+
+// WithDefaultCallOptions returns a DialOption which sets the default
+// CallOptions for calls over the connection.
+func WithDefaultCallOptions(cos ...CallOption) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.callOptions = append(o.callOptions, cos...)
+ })
+}
+
+// WithCodec returns a DialOption which sets a codec for message marshaling and
+// unmarshaling.
+//
+// Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead.
+func WithCodec(c Codec) DialOption {
+ return WithDefaultCallOptions(CallCustomCodec(c))
+}
+
+// WithCompressor returns a DialOption which sets a Compressor to use for
+// message compression. It has lower priority than the compressor set by the
+// UseCompressor CallOption.
+//
+// Deprecated: use UseCompressor instead.
+func WithCompressor(cp Compressor) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.cp = cp
+ })
+}
+
+// WithDecompressor returns a DialOption which sets a Decompressor to use for
+// incoming message decompression. If incoming response messages are encoded
+// using the decompressor's Type(), it will be used. Otherwise, the message
+// encoding will be used to look up the compressor registered via
+// encoding.RegisterCompressor, which will then be used to decompress the
+// message. If no compressor is registered for the encoding, an Unimplemented
+// status error will be returned.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
+func WithDecompressor(dc Decompressor) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.dc = dc
+ })
+}
+
+// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
+// Name resolver will be ignored if this DialOption is specified.
+//
+// Deprecated: use the new balancer APIs in balancer package and
+// WithBalancerName.
+func WithBalancer(b Balancer) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.balancerBuilder = &balancerWrapperBuilder{
+ b: b,
+ }
+ })
+}
+
+// WithBalancerName sets the balancer that the ClientConn will be initialized
+// with. Balancer registered with balancerName will be used. This function
+// panics if no balancer was registered by balancerName.
+//
+// The balancer cannot be overridden by balancer option specified by service
+// config.
+//
+// This is an EXPERIMENTAL API.
+func WithBalancerName(balancerName string) DialOption {
+ builder := balancer.Get(balancerName)
+ if builder == nil {
+ panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
+ }
+ return newFuncDialOption(func(o *dialOptions) {
+ o.balancerBuilder = builder
+ })
+}
+
+// withResolverBuilder is only for grpclb.
+func withResolverBuilder(b resolver.Builder) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.resolverBuilder = b
+ })
+}
+
+// WithServiceConfig returns a DialOption which has a channel to read the
+// service configuration.
+//
+// Deprecated: service config should be received through name resolver, as
+// specified here.
+// https://github.com/grpc/grpc/blob/master/doc/service_config.md
+func WithServiceConfig(c <-chan ServiceConfig) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.scChan = c
+ })
+}
+
+// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
+// when backing off after failed connection attempts.
+func WithBackoffMaxDelay(md time.Duration) DialOption {
+ return WithBackoffConfig(BackoffConfig{MaxDelay: md})
+}
+
+// WithBackoffConfig configures the dialer to use the provided backoff
+// parameters after connection failures.
+//
+// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
+// for use.
+func WithBackoffConfig(b BackoffConfig) DialOption {
+ return withBackoff(backoff.Exponential{
+ MaxDelay: b.MaxDelay,
+ })
+}
+
+// withBackoff sets the backoff strategy used for connectRetryNum after a failed
+// connection attempt.
+//
+// This can be exported if arbitrary backoff strategies are allowed by gRPC.
+func withBackoff(bs backoff.Strategy) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.bs = bs
+ })
+}
+
+// WithBlock returns a DialOption which makes caller of Dial blocks until the
+// underlying connection is up. Without this, Dial returns immediately and
+// connecting the server happens in background.
+func WithBlock() DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.block = true
+ })
+}
+
+// WithInsecure returns a DialOption which disables transport security for this
+// ClientConn. Note that transport security is required unless WithInsecure is
+// set.
+func WithInsecure() DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.insecure = true
+ })
+}
+
+// WithTransportCredentials returns a DialOption which configures a connection
+// level security credentials (e.g., TLS/SSL).
+func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.TransportCredentials = creds
+ })
+}
+
+// WithPerRPCCredentials returns a DialOption which sets credentials and places
+// auth state on each outbound RPC.
+func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
+ })
+}
+
+// WithTimeout returns a DialOption that configures a timeout for dialing a
+// ClientConn initially. This is valid if and only if WithBlock() is present.
+//
+// Deprecated: use DialContext and context.WithTimeout instead.
+func WithTimeout(d time.Duration) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.timeout = d
+ })
+}
+
+func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.Dialer = f
+ })
+}
+
+func init() {
+ internal.WithContextDialer = withContextDialer
+ internal.WithResolverBuilder = withResolverBuilder
+}
+
+// WithDialer returns a DialOption that specifies a function to use for dialing
+// network addresses. If FailOnNonTempDialError() is set to true, and an error
+// is returned by f, gRPC checks the error's Temporary() method to decide if it
+// should try to reconnect to the network address.
+func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
+ return withContextDialer(
+ func(ctx context.Context, addr string) (net.Conn, error) {
+ if deadline, ok := ctx.Deadline(); ok {
+ return f(addr, deadline.Sub(time.Now()))
+ }
+ return f(addr, 0)
+ })
+}
+
+// WithStatsHandler returns a DialOption that specifies the stats handler for
+// all the RPCs and underlying network connections in this ClientConn.
+func WithStatsHandler(h stats.Handler) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.StatsHandler = h
+ })
+}
+
+// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
+// non-temporary dial errors. If f is true, and dialer returns a non-temporary
+// error, gRPC will fail the connection to the network address and won't try to
+// reconnect. The default value of FailOnNonTempDialError is false.
+//
+// This is an EXPERIMENTAL API.
+func FailOnNonTempDialError(f bool) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.FailOnNonTempDialError = f
+ })
+}
+
+// WithUserAgent returns a DialOption that specifies a user agent string for all
+// the RPCs.
+func WithUserAgent(s string) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.UserAgent = s
+ })
+}
+
+// WithKeepaliveParams returns a DialOption that specifies keepalive parameters
+// for the client transport.
+func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.KeepaliveParams = kp
+ })
+}
+
+// WithUnaryInterceptor returns a DialOption that specifies the interceptor for
+// unary RPCs.
+func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.unaryInt = f
+ })
+}
+
+// WithStreamInterceptor returns a DialOption that specifies the interceptor for
+// streaming RPCs.
+func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.streamInt = f
+ })
+}
+
+// WithAuthority returns a DialOption that specifies the value to be used as the
+// :authority pseudo-header. This value only works with WithInsecure and has no
+// effect if TransportCredentials are present.
+func WithAuthority(a string) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.authority = a
+ })
+}
+
+// WithChannelzParentID returns a DialOption that specifies the channelz ID of
+// current ClientConn's parent. This function is used in nested channel creation
+// (e.g. grpclb dial).
+func WithChannelzParentID(id int64) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.channelzParentID = id
+ })
+}
+
+// WithDisableServiceConfig returns a DialOption that causes grpc to ignore any
+// service config provided by the resolver and provides a hint to the resolver
+// to not fetch service configs.
+func WithDisableServiceConfig() DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.disableServiceConfig = true
+ })
+}
+
+// WithDisableRetry returns a DialOption that disables retries, even if the
+// service config enables them. This does not impact transparent retries, which
+// will happen automatically if no data is written to the wire or if the RPC is
+// unprocessed by the remote server.
+//
+// Retry support is currently disabled by default, but will be enabled by
+// default in the future. Until then, it may be enabled by setting the
+// environment variable "GRPC_GO_RETRY" to "on".
+//
+// This API is EXPERIMENTAL.
+func WithDisableRetry() DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.disableRetry = true
+ })
+}
+
+// WithMaxHeaderListSize returns a DialOption that specifies the maximum
+// (uncompressed) size of header list that the client is prepared to accept.
+func WithMaxHeaderListSize(s uint32) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.copts.MaxHeaderListSize = &s
+ })
+}
+
+func defaultDialOptions() dialOptions {
+ return dialOptions{
+ disableRetry: !envconfig.Retry,
+ copts: transport.ConnectOptions{
+ WriteBufferSize: defaultWriteBufSize,
+ ReadBufferSize: defaultReadBufSize,
+ },
+ }
+}
diff --git a/vendor/google.golang.org/grpc/go16.go b/vendor/google.golang.org/grpc/go16.go
index 535ee9356..b1db21af6 100644
--- a/vendor/google.golang.org/grpc/go16.go
+++ b/vendor/google.golang.org/grpc/go16.go
@@ -28,8 +28,8 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status"
- "google.golang.org/grpc/transport"
)
// dialContext connects to the address on the named network.
@@ -50,12 +50,13 @@ func toRPCErr(err error) error {
if err == nil || err == io.EOF {
return err
}
+ if err == io.ErrUnexpectedEOF {
+ return status.Error(codes.Internal, err.Error())
+ }
if _, ok := status.FromError(err); ok {
return err
}
switch e := err.(type) {
- case transport.StreamError:
- return status.Error(e.Code, e.Desc)
case transport.ConnectionError:
return status.Error(codes.Unavailable, e.Desc)
default:
diff --git a/vendor/google.golang.org/grpc/go17.go b/vendor/google.golang.org/grpc/go17.go
index ec676a93c..71a72e8fe 100644
--- a/vendor/google.golang.org/grpc/go17.go
+++ b/vendor/google.golang.org/grpc/go17.go
@@ -29,8 +29,8 @@ import (
netctx "golang.org/x/net/context"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status"
- "google.golang.org/grpc/transport"
)
// dialContext connects to the address on the named network.
@@ -51,12 +51,13 @@ func toRPCErr(err error) error {
if err == nil || err == io.EOF {
return err
}
+ if err == io.ErrUnexpectedEOF {
+ return status.Error(codes.Internal, err.Error())
+ }
if _, ok := status.FromError(err); ok {
return err
}
switch e := err.(type) {
- case transport.StreamError:
- return status.Error(e.Code, e.Desc)
case transport.ConnectionError:
return status.Error(codes.Unavailable, e.Desc)
default:
diff --git a/vendor/google.golang.org/grpc/install_gae.sh b/vendor/google.golang.org/grpc/install_gae.sh
new file mode 100755
index 000000000..d4236f3b8
--- /dev/null
+++ b/vendor/google.golang.org/grpc/install_gae.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+TMP=$(mktemp -d /tmp/sdk.XXX) \
+&& curl -o $TMP.zip "https://storage.googleapis.com/appengine-sdks/featured/go_appengine_sdk_linux_amd64-1.9.64.zip" \
+&& unzip -q $TMP.zip -d $TMP \
+&& export PATH="$PATH:$TMP/go_appengine"
diff --git a/vendor/google.golang.org/grpc/internal/channelz/types.go b/vendor/google.golang.org/grpc/internal/channelz/types.go
index 153d75340..6fd6bb388 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/types.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/types.go
@@ -23,6 +23,7 @@ import (
"time"
"google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
@@ -281,9 +282,9 @@ type SocketInternalMetric struct {
RemoteAddr net.Addr
// Optional, represents the name of the remote endpoint, if different than
// the original target name.
- RemoteName string
- //TODO: socket options
- //TODO: Security
+ RemoteName string
+ SocketOptions *SocketOptionData
+ Security credentials.ChannelzSecurityValue
}
// Socket is the interface that should be satisfied in order to be tracked by
diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_linux.go b/vendor/google.golang.org/grpc/internal/channelz/types_linux.go
new file mode 100644
index 000000000..9801c3e2c
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/types_linux.go
@@ -0,0 +1,54 @@
+// +build !appengine,go1.7
+
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import (
+ "syscall"
+
+ "golang.org/x/sys/unix"
+)
+
+// SocketOptionData defines the struct to hold socket option data, and related
+// getter function to obtain info from fd.
+type SocketOptionData struct {
+ Linger *unix.Linger
+ RecvTimeout *unix.Timeval
+ SendTimeout *unix.Timeval
+ TCPInfo *unix.TCPInfo
+}
+
+// Getsockopt defines the function to get socket options requested by channelz.
+// It is to be passed to syscall.RawConn.Control().
+func (s *SocketOptionData) Getsockopt(fd uintptr) {
+ if v, err := unix.GetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER); err == nil {
+ s.Linger = v
+ }
+ if v, err := unix.GetsockoptTimeval(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVTIMEO); err == nil {
+ s.RecvTimeout = v
+ }
+ if v, err := unix.GetsockoptTimeval(int(fd), syscall.SOL_SOCKET, syscall.SO_SNDTIMEO); err == nil {
+ s.SendTimeout = v
+ }
+ if v, err := unix.GetsockoptTCPInfo(int(fd), syscall.SOL_TCP, syscall.TCP_INFO); err == nil {
+ s.TCPInfo = v
+ }
+ return
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go
new file mode 100644
index 000000000..884910c4e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go
@@ -0,0 +1,38 @@
+// +build !linux appengine !go1.7
+
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import "google.golang.org/grpc/grpclog"
+
+func init() {
+ grpclog.Infof("Channelz: socket options are not supported on non-linux os and appengine.")
+}
+
+// SocketOptionData defines the struct to hold socket option data, and related
+// getter function to obtain info from fd.
+// Windows OS doesn't support Socket Option
+type SocketOptionData struct {
+}
+
+// Getsockopt defines the function to get socket options requested by channelz.
+// It is to be passed to syscall.RawConn.Control().
+// Windows OS doesn't support Socket Option
+func (s *SocketOptionData) Getsockopt(fd uintptr) {}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_linux_go19.go b/vendor/google.golang.org/grpc/internal/channelz/util_linux_go19.go
new file mode 100644
index 000000000..e1e9e32d7
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/util_linux_go19.go
@@ -0,0 +1,39 @@
+// +build linux,go1.9,!appengine
+
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import (
+ "syscall"
+)
+
+// GetSocketOption gets the socket option info of the conn.
+func GetSocketOption(socket interface{}) *SocketOptionData {
+ c, ok := socket.(syscall.Conn)
+ if !ok {
+ return nil
+ }
+ data := &SocketOptionData{}
+ if rawConn, err := c.SyscallConn(); err == nil {
+ rawConn.Control(data.Getsockopt)
+ return data
+ }
+ return nil
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux_pre_go19.go b/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux_pre_go19.go
new file mode 100644
index 000000000..1d4da952d
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux_pre_go19.go
@@ -0,0 +1,26 @@
+// +build !linux !go1.9 appengine
+
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+// GetSocketOption gets the socket option info of the conn.
+func GetSocketOption(c interface{}) *SocketOptionData {
+ return nil
+}
diff --git a/vendor/google.golang.org/grpc/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index d50178e51..3ee8740f1 100644
--- a/vendor/google.golang.org/grpc/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -16,7 +16,8 @@
*
*/
-package grpc
+// Package envconfig contains grpc settings configured by environment variables.
+package envconfig
import (
"os"
@@ -24,14 +25,11 @@ import (
)
const (
- envConfigPrefix = "GRPC_GO_"
- envConfigStickinessStr = envConfigPrefix + "STICKINESS"
+ prefix = "GRPC_GO_"
+ retryStr = prefix + "RETRY"
)
var (
- envConfigStickinessOn bool
+ // Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
+ Retry = strings.EqualFold(os.Getenv(retryStr), "on")
)
-
-func init() {
- envConfigStickinessOn = strings.EqualFold(os.Getenv(envConfigStickinessStr), "on")
-}
diff --git a/vendor/google.golang.org/grpc/transport/bdp_estimator.go b/vendor/google.golang.org/grpc/internal/transport/bdp_estimator.go
index 63cd2627c..63cd2627c 100644
--- a/vendor/google.golang.org/grpc/transport/bdp_estimator.go
+++ b/vendor/google.golang.org/grpc/internal/transport/bdp_estimator.go
diff --git a/vendor/google.golang.org/grpc/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 5c5891a11..ce135c4d1 100644
--- a/vendor/google.golang.org/grpc/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -229,6 +229,12 @@ func (l *outStreamList) dequeue() *outStream {
return b
}
+// controlBuffer is a way to pass information to loopy.
+// Information is passed as specific struct types called control frames.
+// A control frame not only represents data, messages or headers to be sent out
+// but can also be used to instruct loopy to update its internal state.
+// It shouldn't be confused with an HTTP2 frame, although some of the control frames
+// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
type controlBuffer struct {
ch chan struct{}
done <-chan struct{}
@@ -279,6 +285,21 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{
return true, nil
}
+// Note argument f should never be nil.
+func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
+ c.mu.Lock()
+ if c.err != nil {
+ c.mu.Unlock()
+ return false, c.err
+ }
+ if !f(it) { // f wasn't successful
+ c.mu.Unlock()
+ return false, nil
+ }
+ c.mu.Unlock()
+ return true, nil
+}
+
func (c *controlBuffer) get(block bool) (interface{}, error) {
for {
c.mu.Lock()
@@ -335,13 +356,29 @@ const (
serverSide
)
+// Loopy receives frames from the control buffer.
+// Each frame is handled individually; most of the work done by loopy goes
+// into handling data frames. Loopy maintains a queue of active streams, and each
+// stream maintains a queue of data frames; as loopy receives data frames
+// it gets added to the queue of the relevant stream.
+// Loopy goes over this list of active streams by processing one node every iteration,
+// thereby closely resemebling to a round-robin scheduling over all streams. While
+// processing a stream, loopy writes out data bytes from this stream capped by the min
+// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
type loopyWriter struct {
- side side
- cbuf *controlBuffer
- sendQuota uint32
- oiws uint32 // outbound initial window size.
- estdStreams map[uint32]*outStream // Established streams.
- activeStreams *outStreamList // Streams that are sending data.
+ side side
+ cbuf *controlBuffer
+ sendQuota uint32
+ oiws uint32 // outbound initial window size.
+ // estdStreams is map of all established streams that are not cleaned-up yet.
+ // On client-side, this is all streams whose headers were sent out.
+ // On server-side, this is all streams whose headers were received.
+ estdStreams map[uint32]*outStream // Established streams.
+ // activeStreams is a linked-list of all streams that have data to send and some
+ // stream-level flow control quota.
+ // Each of these streams internally have a list of data items(and perhaps trailers
+ // on the server-side) to be sent out.
+ activeStreams *outStreamList
framer *framer
hBuf *bytes.Buffer // The buffer for HPACK encoding.
hEnc *hpack.Encoder // HPACK encoder.
@@ -372,6 +409,21 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
const minBatchSize = 1000
// run should be run in a separate goroutine.
+// It reads control frames from controlBuf and processes them by:
+// 1. Updating loopy's internal state, or/and
+// 2. Writing out HTTP2 frames on the wire.
+//
+// Loopy keeps all active streams with data to send in a linked-list.
+// All streams in the activeStreams linked-list must have both:
+// 1. Data to send, and
+// 2. Stream level flow control quota available.
+//
+// In each iteration of run loop, other than processing the incoming control
+// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
+// This results in writing of HTTP2 frames into an underlying write buffer.
+// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
+// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
+// if the batch size is too low to give stream goroutines a chance to fill it up.
func (l *loopyWriter) run() (err error) {
defer func() {
if err == ErrConnClosing {
@@ -696,21 +748,30 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
return nil
}
+// processData removes the first stream from active streams, writes out at most 16KB
+// of its data and then puts it at the end of activeStreams if there's still more data
+// to be sent and stream has some stream-level flow control.
func (l *loopyWriter) processData() (bool, error) {
if l.sendQuota == 0 {
return true, nil
}
- str := l.activeStreams.dequeue()
+ str := l.activeStreams.dequeue() // Remove the first stream.
if str == nil {
return true, nil
}
- dataItem := str.itl.peek().(*dataFrame)
- if len(dataItem.h) == 0 && len(dataItem.d) == 0 {
+ dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
+ // A data item is represented by a dataFrame, since it later translates into
+ // multiple HTTP2 data frames.
+ // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
+ // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
+ // maximum possilbe HTTP2 frame size.
+
+ if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
- str.itl.dequeue()
+ str.itl.dequeue() // remove the empty data item from stream
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -739,21 +800,20 @@ func (l *loopyWriter) processData() (bool, error) {
if len(buf) < size {
size = len(buf)
}
- if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 {
+ if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
str.state = waitingOnStreamQuota
return false, nil
} else if strQuota < size {
size = strQuota
}
- if l.sendQuota < uint32(size) {
+ if l.sendQuota < uint32(size) { // connection-level flow control.
size = int(l.sendQuota)
}
// Now that outgoing flow controls are checked we can replenish str's write quota
str.wq.replenish(size)
var endStream bool
- // This last data message on this stream and all
- // of it can be written in this go.
+ // If this is the last data message on this stream and all of it can be written in this iteration.
if dataItem.endStream && size == len(buf) {
// buf contains either data or it contains header but data is empty.
if idx == 1 || len(dataItem.d) == 0 {
diff --git a/vendor/google.golang.org/grpc/internal/transport/defaults.go b/vendor/google.golang.org/grpc/internal/transport/defaults.go
new file mode 100644
index 000000000..9fa306b2e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/transport/defaults.go
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package transport
+
+import (
+ "math"
+ "time"
+)
+
+const (
+ // The default value of flow control window size in HTTP2 spec.
+ defaultWindowSize = 65535
+ // The initial window size for flow control.
+ initialWindowSize = defaultWindowSize // for an RPC
+ infinity = time.Duration(math.MaxInt64)
+ defaultClientKeepaliveTime = infinity
+ defaultClientKeepaliveTimeout = 20 * time.Second
+ defaultMaxStreamsClient = 100
+ defaultMaxConnectionIdle = infinity
+ defaultMaxConnectionAge = infinity
+ defaultMaxConnectionAgeGrace = infinity
+ defaultServerKeepaliveTime = 2 * time.Hour
+ defaultServerKeepaliveTimeout = 20 * time.Second
+ defaultKeepalivePolicyMinTime = 5 * time.Minute
+ // max window limit set by HTTP2 Specs.
+ maxWindowSize = math.MaxInt32
+ // defaultWriteQuota is the default value for number of data
+ // bytes that each stream can schedule before some of it being
+ // flushed out.
+ defaultWriteQuota = 64 * 1024
+ defaultClientMaxHeaderListSize = uint32(16 << 20)
+ defaultServerMaxHeaderListSize = uint32(16 << 20)
+)
diff --git a/vendor/google.golang.org/grpc/transport/flowcontrol.go b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
index bbf98b6f5..5ea997a7e 100644
--- a/vendor/google.golang.org/grpc/transport/flowcontrol.go
+++ b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
@@ -23,30 +23,6 @@ import (
"math"
"sync"
"sync/atomic"
- "time"
-)
-
-const (
- // The default value of flow control window size in HTTP2 spec.
- defaultWindowSize = 65535
- // The initial window size for flow control.
- initialWindowSize = defaultWindowSize // for an RPC
- infinity = time.Duration(math.MaxInt64)
- defaultClientKeepaliveTime = infinity
- defaultClientKeepaliveTimeout = 20 * time.Second
- defaultMaxStreamsClient = 100
- defaultMaxConnectionIdle = infinity
- defaultMaxConnectionAge = infinity
- defaultMaxConnectionAgeGrace = infinity
- defaultServerKeepaliveTime = 2 * time.Hour
- defaultServerKeepaliveTimeout = 20 * time.Second
- defaultKeepalivePolicyMinTime = 5 * time.Minute
- // max window limit set by HTTP2 Specs.
- maxWindowSize = math.MaxInt32
- // defaultWriteQuota is the default value for number of data
- // bytes that each stream can schedule before some of it being
- // flushed out.
- defaultWriteQuota = 64 * 1024
)
// writeQuota is a soft limit on the amount of data a stream can
diff --git a/vendor/google.golang.org/grpc/transport/go16.go b/vendor/google.golang.org/grpc/internal/transport/go16.go
index 5babcf9b8..e0d00115d 100644
--- a/vendor/google.golang.org/grpc/transport/go16.go
+++ b/vendor/google.golang.org/grpc/internal/transport/go16.go
@@ -25,6 +25,7 @@ import (
"net/http"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"golang.org/x/net/context"
)
@@ -34,15 +35,15 @@ func dialContext(ctx context.Context, network, address string) (net.Conn, error)
return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
}
-// ContextErr converts the error from context package into a StreamError.
-func ContextErr(err error) StreamError {
+// ContextErr converts the error from context package into a status error.
+func ContextErr(err error) error {
switch err {
case context.DeadlineExceeded:
- return streamErrorf(codes.DeadlineExceeded, "%v", err)
+ return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled:
- return streamErrorf(codes.Canceled, "%v", err)
+ return status.Error(codes.Canceled, err.Error())
}
- return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
+ return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
}
// contextFromRequest returns a background context.
diff --git a/vendor/google.golang.org/grpc/transport/go17.go b/vendor/google.golang.org/grpc/internal/transport/go17.go
index b7fa6bdb9..4d515b00d 100644
--- a/vendor/google.golang.org/grpc/transport/go17.go
+++ b/vendor/google.golang.org/grpc/internal/transport/go17.go
@@ -26,6 +26,7 @@ import (
"net/http"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
netctx "golang.org/x/net/context"
)
@@ -35,15 +36,15 @@ func dialContext(ctx context.Context, network, address string) (net.Conn, error)
return (&net.Dialer{}).DialContext(ctx, network, address)
}
-// ContextErr converts the error from context package into a StreamError.
-func ContextErr(err error) StreamError {
+// ContextErr converts the error from context package into a status error.
+func ContextErr(err error) error {
switch err {
case context.DeadlineExceeded, netctx.DeadlineExceeded:
- return streamErrorf(codes.DeadlineExceeded, "%v", err)
+ return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled, netctx.Canceled:
- return streamErrorf(codes.Canceled, "%v", err)
+ return status.Error(codes.Canceled, err.Error())
}
- return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
+ return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
}
// contextFromRequest returns a context from the HTTP Request.
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index f71b74821..bc8564345 100644
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -80,7 +80,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v)
if err != nil {
- return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err)
+ return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err)
}
st.timeoutSet = true
st.timeout = to
@@ -98,7 +98,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta
for _, v := range vv {
v, err := decodeMetadataHeader(k, v)
if err != nil {
- return nil, streamErrorf(codes.Internal, "malformed binary metadata: %v", err)
+ return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err)
}
metakv = append(metakv, k, v)
}
@@ -274,9 +274,7 @@ func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts
ht.writeCommonHeaders(s)
ht.rw.Write(hdr)
ht.rw.Write(data)
- if !opts.Delay {
- ht.rw.(http.Flusher).Flush()
- }
+ ht.rw.(http.Flusher).Flush()
})
}
@@ -434,17 +432,14 @@ func (ht *serverHandlerTransport) Drain() {
// * io.EOF
// * io.ErrUnexpectedEOF
// * of type transport.ConnectionError
-// * of type transport.StreamError
+// * an error from the status package
func mapRecvMsgError(err error) error {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return err
}
if se, ok := err.(http2.StreamError); ok {
if code, ok := http2ErrConvTab[se.Code]; ok {
- return StreamError{
- Code: code,
- Desc: se.Error(),
- }
+ return status.Error(code, se.Error())
}
}
return connectionErrorf(true, err, err.Error())
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index eaf007eb0..88d1c1612 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -22,6 +22,7 @@ import (
"io"
"math"
"net"
+ "strconv"
"strings"
"sync"
"sync/atomic"
@@ -84,6 +85,9 @@ type http2Client struct {
initialWindowSize int32
+ // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
+ maxSendHeaderListSize *uint32
+
bdpEst *bdpEstimator
// onSuccess is a callback that client transport calls upon
// receiving server preface to signal that a succefull HTTP2
@@ -148,7 +152,7 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
-func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) {
+func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@@ -196,13 +200,11 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
icwz = opts.InitialConnWindowSize
dynamicWindow = false
}
- writeBufSize := defaultWriteBufSize
- if opts.WriteBufferSize > 0 {
- writeBufSize = opts.WriteBufferSize
- }
- readBufSize := defaultReadBufSize
- if opts.ReadBufferSize > 0 {
- readBufSize = opts.ReadBufferSize
+ writeBufSize := opts.WriteBufferSize
+ readBufSize := opts.ReadBufferSize
+ maxHeaderListSize := defaultClientMaxHeaderListSize
+ if opts.MaxHeaderListSize != nil {
+ maxHeaderListSize = *opts.MaxHeaderListSize
}
t := &http2Client{
ctx: ctx,
@@ -218,7 +220,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
awakenKeepalive: make(chan struct{}, 1),
- framer: newFramer(conn, writeBufSize, readBufSize),
+ framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
activeStreams: make(map[uint32]*Stream),
@@ -278,14 +280,21 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
}
+ var ss []http2.Setting
+
if t.initialWindowSize != defaultWindowSize {
- err = t.framer.fr.WriteSettings(http2.Setting{
+ ss = append(ss, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(t.initialWindowSize),
})
- } else {
- err = t.framer.fr.WriteSettings()
}
+ if opts.MaxHeaderListSize != nil {
+ ss = append(ss, http2.Setting{
+ ID: http2.SettingMaxHeaderListSize,
+ Val: *opts.MaxHeaderListSize,
+ })
+ }
+ err = t.framer.fr.WriteSettings(ss...)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
@@ -379,6 +388,9 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
+ if callHdr.PreviousAttempts > 0 {
+ headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
+ }
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
@@ -464,7 +476,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
return nil, err
}
- return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err)
+ return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
}
for k, v := range data {
// Capital header names are illegal in HTTP/2.
@@ -482,11 +494,11 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
// options, then both sets of credentials will be applied.
if callCreds := callHdr.Creds; callCreds != nil {
if !t.isSecure && callCreds.RequireTransportSecurity() {
- return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
+ return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
}
data, err := callCreds.GetRequestMetadata(ctx, audience)
if err != nil {
- return nil, streamErrorf(codes.Internal, "transport: %v", err)
+ return nil, status.Errorf(codes.Internal, "transport: %v", err)
}
for k, v := range data {
// Capital header names are illegal in HTTP/2
@@ -590,14 +602,40 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
return true
}
+ var hdrListSizeErr error
+ checkForHeaderListSize := func(it interface{}) bool {
+ if t.maxSendHeaderListSize == nil {
+ return true
+ }
+ hdrFrame := it.(*headerFrame)
+ var sz int64
+ for _, f := range hdrFrame.hf {
+ if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
+ hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
+ return false
+ }
+ }
+ return true
+ }
for {
- success, err := t.controlBuf.executeAndPut(checkForStreamQuota, hdr)
+ success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
+ if !checkForStreamQuota(it) {
+ return false
+ }
+ if !checkForHeaderListSize(it) {
+ return false
+ }
+ return true
+ }, hdr)
if err != nil {
return nil, err
}
if success {
break
}
+ if hdrListSizeErr != nil {
+ return nil, hdrListSizeErr
+ }
firstTry = false
select {
case <-ch:
@@ -633,7 +671,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
rst = true
rstCode = http2.ErrCodeCancel
}
- t.closeStream(s, err, rst, rstCode, nil, nil, false)
+ t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
}
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
@@ -657,6 +695,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
close(s.done)
// If headerChan isn't closed, then close it.
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+ s.noHeaders = true
close(s.headerChan)
}
cleanup := &cleanupStream{
@@ -715,7 +754,7 @@ func (t *http2Client) Close() error {
}
// Notify all active streams.
for _, s := range streams {
- t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, nil, nil, false)
+ t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
@@ -909,6 +948,13 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
statusCode = codes.Unknown
}
+ if statusCode == codes.Canceled {
+ // Our deadline was already exceeded, and that was likely the cause of
+ // this cancelation. Alter the status code accordingly.
+ if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
+ statusCode = codes.DeadlineExceeded
+ }
+ }
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
}
@@ -918,13 +964,20 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
}
var maxStreams *uint32
var ss []http2.Setting
+ var updateFuncs []func()
f.ForeachSetting(func(s http2.Setting) error {
- if s.ID == http2.SettingMaxConcurrentStreams {
+ switch s.ID {
+ case http2.SettingMaxConcurrentStreams:
maxStreams = new(uint32)
*maxStreams = s.Val
- return nil
+ case http2.SettingMaxHeaderListSize:
+ updateFuncs = append(updateFuncs, func() {
+ t.maxSendHeaderListSize = new(uint32)
+ *t.maxSendHeaderListSize = s.Val
+ })
+ default:
+ ss = append(ss, s)
}
- ss = append(ss, s)
return nil
})
if isFirst && maxStreams == nil {
@@ -934,21 +987,24 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
sf := &incomingSettings{
ss: ss,
}
- if maxStreams == nil {
- t.controlBuf.put(sf)
- return
+ if maxStreams != nil {
+ updateStreamQuota := func() {
+ delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
+ t.maxConcurrentStreams = *maxStreams
+ t.streamQuota += delta
+ if delta > 0 && t.waitingStreams > 0 {
+ close(t.streamsQuotaAvailable) // wake all of them up.
+ t.streamsQuotaAvailable = make(chan struct{}, 1)
+ }
+ }
+ updateFuncs = append(updateFuncs, updateStreamQuota)
}
- updateStreamQuota := func(interface{}) bool {
- delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
- t.maxConcurrentStreams = *maxStreams
- t.streamQuota += delta
- if delta > 0 && t.waitingStreams > 0 {
- close(t.streamsQuotaAvailable) // wake all of them up.
- t.streamsQuotaAvailable = make(chan struct{}, 1)
+ t.controlBuf.executeAndPut(func(interface{}) bool {
+ for _, f := range updateFuncs {
+ f()
}
return true
- }
- t.controlBuf.executeAndPut(updateStreamQuota, sf)
+ }, sf)
}
func (t *http2Client) handlePing(f *http2.PingFrame) {
@@ -1059,8 +1115,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
atomic.StoreUint32(&s.bytesReceived, 1)
var state decodeState
- if err := state.decodeResponseHeader(frame); err != nil {
- t.closeStream(s, err, true, http2.ErrCodeProtocol, nil, nil, false)
+ if err := state.decodeHeader(frame); err != nil {
+ t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
// Something wrong. Stops reading even when there is remaining.
return
}
@@ -1096,6 +1152,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(state.mdata) > 0 {
s.header = state.mdata
}
+ } else {
+ s.noHeaders = true
}
close(s.headerChan)
}
@@ -1146,7 +1204,9 @@ func (t *http2Client) reader() {
t.mu.Unlock()
if s != nil {
// use error detail to provide better err message
- t.closeStream(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()), true, http2.ErrCodeProtocol, nil, nil, false)
+ code := http2ErrConvTab[se.Code]
+ msg := t.framer.fr.ErrorDetail().Error()
+ t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
}
continue
} else {
@@ -1257,12 +1317,14 @@ func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
LastMessageSentTimestamp: t.lastMsgSent,
LastMessageReceivedTimestamp: t.lastMsgRecv,
LocalFlowControlWindow: int64(t.fc.getSize()),
- //socket options
- LocalAddr: t.localAddr,
- RemoteAddr: t.remoteAddr,
- // Security
+ SocketOptions: channelz.GetSocketOption(t.conn),
+ LocalAddr: t.localAddr,
+ RemoteAddr: t.remoteAddr,
// RemoteName :
}
+ if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
+ s.Security = au.GetSecurityValue()
+ }
t.czmu.RUnlock()
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 19acedb2b..a8a09270b 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -48,9 +48,14 @@ import (
"google.golang.org/grpc/tap"
)
-// ErrIllegalHeaderWrite indicates that setting header is illegal because of
-// the stream's state.
-var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
+var (
+ // ErrIllegalHeaderWrite indicates that setting header is illegal because of
+ // the stream's state.
+ ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
+ // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
+ // than the limit set by peer.
+ ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
+)
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
@@ -89,9 +94,10 @@ type http2Server struct {
// Flag to signify that number of ping strikes should be reset to 0.
// This is set whenever data or header frames are sent.
// 1 means yes.
- resetPingStrikes uint32 // Accessed atomically.
- initialWindowSize int32
- bdpEst *bdpEstimator
+ resetPingStrikes uint32 // Accessed atomically.
+ initialWindowSize int32
+ bdpEst *bdpEstimator
+ maxSendHeaderListSize *uint32
mu sync.Mutex // guard the following
@@ -130,15 +136,13 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
- writeBufSize := defaultWriteBufSize
- if config.WriteBufferSize > 0 {
- writeBufSize = config.WriteBufferSize
- }
- readBufSize := defaultReadBufSize
- if config.ReadBufferSize > 0 {
- readBufSize = config.ReadBufferSize
+ writeBufSize := config.WriteBufferSize
+ readBufSize := config.ReadBufferSize
+ maxHeaderListSize := defaultServerMaxHeaderListSize
+ if config.MaxHeaderListSize != nil {
+ maxHeaderListSize = *config.MaxHeaderListSize
}
- framer := newFramer(conn, writeBufSize, readBufSize)
+ framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
var isettings []http2.Setting
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
@@ -168,6 +172,12 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
ID: http2.SettingInitialWindowSize,
Val: uint32(iwz)})
}
+ if config.MaxHeaderListSize != nil {
+ isettings = append(isettings, http2.Setting{
+ ID: http2.SettingMaxHeaderListSize,
+ Val: *config.MaxHeaderListSize,
+ })
+ }
if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
@@ -287,19 +297,17 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
streamID := frame.Header().StreamID
- var state decodeState
- for _, hf := range frame.Fields {
- if err := state.processHeaderField(hf); err != nil {
- if se, ok := err.(StreamError); ok {
- t.controlBuf.put(&cleanupStream{
- streamID: streamID,
- rst: true,
- rstCode: statusCodeConvTab[se.Code],
- onWrite: func() {},
- })
- }
- return
+ state := decodeState{serverSide: true}
+ if err := state.decodeHeader(frame); err != nil {
+ if se, ok := status.FromError(err); ok {
+ t.controlBuf.put(&cleanupStream{
+ streamID: streamID,
+ rst: true,
+ rstCode: statusCodeConvTab[se.Code()],
+ onWrite: func() {},
+ })
}
+ return
}
buf := newRecvBuffer()
@@ -619,11 +627,25 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
return
}
var ss []http2.Setting
+ var updateFuncs []func()
f.ForeachSetting(func(s http2.Setting) error {
- ss = append(ss, s)
+ switch s.ID {
+ case http2.SettingMaxHeaderListSize:
+ updateFuncs = append(updateFuncs, func() {
+ t.maxSendHeaderListSize = new(uint32)
+ *t.maxSendHeaderListSize = s.Val
+ })
+ default:
+ ss = append(ss, s)
+ }
return nil
})
- t.controlBuf.put(&incomingSettings{
+ t.controlBuf.executeAndPut(func(interface{}) bool {
+ for _, f := range updateFuncs {
+ f()
+ }
+ return true
+ }, &incomingSettings{
ss: ss,
})
}
@@ -703,6 +725,21 @@ func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD)
return headerFields
}
+func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
+ if t.maxSendHeaderListSize == nil {
+ return true
+ }
+ hdrFrame := it.(*headerFrame)
+ var sz int64
+ for _, f := range hdrFrame.hf {
+ if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
+ errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
+ return false
+ }
+ }
+ return true
+}
+
// WriteHeader sends the header metedata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() || s.getState() == streamDone {
@@ -716,12 +753,15 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
s.header = md
}
}
- t.writeHeaderLocked(s)
+ if err := t.writeHeaderLocked(s); err != nil {
+ s.hdrMu.Unlock()
+ return err
+ }
s.hdrMu.Unlock()
return nil
}
-func (t *http2Server) writeHeaderLocked(s *Stream) {
+func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
@@ -731,7 +771,7 @@ func (t *http2Server) writeHeaderLocked(s *Stream) {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
- t.controlBuf.put(&headerFrame{
+ success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
@@ -739,12 +779,20 @@ func (t *http2Server) writeHeaderLocked(s *Stream) {
atomic.StoreUint32(&t.resetPingStrikes, 1)
},
})
+ if !success {
+ if err != nil {
+ return err
+ }
+ t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+ return ErrHeaderListSizeLimitViolation
+ }
if t.stats != nil {
// Note: WireLength is not set in outHeader.
// TODO(mmukhi): Revisit this later, if needed.
outHeader := &stats.OutHeader{}
t.stats.HandleRPC(s.Context(), outHeader)
}
+ return nil
}
// WriteStatus sends stream status to the client and terminates the stream.
@@ -761,7 +809,10 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
if !s.updateHeaderSent() { // No headers have been sent.
if len(s.header) > 0 { // Send a separate header frame.
- t.writeHeaderLocked(s)
+ if err := t.writeHeaderLocked(s); err != nil {
+ s.hdrMu.Unlock()
+ return err
+ }
} else { // Send a trailer only response.
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
@@ -791,6 +842,14 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
},
}
s.hdrMu.Unlock()
+ success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
+ if !success {
+ if err != nil {
+ return err
+ }
+ t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+ return ErrHeaderListSizeLimitViolation
+ }
t.closeStream(s, false, 0, trailingHeader, true)
if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
@@ -804,7 +863,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
- return streamErrorf(codes.Internal, "transport: %v", err)
+ return status.Errorf(codes.Internal, "transport: %v", err)
}
} else {
// Writing headers checks for this condition.
@@ -1091,12 +1150,14 @@ func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
LastMessageSentTimestamp: t.lastMsgSent,
LastMessageReceivedTimestamp: t.lastMsgRecv,
LocalFlowControlWindow: int64(t.fc.getSize()),
- //socket options
- LocalAddr: t.localAddr,
- RemoteAddr: t.remoteAddr,
- // Security
+ SocketOptions: channelz.GetSocketOption(t.conn),
+ LocalAddr: t.localAddr,
+ RemoteAddr: t.remoteAddr,
// RemoteName :
}
+ if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
+ s.Security = au.GetSecurityValue()
+ }
t.czmu.RUnlock()
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 7d15c7d74..21da6e80b 100644
--- a/vendor/google.golang.org/grpc/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -23,6 +23,7 @@ import (
"bytes"
"encoding/base64"
"fmt"
+ "io"
"net"
"net/http"
"strconv"
@@ -43,9 +44,6 @@ const (
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
http2InitHeaderTableSize = 4096
- // http2IOBufSize specifies the buffer size for sending frames.
- defaultWriteBufSize = 32 * 1024
- defaultReadBufSize = 32 * 1024
// baseContentType is the base content-type for gRPC. This is a valid
// content-type on it's own, but can also include a content-subtype such as
// "proto" as a suffix after "+" or ";". See
@@ -121,6 +119,8 @@ type decodeState struct {
statsTags []byte
statsTrace []byte
contentSubtype string
+ // whether decoding on server side or not
+ serverSide bool
}
// isReservedHeader checks whether hdr belongs to HTTP2 headers
@@ -139,6 +139,9 @@ func isReservedHeader(hdr string) bool {
"grpc-status",
"grpc-timeout",
"grpc-status-details-bin",
+ // Intentionally exclude grpc-previous-rpc-attempts and
+ // grpc-retry-pushback-ms, which are "reserved", but their API
+ // intentionally works via metadata.
"te":
return true
default:
@@ -146,8 +149,8 @@ func isReservedHeader(hdr string) bool {
}
}
-// isWhitelistedHeader checks whether hdr should be propagated
-// into metadata visible to users.
+// isWhitelistedHeader checks whether hdr should be propagated into metadata
+// visible to users, even though it is classified as "reserved", above.
func isWhitelistedHeader(hdr string) bool {
switch hdr {
case ":authority", "user-agent":
@@ -234,13 +237,22 @@ func decodeMetadataHeader(k, v string) (string, error) {
return v, nil
}
-func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error {
+func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error {
+ // frame.Truncated is set to true when framer detects that the current header
+ // list size hits MaxHeaderListSize limit.
+ if frame.Truncated {
+ return status.Error(codes.Internal, "peer header list size exceeded limit")
+ }
for _, hf := range frame.Fields {
if err := d.processHeaderField(hf); err != nil {
return err
}
}
+ if d.serverSide {
+ return nil
+ }
+
// If grpc status exists, no need to check further.
if d.rawStatusCode != nil || d.statusGen != nil {
return nil
@@ -249,7 +261,7 @@ func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error
// If grpc status doesn't exist and http status doesn't exist,
// then it's a malformed header.
if d.httpStatus == nil {
- return streamErrorf(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)")
+ return status.Error(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)")
}
if *(d.httpStatus) != http.StatusOK {
@@ -257,7 +269,7 @@ func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error
if !ok {
code = codes.Unknown
}
- return streamErrorf(code, http.StatusText(*(d.httpStatus)))
+ return status.Error(code, http.StatusText(*(d.httpStatus)))
}
// gRPC status doesn't exist and http status is OK.
@@ -269,7 +281,6 @@ func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error
code := int(codes.Unknown)
d.rawStatusCode = &code
return nil
-
}
func (d *decodeState) addMetadata(k, v string) {
@@ -284,7 +295,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
case "content-type":
contentSubtype, validContentType := contentSubtype(f.Value)
if !validContentType {
- return streamErrorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value)
+ return status.Errorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value)
}
d.contentSubtype = contentSubtype
// TODO: do we want to propagate the whole content-type in the metadata,
@@ -297,7 +308,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
- return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
+ return status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
}
d.rawStatusCode = &code
case "grpc-message":
@@ -305,38 +316,38 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
case "grpc-status-details-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
- return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
+ return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
}
s := &spb.Status{}
if err := proto.Unmarshal(v, s); err != nil {
- return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
+ return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
}
d.statusGen = status.FromProto(s)
case "grpc-timeout":
d.timeoutSet = true
var err error
if d.timeout, err = decodeTimeout(f.Value); err != nil {
- return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
+ return status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
}
case ":path":
d.method = f.Value
case ":status":
code, err := strconv.Atoi(f.Value)
if err != nil {
- return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err)
+ return status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
}
d.httpStatus = &code
case "grpc-tags-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
- return streamErrorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
+ return status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
}
d.statsTags = v
d.addMetadata(f.Name, string(v))
case "grpc-trace-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
- return streamErrorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
+ return status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
}
d.statsTrace = v
d.addMetadata(f.Name, string(v))
@@ -545,6 +556,9 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
if w.err != nil {
return 0, w.err
}
+ if w.batchSize == 0 { // Buffer has been disabled.
+ return w.conn.Write(b)
+ }
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
@@ -577,8 +591,14 @@ type framer struct {
fr *http2.Framer
}
-func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
- r := bufio.NewReaderSize(conn, readBufferSize)
+func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer {
+ if writeBufferSize < 0 {
+ writeBufferSize = 0
+ }
+ var r io.Reader = conn
+ if readBufferSize > 0 {
+ r = bufio.NewReaderSize(r, readBufferSize)
+ }
w := newBufWriter(conn, writeBufferSize)
f := &framer{
writer: w,
@@ -587,6 +607,7 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()
+ f.fr.MaxHeaderListSize = maxHeaderListSize
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}
diff --git a/vendor/google.golang.org/grpc/transport/log.go b/vendor/google.golang.org/grpc/internal/transport/log.go
index ac8e358c5..ac8e358c5 100644
--- a/vendor/google.golang.org/grpc/transport/log.go
+++ b/vendor/google.golang.org/grpc/internal/transport/log.go
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index f51f87888..9775eeb81 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -19,7 +19,7 @@
// Package transport defines and implements message oriented communication
// channel to complete various transactions (e.g., an RPC). It is meant for
// grpc-internal usage and is not intended to be imported directly by users.
-package transport // externally used as import "google.golang.org/grpc/transport"
+package transport
import (
"errors"
@@ -191,6 +191,8 @@ type Stream struct {
header metadata.MD // the received header metadata.
trailer metadata.MD // the key-value map of trailer metadata.
+ noHeaders bool // set if the client never received headers (set only after the stream is done).
+
// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
headerSent uint32
@@ -259,7 +261,7 @@ func (s *Stream) SetSendCompress(str string) {
s.sendCompress = str
}
-// Done returns a chanel which is closed when it receives the final status
+// Done returns a channel which is closed when it receives the final status
// from the server.
func (s *Stream) Done() <-chan struct{} {
return s.done
@@ -282,6 +284,19 @@ func (s *Stream) Header() (metadata.MD, error) {
return nil, err
}
+// TrailersOnly blocks until a header or trailers-only frame is received and
+// then returns true if the stream was trailers-only. If the stream ends
+// before headers are received, returns true, nil. If a context error happens
+// first, returns it as a status error. Client-side only.
+func (s *Stream) TrailersOnly() (bool, error) {
+ err := s.waitOnHeader()
+ if err != nil {
+ return false, err
+ }
+ // if !headerDone, some other connection error occurred.
+ return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil
+}
+
// Trailer returns the cached trailer metedata. Note that if it is not called
// after the entire stream is done, it could return an empty MD. Client
// side only.
@@ -292,12 +307,6 @@ func (s *Stream) Trailer() metadata.MD {
return c
}
-// ServerTransport returns the underlying ServerTransport for the stream.
-// The client side stream always returns nil.
-func (s *Stream) ServerTransport() ServerTransport {
- return s.st
-}
-
// ContentSubtype returns the content-subtype for a request. For example, a
// content-subtype of "proto" will result in a content-type of
// "application/grpc+proto". This will always be lowercase. See
@@ -319,7 +328,7 @@ func (s *Stream) Method() string {
// Status returns the status received from the server.
// Status can be read safely only after the stream has ended,
-// that is, read or write has returned io.EOF.
+// that is, after Done() is closed.
func (s *Stream) Status() *status.Status {
return s.status
}
@@ -344,8 +353,7 @@ func (s *Stream) SetHeader(md metadata.MD) error {
// combined with any metadata set by previous calls to SetHeader and
// then written to the transport stream.
func (s *Stream) SendHeader(md metadata.MD) error {
- t := s.ServerTransport()
- return t.WriteHeader(s, md)
+ return s.st.WriteHeader(s, md)
}
// SetTrailer sets the trailer metadata which will be sent with the RPC status
@@ -439,6 +447,7 @@ type ServerConfig struct {
WriteBufferSize int
ReadBufferSize int
ChannelzParentID int64
+ MaxHeaderListSize *uint32
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
@@ -451,9 +460,6 @@ func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (S
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
- // Authority is the :authority pseudo-header to use. This field has no effect if
- // TransportCredentials is set.
- Authority string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
@@ -476,6 +482,8 @@ type ConnectOptions struct {
ReadBufferSize int
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID int64
+ // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
+ MaxHeaderListSize *uint32
}
// TargetInfo contains the information of the target such as network address and metadata.
@@ -497,11 +505,6 @@ type Options struct {
// Last indicates whether this write is the last piece for
// this stream.
Last bool
-
- // Delay is a hint to the transport implementation for whether
- // the data could be buffered for a batching write. The
- // transport implementation may ignore the hint.
- Delay bool
}
// CallHdr carries the information of a particular RPC.
@@ -519,14 +522,6 @@ type CallHdr struct {
// Creds specifies credentials.PerRPCCredentials for a call.
Creds credentials.PerRPCCredentials
- // Flush indicates whether a new stream command should be sent
- // to the peer without waiting for the first data. This is
- // only a hint.
- // If it's true, the transport may modify the flush decision
- // for performance purposes.
- // If it's false, new stream will never be flushed.
- Flush bool
-
// ContentSubtype specifies the content-subtype for a request. For example, a
// content-subtype of "proto" will result in a content-type of
// "application/grpc+proto". The value of ContentSubtype must be all
@@ -534,6 +529,8 @@ type CallHdr struct {
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
// for more details.
ContentSubtype string
+
+ PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
}
// ClientTransport is the common interface for all gRPC client-side transport
@@ -622,14 +619,6 @@ type ServerTransport interface {
IncrMsgRecv()
}
-// streamErrorf creates an StreamError with the specified error code and description.
-func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
- return StreamError{
- Code: c,
- Desc: fmt.Sprintf(format, a...),
- }
-}
-
// connectionErrorf creates an ConnectionError with the specified error description.
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
return ConnectionError{
@@ -672,7 +661,7 @@ var (
// errStreamDrain indicates that the stream is rejected because the
// connection is draining. This could be caused by goaway or balancer
// removing the address.
- errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining")
+ errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
// errStreamDone is returned from write at the client side to indiacte application
// layer of an error.
errStreamDone = errors.New("the stream is done")
@@ -681,18 +670,6 @@ var (
statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
)
-// TODO: See if we can replace StreamError with status package errors.
-
-// StreamError is an error that only affects one stream within a connection.
-type StreamError struct {
- Code codes.Code
- Desc string
-}
-
-func (e StreamError) Error() string {
- return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
-}
-
// GoAwayReason contains the reason for the GoAway frame received.
type GoAwayReason uint8
diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go
index 019e65800..76cc456aa 100644
--- a/vendor/google.golang.org/grpc/picker_wrapper.go
+++ b/vendor/google.golang.org/grpc/picker_wrapper.go
@@ -21,17 +21,14 @@ package grpc
import (
"io"
"sync"
- "sync/atomic"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status"
- "google.golang.org/grpc/transport"
)
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
@@ -45,16 +42,10 @@ type pickerWrapper struct {
// The latest connection happened.
connErrMu sync.Mutex
connErr error
-
- stickinessMDKey atomic.Value
- stickiness *stickyStore
}
func newPickerWrapper() *pickerWrapper {
- bp := &pickerWrapper{
- blockingCh: make(chan struct{}),
- stickiness: newStickyStore(),
- }
+ bp := &pickerWrapper{blockingCh: make(chan struct{})}
return bp
}
@@ -71,27 +62,6 @@ func (bp *pickerWrapper) connectionError() error {
return err
}
-func (bp *pickerWrapper) updateStickinessMDKey(newKey string) {
- // No need to check ok because mdKey == "" if ok == false.
- if oldKey, _ := bp.stickinessMDKey.Load().(string); oldKey != newKey {
- bp.stickinessMDKey.Store(newKey)
- bp.stickiness.reset(newKey)
- }
-}
-
-func (bp *pickerWrapper) getStickinessMDKey() string {
- // No need to check ok because mdKey == "" if ok == false.
- mdKey, _ := bp.stickinessMDKey.Load().(string)
- return mdKey
-}
-
-func (bp *pickerWrapper) clearStickinessState() {
- if oldKey := bp.getStickinessMDKey(); oldKey != "" {
- // There's no need to reset store if mdKey was "".
- bp.stickiness.reset(oldKey)
- }
-}
-
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
bp.mu.Lock()
@@ -131,27 +101,6 @@ func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) f
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
-
- mdKey := bp.getStickinessMDKey()
- stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey)
-
- // Potential race here: if stickinessMDKey is updated after the above two
- // lines, and this pick is a sticky pick, the following put could add an
- // entry to sticky store with an outdated sticky key.
- //
- // The solution: keep the current md key in sticky store, and at the
- // beginning of each get/put, check the mdkey against store.curMDKey.
- // - Cons: one more string comparing for each get/put.
- // - Pros: the string matching happens inside get/put, so the overhead for
- // non-sticky RPCs will be minimal.
-
- if isSticky {
- if t, ok := bp.stickiness.get(mdKey, stickyKey); ok {
- // Done function returned is always nil.
- return t, nil, nil
- }
- }
-
var (
p balancer.Picker
ch chan struct{}
@@ -207,9 +156,6 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
continue
}
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
- if isSticky {
- bp.stickiness.put(mdKey, stickyKey, acw)
- }
if channelz.IsOn() {
return t, doneChannelzWrapper(acw, done), nil
}
@@ -232,105 +178,3 @@ func (bp *pickerWrapper) close() {
bp.done = true
close(bp.blockingCh)
}
-
-const stickinessKeyCountLimit = 1000
-
-type stickyStoreEntry struct {
- acw *acBalancerWrapper
- addr resolver.Address
-}
-
-type stickyStore struct {
- mu sync.Mutex
- // curMDKey is check before every get/put to avoid races. The operation will
- // abort immediately when the given mdKey is different from the curMDKey.
- curMDKey string
- store *linkedMap
-}
-
-func newStickyStore() *stickyStore {
- return &stickyStore{
- store: newLinkedMap(),
- }
-}
-
-// reset clears the map in stickyStore, and set the currentMDKey to newMDKey.
-func (ss *stickyStore) reset(newMDKey string) {
- ss.mu.Lock()
- ss.curMDKey = newMDKey
- ss.store.clear()
- ss.mu.Unlock()
-}
-
-// stickyKey is the key to look up in store. mdKey will be checked against
-// curMDKey to avoid races.
-func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) {
- ss.mu.Lock()
- defer ss.mu.Unlock()
- if mdKey != ss.curMDKey {
- return
- }
- // TODO(stickiness): limit the total number of entries.
- ss.store.put(stickyKey, &stickyStoreEntry{
- acw: acw,
- addr: acw.getAddrConn().getCurAddr(),
- })
- if ss.store.len() > stickinessKeyCountLimit {
- ss.store.removeOldest()
- }
-}
-
-// stickyKey is the key to look up in store. mdKey will be checked against
-// curMDKey to avoid races.
-func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) {
- ss.mu.Lock()
- defer ss.mu.Unlock()
- if mdKey != ss.curMDKey {
- return nil, false
- }
- entry, ok := ss.store.get(stickyKey)
- if !ok {
- return nil, false
- }
- ac := entry.acw.getAddrConn()
- if ac.getCurAddr() != entry.addr {
- ss.store.remove(stickyKey)
- return nil, false
- }
- t, ok := ac.getReadyTransport()
- if !ok {
- ss.store.remove(stickyKey)
- return nil, false
- }
- return t, true
-}
-
-// Get one value from metadata in ctx with key stickinessMDKey.
-//
-// It returns "", false if stickinessMDKey is an empty string.
-func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) {
- if stickinessMDKey == "" {
- return "", false
- }
-
- md, added, ok := metadata.FromOutgoingContextRaw(ctx)
- if !ok {
- return "", false
- }
-
- if vv, ok := md[stickinessMDKey]; ok {
- if len(vv) > 0 {
- return vv[0], true
- }
- }
-
- for _, ss := range added {
- for i := 0; i < len(ss)-1; i += 2 {
- if ss[i] == stickinessMDKey {
- return ss[i+1], true
- }
- }
- }
-
- return "", false
-}
diff --git a/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go
index 048fde67d..084bdbfe6 100644
--- a/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go
+++ b/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go
@@ -33,6 +33,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
)
@@ -51,17 +52,23 @@ const (
)
var (
- errMissingAddr = errors.New("missing address")
+ errMissingAddr = errors.New("dns resolver: missing address")
+
+ // Addresses ending with a colon that is supposed to be the separator
+ // between host and port is not allowed. E.g. "::" is a valid address as
+ // it is an IPv6 address (host only) and "[::]:" is invalid as it ends with
+ // a colon as the host and port separator
+ errEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
)
// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
- return &dnsBuilder{freq: defaultFreq}
+ return &dnsBuilder{minFreq: defaultFreq}
}
type dnsBuilder struct {
- // frequency of polling the DNS server.
- freq time.Duration
+ // minimum frequency of polling the DNS server.
+ minFreq time.Duration
}
// Build creates and starts a DNS resolver that watches the name resolution of the target.
@@ -92,7 +99,8 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
d := &dnsResolver{
- freq: b.freq,
+ freq: b.minFreq,
+ backoff: backoff.Exponential{MaxDelay: b.minFreq},
host: host,
port: port,
ctx: ctx,
@@ -148,12 +156,14 @@ func (i *ipResolver) watcher() {
// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
- freq time.Duration
- host string
- port string
- ctx context.Context
- cancel context.CancelFunc
- cc resolver.ClientConn
+ freq time.Duration
+ backoff backoff.Exponential
+ retryCount int
+ host string
+ port string
+ ctx context.Context
+ cancel context.CancelFunc
+ cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
t *time.Timer
@@ -192,8 +202,15 @@ func (d *dnsResolver) watcher() {
case <-d.rn:
}
result, sc := d.lookup()
- // Next lookup should happen after an interval defined by d.freq.
- d.t.Reset(d.freq)
+ // Next lookup should happen within an interval defined by d.freq. It may be
+ // more often due to exponential retry on empty address list.
+ if len(result) == 0 {
+ d.retryCount++
+ d.t.Reset(d.backoff.Backoff(d.retryCount))
+ } else {
+ d.retryCount = 0
+ d.t.Reset(d.freq)
+ }
d.cc.NewServiceConfig(sc)
d.cc.NewAddress(result)
}
@@ -297,7 +314,6 @@ func formatIP(addr string) (addrIP string, ok bool) {
// target: "ipv4-host:80" returns host: "ipv4-host", port: "80"
// target: "[ipv6-host]" returns host: "ipv6-host", port: "443"
// target: ":80" returns host: "localhost", port: "80"
-// target: ":" returns host: "localhost", port: "443"
func parseTarget(target string) (host, port string, err error) {
if target == "" {
return "", "", errMissingAddr
@@ -307,15 +323,15 @@ func parseTarget(target string) (host, port string, err error) {
return target, defaultPort, nil
}
if host, port, err = net.SplitHostPort(target); err == nil {
+ if port == "" {
+ // If the port field is empty (target ends with colon), e.g. "[::1]:", this is an error.
+ return "", "", errEndsWithColon
+ }
// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
if host == "" {
// Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed.
host = "localhost"
}
- if port == "" {
- // If the port field is empty(target ends with colon), e.g. "[::1]:", defaultPort is used.
- port = defaultPort
- }
return host, port, nil
}
if host, port, err = net.SplitHostPort(target + ":" + defaultPort); err == nil {
diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go
index 506afac88..145cf477e 100644
--- a/vendor/google.golang.org/grpc/resolver/resolver.go
+++ b/vendor/google.golang.org/grpc/resolver/resolver.go
@@ -49,8 +49,12 @@ func Get(scheme string) Builder {
return nil
}
-// SetDefaultScheme sets the default scheme that will be used.
-// The default default scheme is "passthrough".
+// SetDefaultScheme sets the default scheme that will be used. The default
+// default scheme is "passthrough".
+//
+// NOTE: this function must only be called during initialization time (i.e. in
+// an init() function), and is not thread-safe. The scheme set last overrides
+// previously set values.
func SetDefaultScheme(scheme string) {
defaultScheme = scheme
}
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 033801f34..61342c9cd 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -36,11 +36,11 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
+ "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
- "google.golang.org/grpc/transport"
)
// Compressor defines the interface gRPC uses to compress a message.
@@ -162,10 +162,15 @@ type callInfo struct {
creds credentials.PerRPCCredentials
contentSubtype string
codec baseCodec
+ disableRetry bool
+ maxRetryRPCBufferSize int
}
func defaultCallInfo() *callInfo {
- return &callInfo{failFast: true}
+ return &callInfo{
+ failFast: true,
+ maxRetryRPCBufferSize: 256 * 1024, // 256KB
+ }
}
// CallOption configures a Call before it starts or extracts information from
@@ -415,6 +420,27 @@ func (o CustomCodecCallOption) before(c *callInfo) error {
}
func (o CustomCodecCallOption) after(c *callInfo) {}
+// MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
+// used for buffering this RPC's requests for retry purposes.
+//
+// This API is EXPERIMENTAL.
+func MaxRetryRPCBufferSize(bytes int) CallOption {
+ return MaxRetryRPCBufferSizeCallOption{bytes}
+}
+
+// MaxRetryRPCBufferSizeCallOption is a CallOption indicating the amount of
+// memory to be used for caching this RPC for retry purposes.
+// This is an EXPERIMENTAL API.
+type MaxRetryRPCBufferSizeCallOption struct {
+ MaxRetryRPCBufferSize int
+}
+
+func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {
+ c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize
+ return nil
+}
+func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo) {}
+
// The format of the payload: compressed or not?
type payloadFormat uint8
@@ -444,7 +470,7 @@ type parser struct {
// * io.EOF, when no messages remain
// * io.ErrUnexpectedEOF
// * of type transport.ConnectionError
-// * of type transport.StreamError
+// * an error from the status package
// No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible
// error.
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 014c72b3f..f5bea7238 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -45,12 +45,12 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
- "google.golang.org/grpc/transport"
)
const (
@@ -135,19 +135,25 @@ type options struct {
writeBufferSize int
readBufferSize int
connectionTimeout time.Duration
+ maxHeaderListSize *uint32
}
var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
+ writeBufferSize: defaultWriteBufSize,
+ readBufferSize: defaultReadBufSize,
}
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)
-// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
-// before doing a write on the wire.
+// WriteBufferSize determines how much data can be batched before doing a write on the wire.
+// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
+// The default value for this buffer is 32KB.
+// Zero will disable the write buffer such that each write will be on underlying connection.
+// Note: A Send call may not directly translate to a write.
func WriteBufferSize(s int) ServerOption {
return func(o *options) {
o.writeBufferSize = s
@@ -156,6 +162,9 @@ func WriteBufferSize(s int) ServerOption {
// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
// for one read syscall.
+// The default value for this buffer is 32KB.
+// Zero will disable read buffer for a connection so data framer can access the underlying
+// conn directly.
func ReadBufferSize(s int) ServerOption {
return func(o *options) {
o.readBufferSize = s
@@ -335,6 +344,14 @@ func ConnectionTimeout(d time.Duration) ServerOption {
}
}
+// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
+// of header list that the server is prepared to accept.
+func MaxHeaderListSize(s uint32) ServerOption {
+ return func(o *options) {
+ o.maxHeaderListSize = &s
+ }
+}
+
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@@ -481,7 +498,8 @@ type listenSocket struct {
func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
return &channelz.SocketInternalMetric{
- LocalAddr: l.Listener.Addr(),
+ SocketOptions: channelz.GetSocketOption(l.Listener),
+ LocalAddr: l.Listener.Addr(),
}
}
@@ -656,6 +674,7 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
WriteBufferSize: s.opts.writeBufferSize,
ReadBufferSize: s.opts.readBufferSize,
ChannelzParentID: s.channelzID,
+ MaxHeaderListSize: s.opts.maxHeaderListSize,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
@@ -944,10 +963,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
switch st := err.(type) {
case transport.ConnectionError:
// Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
}
@@ -1028,10 +1043,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
}
- opts := &transport.Options{
- Last: true,
- Delay: false,
- }
+ opts := &transport.Options{Last: true}
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
@@ -1046,10 +1058,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
switch st := err.(type) {
case transport.ConnectionError:
// Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
}
@@ -1169,12 +1177,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if appErr != nil {
appStatus, ok := status.FromError(appErr)
if !ok {
- switch err := appErr.(type) {
- case transport.StreamError:
- appStatus = status.New(err.Code, err.Desc)
- default:
- appStatus = status.New(codes.Unknown, appErr.Error())
- }
+ appStatus = status.New(codes.Unknown, appErr.Error())
appErr = appStatus.Err()
}
if trInfo != nil {
diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go
index 015631d8d..e0d735265 100644
--- a/vendor/google.golang.org/grpc/service_config.go
+++ b/vendor/google.golang.org/grpc/service_config.go
@@ -25,6 +25,7 @@ import (
"strings"
"time"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
)
@@ -56,6 +57,8 @@ type MethodConfig struct {
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
MaxRespSize *int
+ // RetryPolicy configures retry options for the method.
+ retryPolicy *retryPolicy
}
// ServiceConfig is provided by the service provider and contains parameters for how
@@ -68,13 +71,84 @@ type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified
// via grpc.WithBalancer will override this.
LB *string
- // Methods contains a map for the methods in this service.
- // If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
- // If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
- // Otherwise, the method has no MethodConfig to use.
+
+ // Methods contains a map for the methods in this service. If there is an
+ // exact match for a method (i.e. /service/method) in the map, use the
+ // corresponding MethodConfig. If there's no exact match, look for the
+ // default config for the service (/service/) and use the corresponding
+ // MethodConfig if it exists. Otherwise, the method has no MethodConfig to
+ // use.
Methods map[string]MethodConfig
- stickinessMetadataKey *string
+ // If a retryThrottlingPolicy is provided, gRPC will automatically throttle
+ // retry attempts and hedged RPCs when the client’s ratio of failures to
+ // successes exceeds a threshold.
+ //
+ // For each server name, the gRPC client will maintain a token_count which is
+ // initially set to maxTokens, and can take values between 0 and maxTokens.
+ //
+ // Every outgoing RPC (regardless of service or method invoked) will change
+ // token_count as follows:
+ //
+ // - Every failed RPC will decrement the token_count by 1.
+ // - Every successful RPC will increment the token_count by tokenRatio.
+ //
+ // If token_count is less than or equal to maxTokens / 2, then RPCs will not
+ // be retried and hedged RPCs will not be sent.
+ retryThrottling *retryThrottlingPolicy
+}
+
+// retryPolicy defines the go-native version of the retry policy defined by the
+// service config here:
+// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
+type retryPolicy struct {
+ // MaxAttempts is the maximum number of attempts, including the original RPC.
+ //
+ // This field is required and must be two or greater.
+ maxAttempts int
+
+ // Exponential backoff parameters. The initial retry attempt will occur at
+ // random(0, initialBackoffMS). In general, the nth attempt will occur at
+ // random(0,
+ // min(initialBackoffMS*backoffMultiplier**(n-1), maxBackoffMS)).
+ //
+ // These fields are required and must be greater than zero.
+ initialBackoff time.Duration
+ maxBackoff time.Duration
+ backoffMultiplier float64
+
+ // The set of status codes which may be retried.
+ //
+ // Status codes are specified as strings, e.g., "UNAVAILABLE".
+ //
+ // This field is required and must be non-empty.
+ // Note: a set is used to store this for easy lookup.
+ retryableStatusCodes map[codes.Code]bool
+}
+
+type jsonRetryPolicy struct {
+ MaxAttempts int
+ InitialBackoff string
+ MaxBackoff string
+ BackoffMultiplier float64
+ RetryableStatusCodes []codes.Code
+}
+
+// retryThrottlingPolicy defines the go-native version of the retry throttling
+// policy defined by the service config here:
+// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
+type retryThrottlingPolicy struct {
+ // The number of tokens starts at maxTokens. The token_count will always be
+ // between 0 and maxTokens.
+ //
+ // This field is required and must be greater than zero.
+ MaxTokens float64
+ // The amount of tokens to add on each successful RPC. Typically this will
+ // be some number between 0 and 1, e.g., 0.1.
+ //
+ // This field is required and must be greater than zero. Up to 3 decimal
+ // places are supported.
+ TokenRatio float64
}
func parseDuration(s *string) (*time.Duration, error) {
@@ -144,13 +218,14 @@ type jsonMC struct {
Timeout *string
MaxRequestMessageBytes *int64
MaxResponseMessageBytes *int64
+ RetryPolicy *jsonRetryPolicy
}
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonSC struct {
- LoadBalancingPolicy *string
- StickinessMetadataKey *string
- MethodConfig *[]jsonMC
+ LoadBalancingPolicy *string
+ MethodConfig *[]jsonMC
+ RetryThrottling *retryThrottlingPolicy
}
func parseServiceConfig(js string) (ServiceConfig, error) {
@@ -161,10 +236,9 @@ func parseServiceConfig(js string) (ServiceConfig, error) {
return ServiceConfig{}, err
}
sc := ServiceConfig{
- LB: rsc.LoadBalancingPolicy,
- Methods: make(map[string]MethodConfig),
-
- stickinessMetadataKey: rsc.StickinessMetadataKey,
+ LB: rsc.LoadBalancingPolicy,
+ Methods: make(map[string]MethodConfig),
+ retryThrottling: rsc.RetryThrottling,
}
if rsc.MethodConfig == nil {
return sc, nil
@@ -184,6 +258,10 @@ func parseServiceConfig(js string) (ServiceConfig, error) {
WaitForReady: m.WaitForReady,
Timeout: d,
}
+ if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
+ grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
+ return ServiceConfig{}, err
+ }
if m.MaxRequestMessageBytes != nil {
if *m.MaxRequestMessageBytes > int64(maxInt) {
mc.MaxReqSize = newInt(maxInt)
@@ -205,9 +283,56 @@ func parseServiceConfig(js string) (ServiceConfig, error) {
}
}
+ if sc.retryThrottling != nil {
+ if sc.retryThrottling.MaxTokens <= 0 ||
+ sc.retryThrottling.MaxTokens >= 1000 ||
+ sc.retryThrottling.TokenRatio <= 0 {
+ // Illegal throttling config; disable throttling.
+ sc.retryThrottling = nil
+ }
+ }
return sc, nil
}
+func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) {
+ if jrp == nil {
+ return nil, nil
+ }
+ ib, err := parseDuration(&jrp.InitialBackoff)
+ if err != nil {
+ return nil, err
+ }
+ mb, err := parseDuration(&jrp.MaxBackoff)
+ if err != nil {
+ return nil, err
+ }
+
+ if jrp.MaxAttempts <= 1 ||
+ *ib <= 0 ||
+ *mb <= 0 ||
+ jrp.BackoffMultiplier <= 0 ||
+ len(jrp.RetryableStatusCodes) == 0 {
+ grpclog.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp)
+ return nil, nil
+ }
+
+ rp := &retryPolicy{
+ maxAttempts: jrp.MaxAttempts,
+ initialBackoff: *ib,
+ maxBackoff: *mb,
+ backoffMultiplier: jrp.BackoffMultiplier,
+ retryableStatusCodes: make(map[codes.Code]bool),
+ }
+ if rp.maxAttempts > 5 {
+ // TODO(retry): Make the max maxAttempts configurable.
+ rp.maxAttempts = 5
+ }
+ for _, code := range jrp.RetryableStatusCodes {
+ rp.retryableStatusCodes[code] = true
+ }
+ return rp, nil
+}
+
func min(a, b *int) *int {
if *a < *b {
return a
diff --git a/vendor/google.golang.org/grpc/stickiness_linkedmap.go b/vendor/google.golang.org/grpc/stickiness_linkedmap.go
deleted file mode 100644
index 1c726af16..000000000
--- a/vendor/google.golang.org/grpc/stickiness_linkedmap.go
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package grpc
-
-import (
- "container/list"
-)
-
-type linkedMapKVPair struct {
- key string
- value *stickyStoreEntry
-}
-
-// linkedMap is an implementation of a map that supports removing the oldest
-// entry.
-//
-// linkedMap is NOT thread safe.
-//
-// It's for use of stickiness only!
-type linkedMap struct {
- m map[string]*list.Element
- l *list.List // Head of the list is the oldest element.
-}
-
-// newLinkedMap returns a new LinkedMap.
-func newLinkedMap() *linkedMap {
- return &linkedMap{
- m: make(map[string]*list.Element),
- l: list.New(),
- }
-}
-
-// put adds entry (key, value) to the map. Existing key will be overridden.
-func (m *linkedMap) put(key string, value *stickyStoreEntry) {
- if oldE, ok := m.m[key]; ok {
- // Remove existing entry.
- m.l.Remove(oldE)
- }
- e := m.l.PushBack(&linkedMapKVPair{key: key, value: value})
- m.m[key] = e
-}
-
-// get returns the value of the given key.
-func (m *linkedMap) get(key string) (*stickyStoreEntry, bool) {
- e, ok := m.m[key]
- if !ok {
- return nil, false
- }
- m.l.MoveToBack(e)
- return e.Value.(*linkedMapKVPair).value, true
-}
-
-// remove removes key from the map, and returns the value. The map is not
-// modified if key is not in the map.
-func (m *linkedMap) remove(key string) (*stickyStoreEntry, bool) {
- e, ok := m.m[key]
- if !ok {
- return nil, false
- }
- delete(m.m, key)
- m.l.Remove(e)
- return e.Value.(*linkedMapKVPair).value, true
-}
-
-// len returns the len of the map.
-func (m *linkedMap) len() int {
- return len(m.m)
-}
-
-// clear removes all elements from the map.
-func (m *linkedMap) clear() {
- m.m = make(map[string]*list.Element)
- m.l = list.New()
-}
-
-// removeOldest removes the oldest key from the map.
-func (m *linkedMap) removeOldest() {
- e := m.l.Front()
- m.l.Remove(e)
- delete(m.m, e.Value.(*linkedMapKVPair).key)
-}
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 152d9eccd..65d45a1d9 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -21,6 +21,8 @@ package grpc
import (
"errors"
"io"
+ "math"
+ "strconv"
"sync"
"time"
@@ -29,11 +31,13 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
+ "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcrand"
+ "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
- "google.golang.org/grpc/transport"
)
// StreamHandler defines the handler called by gRPC server to complete the
@@ -55,31 +59,20 @@ type StreamDesc struct {
// Stream defines the common interface a client or server stream has to satisfy.
//
-// All errors returned from Stream are compatible with the status package.
+// Deprecated: See ClientStream and ServerStream documentation instead.
type Stream interface {
- // Context returns the context for this stream.
+ // Deprecated: See ClientStream and ServerStream documentation instead.
Context() context.Context
- // SendMsg blocks until it sends m, the stream is done or the stream
- // breaks.
- // On error, it aborts the stream and returns an RPC status on client
- // side. On server side, it simply returns the error to the caller.
- // SendMsg is called by generated code. Also Users can call SendMsg
- // directly when it is really needed in their use cases.
- // It's safe to have a goroutine calling SendMsg and another goroutine calling
- // recvMsg on the same stream at the same time.
- // But it is not safe to call SendMsg on the same stream in different goroutines.
+ // Deprecated: See ClientStream and ServerStream documentation instead.
SendMsg(m interface{}) error
- // RecvMsg blocks until it receives a message or the stream is
- // done. On client side, it returns io.EOF when the stream is done. On
- // any other error, it aborts the stream and returns an RPC status. On
- // server side, it simply returns the error to the caller.
- // It's safe to have a goroutine calling SendMsg and another goroutine calling
- // recvMsg on the same stream at the same time.
- // But it is not safe to call RecvMsg on the same stream in different goroutines.
+ // Deprecated: See ClientStream and ServerStream documentation instead.
RecvMsg(m interface{}) error
}
-// ClientStream defines the interface a client stream has to satisfy.
+// ClientStream defines the client-side behavior of a streaming RPC.
+//
+// All errors returned from ClientStream methods are compatible with the
+// status package.
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
@@ -91,13 +84,38 @@ type ClientStream interface {
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met.
CloseSend() error
- // Stream.SendMsg() may return a non-nil error when something wrong happens sending
- // the request. The returned error indicates the status of this sending, not the final
- // status of the RPC.
+ // Context returns the context for this stream.
+ //
+ // It should not be called until after Header or RecvMsg has returned. Once
+ // called, subsequent client-side retries are disabled.
+ Context() context.Context
+ // SendMsg is generally called by generated code. On error, SendMsg aborts
+ // the stream. If the error was generated by the client, the status is
+ // returned directly; otherwise, io.EOF is returned and the status of
+ // the stream may be discovered using RecvMsg.
+ //
+ // SendMsg blocks until:
+ // - There is sufficient flow control to schedule m with the transport, or
+ // - The stream is done, or
+ // - The stream breaks.
//
- // Always call Stream.RecvMsg() to drain the stream and get the final
- // status, otherwise there could be leaked resources.
- Stream
+ // SendMsg does not wait until the message is received by the server. An
+ // untimely stream closure may result in lost messages. To ensure delivery,
+ // users should ensure the RPC completed successfully using RecvMsg.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not safe
+ // to call SendMsg on the same stream in different goroutines.
+ SendMsg(m interface{}) error
+ // RecvMsg blocks until it receives a message into m or the stream is
+ // done. It returns io.EOF when the stream completes successfully. On
+ // any other error, the stream is aborted and the error contains the RPC
+ // status.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not
+ // safe to call RecvMsg on the same stream in different goroutines.
+ RecvMsg(m interface{}) error
}
// NewStream creates a new Stream for the client side. This is typically
@@ -128,8 +146,6 @@ func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method st
}
// NewClientStream is a wrapper for ClientConn.NewStream.
-//
-// DEPRECATED: Use ClientConn.NewStream instead.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
return cc.NewStream(ctx, desc, method, opts...)
}
@@ -178,13 +194,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
callHdr := &transport.CallHdr{
- Host: cc.authority,
- Method: method,
- // If it's not client streaming, we should already have the request to be sent,
- // so we don't flush the header.
- // If it's client streaming, the user may never send a request or send it any
- // time soon, so we ask the transport to flush the header.
- Flush: desc.ClientStreams,
+ Host: cc.authority,
+ Method: method,
ContentSubtype: c.contentSubtype,
}
@@ -218,15 +229,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
trInfo.tr.LazyLog(&trInfo.firstLine, false)
ctx = trace.NewContext(ctx, trInfo.tr)
- defer func() {
- if err != nil {
- // Need to call tr.finish() if error is returned.
- // Because tr will not be returned to caller.
- trInfo.tr.LazyPrintf("RPC: [%v]", err)
- trInfo.tr.SetError()
- trInfo.tr.Finish()
- }
- }()
}
ctx = newContextWithRPCInfo(ctx, c.failFast)
sh := cc.dopts.copts.StatsHandler
@@ -240,80 +242,41 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
FailFast: c.failFast,
}
sh.HandleRPC(ctx, begin)
- defer func() {
- if err != nil {
- // Only handle end stats if err != nil.
- end := &stats.End{
- Client: true,
- Error: err,
- BeginTime: beginTime,
- EndTime: time.Now(),
- }
- sh.HandleRPC(ctx, end)
- }
- }()
}
- var (
- t transport.ClientTransport
- s *transport.Stream
- done func(balancer.DoneInfo)
- )
- for {
- // Check to make sure the context has expired. This will prevent us from
- // looping forever if an error occurs for wait-for-ready RPCs where no data
- // is sent on the wire.
- select {
- case <-ctx.Done():
- return nil, toRPCErr(ctx.Err())
- default:
- }
-
- t, done, err = cc.getTransport(ctx, c.failFast)
- if err != nil {
- return nil, err
- }
+ cs := &clientStream{
+ callHdr: callHdr,
+ ctx: ctx,
+ methodConfig: &mc,
+ opts: opts,
+ callInfo: c,
+ cc: cc,
+ desc: desc,
+ codec: c.codec,
+ cp: cp,
+ comp: comp,
+ cancel: cancel,
+ beginTime: beginTime,
+ firstAttempt: true,
+ }
+ if !cc.dopts.disableRetry {
+ cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
+ }
+
+ cs.callInfo.stream = cs
+ // Only this initial attempt has stats/tracing.
+ // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
+ if err := cs.newAttemptLocked(sh, trInfo); err != nil {
+ cs.finish(err)
+ return nil, err
+ }
- s, err = t.NewStream(ctx, callHdr)
- if err != nil {
- if done != nil {
- done(balancer.DoneInfo{Err: err})
- done = nil
- }
- // In the event of any error from NewStream, we never attempted to write
- // anything to the wire, so we can retry indefinitely for non-fail-fast
- // RPCs.
- if !c.failFast {
- continue
- }
- return nil, toRPCErr(err)
- }
- break
+ op := func(a *csAttempt) error { return a.newStream() }
+ if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
+ cs.finish(err)
+ return nil, err
}
- cs := &clientStream{
- opts: opts,
- c: c,
- cc: cc,
- desc: desc,
- codec: c.codec,
- cp: cp,
- comp: comp,
- cancel: cancel,
- attempt: &csAttempt{
- t: t,
- s: s,
- p: &parser{r: s},
- done: done,
- dc: cc.dopts.dc,
- ctx: ctx,
- trInfo: trInfo,
- statsHandler: sh,
- beginTime: beginTime,
- },
- }
- cs.c.stream = cs
- cs.attempt.cs = cs
if desc != unaryStreamDesc {
// Listen on cc and stream contexts to cleanup when the user closes the
// ClientConn or cancels the stream context. In all other cases, an error
@@ -332,12 +295,45 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
return cs, nil
}
+func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
+ cs.attempt = &csAttempt{
+ cs: cs,
+ dc: cs.cc.dopts.dc,
+ statsHandler: sh,
+ trInfo: trInfo,
+ }
+
+ if err := cs.ctx.Err(); err != nil {
+ return toRPCErr(err)
+ }
+ t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
+ if err != nil {
+ return err
+ }
+ cs.attempt.t = t
+ cs.attempt.done = done
+ return nil
+}
+
+func (a *csAttempt) newStream() error {
+ cs := a.cs
+ cs.callHdr.PreviousAttempts = cs.numRetries
+ s, err := a.t.NewStream(cs.ctx, cs.callHdr)
+ if err != nil {
+ return toRPCErr(err)
+ }
+ cs.attempt.s = s
+ cs.attempt.p = &parser{r: s}
+ return nil
+}
+
// clientStream implements a client side Stream.
type clientStream struct {
- opts []CallOption
- c *callInfo
- cc *ClientConn
- desc *StreamDesc
+ callHdr *transport.CallHdr
+ opts []CallOption
+ callInfo *callInfo
+ cc *ClientConn
+ desc *StreamDesc
codec baseCodec
cp Compressor
@@ -345,13 +341,25 @@ type clientStream struct {
cancel context.CancelFunc // cancels all attempts
- sentLast bool // sent an end stream
+ sentLast bool // sent an end stream
+ beginTime time.Time
+
+ methodConfig *MethodConfig
+
+ ctx context.Context // the application's context, wrapped by stats/tracing
- mu sync.Mutex // guards finished
- finished bool // TODO: replace with atomic cmpxchg or sync.Once?
+ retryThrottler *retryThrottler // The throttler active when the RPC began.
- attempt *csAttempt // the active client stream attempt
+ mu sync.Mutex
+ firstAttempt bool // if true, transparent retry is valid
+ numRetries int // exclusive of transparent retry attempt(s)
+ numRetriesSincePushback int // retries since pushback; to reset backoff
+ finished bool // TODO: replace with atomic cmpxchg or sync.Once?
+ attempt *csAttempt // the active client stream attempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
+ committed bool // active attempt committed for retry?
+ buffer []func(a *csAttempt) error // operations to replay on retry
+ bufferSize int // current size of buffer
}
// csAttempt implements a single transport stream attempt within a
@@ -363,53 +371,298 @@ type csAttempt struct {
p *parser
done func(balancer.DoneInfo)
+ finished bool
dc Decompressor
decomp encoding.Compressor
decompSet bool
- ctx context.Context // the application's context, wrapped by stats/tracing
-
mu sync.Mutex // guards trInfo.tr
// trInfo.tr is set when created (if EnableTracing is true),
// and cleared when the finish method is called.
trInfo traceInfo
statsHandler stats.Handler
- beginTime time.Time
+}
+
+func (cs *clientStream) commitAttemptLocked() {
+ cs.committed = true
+ cs.buffer = nil
+}
+
+func (cs *clientStream) commitAttempt() {
+ cs.mu.Lock()
+ cs.commitAttemptLocked()
+ cs.mu.Unlock()
+}
+
+// shouldRetry returns nil if the RPC should be retried; otherwise it returns
+// the error that should be returned by the operation.
+func (cs *clientStream) shouldRetry(err error) error {
+ if cs.attempt.s == nil && !cs.callInfo.failFast {
+ // In the event of any error from NewStream (attempt.s == nil), we
+ // never attempted to write anything to the wire, so we can retry
+ // indefinitely for non-fail-fast RPCs.
+ return nil
+ }
+ if cs.finished || cs.committed {
+ // RPC is finished or committed; cannot retry.
+ return err
+ }
+ // Wait for the trailers.
+ if cs.attempt.s != nil {
+ <-cs.attempt.s.Done()
+ }
+ if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
+ // First attempt, wait-for-ready, stream unprocessed: transparently retry.
+ cs.firstAttempt = false
+ return nil
+ }
+ cs.firstAttempt = false
+ if cs.cc.dopts.disableRetry {
+ return err
+ }
+
+ pushback := 0
+ hasPushback := false
+ if cs.attempt.s != nil {
+ if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil {
+ // Context error; stop now.
+ return toErr
+ } else if !to {
+ return err
+ }
+
+ // TODO(retry): Move down if the spec changes to not check server pushback
+ // before considering this a failure for throttling.
+ sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
+ if len(sps) == 1 {
+ var e error
+ if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
+ grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
+ cs.retryThrottler.throttle() // This counts as a failure for throttling.
+ return err
+ }
+ hasPushback = true
+ } else if len(sps) > 1 {
+ grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
+ cs.retryThrottler.throttle() // This counts as a failure for throttling.
+ return err
+ }
+ }
+
+ var code codes.Code
+ if cs.attempt.s != nil {
+ code = cs.attempt.s.Status().Code()
+ } else {
+ code = status.Convert(err).Code()
+ }
+
+ rp := cs.methodConfig.retryPolicy
+ if rp == nil || !rp.retryableStatusCodes[code] {
+ return err
+ }
+
+ // Note: the ordering here is important; we count this as a failure
+ // only if the code matched a retryable code.
+ if cs.retryThrottler.throttle() {
+ return err
+ }
+ if cs.numRetries+1 >= rp.maxAttempts {
+ return err
+ }
+
+ var dur time.Duration
+ if hasPushback {
+ dur = time.Millisecond * time.Duration(pushback)
+ cs.numRetriesSincePushback = 0
+ } else {
+ fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
+ cur := float64(rp.initialBackoff) * fact
+ if max := float64(rp.maxBackoff); cur > max {
+ cur = max
+ }
+ dur = time.Duration(grpcrand.Int63n(int64(cur)))
+ cs.numRetriesSincePushback++
+ }
+
+ // TODO(dfawley): we could eagerly fail here if dur puts us past the
+ // deadline, but unsure if it is worth doing.
+ t := time.NewTimer(dur)
+ select {
+ case <-t.C:
+ cs.numRetries++
+ return nil
+ case <-cs.ctx.Done():
+ t.Stop()
+ return status.FromContextError(cs.ctx.Err()).Err()
+ }
+}
+
+// Returns nil if a retry was performed and succeeded; error otherwise.
+func (cs *clientStream) retryLocked(lastErr error) error {
+ for {
+ cs.attempt.finish(lastErr)
+ if err := cs.shouldRetry(lastErr); err != nil {
+ cs.commitAttemptLocked()
+ return err
+ }
+ if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
+ return err
+ }
+ if lastErr = cs.replayBufferLocked(); lastErr == nil {
+ return nil
+ }
+ }
}
func (cs *clientStream) Context() context.Context {
- // TODO(retry): commit the current attempt (the context has peer-aware data).
- return cs.attempt.context()
+ cs.commitAttempt()
+ // No need to lock before using attempt, since we know it is committed and
+ // cannot change.
+ return cs.attempt.s.Context()
+}
+
+func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
+ cs.mu.Lock()
+ for {
+ if cs.committed {
+ cs.mu.Unlock()
+ return op(cs.attempt)
+ }
+ a := cs.attempt
+ cs.mu.Unlock()
+ err := op(a)
+ cs.mu.Lock()
+ if a != cs.attempt {
+ // We started another attempt already.
+ continue
+ }
+ if err == io.EOF {
+ <-a.s.Done()
+ }
+ if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
+ onSuccess()
+ cs.mu.Unlock()
+ return err
+ }
+ if err := cs.retryLocked(err); err != nil {
+ cs.mu.Unlock()
+ return err
+ }
+ }
}
func (cs *clientStream) Header() (metadata.MD, error) {
- m, err := cs.attempt.header()
+ var m metadata.MD
+ err := cs.withRetry(func(a *csAttempt) error {
+ var err error
+ m, err = a.s.Header()
+ return toRPCErr(err)
+ }, cs.commitAttemptLocked)
if err != nil {
- // TODO(retry): maybe retry on error or commit attempt on success.
- err = toRPCErr(err)
cs.finish(err)
}
return m, err
}
func (cs *clientStream) Trailer() metadata.MD {
- // TODO(retry): on error, maybe retry (trailers-only).
- return cs.attempt.trailer()
+ // On RPC failure, we never need to retry, because usage requires that
+ // RecvMsg() returned a non-nil error before calling this function is valid.
+ // We would have retried earlier if necessary.
+ //
+ // Commit the attempt anyway, just in case users are not following those
+ // directions -- it will prevent races and should not meaningfully impact
+ // performance.
+ cs.commitAttempt()
+ if cs.attempt.s == nil {
+ return nil
+ }
+ return cs.attempt.s.Trailer()
+}
+
+func (cs *clientStream) replayBufferLocked() error {
+ a := cs.attempt
+ for _, f := range cs.buffer {
+ if err := f(a); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
+ // Note: we still will buffer if retry is disabled (for transparent retries).
+ if cs.committed {
+ return
+ }
+ cs.bufferSize += sz
+ if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
+ cs.commitAttemptLocked()
+ return
+ }
+ cs.buffer = append(cs.buffer, op)
}
func (cs *clientStream) SendMsg(m interface{}) (err error) {
- // TODO(retry): buffer message for replaying if not committed.
- return cs.attempt.sendMsg(m)
+ defer func() {
+ if err != nil && err != io.EOF {
+ // Call finish on the client stream for errors generated by this SendMsg
+ // call, as these indicate problems created by this client. (Transport
+ // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
+ // error will be returned from RecvMsg eventually in that case, or be
+ // retried.)
+ cs.finish(err)
+ }
+ }()
+ if cs.sentLast {
+ return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
+ }
+ if !cs.desc.ClientStreams {
+ cs.sentLast = true
+ }
+ data, err := encode(cs.codec, m)
+ if err != nil {
+ return err
+ }
+ compData, err := compress(data, cs.cp, cs.comp)
+ if err != nil {
+ return err
+ }
+ hdr, payload := msgHeader(data, compData)
+ // TODO(dfawley): should we be checking len(data) instead?
+ if len(payload) > *cs.callInfo.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
+ }
+ op := func(a *csAttempt) error {
+ err := a.sendMsg(m, hdr, payload, data)
+ // nil out the message and uncomp when replaying; they are only needed for
+ // stats which is disabled for subsequent attempts.
+ m, data = nil, nil
+ return err
+ }
+ return cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
}
-func (cs *clientStream) RecvMsg(m interface{}) (err error) {
- // TODO(retry): maybe retry on error or commit attempt on success.
- return cs.attempt.recvMsg(m)
+func (cs *clientStream) RecvMsg(m interface{}) error {
+ err := cs.withRetry(func(a *csAttempt) error {
+ return a.recvMsg(m)
+ }, cs.commitAttemptLocked)
+ if err != nil || !cs.desc.ServerStreams {
+ // err != nil or non-server-streaming indicates end of stream.
+ cs.finish(err)
+ }
+ return err
}
func (cs *clientStream) CloseSend() error {
- cs.attempt.closeSend()
+ if cs.sentLast {
+ // TODO: return an error and finish the stream instead, due to API misuse?
+ return nil
+ }
+ cs.sentLast = true
+ op := func(a *csAttempt) error { return a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) }
+ cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
+ // We never returned an error here for reasons.
return nil
}
@@ -424,7 +677,11 @@ func (cs *clientStream) finish(err error) {
return
}
cs.finished = true
+ cs.commitAttemptLocked()
cs.mu.Unlock()
+ if err == nil {
+ cs.retryThrottler.successfulRPC()
+ }
if channelz.IsOn() {
if err != nil {
cs.cc.incrCallsFailed()
@@ -432,46 +689,20 @@ func (cs *clientStream) finish(err error) {
cs.cc.incrCallsSucceeded()
}
}
- // TODO(retry): commit current attempt if necessary.
- cs.attempt.finish(err)
- for _, o := range cs.opts {
- o.after(cs.c)
+ if cs.attempt != nil {
+ cs.attempt.finish(err)
+ }
+ // after functions all rely upon having a stream.
+ if cs.attempt.s != nil {
+ for _, o := range cs.opts {
+ o.after(cs.callInfo)
+ }
}
cs.cancel()
}
-func (a *csAttempt) context() context.Context {
- return a.s.Context()
-}
-
-func (a *csAttempt) header() (metadata.MD, error) {
- return a.s.Header()
-}
-
-func (a *csAttempt) trailer() metadata.MD {
- return a.s.Trailer()
-}
-
-func (a *csAttempt) sendMsg(m interface{}) (err error) {
- // TODO Investigate how to signal the stats handling party.
- // generate error stats if err != nil && err != io.EOF?
+func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
cs := a.cs
- defer func() {
- // For non-client-streaming RPCs, we return nil instead of EOF on success
- // because the generated code requires it. finish is not called; RecvMsg()
- // will call it with the stream's status independently.
- if err == io.EOF && !cs.desc.ClientStreams {
- err = nil
- }
- if err != nil && err != io.EOF {
- // Call finish on the client stream for errors generated by this SendMsg
- // call, as these indicate problems created by this client. (Transport
- // errors are converted to an io.EOF error below; the real error will be
- // returned from RecvMsg eventually in that case, or be retried.)
- cs.finish(err)
- }
- }()
- // TODO: Check cs.sentLast and error if we already ended the stream.
if EnableTracing {
a.mu.Lock()
if a.trInfo.tr != nil {
@@ -479,44 +710,26 @@ func (a *csAttempt) sendMsg(m interface{}) (err error) {
}
a.mu.Unlock()
}
- data, err := encode(cs.codec, m)
- if err != nil {
- return err
- }
- compData, err := compress(data, cs.cp, cs.comp)
- if err != nil {
- return err
- }
- hdr, payload := msgHeader(data, compData)
- // TODO(dfawley): should we be checking len(data) instead?
- if len(payload) > *cs.c.maxSendMessageSize {
- return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.c.maxSendMessageSize)
+ if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
+ if !cs.desc.ClientStreams {
+ // For non-client-streaming RPCs, we return nil instead of EOF on error
+ // because the generated code requires it. finish is not called; RecvMsg()
+ // will call it with the stream's status independently.
+ return nil
+ }
+ return io.EOF
}
-
- if !cs.desc.ClientStreams {
- cs.sentLast = true
+ if a.statsHandler != nil {
+ a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
}
- err = a.t.Write(a.s, hdr, payload, &transport.Options{Last: !cs.desc.ClientStreams})
- if err == nil {
- if a.statsHandler != nil {
- a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payload, time.Now()))
- }
- if channelz.IsOn() {
- a.t.IncrMsgSent()
- }
- return nil
+ if channelz.IsOn() {
+ a.t.IncrMsgSent()
}
- return io.EOF
+ return nil
}
func (a *csAttempt) recvMsg(m interface{}) (err error) {
cs := a.cs
- defer func() {
- if err != nil || !cs.desc.ServerStreams {
- // err != nil or non-server-streaming indicates end of stream.
- cs.finish(err)
- }
- }()
var inPayload *stats.InPayload
if a.statsHandler != nil {
inPayload = &stats.InPayload{
@@ -539,7 +752,7 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) {
// Only initialize this state once per stream.
a.decompSet = true
}
- err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, inPayload, a.decomp)
+ err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, inPayload, a.decomp)
if err != nil {
if err == io.EOF {
if statusErr := a.s.Status().Err(); statusErr != nil {
@@ -557,7 +770,7 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) {
a.mu.Unlock()
}
if inPayload != nil {
- a.statsHandler.HandleRPC(a.ctx, inPayload)
+ a.statsHandler.HandleRPC(cs.ctx, inPayload)
}
if channelz.IsOn() {
a.t.IncrMsgRecv()
@@ -569,7 +782,7 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) {
// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
- err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, nil, a.decomp)
+ err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
@@ -579,37 +792,40 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) {
return toRPCErr(err)
}
-func (a *csAttempt) closeSend() {
- cs := a.cs
- if cs.sentLast {
- return
- }
- cs.sentLast = true
- cs.attempt.t.Write(cs.attempt.s, nil, nil, &transport.Options{Last: true})
- // We ignore errors from Write. Any error it would return would also be
- // returned by a subsequent RecvMsg call, and the user is supposed to always
- // finish the stream by calling RecvMsg until it returns err != nil.
-}
-
func (a *csAttempt) finish(err error) {
a.mu.Lock()
- a.t.CloseStream(a.s, err)
+ if a.finished {
+ a.mu.Unlock()
+ return
+ }
+ a.finished = true
+ if err == io.EOF {
+ // Ending a stream with EOF indicates a success.
+ err = nil
+ }
+ if a.s != nil {
+ a.t.CloseStream(a.s, err)
+ }
if a.done != nil {
+ br := false
+ if a.s != nil {
+ br = a.s.BytesReceived()
+ }
a.done(balancer.DoneInfo{
Err: err,
- BytesSent: true,
- BytesReceived: a.s.BytesReceived(),
+ BytesSent: a.s != nil,
+ BytesReceived: br,
})
}
if a.statsHandler != nil {
end := &stats.End{
Client: true,
- BeginTime: a.beginTime,
+ BeginTime: a.cs.beginTime,
EndTime: time.Now(),
Error: err,
}
- a.statsHandler.HandleRPC(a.ctx, end)
+ a.statsHandler.HandleRPC(a.cs.ctx, end)
}
if a.trInfo.tr != nil {
if err == nil {
@@ -624,7 +840,10 @@ func (a *csAttempt) finish(err error) {
a.mu.Unlock()
}
-// ServerStream defines the interface a server stream has to satisfy.
+// ServerStream defines the server-side behavior of a streaming RPC.
+//
+// All errors returned from ServerStream methods are compatible with the
+// status package.
type ServerStream interface {
// SetHeader sets the header metadata. It may be called multiple times.
// When call multiple times, all the provided metadata will be merged.
@@ -640,7 +859,32 @@ type ServerStream interface {
// SetTrailer sets the trailer metadata which will be sent with the RPC status.
// When called more than once, all the provided metadata will be merged.
SetTrailer(metadata.MD)
- Stream
+ // Context returns the context for this stream.
+ Context() context.Context
+ // SendMsg sends a message. On error, SendMsg aborts the stream and the
+ // error is returned directly.
+ //
+ // SendMsg blocks until:
+ // - There is sufficient flow control to schedule m with the transport, or
+ // - The stream is done, or
+ // - The stream breaks.
+ //
+ // SendMsg does not wait until the message is received by the client. An
+ // untimely stream closure may result in lost messages.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not safe
+ // to call SendMsg on the same stream in different goroutines.
+ SendMsg(m interface{}) error
+ // RecvMsg blocks until it receives a message into m or the stream is
+ // done. It returns io.EOF when the client has performed a CloseSend. On
+ // any non-EOF error, the stream is aborted and the error contains the
+ // RPC status.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not
+ // safe to call RecvMsg on the same stream in different goroutines.
+ RecvMsg(m interface{}) error
}
// serverStream implements a server side Stream.
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 7f124fbd5..d668a424d 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.13.0"
+const Version = "1.14.0"
diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh
index 079bc2896..44a310e55 100755
--- a/vendor/google.golang.org/grpc/vet.sh
+++ b/vendor/google.golang.org/grpc/vet.sh
@@ -15,11 +15,6 @@ die() {
PATH="$GOPATH/bin:$GOROOT/bin:$PATH"
-# Check proto in manual runs or cron runs.
-if [[ "$TRAVIS" != "true" || "$TRAVIS_EVENT_TYPE" = "cron" ]]; then
- check_proto="true"
-fi
-
if [ "$1" = "-install" ]; then
go get -d \
google.golang.org/grpc/...
@@ -29,7 +24,7 @@ if [ "$1" = "-install" ]; then
honnef.co/go/tools/cmd/staticcheck \
github.com/client9/misspell/cmd/misspell \
github.com/golang/protobuf/protoc-gen-go
- if [[ "$check_proto" = "true" ]]; then
+ if [[ -z "$VET_SKIP_PROTO" ]]; then
if [[ "$TRAVIS" = "true" ]]; then
PROTOBUF_VERSION=3.3.0
PROTOC_FILENAME=protoc-${PROTOBUF_VERSION}-linux-x86_64.zip
@@ -75,7 +70,7 @@ go tool vet -all . 2>&1 | grep -vE '(clientconn|transport\/transport_test).go:.*
set -o pipefail
git reset --hard HEAD
-if [[ "$check_proto" = "true" ]]; then
+if [[ -z "$VET_SKIP_PROTO" ]]; then
PATH="/home/travis/bin:$PATH" make proto && \
git status --porcelain 2>&1 | (! read) || \
(git status; git --no-pager diff; exit 1)
@@ -83,7 +78,7 @@ fi
# TODO(menghanl): fix errors in transport_test.
staticcheck -ignore '
-google.golang.org/grpc/transport/transport_test.go:SA2002
+google.golang.org/grpc/internal/transport/transport_test.go:SA2002
google.golang.org/grpc/benchmark/benchmain/main.go:SA1019
google.golang.org/grpc/stats/stats_test.go:SA1019
google.golang.org/grpc/test/end2end_test.go:SA1019