summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/grpclb.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/grpclb.go')
-rw-r--r--vendor/google.golang.org/grpc/grpclb.go341
1 files changed, 0 insertions, 341 deletions
diff --git a/vendor/google.golang.org/grpc/grpclb.go b/vendor/google.golang.org/grpc/grpclb.go
deleted file mode 100644
index bc2b44525..000000000
--- a/vendor/google.golang.org/grpc/grpclb.go
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package grpc
-
-import (
- "strconv"
- "strings"
- "sync"
- "time"
-
- "golang.org/x/net/context"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/connectivity"
- lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/resolver"
-)
-
-const (
- lbTokeyKey = "lb-token"
- defaultFallbackTimeout = 10 * time.Second
- grpclbName = "grpclb"
-)
-
-func convertDuration(d *lbpb.Duration) time.Duration {
- if d == nil {
- return 0
- }
- return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
-}
-
-// Client API for LoadBalancer service.
-// Mostly copied from generated pb.go file.
-// To avoid circular dependency.
-type loadBalancerClient struct {
- cc *ClientConn
-}
-
-func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
- desc := &StreamDesc{
- StreamName: "BalanceLoad",
- ServerStreams: true,
- ClientStreams: true,
- }
- stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
- if err != nil {
- return nil, err
- }
- x := &balanceLoadClientStream{stream}
- return x, nil
-}
-
-type balanceLoadClientStream struct {
- ClientStream
-}
-
-func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
- return x.ClientStream.SendMsg(m)
-}
-
-func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
- m := new(lbpb.LoadBalanceResponse)
- if err := x.ClientStream.RecvMsg(m); err != nil {
- return nil, err
- }
- return m, nil
-}
-
-func init() {
- balancer.Register(newLBBuilder())
-}
-
-// newLBBuilder creates a builder for grpclb.
-func newLBBuilder() balancer.Builder {
- return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
-}
-
-// NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given
-// fallbackTimeout. If no response is received from the remote balancer within
-// fallbackTimeout, the backend addresses from the resolved address list will be
-// used.
-//
-// Only call this function when a non-default fallback timeout is needed.
-func NewLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
- return &lbBuilder{
- fallbackTimeout: fallbackTimeout,
- }
-}
-
-type lbBuilder struct {
- fallbackTimeout time.Duration
-}
-
-func (b *lbBuilder) Name() string {
- return grpclbName
-}
-
-func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
- // This generates a manual resolver builder with a random scheme. This
- // scheme will be used to dial to remote LB, so we can send filtered address
- // updates to remote LB ClientConn using this manual resolver.
- scheme := "grpclb_internal_" + strconv.FormatInt(time.Now().UnixNano(), 36)
- r := &lbManualResolver{scheme: scheme, ccb: cc}
-
- var target string
- targetSplitted := strings.Split(cc.Target(), ":///")
- if len(targetSplitted) < 2 {
- target = cc.Target()
- } else {
- target = targetSplitted[1]
- }
-
- lb := &lbBalancer{
- cc: newLBCacheClientConn(cc),
- target: target,
- opt: opt,
- fallbackTimeout: b.fallbackTimeout,
- doneCh: make(chan struct{}),
-
- manualResolver: r,
- csEvltr: &connectivityStateEvaluator{},
- subConns: make(map[resolver.Address]balancer.SubConn),
- scStates: make(map[balancer.SubConn]connectivity.State),
- picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
- clientStats: &rpcStats{},
- }
-
- return lb
-}
-
-type lbBalancer struct {
- cc *lbCacheClientConn
- target string
- opt balancer.BuildOptions
- fallbackTimeout time.Duration
- doneCh chan struct{}
-
- // manualResolver is used in the remote LB ClientConn inside grpclb. When
- // resolved address updates are received by grpclb, filtered updates will be
- // send to remote LB ClientConn through this resolver.
- manualResolver *lbManualResolver
- // The ClientConn to talk to the remote balancer.
- ccRemoteLB *ClientConn
-
- // Support client side load reporting. Each picker gets a reference to this,
- // and will update its content.
- clientStats *rpcStats
-
- mu sync.Mutex // guards everything following.
- // The full server list including drops, used to check if the newly received
- // serverList contains anything new. Each generate picker will also have
- // reference to this list to do the first layer pick.
- fullServerList []*lbpb.Server
- // All backends addresses, with metadata set to nil. This list contains all
- // backend addresses in the same order and with the same duplicates as in
- // serverlist. When generating picker, a SubConn slice with the same order
- // but with only READY SCs will be gerenated.
- backendAddrs []resolver.Address
- // Roundrobin functionalities.
- csEvltr *connectivityStateEvaluator
- state connectivity.State
- subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
- scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
- picker balancer.Picker
- // Support fallback to resolved backend addresses if there's no response
- // from remote balancer within fallbackTimeout.
- fallbackTimerExpired bool
- serverListReceived bool
- // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
- // when resolved address updates are received, and read in the goroutine
- // handling fallback.
- resolvedBackendAddrs []resolver.Address
-}
-
-// regeneratePicker takes a snapshot of the balancer, and generates a picker from
-// it. The picker
-// - always returns ErrTransientFailure if the balancer is in TransientFailure,
-// - does two layer roundrobin pick otherwise.
-// Caller must hold lb.mu.
-func (lb *lbBalancer) regeneratePicker() {
- if lb.state == connectivity.TransientFailure {
- lb.picker = &errPicker{err: balancer.ErrTransientFailure}
- return
- }
- var readySCs []balancer.SubConn
- for _, a := range lb.backendAddrs {
- if sc, ok := lb.subConns[a]; ok {
- if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
- readySCs = append(readySCs, sc)
- }
- }
- }
-
- if len(lb.fullServerList) <= 0 {
- if len(readySCs) <= 0 {
- lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
- return
- }
- lb.picker = &rrPicker{subConns: readySCs}
- return
- }
- lb.picker = &lbPicker{
- serverList: lb.fullServerList,
- subConns: readySCs,
- stats: lb.clientStats,
- }
-}
-
-func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
- grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
- lb.mu.Lock()
- defer lb.mu.Unlock()
-
- oldS, ok := lb.scStates[sc]
- if !ok {
- grpclog.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
- return
- }
- lb.scStates[sc] = s
- switch s {
- case connectivity.Idle:
- sc.Connect()
- case connectivity.Shutdown:
- // When an address was removed by resolver, b called RemoveSubConn but
- // kept the sc's state in scStates. Remove state for this sc here.
- delete(lb.scStates, sc)
- }
-
- oldAggrState := lb.state
- lb.state = lb.csEvltr.recordTransition(oldS, s)
-
- // Regenerate picker when one of the following happens:
- // - this sc became ready from not-ready
- // - this sc became not-ready from ready
- // - the aggregated state of balancer became TransientFailure from non-TransientFailure
- // - the aggregated state of balancer became non-TransientFailure from TransientFailure
- if (oldS == connectivity.Ready) != (s == connectivity.Ready) ||
- (lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
- lb.regeneratePicker()
- }
-
- lb.cc.UpdateBalancerState(lb.state, lb.picker)
-}
-
-// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
-// resolved backends (backends received from resolver, not from remote balancer)
-// if no connection to remote balancers was successful.
-func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
- timer := time.NewTimer(fallbackTimeout)
- defer timer.Stop()
- select {
- case <-timer.C:
- case <-lb.doneCh:
- return
- }
- lb.mu.Lock()
- if lb.serverListReceived {
- lb.mu.Unlock()
- return
- }
- lb.fallbackTimerExpired = true
- lb.refreshSubConns(lb.resolvedBackendAddrs)
- lb.mu.Unlock()
-}
-
-// HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB
-// clientConn. The remoteLB clientConn will handle creating/removing remoteLB
-// connections.
-func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
- grpclog.Infof("lbBalancer: handleResolvedResult: %+v", addrs)
- if len(addrs) <= 0 {
- return
- }
-
- var remoteBalancerAddrs, backendAddrs []resolver.Address
- for _, a := range addrs {
- if a.Type == resolver.GRPCLB {
- remoteBalancerAddrs = append(remoteBalancerAddrs, a)
- } else {
- backendAddrs = append(backendAddrs, a)
- }
- }
-
- if lb.ccRemoteLB == nil {
- if len(remoteBalancerAddrs) <= 0 {
- grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
- return
- }
- // First time receiving resolved addresses, create a cc to remote
- // balancers.
- lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName)
- // Start the fallback goroutine.
- go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
- }
-
- // cc to remote balancers uses lb.manualResolver. Send the updated remote
- // balancer addresses to it through manualResolver.
- lb.manualResolver.NewAddress(remoteBalancerAddrs)
-
- lb.mu.Lock()
- lb.resolvedBackendAddrs = backendAddrs
- // If serverListReceived is true, connection to remote balancer was
- // successful and there's no need to do fallback anymore.
- // If fallbackTimerExpired is false, fallback hasn't happened yet.
- if !lb.serverListReceived && lb.fallbackTimerExpired {
- // This means we received a new list of resolved backends, and we are
- // still in fallback mode. Need to update the list of backends we are
- // using to the new list of backends.
- lb.refreshSubConns(lb.resolvedBackendAddrs)
- }
- lb.mu.Unlock()
-}
-
-func (lb *lbBalancer) Close() {
- select {
- case <-lb.doneCh:
- return
- default:
- }
- close(lb.doneCh)
- if lb.ccRemoteLB != nil {
- lb.ccRemoteLB.Close()
- }
- lb.cc.close()
-}