From bac3376278bfd8125879ca86e8eb26df85858d4c Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Sun, 22 Jul 2018 20:14:05 -0700 Subject: Updating dependencies (#9139) --- vendor/google.golang.org/grpc/stream.go | 66 ++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 26 deletions(-) (limited to 'vendor/google.golang.org/grpc/stream.go') diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 82921a15a..152d9eccd 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -27,9 +27,9 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/channelz" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" + "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" @@ -101,7 +101,21 @@ type ClientStream interface { } // NewStream creates a new Stream for the client side. This is typically -// called by generated code. +// called by generated code. ctx is used for the lifetime of the stream. +// +// To ensure resources are not leaked due to the stream returned, one of the following +// actions must be performed: +// +// 1. Call Close on the ClientConn. +// 2. Cancel the context provided. +// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated +// client-streaming RPC, for instance, might use the helper function +// CloseAndRecv (note that CloseSend does not Recv, therefore is not +// guaranteed to release all resources). +// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. +// +// If none of the above happen, a goroutine and a context will be leaked, and grpc +// will not call the optionally-configured stats handler with a stats.End message. func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options @@ -113,8 +127,7 @@ func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method st return newClientStream(ctx, desc, cc, method, opts...) } -// NewClientStream creates a new Stream for the client side. This is typically -// called by generated code. +// NewClientStream is a wrapper for ClientConn.NewStream. // // DEPRECATED: Use ClientConn.NewStream instead. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { @@ -466,27 +479,27 @@ func (a *csAttempt) sendMsg(m interface{}) (err error) { } a.mu.Unlock() } - var outPayload *stats.OutPayload - if a.statsHandler != nil { - outPayload = &stats.OutPayload{ - Client: true, - } + data, err := encode(cs.codec, m) + if err != nil { + return err } - hdr, data, err := encode(cs.codec, m, cs.cp, outPayload, cs.comp) + compData, err := compress(data, cs.cp, cs.comp) if err != nil { return err } - if len(data) > *cs.c.maxSendMessageSize { - return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize) + hdr, payload := msgHeader(data, compData) + // TODO(dfawley): should we be checking len(data) instead? + if len(payload) > *cs.c.maxSendMessageSize { + return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.c.maxSendMessageSize) } + if !cs.desc.ClientStreams { cs.sentLast = true } - err = a.t.Write(a.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams}) + err = a.t.Write(a.s, hdr, payload, &transport.Options{Last: !cs.desc.ClientStreams}) if err == nil { - if outPayload != nil { - outPayload.SentTime = time.Now() - a.statsHandler.HandleRPC(a.ctx, outPayload) + if a.statsHandler != nil { + a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payload, time.Now())) } if channelz.IsOn() { a.t.IncrMsgSent() @@ -696,23 +709,24 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { ss.t.IncrMsgSent() } }() - var outPayload *stats.OutPayload - if ss.statsHandler != nil { - outPayload = &stats.OutPayload{} + data, err := encode(ss.codec, m) + if err != nil { + return err } - hdr, data, err := encode(ss.codec, m, ss.cp, outPayload, ss.comp) + compData, err := compress(data, ss.cp, ss.comp) if err != nil { return err } - if len(data) > ss.maxSendMessageSize { - return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize) + hdr, payload := msgHeader(data, compData) + // TODO(dfawley): should we be checking len(data) instead? + if len(payload) > ss.maxSendMessageSize { + return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) } - if err := ss.t.Write(ss.s, hdr, data, &transport.Options{Last: false}); err != nil { + if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { return toRPCErr(err) } - if outPayload != nil { - outPayload.SentTime = time.Now() - ss.statsHandler.HandleRPC(ss.s.Context(), outPayload) + if ss.statsHandler != nil { + ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) } return nil } -- cgit v1.2.3-1-g7c22