/* * * Copyright 2016 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package grpc import ( "strconv" "strings" "sync" "time" "golang.org/x/net/context" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" ) const ( lbTokeyKey = "lb-token" defaultFallbackTimeout = 10 * time.Second grpclbName = "grpclb" ) func convertDuration(d *lbpb.Duration) time.Duration { if d == nil { return 0 } return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond } // Client API for LoadBalancer service. // Mostly copied from generated pb.go file. // To avoid circular dependency. type loadBalancerClient struct { cc *ClientConn } func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) { desc := &StreamDesc{ StreamName: "BalanceLoad", ServerStreams: true, ClientStreams: true, } stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...) if err != nil { return nil, err } x := &balanceLoadClientStream{stream} return x, nil } type balanceLoadClientStream struct { ClientStream } func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error { return x.ClientStream.SendMsg(m) } func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { m := new(lbpb.LoadBalanceResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } func init() { balancer.Register(newLBBuilder()) } // newLBBuilder creates a builder for grpclb. func newLBBuilder() balancer.Builder { return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout) } // NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given // fallbackTimeout. If no response is received from the remote balancer within // fallbackTimeout, the backend addresses from the resolved address list will be // used. // // Only call this function when a non-default fallback timeout is needed. func NewLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder { return &lbBuilder{ fallbackTimeout: fallbackTimeout, } } type lbBuilder struct { fallbackTimeout time.Duration } func (b *lbBuilder) Name() string { return grpclbName } func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { // This generates a manual resolver builder with a random scheme. This // scheme will be used to dial to remote LB, so we can send filtered address // updates to remote LB ClientConn using this manual resolver. scheme := "grpclb_internal_" + strconv.FormatInt(time.Now().UnixNano(), 36) r := &lbManualResolver{scheme: scheme, ccb: cc} var target string targetSplitted := strings.Split(cc.Target(), ":///") if len(targetSplitted) < 2 { target = cc.Target() } else { target = targetSplitted[1] } lb := &lbBalancer{ cc: newLBCacheClientConn(cc), target: target, opt: opt, fallbackTimeout: b.fallbackTimeout, doneCh: make(chan struct{}), manualResolver: r, csEvltr: &connectivityStateEvaluator{}, subConns: make(map[resolver.Address]balancer.SubConn), scStates: make(map[balancer.SubConn]connectivity.State), picker: &errPicker{err: balancer.ErrNoSubConnAvailable}, clientStats: &rpcStats{}, } return lb } type lbBalancer struct { cc *lbCacheClientConn target string opt balancer.BuildOptions fallbackTimeout time.Duration doneCh chan struct{} // manualResolver is used in the remote LB ClientConn inside grpclb. When // resolved address updates are received by grpclb, filtered updates will be // send to remote LB ClientConn through this resolver. manualResolver *lbManualResolver // The ClientConn to talk to the remote balancer. ccRemoteLB *ClientConn // Support client side load reporting. Each picker gets a reference to this, // and will update its content. clientStats *rpcStats mu sync.Mutex // guards everything following. // The full server list including drops, used to check if the newly received // serverList contains anything new. Each generate picker will also have // reference to this list to do the first layer pick. fullServerList []*lbpb.Server // All backends addresses, with metadata set to nil. This list contains all // backend addresses in the same order and with the same duplicates as in // serverlist. When generating picker, a SubConn slice with the same order // but with only READY SCs will be gerenated. backendAddrs []resolver.Address // Roundrobin functionalities. csEvltr *connectivityStateEvaluator state connectivity.State subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns. picker balancer.Picker // Support fallback to resolved backend addresses if there's no response // from remote balancer within fallbackTimeout. fallbackTimerExpired bool serverListReceived bool // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set // when resolved address updates are received, and read in the goroutine // handling fallback. resolvedBackendAddrs []resolver.Address } // regeneratePicker takes a snapshot of the balancer, and generates a picker from // it. The picker // - always returns ErrTransientFailure if the balancer is in TransientFailure, // - does two layer roundrobin pick otherwise. // Caller must hold lb.mu. func (lb *lbBalancer) regeneratePicker() { if lb.state == connectivity.TransientFailure { lb.picker = &errPicker{err: balancer.ErrTransientFailure} return } var readySCs []balancer.SubConn for _, a := range lb.backendAddrs { if sc, ok := lb.subConns[a]; ok { if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready { readySCs = append(readySCs, sc) } } } if len(lb.fullServerList) <= 0 { if len(readySCs) <= 0 { lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} return } lb.picker = &rrPicker{subConns: readySCs} return } lb.picker = &lbPicker{ serverList: lb.fullServerList, subConns: readySCs, stats: lb.clientStats, } } func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s) lb.mu.Lock() defer lb.mu.Unlock() oldS, ok := lb.scStates[sc] if !ok { grpclog.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s) return } lb.scStates[sc] = s switch s { case connectivity.Idle: sc.Connect() case connectivity.Shutdown: // When an address was removed by resolver, b called RemoveSubConn but // kept the sc's state in scStates. Remove state for this sc here. delete(lb.scStates, sc) } oldAggrState := lb.state lb.state = lb.csEvltr.recordTransition(oldS, s) // Regenerate picker when one of the following happens: // - this sc became ready from not-ready // - this sc became not-ready from ready // - the aggregated state of balancer became TransientFailure from non-TransientFailure // - the aggregated state of balancer became non-TransientFailure from TransientFailure if (oldS == connectivity.Ready) != (s == connectivity.Ready) || (lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { lb.regeneratePicker() } lb.cc.UpdateBalancerState(lb.state, lb.picker) } // fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use // resolved backends (backends received from resolver, not from remote balancer) // if no connection to remote balancers was successful. func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) { timer := time.NewTimer(fallbackTimeout) defer timer.Stop() select { case <-timer.C: case <-lb.doneCh: return } lb.mu.Lock() if lb.serverListReceived { lb.mu.Unlock() return } lb.fallbackTimerExpired = true lb.refreshSubConns(lb.resolvedBackendAddrs) lb.mu.Unlock() } // HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB // clientConn. The remoteLB clientConn will handle creating/removing remoteLB // connections. func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { grpclog.Infof("lbBalancer: handleResolvedResult: %+v", addrs) if len(addrs) <= 0 { return } var remoteBalancerAddrs, backendAddrs []resolver.Address for _, a := range addrs { if a.Type == resolver.GRPCLB { remoteBalancerAddrs = append(remoteBalancerAddrs, a) } else { backendAddrs = append(backendAddrs, a) } } if lb.ccRemoteLB == nil { if len(remoteBalancerAddrs) <= 0 { grpclog.Errorf("grpclb: no remote balancer address is available, should never happen") return } // First time receiving resolved addresses, create a cc to remote // balancers. lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName) // Start the fallback goroutine. go lb.fallbackToBackendsAfter(lb.fallbackTimeout) } // cc to remote balancers uses lb.manualResolver. Send the updated remote // balancer addresses to it through manualResolver. lb.manualResolver.NewAddress(remoteBalancerAddrs) lb.mu.Lock() lb.resolvedBackendAddrs = backendAddrs // If serverListReceived is true, connection to remote balancer was // successful and there's no need to do fallback anymore. // If fallbackTimerExpired is false, fallback hasn't happened yet. if !lb.serverListReceived && lb.fallbackTimerExpired { // This means we received a new list of resolved backends, and we are // still in fallback mode. Need to update the list of backends we are // using to the new list of backends. lb.refreshSubConns(lb.resolvedBackendAddrs) } lb.mu.Unlock() } func (lb *lbBalancer) Close() { select { case <-lb.doneCh: return default: } close(lb.doneCh) if lb.ccRemoteLB != nil { lb.ccRemoteLB.Close() } lb.cc.close() }