summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go66
1 files changed, 40 insertions, 26 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 82921a15a..152d9eccd 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -27,9 +27,9 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
- "google.golang.org/grpc/channelz"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
+ "google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
@@ -101,7 +101,21 @@ type ClientStream interface {
}
// NewStream creates a new Stream for the client side. This is typically
-// called by generated code.
+// called by generated code. ctx is used for the lifetime of the stream.
+//
+// To ensure resources are not leaked due to the stream returned, one of the following
+// actions must be performed:
+//
+// 1. Call Close on the ClientConn.
+// 2. Cancel the context provided.
+// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
+// client-streaming RPC, for instance, might use the helper function
+// CloseAndRecv (note that CloseSend does not Recv, therefore is not
+// guaranteed to release all resources).
+// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
+//
+// If none of the above happen, a goroutine and a context will be leaked, and grpc
+// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
@@ -113,8 +127,7 @@ func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method st
return newClientStream(ctx, desc, cc, method, opts...)
}
-// NewClientStream creates a new Stream for the client side. This is typically
-// called by generated code.
+// NewClientStream is a wrapper for ClientConn.NewStream.
//
// DEPRECATED: Use ClientConn.NewStream instead.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
@@ -466,27 +479,27 @@ func (a *csAttempt) sendMsg(m interface{}) (err error) {
}
a.mu.Unlock()
}
- var outPayload *stats.OutPayload
- if a.statsHandler != nil {
- outPayload = &stats.OutPayload{
- Client: true,
- }
+ data, err := encode(cs.codec, m)
+ if err != nil {
+ return err
}
- hdr, data, err := encode(cs.codec, m, cs.cp, outPayload, cs.comp)
+ compData, err := compress(data, cs.cp, cs.comp)
if err != nil {
return err
}
- if len(data) > *cs.c.maxSendMessageSize {
- return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize)
+ hdr, payload := msgHeader(data, compData)
+ // TODO(dfawley): should we be checking len(data) instead?
+ if len(payload) > *cs.c.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.c.maxSendMessageSize)
}
+
if !cs.desc.ClientStreams {
cs.sentLast = true
}
- err = a.t.Write(a.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams})
+ err = a.t.Write(a.s, hdr, payload, &transport.Options{Last: !cs.desc.ClientStreams})
if err == nil {
- if outPayload != nil {
- outPayload.SentTime = time.Now()
- a.statsHandler.HandleRPC(a.ctx, outPayload)
+ if a.statsHandler != nil {
+ a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payload, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@@ -696,23 +709,24 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.t.IncrMsgSent()
}
}()
- var outPayload *stats.OutPayload
- if ss.statsHandler != nil {
- outPayload = &stats.OutPayload{}
+ data, err := encode(ss.codec, m)
+ if err != nil {
+ return err
}
- hdr, data, err := encode(ss.codec, m, ss.cp, outPayload, ss.comp)
+ compData, err := compress(data, ss.cp, ss.comp)
if err != nil {
return err
}
- if len(data) > ss.maxSendMessageSize {
- return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize)
+ hdr, payload := msgHeader(data, compData)
+ // TODO(dfawley): should we be checking len(data) instead?
+ if len(payload) > ss.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
}
- if err := ss.t.Write(ss.s, hdr, data, &transport.Options{Last: false}); err != nil {
+ if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
- if outPayload != nil {
- outPayload.SentTime = time.Now()
- ss.statsHandler.HandleRPC(ss.s.Context(), outPayload)
+ if ss.statsHandler != nil {
+ ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
}
return nil
}