summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go157
1 files changed, 80 insertions, 77 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 9b035e8f5..318ac4073 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -65,8 +65,6 @@ var (
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
- // errConnUnavailable indicates that the connection is unavailable.
- errConnUnavailable = errors.New("grpc: the connection is unavailable")
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors.New("grpc: balancer is closed")
// We use an accessor so that minConnectTimeout can be
@@ -89,8 +87,6 @@ var (
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
- // errNetworkIO indicates that the connection is down due to some network I/O error.
- errNetworkIO = errors.New("grpc: failed with network I/O error")
)
const (
@@ -101,12 +97,6 @@ const (
defaultReadBufSize = 32 * 1024
)
-// RegisterChannelz turns on channelz service.
-// This is an EXPERIMENTAL API.
-func RegisterChannelz() {
- channelz.TurnOn()
-}
-
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
@@ -135,6 +125,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
+ czData: new(channelzData),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.ctx, cc.cancel = context.WithCancel(context.Background())
@@ -145,9 +136,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
- cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target)
+ cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
} else {
- cc.channelzID = channelz.RegisterChannel(cc, 0, target)
+ cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
}
}
@@ -293,6 +284,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
s := cc.GetState()
if s == connectivity.Ready {
break
+ } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
+ if err = cc.blockingpicker.connectionError(); err != nil {
+ terr, ok := err.(interface{ Temporary() bool })
+ if ok && !terr.Temporary() {
+ return nil, err
+ }
+ }
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
@@ -374,12 +372,8 @@ type ClientConn struct {
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value
- channelzID int64 // channelz unique identification number
- czmu sync.RWMutex
- callsStarted int64
- callsSucceeded int64
- callsFailed int64
- lastCallStartedTime time.Time
+ channelzID int64 // channelz unique identification number
+ czData *channelzData
}
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
@@ -532,9 +526,11 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
ac := &addrConn{
- cc: cc,
- addrs: addrs,
- dopts: cc.dopts,
+ cc: cc,
+ addrs: addrs,
+ dopts: cc.dopts,
+ czData: new(channelzData),
+ resetBackoff: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
@@ -564,19 +560,14 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
ac.tearDown(err)
}
-// ChannelzMetric returns ChannelInternalMetric of current ClientConn.
-// This is an EXPERIMENTAL API.
-func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric {
- state := cc.GetState()
- cc.czmu.RLock()
- defer cc.czmu.RUnlock()
+func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
return &channelz.ChannelInternalMetric{
- State: state,
+ State: cc.GetState(),
Target: cc.target,
- CallsStarted: cc.callsStarted,
- CallsSucceeded: cc.callsSucceeded,
- CallsFailed: cc.callsFailed,
- LastCallStartedTimestamp: cc.lastCallStartedTime,
+ CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
+ CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
+ CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
+ LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
}
}
@@ -587,23 +578,16 @@ func (cc *ClientConn) Target() string {
}
func (cc *ClientConn) incrCallsStarted() {
- cc.czmu.Lock()
- cc.callsStarted++
- // TODO(yuxuanli): will make this a time.Time pointer improve performance?
- cc.lastCallStartedTime = time.Now()
- cc.czmu.Unlock()
+ atomic.AddInt64(&cc.czData.callsStarted, 1)
+ atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
}
func (cc *ClientConn) incrCallsSucceeded() {
- cc.czmu.Lock()
- cc.callsSucceeded++
- cc.czmu.Unlock()
+ atomic.AddInt64(&cc.czData.callsSucceeded, 1)
}
func (cc *ClientConn) incrCallsFailed() {
- cc.czmu.Lock()
- cc.callsFailed++
- cc.czmu.Unlock()
+ atomic.AddInt64(&cc.czData.callsFailed, 1)
}
// connect starts to creating transport and also starts the transport monitor
@@ -754,6 +738,24 @@ func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
go r.resolveNow(o)
}
+// ResetConnectBackoff wakes up all subchannels in transient failure and causes
+// them to attempt another connection immediately. It also resets the backoff
+// times used for subsequent attempts regardless of the current state.
+//
+// In general, this function should not be used. Typical service or network
+// outages result in a reasonable client reconnection strategy by default.
+// However, if a previously unavailable network becomes available, this may be
+// used to trigger an immediate reconnect.
+//
+// This API is EXPERIMENTAL.
+func (cc *ClientConn) ResetConnectBackoff() {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ for ac := range cc.conns {
+ ac.resetConnectBackoff()
+ }
+}
+
// Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error {
defer cc.cancel()
@@ -822,12 +824,10 @@ type addrConn struct {
// negotiations must complete.
connectDeadline time.Time
- channelzID int64 // channelz unique identification number
- czmu sync.RWMutex
- callsStarted int64
- callsSucceeded int64
- callsFailed int64
- lastCallStartedTime time.Time
+ resetBackoff chan struct{}
+
+ channelzID int64 // channelz unique identification number
+ czData *channelzData
}
// adjustParams updates parameters used to create transports upon
@@ -852,14 +852,6 @@ func (ac *addrConn) printf(format string, a ...interface{}) {
}
}
-// errorf records an error in ac's event log, unless ac has been closed.
-// REQUIRES ac.mu is held.
-func (ac *addrConn) errorf(format string, a ...interface{}) {
- if ac.events != nil {
- ac.events.Errorf(format, a...)
- }
-}
-
// resetTransport recreates a transport to the address for ac. The old
// transport will close itself on error or when the clientconn is closed.
// The created transport must receive initial settings frame from the server.
@@ -890,6 +882,7 @@ func (ac *addrConn) resetTransport() error {
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
var backoffDeadline, connectDeadline time.Time
+ var resetBackoff chan struct{}
for connectRetryNum := 0; ; connectRetryNum++ {
ac.mu.Lock()
if ac.backoffDeadline.IsZero() {
@@ -897,6 +890,7 @@ func (ac *addrConn) resetTransport() error {
// or this is the first time this addrConn is trying to establish a
// connection.
backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration.
+ resetBackoff = ac.resetBackoff
// This will be the duration that dial gets to finish.
dialDuration := getMinConnectTimeout()
if backoffFor > dialDuration {
@@ -930,7 +924,7 @@ func (ac *addrConn) resetTransport() error {
copy(addrsIter, ac.addrs)
copts := ac.dopts.copts
ac.mu.Unlock()
- connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
+ connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts, resetBackoff)
if err != nil {
return err
}
@@ -942,7 +936,7 @@ func (ac *addrConn) resetTransport() error {
// createTransport creates a connection to one of the backends in addrs.
// It returns true if a connection was established.
-func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
+func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions, resetBackoff chan struct{}) (bool, error) {
for i := ridx; i < len(addrs); i++ {
addr := addrs[i]
target := transport.TargetInfo{
@@ -1042,6 +1036,8 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
select {
case <-timer.C:
+ case <-resetBackoff:
+ timer.Stop()
case <-ac.ctx.Done():
timer.Stop()
return false, ac.ctx.Err()
@@ -1049,6 +1045,14 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
return false, nil
}
+func (ac *addrConn) resetConnectBackoff() {
+ ac.mu.Lock()
+ close(ac.resetBackoff)
+ ac.resetBackoff = make(chan struct{})
+ ac.connectRetryNum = 0
+ ac.mu.Unlock()
+}
+
// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (ac *addrConn) transportMonitor() {
@@ -1197,36 +1201,27 @@ func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
ac.mu.Lock()
addr := ac.curAddr.Addr
ac.mu.Unlock()
- state := ac.getState()
- ac.czmu.RLock()
- defer ac.czmu.RUnlock()
return &channelz.ChannelInternalMetric{
- State: state,
+ State: ac.getState(),
Target: addr,
- CallsStarted: ac.callsStarted,
- CallsSucceeded: ac.callsSucceeded,
- CallsFailed: ac.callsFailed,
- LastCallStartedTimestamp: ac.lastCallStartedTime,
+ CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
+ CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
+ CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
+ LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
}
}
func (ac *addrConn) incrCallsStarted() {
- ac.czmu.Lock()
- ac.callsStarted++
- ac.lastCallStartedTime = time.Now()
- ac.czmu.Unlock()
+ atomic.AddInt64(&ac.czData.callsStarted, 1)
+ atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
}
func (ac *addrConn) incrCallsSucceeded() {
- ac.czmu.Lock()
- ac.callsSucceeded++
- ac.czmu.Unlock()
+ atomic.AddInt64(&ac.czData.callsSucceeded, 1)
}
func (ac *addrConn) incrCallsFailed() {
- ac.czmu.Lock()
- ac.callsFailed++
- ac.czmu.Unlock()
+ atomic.AddInt64(&ac.czData.callsFailed, 1)
}
type retryThrottler struct {
@@ -1266,6 +1261,14 @@ func (rt *retryThrottler) successfulRPC() {
}
}
+type channelzChannel struct {
+ cc *ClientConn
+}
+
+func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
+ return c.cc.channelzMetric()
+}
+
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//