summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/picker_wrapper.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/picker_wrapper.go')
-rw-r--r--vendor/google.golang.org/grpc/picker_wrapper.go331
1 files changed, 331 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go
new file mode 100644
index 000000000..0a984e6c8
--- /dev/null
+++ b/vendor/google.golang.org/grpc/picker_wrapper.go
@@ -0,0 +1,331 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "io"
+ "sync"
+ "sync/atomic"
+
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/channelz"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/status"
+ "google.golang.org/grpc/transport"
+)
+
+// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
+// actions and unblock when there's a picker update.
+type pickerWrapper struct {
+ mu sync.Mutex
+ done bool
+ blockingCh chan struct{}
+ picker balancer.Picker
+
+ // The latest connection happened.
+ connErrMu sync.Mutex
+ connErr error
+
+ stickinessMDKey atomic.Value
+ stickiness *stickyStore
+}
+
+func newPickerWrapper() *pickerWrapper {
+ bp := &pickerWrapper{
+ blockingCh: make(chan struct{}),
+ stickiness: newStickyStore(),
+ }
+ return bp
+}
+
+func (bp *pickerWrapper) updateConnectionError(err error) {
+ bp.connErrMu.Lock()
+ bp.connErr = err
+ bp.connErrMu.Unlock()
+}
+
+func (bp *pickerWrapper) connectionError() error {
+ bp.connErrMu.Lock()
+ err := bp.connErr
+ bp.connErrMu.Unlock()
+ return err
+}
+
+func (bp *pickerWrapper) updateStickinessMDKey(newKey string) {
+ // No need to check ok because mdKey == "" if ok == false.
+ if oldKey, _ := bp.stickinessMDKey.Load().(string); oldKey != newKey {
+ bp.stickinessMDKey.Store(newKey)
+ bp.stickiness.reset(newKey)
+ }
+}
+
+func (bp *pickerWrapper) getStickinessMDKey() string {
+ // No need to check ok because mdKey == "" if ok == false.
+ mdKey, _ := bp.stickinessMDKey.Load().(string)
+ return mdKey
+}
+
+func (bp *pickerWrapper) clearStickinessState() {
+ if oldKey := bp.getStickinessMDKey(); oldKey != "" {
+ // There's no need to reset store if mdKey was "".
+ bp.stickiness.reset(oldKey)
+ }
+}
+
+// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
+func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
+ bp.mu.Lock()
+ if bp.done {
+ bp.mu.Unlock()
+ return
+ }
+ bp.picker = p
+ // bp.blockingCh should never be nil.
+ close(bp.blockingCh)
+ bp.blockingCh = make(chan struct{})
+ bp.mu.Unlock()
+}
+
+func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
+ acw.mu.Lock()
+ ac := acw.ac
+ acw.mu.Unlock()
+ ac.incrCallsStarted()
+ return func(b balancer.DoneInfo) {
+ if b.Err != nil && b.Err != io.EOF {
+ ac.incrCallsFailed()
+ } else {
+ ac.incrCallsSucceeded()
+ }
+ if done != nil {
+ done(b)
+ }
+ }
+}
+
+// pick returns the transport that will be used for the RPC.
+// It may block in the following cases:
+// - there's no picker
+// - the current picker returns ErrNoSubConnAvailable
+// - the current picker returns other errors and failfast is false.
+// - the subConn returned by the current picker is not READY
+// When one of these situations happens, pick blocks until the picker gets updated.
+func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
+
+ mdKey := bp.getStickinessMDKey()
+ stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey)
+
+ // Potential race here: if stickinessMDKey is updated after the above two
+ // lines, and this pick is a sticky pick, the following put could add an
+ // entry to sticky store with an outdated sticky key.
+ //
+ // The solution: keep the current md key in sticky store, and at the
+ // beginning of each get/put, check the mdkey against store.curMDKey.
+ // - Cons: one more string comparing for each get/put.
+ // - Pros: the string matching happens inside get/put, so the overhead for
+ // non-sticky RPCs will be minimal.
+
+ if isSticky {
+ if t, ok := bp.stickiness.get(mdKey, stickyKey); ok {
+ // Done function returned is always nil.
+ return t, nil, nil
+ }
+ }
+
+ var (
+ p balancer.Picker
+ ch chan struct{}
+ )
+
+ for {
+ bp.mu.Lock()
+ if bp.done {
+ bp.mu.Unlock()
+ return nil, nil, ErrClientConnClosing
+ }
+
+ if bp.picker == nil {
+ ch = bp.blockingCh
+ }
+ if ch == bp.blockingCh {
+ // This could happen when either:
+ // - bp.picker is nil (the previous if condition), or
+ // - has called pick on the current picker.
+ bp.mu.Unlock()
+ select {
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
+ case <-ch:
+ }
+ continue
+ }
+
+ ch = bp.blockingCh
+ p = bp.picker
+ bp.mu.Unlock()
+
+ subConn, done, err := p.Pick(ctx, opts)
+
+ if err != nil {
+ switch err {
+ case balancer.ErrNoSubConnAvailable:
+ continue
+ case balancer.ErrTransientFailure:
+ if !failfast {
+ continue
+ }
+ return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
+ default:
+ // err is some other error.
+ return nil, nil, toRPCErr(err)
+ }
+ }
+
+ acw, ok := subConn.(*acBalancerWrapper)
+ if !ok {
+ grpclog.Infof("subconn returned from pick is not *acBalancerWrapper")
+ continue
+ }
+ if t, ok := acw.getAddrConn().getReadyTransport(); ok {
+ if isSticky {
+ bp.stickiness.put(mdKey, stickyKey, acw)
+ }
+ if channelz.IsOn() {
+ return t, doneChannelzWrapper(acw, done), nil
+ }
+ return t, done, nil
+ }
+ grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
+ // If ok == false, ac.state is not READY.
+ // A valid picker always returns READY subConn. This means the state of ac
+ // just changed, and picker will be updated shortly.
+ // continue back to the beginning of the for loop to repick.
+ }
+}
+
+func (bp *pickerWrapper) close() {
+ bp.mu.Lock()
+ defer bp.mu.Unlock()
+ if bp.done {
+ return
+ }
+ bp.done = true
+ close(bp.blockingCh)
+}
+
+type stickyStoreEntry struct {
+ acw *acBalancerWrapper
+ addr resolver.Address
+}
+
+type stickyStore struct {
+ mu sync.Mutex
+ // curMDKey is check before every get/put to avoid races. The operation will
+ // abort immediately when the given mdKey is different from the curMDKey.
+ curMDKey string
+ store map[string]*stickyStoreEntry
+}
+
+func newStickyStore() *stickyStore {
+ return &stickyStore{
+ store: make(map[string]*stickyStoreEntry),
+ }
+}
+
+// reset clears the map in stickyStore, and set the currentMDKey to newMDKey.
+func (ss *stickyStore) reset(newMDKey string) {
+ ss.mu.Lock()
+ ss.curMDKey = newMDKey
+ ss.store = make(map[string]*stickyStoreEntry)
+ ss.mu.Unlock()
+}
+
+// stickyKey is the key to look up in store. mdKey will be checked against
+// curMDKey to avoid races.
+func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) {
+ ss.mu.Lock()
+ defer ss.mu.Unlock()
+ if mdKey != ss.curMDKey {
+ return
+ }
+ // TODO(stickiness): limit the total number of entries.
+ ss.store[stickyKey] = &stickyStoreEntry{
+ acw: acw,
+ addr: acw.getAddrConn().getCurAddr(),
+ }
+}
+
+// stickyKey is the key to look up in store. mdKey will be checked against
+// curMDKey to avoid races.
+func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) {
+ ss.mu.Lock()
+ defer ss.mu.Unlock()
+ if mdKey != ss.curMDKey {
+ return nil, false
+ }
+ entry, ok := ss.store[stickyKey]
+ if !ok {
+ return nil, false
+ }
+ ac := entry.acw.getAddrConn()
+ if ac.getCurAddr() != entry.addr {
+ delete(ss.store, stickyKey)
+ return nil, false
+ }
+ t, ok := ac.getReadyTransport()
+ if !ok {
+ delete(ss.store, stickyKey)
+ return nil, false
+ }
+ return t, true
+}
+
+// Get one value from metadata in ctx with key stickinessMDKey.
+//
+// It returns "", false if stickinessMDKey is an empty string.
+func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) {
+ if stickinessMDKey == "" {
+ return "", false
+ }
+
+ md, added, ok := metadata.FromOutgoingContextRaw(ctx)
+ if !ok {
+ return "", false
+ }
+
+ if vv, ok := md[stickinessMDKey]; ok {
+ if len(vv) > 0 {
+ return vv[0], true
+ }
+ }
+
+ for _, ss := range added {
+ for i := 0; i < len(ss)-1; i += 2 {
+ if ss[i] == stickinessMDKey {
+ return ss[i+1], true
+ }
+ }
+ }
+
+ return "", false
+}