summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/internal/transport/http2_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/http2_client.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go63
1 files changed, 20 insertions, 43 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 88d1c1612..904e790c4 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -111,19 +111,7 @@ type http2Client struct {
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
- czmu sync.RWMutex
- kpCount int64
- // The number of streams that have started, including already finished ones.
- streamsStarted int64
- // The number of streams that have ended successfully by receiving EoS bit set
- // frame from server.
- streamsSucceeded int64
- streamsFailed int64
- lastStreamCreated time.Time
- msgSent int64
- msgRecv int64
- lastMsgSent time.Time
- lastMsgRecv time.Time
+ czData *channelzData
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -234,6 +222,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
+ czData: new(channelzData),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
@@ -550,10 +539,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
t.activeStreams[id] = s
if channelz.IsOn() {
- t.czmu.Lock()
- t.streamsStarted++
- t.lastStreamCreated = time.Now()
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.streamsStarted, 1)
+ atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
var sendPing bool
// If the number of active streams change from 0 to 1, then check if keepalive
@@ -707,13 +694,11 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
}
t.mu.Unlock()
if channelz.IsOn() {
- t.czmu.Lock()
if eosReceived {
- t.streamsSucceeded++
+ atomic.AddInt64(&t.czData.streamsSucceeded, 1)
} else {
- t.streamsFailed++
+ atomic.AddInt64(&t.czData.streamsFailed, 1)
}
- t.czmu.Unlock()
}
},
rst: rst,
@@ -1263,9 +1248,7 @@ func (t *http2Client) keepalive() {
} else {
t.mu.Unlock()
if channelz.IsOn() {
- t.czmu.Lock()
- t.kpCount++
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.kpCount, 1)
}
// Send ping.
t.controlBuf.put(p)
@@ -1305,17 +1288,16 @@ func (t *http2Client) GoAway() <-chan struct{} {
}
func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
- t.czmu.RLock()
s := channelz.SocketInternalMetric{
- StreamsStarted: t.streamsStarted,
- StreamsSucceeded: t.streamsSucceeded,
- StreamsFailed: t.streamsFailed,
- MessagesSent: t.msgSent,
- MessagesReceived: t.msgRecv,
- KeepAlivesSent: t.kpCount,
- LastLocalStreamCreatedTimestamp: t.lastStreamCreated,
- LastMessageSentTimestamp: t.lastMsgSent,
- LastMessageReceivedTimestamp: t.lastMsgRecv,
+ StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
+ StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
+ StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
+ MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
+ MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
+ KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
+ LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
+ LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
+ LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
LocalAddr: t.localAddr,
@@ -1325,23 +1307,18 @@ func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
- t.czmu.RUnlock()
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
}
func (t *http2Client) IncrMsgSent() {
- t.czmu.Lock()
- t.msgSent++
- t.lastMsgSent = time.Now()
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.msgSent, 1)
+ atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
}
func (t *http2Client) IncrMsgRecv() {
- t.czmu.Lock()
- t.msgRecv++
- t.lastMsgRecv = time.Now()
- t.czmu.Unlock()
+ atomic.AddInt64(&t.czData.msgRecv, 1)
+ atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
}
func (t *http2Client) getOutFlowWindow() int64 {