summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/grpclb_remote_balancer.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/grpclb_remote_balancer.go')
-rw-r--r--vendor/google.golang.org/grpc/grpclb_remote_balancer.go266
1 files changed, 0 insertions, 266 deletions
diff --git a/vendor/google.golang.org/grpc/grpclb_remote_balancer.go b/vendor/google.golang.org/grpc/grpclb_remote_balancer.go
deleted file mode 100644
index b8dd4f18c..000000000
--- a/vendor/google.golang.org/grpc/grpclb_remote_balancer.go
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package grpc
-
-import (
- "fmt"
- "net"
- "reflect"
- "time"
-
- "golang.org/x/net/context"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/channelz"
-
- "google.golang.org/grpc/connectivity"
- lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/resolver"
-)
-
-// processServerList updates balaner's internal state, create/remove SubConns
-// and regenerates picker using the received serverList.
-func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
- grpclog.Infof("lbBalancer: processing server list: %+v", l)
- lb.mu.Lock()
- defer lb.mu.Unlock()
-
- // Set serverListReceived to true so fallback will not take effect if it has
- // not hit timeout.
- lb.serverListReceived = true
-
- // If the new server list == old server list, do nothing.
- if reflect.DeepEqual(lb.fullServerList, l.Servers) {
- grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
- return
- }
- lb.fullServerList = l.Servers
-
- var backendAddrs []resolver.Address
- for _, s := range l.Servers {
- if s.DropForLoadBalancing || s.DropForRateLimiting {
- continue
- }
-
- md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken)
- ip := net.IP(s.IpAddress)
- ipStr := ip.String()
- if ip.To4() == nil {
- // Add square brackets to ipv6 addresses, otherwise net.Dial() and
- // net.SplitHostPort() will return too many colons error.
- ipStr = fmt.Sprintf("[%s]", ipStr)
- }
- addr := resolver.Address{
- Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
- Metadata: &md,
- }
-
- backendAddrs = append(backendAddrs, addr)
- }
-
- // Call refreshSubConns to create/remove SubConns.
- lb.refreshSubConns(backendAddrs)
- // Regenerate and update picker no matter if there's update on backends (if
- // any SubConn will be newed/removed). Because since the full serverList was
- // different, there might be updates in drops or pick weights(different
- // number of duplicates). We need to update picker with the fulllist.
- //
- // Now with cache, even if SubConn was newed/removed, there might be no
- // state changes.
- lb.regeneratePicker()
- lb.cc.UpdateBalancerState(lb.state, lb.picker)
-}
-
-// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
-// indicating whether the backendAddrs are different from the cached
-// backendAddrs (whether any SubConn was newed/removed).
-// Caller must hold lb.mu.
-func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
- lb.backendAddrs = nil
- var backendsUpdated bool
- // addrsSet is the set converted from backendAddrs, it's used to quick
- // lookup for an address.
- addrsSet := make(map[resolver.Address]struct{})
- // Create new SubConns.
- for _, addr := range backendAddrs {
- addrWithoutMD := addr
- addrWithoutMD.Metadata = nil
- addrsSet[addrWithoutMD] = struct{}{}
- lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD)
-
- if _, ok := lb.subConns[addrWithoutMD]; !ok {
- backendsUpdated = true
-
- // Use addrWithMD to create the SubConn.
- sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
- if err != nil {
- grpclog.Warningf("roundrobinBalancer: failed to create new SubConn: %v", err)
- continue
- }
- lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
- if _, ok := lb.scStates[sc]; !ok {
- // Only set state of new sc to IDLE. The state could already be
- // READY for cached SubConns.
- lb.scStates[sc] = connectivity.Idle
- }
- sc.Connect()
- }
- }
-
- for a, sc := range lb.subConns {
- // a was removed by resolver.
- if _, ok := addrsSet[a]; !ok {
- backendsUpdated = true
-
- lb.cc.RemoveSubConn(sc)
- delete(lb.subConns, a)
- // Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
- // The entry will be deleted in HandleSubConnStateChange.
- }
- }
-
- return backendsUpdated
-}
-
-func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
- for {
- reply, err := s.Recv()
- if err != nil {
- return fmt.Errorf("grpclb: failed to recv server list: %v", err)
- }
- if serverList := reply.GetServerList(); serverList != nil {
- lb.processServerList(serverList)
- }
- }
-}
-
-func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- case <-s.Context().Done():
- return
- }
- stats := lb.clientStats.toClientStats()
- t := time.Now()
- stats.Timestamp = &lbpb.Timestamp{
- Seconds: t.Unix(),
- Nanos: int32(t.Nanosecond()),
- }
- if err := s.Send(&lbpb.LoadBalanceRequest{
- LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
- ClientStats: stats,
- },
- }); err != nil {
- return
- }
- }
-}
-
-func (lb *lbBalancer) callRemoteBalancer() error {
- lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := lbClient.BalanceLoad(ctx, FailFast(false))
- if err != nil {
- return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
- }
-
- // grpclb handshake on the stream.
- initReq := &lbpb.LoadBalanceRequest{
- LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
- InitialRequest: &lbpb.InitialLoadBalanceRequest{
- Name: lb.target,
- },
- },
- }
- if err := stream.Send(initReq); err != nil {
- return fmt.Errorf("grpclb: failed to send init request: %v", err)
- }
- reply, err := stream.Recv()
- if err != nil {
- return fmt.Errorf("grpclb: failed to recv init response: %v", err)
- }
- initResp := reply.GetInitialResponse()
- if initResp == nil {
- return fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
- }
- if initResp.LoadBalancerDelegate != "" {
- return fmt.Errorf("grpclb: Delegation is not supported")
- }
-
- go func() {
- if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
- lb.sendLoadReport(stream, d)
- }
- }()
- return lb.readServerList(stream)
-}
-
-func (lb *lbBalancer) watchRemoteBalancer() {
- for {
- err := lb.callRemoteBalancer()
- select {
- case <-lb.doneCh:
- return
- default:
- if err != nil {
- grpclog.Error(err)
- }
- }
-
- }
-}
-
-func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
- var dopts []DialOption
- if creds := lb.opt.DialCreds; creds != nil {
- if err := creds.OverrideServerName(remoteLBName); err == nil {
- dopts = append(dopts, WithTransportCredentials(creds))
- } else {
- grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
- dopts = append(dopts, WithInsecure())
- }
- } else {
- dopts = append(dopts, WithInsecure())
- }
- if lb.opt.Dialer != nil {
- // WithDialer takes a different type of function, so we instead use a
- // special DialOption here.
- dopts = append(dopts, withContextDialer(lb.opt.Dialer))
- }
- // Explicitly set pickfirst as the balancer.
- dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
- dopts = append(dopts, withResolverBuilder(lb.manualResolver))
- if channelz.IsOn() {
- dopts = append(dopts, WithChannelzParentID(lb.opt.ChannelzParentID))
- }
-
- // DialContext using manualResolver.Scheme, which is a random scheme generated
- // when init grpclb. The target name is not important.
- cc, err := DialContext(context.Background(), "grpclb:///grpclb.server", dopts...)
- if err != nil {
- grpclog.Fatalf("failed to dial: %v", err)
- }
- lb.ccRemoteLB = cc
- go lb.watchRemoteBalancer()
-}