summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/rpc_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/rpc_util.go')
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go124
1 files changed, 69 insertions, 55 deletions
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 69ef3c0b5..033801f34 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -419,8 +419,8 @@ func (o CustomCodecCallOption) after(c *callInfo) {}
type payloadFormat uint8
const (
- compressionNone payloadFormat = iota // no compression
- compressionMade
+ compressionNone payloadFormat = 0 // no compression
+ compressionMade payloadFormat = 1 // compressed
)
// parser reads complete gRPC messages from the underlying reader.
@@ -477,65 +477,82 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
return pf, msg, nil
}
-// encode serializes msg and returns a buffer of message header and a buffer of msg.
-// If msg is nil, it generates the message header and an empty msg buffer.
-// TODO(ddyihai): eliminate extra Compressor parameter.
-func encode(c baseCodec, msg interface{}, cp Compressor, outPayload *stats.OutPayload, compressor encoding.Compressor) ([]byte, []byte, error) {
- var (
- b []byte
- cbuf *bytes.Buffer
- )
- const (
- payloadLen = 1
- sizeLen = 4
- )
- if msg != nil {
- var err error
- b, err = c.Marshal(msg)
- if err != nil {
- return nil, nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
+// encode serializes msg and returns a buffer containing the message, or an
+// error if it is too large to be transmitted by grpc. If msg is nil, it
+// generates an empty message.
+func encode(c baseCodec, msg interface{}) ([]byte, error) {
+ if msg == nil { // NOTE: typed nils will not be caught by this check
+ return nil, nil
+ }
+ b, err := c.Marshal(msg)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
+ }
+ if uint(len(b)) > math.MaxUint32 {
+ return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
+ }
+ return b, nil
+}
+
+// compress returns the input bytes compressed by compressor or cp. If both
+// compressors are nil, returns nil.
+//
+// TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.
+func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, error) {
+ if compressor == nil && cp == nil {
+ return nil, nil
+ }
+ wrapErr := func(err error) error {
+ return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
+ }
+ cbuf := &bytes.Buffer{}
+ if compressor != nil {
+ z, _ := compressor.Compress(cbuf)
+ if _, err := z.Write(in); err != nil {
+ return nil, wrapErr(err)
}
- if outPayload != nil {
- outPayload.Payload = msg
- // TODO truncate large payload.
- outPayload.Data = b
- outPayload.Length = len(b)
+ if err := z.Close(); err != nil {
+ return nil, wrapErr(err)
}
- if compressor != nil || cp != nil {
- cbuf = new(bytes.Buffer)
- // Has compressor, check Compressor is set by UseCompressor first.
- if compressor != nil {
- z, _ := compressor.Compress(cbuf)
- if _, err := z.Write(b); err != nil {
- return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
- }
- z.Close()
- } else {
- // If Compressor is not set by UseCompressor, use default Compressor
- if err := cp.Do(cbuf, b); err != nil {
- return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
- }
- }
- b = cbuf.Bytes()
+ } else {
+ if err := cp.Do(cbuf, in); err != nil {
+ return nil, wrapErr(err)
}
}
- if uint(len(b)) > math.MaxUint32 {
- return nil, nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
- }
+ return cbuf.Bytes(), nil
+}
- bufHeader := make([]byte, payloadLen+sizeLen)
- if compressor != nil || cp != nil {
- bufHeader[0] = byte(compressionMade)
+const (
+ payloadLen = 1
+ sizeLen = 4
+ headerLen = payloadLen + sizeLen
+)
+
+// msgHeader returns a 5-byte header for the message being transmitted and the
+// payload, which is compData if non-nil or data otherwise.
+func msgHeader(data, compData []byte) (hdr []byte, payload []byte) {
+ hdr = make([]byte, headerLen)
+ if compData != nil {
+ hdr[0] = byte(compressionMade)
+ data = compData
} else {
- bufHeader[0] = byte(compressionNone)
+ hdr[0] = byte(compressionNone)
}
- // Write length of b into buf
- binary.BigEndian.PutUint32(bufHeader[payloadLen:], uint32(len(b)))
- if outPayload != nil {
- outPayload.WireLength = payloadLen + sizeLen + len(b)
+ // Write length of payload into buf
+ binary.BigEndian.PutUint32(hdr[payloadLen:], uint32(len(data)))
+ return hdr, data
+}
+
+func outPayload(client bool, msg interface{}, data, payload []byte, t time.Time) *stats.OutPayload {
+ return &stats.OutPayload{
+ Client: client,
+ Payload: msg,
+ Data: data,
+ Length: len(data),
+ WireLength: len(payload) + headerLen,
+ SentTime: t,
}
- return bufHeader, b, nil
}
func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool) *status.Status {
@@ -721,7 +738,4 @@ const (
SupportPackageIsVersion5 = true
)
-// Version is the current grpc version.
-const Version = "1.12.0"
-
const grpcUA = "grpc-go/" + Version