From 1e5c432e1029601a664454388ae366ef69618d62 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Mon, 25 Jun 2018 12:33:13 -0700 Subject: MM-10702 Moving plugins to use hashicorp go-plugin. (#8978) * Moving plugins to use hashicorp go-plugin. * Tweaks from feedback. --- vendor/google.golang.org/grpc/picker_wrapper.go | 331 ++++++++++++++++++++++++ 1 file changed, 331 insertions(+) create mode 100644 vendor/google.golang.org/grpc/picker_wrapper.go (limited to 'vendor/google.golang.org/grpc/picker_wrapper.go') diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go new file mode 100644 index 000000000..0a984e6c8 --- /dev/null +++ b/vendor/google.golang.org/grpc/picker_wrapper.go @@ -0,0 +1,331 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "io" + "sync" + "sync/atomic" + + "golang.org/x/net/context" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/channelz" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" + "google.golang.org/grpc/transport" +) + +// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick +// actions and unblock when there's a picker update. +type pickerWrapper struct { + mu sync.Mutex + done bool + blockingCh chan struct{} + picker balancer.Picker + + // The latest connection happened. + connErrMu sync.Mutex + connErr error + + stickinessMDKey atomic.Value + stickiness *stickyStore +} + +func newPickerWrapper() *pickerWrapper { + bp := &pickerWrapper{ + blockingCh: make(chan struct{}), + stickiness: newStickyStore(), + } + return bp +} + +func (bp *pickerWrapper) updateConnectionError(err error) { + bp.connErrMu.Lock() + bp.connErr = err + bp.connErrMu.Unlock() +} + +func (bp *pickerWrapper) connectionError() error { + bp.connErrMu.Lock() + err := bp.connErr + bp.connErrMu.Unlock() + return err +} + +func (bp *pickerWrapper) updateStickinessMDKey(newKey string) { + // No need to check ok because mdKey == "" if ok == false. + if oldKey, _ := bp.stickinessMDKey.Load().(string); oldKey != newKey { + bp.stickinessMDKey.Store(newKey) + bp.stickiness.reset(newKey) + } +} + +func (bp *pickerWrapper) getStickinessMDKey() string { + // No need to check ok because mdKey == "" if ok == false. + mdKey, _ := bp.stickinessMDKey.Load().(string) + return mdKey +} + +func (bp *pickerWrapper) clearStickinessState() { + if oldKey := bp.getStickinessMDKey(); oldKey != "" { + // There's no need to reset store if mdKey was "". + bp.stickiness.reset(oldKey) + } +} + +// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. +func (bp *pickerWrapper) updatePicker(p balancer.Picker) { + bp.mu.Lock() + if bp.done { + bp.mu.Unlock() + return + } + bp.picker = p + // bp.blockingCh should never be nil. + close(bp.blockingCh) + bp.blockingCh = make(chan struct{}) + bp.mu.Unlock() +} + +func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) { + acw.mu.Lock() + ac := acw.ac + acw.mu.Unlock() + ac.incrCallsStarted() + return func(b balancer.DoneInfo) { + if b.Err != nil && b.Err != io.EOF { + ac.incrCallsFailed() + } else { + ac.incrCallsSucceeded() + } + if done != nil { + done(b) + } + } +} + +// pick returns the transport that will be used for the RPC. +// It may block in the following cases: +// - there's no picker +// - the current picker returns ErrNoSubConnAvailable +// - the current picker returns other errors and failfast is false. +// - the subConn returned by the current picker is not READY +// When one of these situations happens, pick blocks until the picker gets updated. +func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) { + + mdKey := bp.getStickinessMDKey() + stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey) + + // Potential race here: if stickinessMDKey is updated after the above two + // lines, and this pick is a sticky pick, the following put could add an + // entry to sticky store with an outdated sticky key. + // + // The solution: keep the current md key in sticky store, and at the + // beginning of each get/put, check the mdkey against store.curMDKey. + // - Cons: one more string comparing for each get/put. + // - Pros: the string matching happens inside get/put, so the overhead for + // non-sticky RPCs will be minimal. + + if isSticky { + if t, ok := bp.stickiness.get(mdKey, stickyKey); ok { + // Done function returned is always nil. + return t, nil, nil + } + } + + var ( + p balancer.Picker + ch chan struct{} + ) + + for { + bp.mu.Lock() + if bp.done { + bp.mu.Unlock() + return nil, nil, ErrClientConnClosing + } + + if bp.picker == nil { + ch = bp.blockingCh + } + if ch == bp.blockingCh { + // This could happen when either: + // - bp.picker is nil (the previous if condition), or + // - has called pick on the current picker. + bp.mu.Unlock() + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case <-ch: + } + continue + } + + ch = bp.blockingCh + p = bp.picker + bp.mu.Unlock() + + subConn, done, err := p.Pick(ctx, opts) + + if err != nil { + switch err { + case balancer.ErrNoSubConnAvailable: + continue + case balancer.ErrTransientFailure: + if !failfast { + continue + } + return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError()) + default: + // err is some other error. + return nil, nil, toRPCErr(err) + } + } + + acw, ok := subConn.(*acBalancerWrapper) + if !ok { + grpclog.Infof("subconn returned from pick is not *acBalancerWrapper") + continue + } + if t, ok := acw.getAddrConn().getReadyTransport(); ok { + if isSticky { + bp.stickiness.put(mdKey, stickyKey, acw) + } + if channelz.IsOn() { + return t, doneChannelzWrapper(acw, done), nil + } + return t, done, nil + } + grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick") + // If ok == false, ac.state is not READY. + // A valid picker always returns READY subConn. This means the state of ac + // just changed, and picker will be updated shortly. + // continue back to the beginning of the for loop to repick. + } +} + +func (bp *pickerWrapper) close() { + bp.mu.Lock() + defer bp.mu.Unlock() + if bp.done { + return + } + bp.done = true + close(bp.blockingCh) +} + +type stickyStoreEntry struct { + acw *acBalancerWrapper + addr resolver.Address +} + +type stickyStore struct { + mu sync.Mutex + // curMDKey is check before every get/put to avoid races. The operation will + // abort immediately when the given mdKey is different from the curMDKey. + curMDKey string + store map[string]*stickyStoreEntry +} + +func newStickyStore() *stickyStore { + return &stickyStore{ + store: make(map[string]*stickyStoreEntry), + } +} + +// reset clears the map in stickyStore, and set the currentMDKey to newMDKey. +func (ss *stickyStore) reset(newMDKey string) { + ss.mu.Lock() + ss.curMDKey = newMDKey + ss.store = make(map[string]*stickyStoreEntry) + ss.mu.Unlock() +} + +// stickyKey is the key to look up in store. mdKey will be checked against +// curMDKey to avoid races. +func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) { + ss.mu.Lock() + defer ss.mu.Unlock() + if mdKey != ss.curMDKey { + return + } + // TODO(stickiness): limit the total number of entries. + ss.store[stickyKey] = &stickyStoreEntry{ + acw: acw, + addr: acw.getAddrConn().getCurAddr(), + } +} + +// stickyKey is the key to look up in store. mdKey will be checked against +// curMDKey to avoid races. +func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) { + ss.mu.Lock() + defer ss.mu.Unlock() + if mdKey != ss.curMDKey { + return nil, false + } + entry, ok := ss.store[stickyKey] + if !ok { + return nil, false + } + ac := entry.acw.getAddrConn() + if ac.getCurAddr() != entry.addr { + delete(ss.store, stickyKey) + return nil, false + } + t, ok := ac.getReadyTransport() + if !ok { + delete(ss.store, stickyKey) + return nil, false + } + return t, true +} + +// Get one value from metadata in ctx with key stickinessMDKey. +// +// It returns "", false if stickinessMDKey is an empty string. +func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) { + if stickinessMDKey == "" { + return "", false + } + + md, added, ok := metadata.FromOutgoingContextRaw(ctx) + if !ok { + return "", false + } + + if vv, ok := md[stickinessMDKey]; ok { + if len(vv) > 0 { + return vv[0], true + } + } + + for _, ss := range added { + for i := 0; i < len(ss)-1; i += 2 { + if ss[i] == stickinessMDKey { + return ss[i+1], true + } + } + } + + return "", false +} -- cgit v1.2.3-1-g7c22