summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/cluster.go
diff options
context:
space:
mode:
authorChristopher Speller <crspeller@gmail.com>2018-04-16 05:37:14 -0700
committerJoram Wilander <jwawilander@gmail.com>2018-04-16 08:37:14 -0400
commit6e2cb00008cbf09e556b00f87603797fcaa47e09 (patch)
tree3c0eb55ff4226a3f024aad373140d1fb860a6404 /vendor/github.com/go-redis/redis/cluster.go
parentbf24f51c4e1cc6286885460672f7f449e8c6f5ef (diff)
downloadchat-6e2cb00008cbf09e556b00f87603797fcaa47e09.tar.gz
chat-6e2cb00008cbf09e556b00f87603797fcaa47e09.tar.bz2
chat-6e2cb00008cbf09e556b00f87603797fcaa47e09.zip
Depenancy upgrades and movign to dep. (#8630)
Diffstat (limited to 'vendor/github.com/go-redis/redis/cluster.go')
-rw-r--r--vendor/github.com/go-redis/redis/cluster.go547
1 files changed, 337 insertions, 210 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go
index a2c18b387..4a2951157 100644
--- a/vendor/github.com/go-redis/redis/cluster.go
+++ b/vendor/github.com/go-redis/redis/cluster.go
@@ -1,7 +1,10 @@
package redis
import (
+ "context"
+ "errors"
"fmt"
+ "math"
"math/rand"
"net"
"sync"
@@ -12,10 +15,10 @@ import (
"github.com/go-redis/redis/internal/hashtag"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
+ "github.com/go-redis/redis/internal/singleflight"
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
-var errNilClusterState = fmt.Errorf("redis: cannot load cluster slots")
// ClusterOptions are used to configure a cluster client and should be
// passed to NewClusterClient.
@@ -25,13 +28,15 @@ type ClusterOptions struct {
// The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects.
- // Default is 16.
+ // Default is 8.
MaxRedirects int
// Enables read-only commands on slave nodes.
ReadOnly bool
// Allows routing read-only commands to the closest master or slave node.
RouteByLatency bool
+ // Allows routing read-only commands to the random master or slave node.
+ RouteRandomly bool
// Following options are copied from Options struct.
@@ -57,7 +62,7 @@ func (opt *ClusterOptions) init() {
if opt.MaxRedirects == -1 {
opt.MaxRedirects = 0
} else if opt.MaxRedirects == 0 {
- opt.MaxRedirects = 16
+ opt.MaxRedirects = 8
}
if opt.RouteByLatency {
@@ -118,11 +123,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
//------------------------------------------------------------------------------
type clusterNode struct {
- Client *Client
- Latency time.Duration
+ Client *Client
- loading time.Time
- generation uint32
+ latency uint32 // atomic
+ generation uint32 // atomic
+ loading uint32 // atomic
}
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@@ -132,36 +137,69 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
Client: NewClient(opt),
}
+ node.latency = math.MaxUint32
if clOpt.RouteByLatency {
- node.updateLatency()
+ go node.updateLatency()
}
return &node
}
+func (n *clusterNode) Close() error {
+ return n.Client.Close()
+}
+
+func (n *clusterNode) Test() error {
+ return n.Client.ClusterInfo().Err()
+}
+
func (n *clusterNode) updateLatency() {
const probes = 10
+
+ var latency uint32
for i := 0; i < probes; i++ {
start := time.Now()
n.Client.Ping()
- n.Latency += time.Since(start)
+ probe := uint32(time.Since(start) / time.Microsecond)
+ latency = (latency + probe) / 2
}
- n.Latency = n.Latency / probes
+ atomic.StoreUint32(&n.latency, latency)
+}
+
+func (n *clusterNode) Latency() time.Duration {
+ latency := atomic.LoadUint32(&n.latency)
+ return time.Duration(latency) * time.Microsecond
+}
+
+func (n *clusterNode) MarkAsLoading() {
+ atomic.StoreUint32(&n.loading, uint32(time.Now().Unix()))
}
func (n *clusterNode) Loading() bool {
- return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
+ const minute = int64(time.Minute / time.Second)
+
+ loading := atomic.LoadUint32(&n.loading)
+ if loading == 0 {
+ return false
+ }
+ if time.Now().Unix()-int64(loading) < minute {
+ return true
+ }
+ atomic.StoreUint32(&n.loading, 0)
+ return false
}
func (n *clusterNode) Generation() uint32 {
- return n.generation
+ return atomic.LoadUint32(&n.generation)
}
func (n *clusterNode) SetGeneration(gen uint32) {
- if gen < n.generation {
- panic("gen < n.generation")
+ for {
+ v := atomic.LoadUint32(&n.generation)
+ if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
+ break
+ }
}
- n.generation = gen
}
//------------------------------------------------------------------------------
@@ -169,18 +207,23 @@ func (n *clusterNode) SetGeneration(gen uint32) {
type clusterNodes struct {
opt *ClusterOptions
- mu sync.RWMutex
- addrs []string
- nodes map[string]*clusterNode
- closed bool
+ mu sync.RWMutex
+ allAddrs []string
+ allNodes map[string]*clusterNode
+ clusterAddrs []string
+ closed bool
+
+ nodeCreateGroup singleflight.Group
generation uint32
}
func newClusterNodes(opt *ClusterOptions) *clusterNodes {
return &clusterNodes{
- opt: opt,
- nodes: make(map[string]*clusterNode),
+ opt: opt,
+
+ allAddrs: opt.Addrs,
+ allNodes: make(map[string]*clusterNode),
}
}
@@ -194,21 +237,29 @@ func (c *clusterNodes) Close() error {
c.closed = true
var firstErr error
- for _, node := range c.nodes {
+ for _, node := range c.allNodes {
if err := node.Client.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
- c.addrs = nil
- c.nodes = nil
+
+ c.allNodes = nil
+ c.clusterAddrs = nil
return firstErr
}
func (c *clusterNodes) Addrs() ([]string, error) {
+ var addrs []string
c.mu.RLock()
closed := c.closed
- addrs := c.addrs
+ if !closed {
+ if len(c.clusterAddrs) > 0 {
+ addrs = c.clusterAddrs
+ } else {
+ addrs = c.allAddrs
+ }
+ }
c.mu.RUnlock()
if closed {
@@ -229,55 +280,45 @@ func (c *clusterNodes) NextGeneration() uint32 {
func (c *clusterNodes) GC(generation uint32) {
var collected []*clusterNode
c.mu.Lock()
- for i := 0; i < len(c.addrs); {
- addr := c.addrs[i]
- node := c.nodes[addr]
+ for addr, node := range c.allNodes {
if node.Generation() >= generation {
- i++
continue
}
- c.addrs = append(c.addrs[:i], c.addrs[i+1:]...)
- delete(c.nodes, addr)
+ c.clusterAddrs = remove(c.clusterAddrs, addr)
+ delete(c.allNodes, addr)
collected = append(collected, node)
}
c.mu.Unlock()
- time.AfterFunc(time.Minute, func() {
- for _, node := range collected {
- _ = node.Client.Close()
- }
- })
-}
-
-func (c *clusterNodes) All() ([]*clusterNode, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
-
- if c.closed {
- return nil, pool.ErrClosed
+ for _, node := range collected {
+ _ = node.Client.Close()
}
-
- nodes := make([]*clusterNode, 0, len(c.nodes))
- for _, node := range c.nodes {
- nodes = append(nodes, node)
- }
- return nodes, nil
}
func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
var node *clusterNode
- var ok bool
+ var err error
c.mu.RLock()
- if !c.closed {
- node, ok = c.nodes[addr]
+ if c.closed {
+ err = pool.ErrClosed
+ } else {
+ node = c.allNodes[addr]
}
c.mu.RUnlock()
- if ok {
+ if err != nil {
+ return nil, err
+ }
+ if node != nil {
return node, nil
}
+ v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
+ node := newClusterNode(c.opt, addr)
+ return node, node.Test()
+ })
+
c.mu.Lock()
defer c.mu.Unlock()
@@ -285,15 +326,35 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return nil, pool.ErrClosed
}
- node, ok = c.nodes[addr]
+ node, ok := c.allNodes[addr]
if ok {
- return node, nil
+ _ = v.(*clusterNode).Close()
+ return node, err
}
+ node = v.(*clusterNode)
- c.addrs = append(c.addrs, addr)
- node = newClusterNode(c.opt, addr)
- c.nodes[addr] = node
- return node, nil
+ c.allAddrs = appendIfNotExists(c.allAddrs, addr)
+ if err == nil {
+ c.clusterAddrs = append(c.clusterAddrs, addr)
+ }
+ c.allNodes[addr] = node
+
+ return node, err
+}
+
+func (c *clusterNodes) All() ([]*clusterNode, error) {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ if c.closed {
+ return nil, pool.ErrClosed
+ }
+
+ cp := make([]*clusterNode, 0, len(c.allNodes))
+ for _, node := range c.allNodes {
+ cp = append(cp, node)
+ }
+ return cp, nil
}
func (c *clusterNodes) Random() (*clusterNode, error) {
@@ -302,20 +363,8 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
return nil, err
}
- var nodeErr error
- for i := 0; i <= c.opt.MaxRedirects; i++ {
- n := rand.Intn(len(addrs))
- node, err := c.GetOrCreate(addrs[n])
- if err != nil {
- return nil, err
- }
-
- nodeErr = node.Client.ClusterInfo().Err()
- if nodeErr == nil {
- return node, nil
- }
- }
- return nil, nodeErr
+ n := rand.Intn(len(addrs))
+ return c.GetOrCreate(addrs[n])
}
//------------------------------------------------------------------------------
@@ -367,6 +416,10 @@ func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*
}
}
+ time.AfterFunc(time.Minute, func() {
+ nodes.GC(c.generation)
+ })
+
return &c, nil
}
@@ -416,13 +469,19 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
if n.Loading() {
continue
}
- if node == nil || node.Latency-n.Latency > threshold {
+ if node == nil || node.Latency()-n.Latency() > threshold {
node = n
}
}
return node, nil
}
+func (c *clusterState) slotRandomNode(slot int) *clusterNode {
+ nodes := c.slotNodes(slot)
+ n := rand.Intn(len(nodes))
+ return nodes[n]
+}
+
func (c *clusterState) slotNodes(slot int) []*clusterNode {
if slot >= 0 && slot < len(c.slots) {
return c.slots[slot]
@@ -432,25 +491,83 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
//------------------------------------------------------------------------------
+type clusterStateHolder struct {
+ load func() (*clusterState, error)
+
+ state atomic.Value
+
+ lastErrMu sync.RWMutex
+ lastErr error
+
+ reloading uint32 // atomic
+}
+
+func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder {
+ return &clusterStateHolder{
+ load: fn,
+ }
+}
+
+func (c *clusterStateHolder) Load() (*clusterState, error) {
+ state, err := c.load()
+ if err != nil {
+ c.lastErrMu.Lock()
+ c.lastErr = err
+ c.lastErrMu.Unlock()
+ return nil, err
+ }
+ c.state.Store(state)
+ return state, nil
+}
+
+func (c *clusterStateHolder) LazyReload() {
+ if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
+ return
+ }
+ go func() {
+ defer atomic.StoreUint32(&c.reloading, 0)
+
+ _, err := c.Load()
+ if err == nil {
+ time.Sleep(time.Second)
+ }
+ }()
+}
+
+func (c *clusterStateHolder) Get() (*clusterState, error) {
+ v := c.state.Load()
+ if v != nil {
+ return v.(*clusterState), nil
+ }
+
+ c.lastErrMu.RLock()
+ err := c.lastErr
+ c.lastErrMu.RUnlock()
+ if err != nil {
+ return nil, err
+ }
+
+ return nil, errors.New("redis: cluster has no state")
+}
+
+//------------------------------------------------------------------------------
+
// ClusterClient is a Redis Cluster client representing a pool of zero
// or more underlying connections. It's safe for concurrent use by
// multiple goroutines.
type ClusterClient struct {
cmdable
- opt *ClusterOptions
- nodes *clusterNodes
- _state atomic.Value
+ ctx context.Context
- cmdsInfoOnce internal.Once
- cmdsInfo map[string]*CommandInfo
+ opt *ClusterOptions
+ nodes *clusterNodes
+ state *clusterStateHolder
+ cmdsInfoCache *cmdsInfoCache
process func(Cmder) error
processPipeline func([]Cmder) error
processTxPipeline func([]Cmder) error
-
- // Reports whether slots reloading is in progress.
- reloading uint32
}
// NewClusterClient returns a Redis Cluster client as described in
@@ -459,9 +576,11 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt.init()
c := &ClusterClient{
- opt: opt,
- nodes: newClusterNodes(opt),
+ opt: opt,
+ nodes: newClusterNodes(opt),
+ cmdsInfoCache: newCmdsInfoCache(),
}
+ c.state = newClusterStateHolder(c.loadState)
c.process = c.defaultProcess
c.processPipeline = c.defaultProcessPipeline
@@ -469,25 +588,33 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.cmdable.setProcessor(c.Process)
- // Add initial nodes.
- for _, addr := range opt.Addrs {
- _, _ = c.nodes.GetOrCreate(addr)
+ _, _ = c.state.Load()
+ if opt.IdleCheckFrequency > 0 {
+ go c.reaper(opt.IdleCheckFrequency)
}
- // Preload cluster slots.
- for i := 0; i < 10; i++ {
- state, err := c.reloadState()
- if err == nil {
- c._state.Store(state)
- break
- }
+ return c
+}
+
+func (c *ClusterClient) Context() context.Context {
+ if c.ctx != nil {
+ return c.ctx
}
+ return context.Background()
+}
- if opt.IdleCheckFrequency > 0 {
- go c.reaper(opt.IdleCheckFrequency)
+func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
+ if ctx == nil {
+ panic("nil context")
}
+ c2 := c.copy()
+ c2.ctx = ctx
+ return c2
+}
- return c
+func (c *ClusterClient) copy() *ClusterClient {
+ cp := *c
+ return &cp
}
// Options returns read-only Options that were used to create the client.
@@ -499,40 +626,18 @@ func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
-func (c *ClusterClient) state() (*clusterState, error) {
- v := c._state.Load()
- if v != nil {
- return v.(*clusterState), nil
- }
-
- _, err := c.nodes.Addrs()
- if err != nil {
- return nil, err
- }
-
- c.lazyReloadState()
- return nil, errNilClusterState
-}
-
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
- err := c.cmdsInfoOnce.Do(func() error {
+ cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) {
node, err := c.nodes.Random()
if err != nil {
- return err
- }
-
- cmdsInfo, err := node.Client.Command().Result()
- if err != nil {
- return err
+ return nil, err
}
-
- c.cmdsInfo = cmdsInfo
- return nil
+ return node.Client.Command().Result()
})
if err != nil {
return nil
}
- info := c.cmdsInfo[name]
+ info := cmdsInfo[name]
if info == nil {
internal.Logf("info for cmd=%s not found", name)
}
@@ -552,7 +657,12 @@ func (c *ClusterClient) cmdSlot(cmd Cmder) int {
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
}
-func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
+func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
+ state, err := c.state.Get()
+ if err != nil {
+ return 0, nil, err
+ }
+
cmdInfo := c.cmdInfo(cmd.Name())
slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
@@ -562,6 +672,11 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl
return slot, node, err
}
+ if c.opt.RouteRandomly {
+ node := state.slotRandomNode(slot)
+ return slot, node, nil
+ }
+
node, err := state.slotSlaveNode(slot)
return slot, node, err
}
@@ -570,16 +685,24 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl
return slot, node, err
}
+func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
+ state, err := c.state.Get()
+ if err != nil {
+ return nil, err
+ }
+
+ nodes := state.slotNodes(slot)
+ if len(nodes) > 0 {
+ return nodes[0], nil
+ }
+ return c.nodes.Random()
+}
+
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if len(keys) == 0 {
return fmt.Errorf("redis: keys don't hash to the same slot")
}
- state, err := c.state()
- if err != nil {
- return err
- }
-
slot := hashtag.Slot(keys[0])
for _, key := range keys[1:] {
if hashtag.Slot(key) != slot {
@@ -587,7 +710,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
}
}
- node, err := state.slotMasterNode(slot)
+ node, err := c.slotMasterNode(slot)
if err != nil {
return err
}
@@ -608,7 +731,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
moved, ask, addr := internal.IsMovedError(err)
if moved || ask {
- c.lazyReloadState()
+ c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
@@ -617,10 +740,11 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
}
if err == pool.ErrClosed {
- node, err = state.slotMasterNode(slot)
+ node, err = c.slotMasterNode(slot)
if err != nil {
return err
}
+ continue
}
return err
@@ -644,31 +768,27 @@ func (c *ClusterClient) WrapProcess(
}
func (c *ClusterClient) Process(cmd Cmder) error {
- if c.process != nil {
- return c.process(cmd)
- }
- return c.defaultProcess(cmd)
+ return c.process(cmd)
}
func (c *ClusterClient) defaultProcess(cmd Cmder) error {
- state, err := c.state()
- if err != nil {
- cmd.setErr(err)
- return err
- }
-
- _, node, err := c.cmdSlotAndNode(state, cmd)
- if err != nil {
- cmd.setErr(err)
- return err
- }
-
+ var node *clusterNode
var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
+ if node == nil {
+ var err error
+ _, node, err = c.cmdSlotAndNode(cmd)
+ if err != nil {
+ cmd.setErr(err)
+ break
+ }
+ }
+
+ var err error
if ask {
pipe := node.Client.Pipeline()
_ = pipe.Process(NewCmd("ASKING"))
@@ -687,15 +807,13 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
// If slave is loading - read from master.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
- // TODO: race
- node.loading = time.Now()
+ node.MarkAsLoading()
continue
}
if internal.IsRetryableError(err, true) {
- var nodeErr error
- node, nodeErr = c.nodes.Random()
- if nodeErr != nil {
+ node, err = c.nodes.Random()
+ if err != nil {
break
}
continue
@@ -705,22 +823,18 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
- c.lazyReloadState()
+ c.state.LazyReload()
- var nodeErr error
- node, nodeErr = c.nodes.GetOrCreate(addr)
- if nodeErr != nil {
+ node, err = c.nodes.GetOrCreate(addr)
+ if err != nil {
break
}
continue
}
if err == pool.ErrClosed {
- _, node, err = c.cmdSlotAndNode(state, cmd)
- if err != nil {
- cmd.setErr(err)
- return err
- }
+ node = nil
+ continue
}
break
@@ -732,7 +846,7 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
- state, err := c.state()
+ state, err := c.state.Get()
if err != nil {
return err
}
@@ -765,7 +879,7 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
- state, err := c.state()
+ state, err := c.state.Get()
if err != nil {
return err
}
@@ -798,7 +912,7 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
- state, err := c.state()
+ state, err := c.state.Get()
if err != nil {
return err
}
@@ -838,7 +952,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats
- state, _ := c.state()
+ state, _ := c.state.Get()
if state == nil {
return &acc
}
@@ -868,46 +982,34 @@ func (c *ClusterClient) PoolStats() *PoolStats {
return &acc
}
-func (c *ClusterClient) lazyReloadState() {
- if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
- return
+func (c *ClusterClient) loadState() (*clusterState, error) {
+ addrs, err := c.nodes.Addrs()
+ if err != nil {
+ return nil, err
}
- go func() {
- defer atomic.StoreUint32(&c.reloading, 0)
-
- for {
- state, err := c.reloadState()
- if err == pool.ErrClosed {
- return
+ var firstErr error
+ for _, addr := range addrs {
+ node, err := c.nodes.GetOrCreate(addr)
+ if err != nil {
+ if firstErr == nil {
+ firstErr = err
}
+ continue
+ }
- if err != nil {
- time.Sleep(time.Millisecond)
- continue
+ slots, err := node.Client.ClusterSlots().Result()
+ if err != nil {
+ if firstErr == nil {
+ firstErr = err
}
-
- c._state.Store(state)
- time.Sleep(5 * time.Second)
- c.nodes.GC(state.generation)
- break
+ continue
}
- }()
-}
-// Not thread-safe.
-func (c *ClusterClient) reloadState() (*clusterState, error) {
- node, err := c.nodes.Random()
- if err != nil {
- return nil, err
- }
-
- slots, err := node.Client.ClusterSlots().Result()
- if err != nil {
- return nil, err
+ return newClusterState(c.nodes, slots, node.Client.opt.Addr)
}
- return newClusterState(c.nodes, slots, node.Client.opt.Addr)
+ return nil, firstErr
}
// reaper closes idle connections to the cluster.
@@ -991,7 +1093,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
- state, err := c.state()
+ state, err := c.state.Get()
if err != nil {
setCmdsErr(cmds, err)
return nil, err
@@ -1024,15 +1126,17 @@ func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cm
func (c *ClusterClient) pipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := writeCmd(cn, cmds...); err != nil {
+ _ = cn.SetWriteTimeout(c.opt.WriteTimeout)
+
+ err := writeCmd(cn, cmds...)
+ if err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
return err
}
// Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
+ _ = cn.SetReadTimeout(c.opt.ReadTimeout)
return c.pipelineReadCmds(cn, cmds, failedCmds)
}
@@ -1065,7 +1169,7 @@ func (c *ClusterClient) checkMovedErr(
moved, ask, addr := internal.IsMovedError(err)
if moved {
- c.lazyReloadState()
+ c.state.LazyReload()
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
@@ -1103,7 +1207,7 @@ func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
}
func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
- state, err := c.state()
+ state, err := c.state.Get()
if err != nil {
return err
}
@@ -1249,12 +1353,7 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub {
slot = -1
}
- state, err := c.state()
- if err != nil {
- return nil, err
- }
-
- masterNode, err := state.slotMasterNode(slot)
+ masterNode, err := c.slotMasterNode(slot)
if err != nil {
return nil, err
}
@@ -1310,3 +1409,31 @@ func appendNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
}
return append(nodes, node)
}
+
+func appendIfNotExists(ss []string, es ...string) []string {
+loop:
+ for _, e := range es {
+ for _, s := range ss {
+ if s == e {
+ continue loop
+ }
+ }
+ ss = append(ss, e)
+ }
+ return ss
+}
+
+func remove(ss []string, es ...string) []string {
+ if len(es) == 0 {
+ return ss[:0]
+ }
+ for _, e := range es {
+ for i, s := range ss {
+ if s == e {
+ ss = append(ss[:i], ss[i+1:]...)
+ break
+ }
+ }
+ }
+ return ss
+}