summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/internal/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go4
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go7
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go63
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go75
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/log.go6
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go25
6 files changed, 75 insertions, 105 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index ce135c4d1..204ba1588 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -104,7 +104,6 @@ type headerFrame struct {
type cleanupStream struct {
streamID uint32
- idPtr *uint32
rst bool
rstCode http2.ErrCode
onWrite func()
@@ -138,9 +137,6 @@ type outgoingSettings struct {
ss []http2.Setting
}
-type settingsAck struct {
-}
-
type incomingGoAway struct {
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index bc8564345..c6fb4b9c1 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -237,9 +237,9 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
if ht.stats != nil {
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
- ht.Close()
close(ht.writes)
}
+ ht.Close()
return err
}
@@ -326,11 +326,11 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
go func() {
select {
case <-requestOver:
- return
case <-ht.closedCh:
case <-clientGone:
}
cancel()
+ ht.Close()
}()
req := ht.req
@@ -442,5 +442,8 @@ func mapRecvMsgError(err error) error {
return status.Error(code, se.Error())
}
}
+ if strings.Contains(err.Error(), "body closed by handler") {
+ return status.Error(codes.Canceled, err.Error())
+ }
return connectionErrorf(true, err, err.Error())
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 88d1c1612..904e790c4 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -111,19 +111,7 @@ type http2Client struct {
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
- czmu sync.RWMutex
- kpCount int64
- // The number of streams that have started, including already finished ones.
- streamsStarted int64
- // The number of streams that have ended successfully by receiving EoS bit set
- // frame from server.
- streamsSucceeded int64
- streamsFailed int64
- lastStreamCreated time.Time
- msgSent int64
- msgRecv int64
- lastMsgSent time.Time
- lastMsgRecv time.Time
+ czData *channelzData
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -234,6 +222,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
+ czData: new(channelzData),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
@@ -550,10 +539,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
t.activeStreams[id] = s
if channelz.IsOn() {
- t.czmu.Lock()
- t.streamsStarted++
- t.lastStreamCreated = time.Now()
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.streamsStarted, 1)
+ atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
var sendPing bool
// If the number of active streams change from 0 to 1, then check if keepalive
@@ -707,13 +694,11 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
}
t.mu.Unlock()
if channelz.IsOn() {
- t.czmu.Lock()
if eosReceived {
- t.streamsSucceeded++
+ atomic.AddInt64(&t.czData.streamsSucceeded, 1)
} else {
- t.streamsFailed++
+ atomic.AddInt64(&t.czData.streamsFailed, 1)
}
- t.czmu.Unlock()
}
},
rst: rst,
@@ -1263,9 +1248,7 @@ func (t *http2Client) keepalive() {
} else {
t.mu.Unlock()
if channelz.IsOn() {
- t.czmu.Lock()
- t.kpCount++
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.kpCount, 1)
}
// Send ping.
t.controlBuf.put(p)
@@ -1305,17 +1288,16 @@ func (t *http2Client) GoAway() <-chan struct{} {
}
func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
- t.czmu.RLock()
s := channelz.SocketInternalMetric{
- StreamsStarted: t.streamsStarted,
- StreamsSucceeded: t.streamsSucceeded,
- StreamsFailed: t.streamsFailed,
- MessagesSent: t.msgSent,
- MessagesReceived: t.msgRecv,
- KeepAlivesSent: t.kpCount,
- LastLocalStreamCreatedTimestamp: t.lastStreamCreated,
- LastMessageSentTimestamp: t.lastMsgSent,
- LastMessageReceivedTimestamp: t.lastMsgRecv,
+ StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
+ StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
+ StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
+ MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
+ MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
+ KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
+ LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
+ LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
+ LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
LocalAddr: t.localAddr,
@@ -1325,23 +1307,18 @@ func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
- t.czmu.RUnlock()
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
}
func (t *http2Client) IncrMsgSent() {
- t.czmu.Lock()
- t.msgSent++
- t.lastMsgSent = time.Now()
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.msgSent, 1)
+ atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
}
func (t *http2Client) IncrMsgRecv() {
- t.czmu.Lock()
- t.msgRecv++
- t.lastMsgRecv = time.Now()
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.msgRecv, 1)
+ atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
}
func (t *http2Client) getOutFlowWindow() int64 {
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index a8a09270b..efb7f53ff 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -118,19 +118,7 @@ type http2Server struct {
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
- czmu sync.RWMutex
- kpCount int64
- // The number of streams that have started, including already finished ones.
- streamsStarted int64
- // The number of streams that have ended successfully by sending frame with
- // EoS bit set.
- streamsSucceeded int64
- streamsFailed int64
- lastStreamCreated time.Time
- msgSent int64
- msgRecv int64
- lastMsgSent time.Time
- lastMsgRecv time.Time
+ czData *channelzData
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -231,6 +219,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
+ czData: new(channelzData),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if dynamicWindow {
@@ -295,7 +284,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
}
// operateHeader takes action on the decoded headers.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
streamID := frame.Header().StreamID
state := decodeState{serverSide: true}
if err := state.decodeHeader(frame); err != nil {
@@ -307,7 +296,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
onWrite: func() {},
})
}
- return
+ return false
}
buf := newRecvBuffer()
@@ -361,13 +350,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
- return
+ return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
- return
+ return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
@@ -377,7 +366,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
- return
+ return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
@@ -392,10 +381,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
t.mu.Unlock()
if channelz.IsOn() {
- t.czmu.Lock()
- t.streamsStarted++
- t.lastStreamCreated = time.Now()
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.streamsStarted, 1)
+ atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
@@ -430,7 +417,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
wq: s.wq,
})
handle(s)
- return
+ return false
}
// HandleStreams receives incoming streams using the given handler. This is
@@ -977,9 +964,7 @@ func (t *http2Server) keepalive() {
}
pingSent = true
if channelz.IsOn() {
- t.czmu.Lock()
- t.kpCount++
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
@@ -1044,13 +1029,11 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hd
}
t.mu.Unlock()
if channelz.IsOn() {
- t.czmu.Lock()
if eosReceived {
- t.streamsSucceeded++
+ atomic.AddInt64(&t.czData.streamsSucceeded, 1)
} else {
- t.streamsFailed++
+ atomic.AddInt64(&t.czData.streamsFailed, 1)
}
- t.czmu.Unlock()
}
},
}
@@ -1138,17 +1121,16 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
}
func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
- t.czmu.RLock()
s := channelz.SocketInternalMetric{
- StreamsStarted: t.streamsStarted,
- StreamsSucceeded: t.streamsSucceeded,
- StreamsFailed: t.streamsFailed,
- MessagesSent: t.msgSent,
- MessagesReceived: t.msgRecv,
- KeepAlivesSent: t.kpCount,
- LastRemoteStreamCreatedTimestamp: t.lastStreamCreated,
- LastMessageSentTimestamp: t.lastMsgSent,
- LastMessageReceivedTimestamp: t.lastMsgRecv,
+ StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
+ StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
+ StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
+ MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
+ MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
+ KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
+ LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
+ LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
+ LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
LocalAddr: t.localAddr,
@@ -1158,23 +1140,18 @@ func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
- t.czmu.RUnlock()
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
}
func (t *http2Server) IncrMsgSent() {
- t.czmu.Lock()
- t.msgSent++
- t.lastMsgSent = time.Now()
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.msgSent, 1)
+ atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
}
func (t *http2Server) IncrMsgRecv() {
- t.czmu.Lock()
- t.msgRecv++
- t.lastMsgRecv = time.Now()
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.msgRecv, 1)
+ atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
}
func (t *http2Server) getOutFlowWindow() int64 {
diff --git a/vendor/google.golang.org/grpc/internal/transport/log.go b/vendor/google.golang.org/grpc/internal/transport/log.go
index ac8e358c5..879df80c4 100644
--- a/vendor/google.golang.org/grpc/internal/transport/log.go
+++ b/vendor/google.golang.org/grpc/internal/transport/log.go
@@ -42,9 +42,3 @@ func errorf(format string, args ...interface{}) {
grpclog.Errorf(format, args...)
}
}
-
-func fatalf(format string, args ...interface{}) {
- if grpclog.V(logLevel) {
- grpclog.Fatalf(format, args...)
- }
-}
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 9775eeb81..fdf8ad684 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -176,7 +176,6 @@ type Stream struct {
buf *recvBuffer
trReader io.Reader
fc *inFlow
- recvQuota uint32
wq *writeQuota
// Callback to state application's intentions to read data. This
@@ -683,3 +682,27 @@ const (
// "too_many_pings".
GoAwayTooManyPings GoAwayReason = 2
)
+
+// channelzData is used to store channelz related data for http2Client and http2Server.
+// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
+// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
+// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
+type channelzData struct {
+ kpCount int64
+ // The number of streams that have started, including already finished ones.
+ streamsStarted int64
+ // Client side: The number of streams that have ended successfully by receiving
+ // EoS bit set frame from server.
+ // Server side: The number of streams that have ended successfully by sending
+ // frame with EoS bit set.
+ streamsSucceeded int64
+ streamsFailed int64
+ // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
+ // instead of time.Time since it's more costly to atomically update time.Time variable than int64
+ // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
+ lastStreamCreatedTime int64
+ msgSent int64
+ msgRecv int64
+ lastMsgSentTime int64
+ lastMsgRecvTime int64
+}