summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/cluster.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/cluster.go')
-rw-r--r--vendor/github.com/go-redis/redis/cluster.go366
1 files changed, 219 insertions, 147 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go
index 647a25be3..c81fc1d57 100644
--- a/vendor/github.com/go-redis/redis/cluster.go
+++ b/vendor/github.com/go-redis/redis/cluster.go
@@ -14,8 +14,8 @@ import (
"github.com/go-redis/redis/internal/proto"
)
-var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes")
-var errNilClusterState = internal.RedisError("redis: cannot load cluster slots")
+var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
+var errNilClusterState = fmt.Errorf("redis: cannot load cluster slots")
// ClusterOptions are used to configure a cluster client and should be
// passed to NewClusterClient.
@@ -64,6 +64,19 @@ func (opt *ClusterOptions) init() {
opt.ReadOnly = true
}
+ switch opt.ReadTimeout {
+ case -1:
+ opt.ReadTimeout = 0
+ case 0:
+ opt.ReadTimeout = 3 * time.Second
+ }
+ switch opt.WriteTimeout {
+ case -1:
+ opt.WriteTimeout = 0
+ case 0:
+ opt.WriteTimeout = opt.ReadTimeout
+ }
+
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0
@@ -192,6 +205,21 @@ func (c *clusterNodes) Close() error {
return firstErr
}
+func (c *clusterNodes) Addrs() ([]string, error) {
+ c.mu.RLock()
+ closed := c.closed
+ addrs := c.addrs
+ c.mu.RUnlock()
+
+ if closed {
+ return nil, pool.ErrClosed
+ }
+ if len(addrs) == 0 {
+ return nil, errClusterNoNodes
+ }
+ return addrs, nil
+}
+
func (c *clusterNodes) NextGeneration() uint32 {
c.generation++
return c.generation
@@ -272,16 +300,9 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
}
func (c *clusterNodes) Random() (*clusterNode, error) {
- c.mu.RLock()
- closed := c.closed
- addrs := c.addrs
- c.mu.RUnlock()
-
- if closed {
- return nil, pool.ErrClosed
- }
- if len(addrs) == 0 {
- return nil, errClusterNoNodes
+ addrs, err := c.Addrs()
+ if err != nil {
+ return nil, err
}
var nodeErr error
@@ -468,13 +489,23 @@ func (c *ClusterClient) Options() *ClusterOptions {
return c.opt
}
-func (c *ClusterClient) state() *clusterState {
+func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
+ return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
+}
+
+func (c *ClusterClient) state() (*clusterState, error) {
v := c._state.Load()
if v != nil {
- return v.(*clusterState)
+ return v.(*clusterState), nil
}
+
+ _, err := c.nodes.Addrs()
+ if err != nil {
+ return nil, err
+ }
+
c.lazyReloadState()
- return nil
+ return nil, errNilClusterState
}
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
@@ -495,17 +526,22 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
if err != nil {
return nil
}
- return c.cmdsInfo[name]
+ info := c.cmdsInfo[name]
+ if info == nil {
+ internal.Logf("info for cmd=%s not found", name)
+ }
+ return info
}
-func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
- if state == nil {
- node, err := c.nodes.Random()
- return 0, node, err
- }
+func (c *ClusterClient) cmdSlot(cmd Cmder) int {
+ cmdInfo := c.cmdInfo(cmd.Name())
+ firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
+ return hashtag.Slot(firstKey)
+}
+func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
cmdInfo := c.cmdInfo(cmd.Name())
- firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
+ firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
slot := hashtag.Slot(firstKey)
if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
@@ -523,19 +559,51 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl
}
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
- state := c.state()
+ if len(keys) == 0 {
+ return fmt.Errorf("redis: keys don't hash to the same slot")
+ }
- var node *clusterNode
- var err error
- if state != nil && len(keys) > 0 {
- node, err = state.slotMasterNode(hashtag.Slot(keys[0]))
- } else {
- node, err = c.nodes.Random()
+ state, err := c.state()
+ if err != nil {
+ return err
+ }
+
+ slot := hashtag.Slot(keys[0])
+ for _, key := range keys[1:] {
+ if hashtag.Slot(key) != slot {
+ return fmt.Errorf("redis: Watch requires all keys to be in the same slot")
+ }
}
+
+ node, err := state.slotMasterNode(slot)
if err != nil {
return err
}
- return node.Client.Watch(fn, keys...)
+
+ for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
+ err = node.Client.Watch(fn, keys...)
+ if err == nil {
+ break
+ }
+
+ moved, ask, addr := internal.IsMovedError(err)
+ if moved || ask {
+ c.lazyReloadState()
+ node, err = c.nodes.GetOrCreate(addr)
+ if err != nil {
+ return err
+ }
+ continue
+ }
+
+ return err
+ }
+
+ return err
}
// Close closes the cluster client, releasing any open resources.
@@ -547,7 +615,13 @@ func (c *ClusterClient) Close() error {
}
func (c *ClusterClient) Process(cmd Cmder) error {
- slot, node, err := c.cmdSlotAndNode(c.state(), cmd)
+ state, err := c.state()
+ if err != nil {
+ cmd.setErr(err)
+ return err
+ }
+
+ _, node, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
cmd.setErr(err)
return err
@@ -556,7 +630,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
- time.Sleep(node.Client.retryBackoff(attempt))
+ time.Sleep(c.retryBackoff(attempt))
}
if ask {
@@ -572,7 +646,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
// If there is no error - we are done.
if err == nil {
- return nil
+ break
}
// If slave is loading - read from master.
@@ -582,12 +656,11 @@ func (c *ClusterClient) Process(cmd Cmder) error {
continue
}
- // On network errors try random node.
- if internal.IsRetryableError(err) || internal.IsClusterDownError(err) {
- node, err = c.nodes.Random()
- if err != nil {
- cmd.setErr(err)
- return err
+ if internal.IsRetryableError(err, true) {
+ var nodeErr error
+ node, nodeErr = c.nodes.Random()
+ if nodeErr != nil {
+ break
}
continue
}
@@ -596,20 +669,13 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
- state := c.state()
- if state != nil && slot >= 0 {
- master, _ := state.slotMasterNode(slot)
- if moved && (master == nil || master.Client.getAddr() != addr) {
- c.lazyReloadState()
- }
- }
+ c.lazyReloadState()
- node, err = c.nodes.GetOrCreate(addr)
- if err != nil {
- cmd.setErr(err)
- return err
+ var nodeErr error
+ node, nodeErr = c.nodes.GetOrCreate(addr)
+ if nodeErr != nil {
+ break
}
-
continue
}
@@ -622,9 +688,9 @@ func (c *ClusterClient) Process(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
- state := c.state()
- if state == nil {
- return errNilClusterState
+ state, err := c.state()
+ if err != nil {
+ return err
}
var wg sync.WaitGroup
@@ -655,9 +721,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
- state := c.state()
- if state == nil {
- return errNilClusterState
+ state, err := c.state()
+ if err != nil {
+ return err
}
var wg sync.WaitGroup
@@ -688,9 +754,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
- state := c.state()
- if state == nil {
- return errNilClusterState
+ state, err := c.state()
+ if err != nil {
+ return err
}
var wg sync.WaitGroup
@@ -728,27 +794,31 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats
- state := c.state()
+ state, _ := c.state()
if state == nil {
return &acc
}
for _, node := range state.masters {
s := node.Client.connPool.Stats()
- acc.Requests += s.Requests
acc.Hits += s.Hits
+ acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
+
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
+ acc.StaleConns += s.StaleConns
}
for _, node := range state.slaves {
s := node.Client.connPool.Stats()
- acc.Requests += s.Requests
acc.Hits += s.Hits
+ acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
+
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
+ acc.StaleConns += s.StaleConns
}
return &acc
@@ -762,10 +832,8 @@ func (c *ClusterClient) lazyReloadState() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
- var state *clusterState
for {
- var err error
- state, err = c.reloadState()
+ state, err := c.reloadState()
if err == pool.ErrClosed {
return
}
@@ -776,11 +844,10 @@ func (c *ClusterClient) lazyReloadState() {
}
c._state.Store(state)
+ time.Sleep(5 * time.Second)
+ c.nodes.GC(state.generation)
break
}
-
- time.Sleep(3 * time.Second)
- c.nodes.GC(state.generation)
}()
}
@@ -810,21 +877,12 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
break
}
- var n int
for _, node := range nodes {
- nn, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
+ _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
if err != nil {
internal.Logf("ReapStaleConns failed: %s", err)
- } else {
- n += nn
}
}
-
- s := c.PoolStats()
- internal.Logf(
- "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
- n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
- )
}
}
@@ -837,16 +895,21 @@ func (c *ClusterClient) Pipeline() Pipeliner {
}
func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.Pipeline().pipelined(fn)
+ return c.Pipeline().Pipelined(fn)
}
func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsByNode(cmds)
if err != nil {
+ setCmdsErr(cmds, err)
return err
}
- for i := 0; i <= c.opt.MaxRedirects; i++ {
+ for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
@@ -856,8 +919,12 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
continue
}
- err = c.pipelineProcessCmds(cn, cmds, failedCmds)
- node.Client.releaseConn(cn, err)
+ err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
+ if err == nil || internal.IsRedisError(err) {
+ _ = node.Client.connPool.Put(cn)
+ } else {
+ _ = node.Client.connPool.Remove(cn)
+ }
}
if len(failedCmds) == 0 {
@@ -866,21 +933,20 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
cmdsMap = failedCmds
}
- var firstErr error
- for _, cmd := range cmds {
- if err := cmd.Err(); err != nil {
- firstErr = err
- break
- }
- }
- return firstErr
+ return firstCmdsErr(cmds)
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
- state := c.state()
+ state, err := c.state()
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return nil, err
+ }
+
cmdsMap := make(map[*clusterNode][]Cmder)
for _, cmd := range cmds {
- _, node, err := c.cmdSlotAndNode(state, cmd)
+ slot := c.cmdSlot(cmd)
+ node, err := state.slotMasterNode(slot)
if err != nil {
return nil, err
}
@@ -890,11 +956,12 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
}
func (c *ClusterClient) pipelineProcessCmds(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := writeCmd(cn, cmds...); err != nil {
setCmdsErr(cmds, err)
+ failedCmds[node] = cmds
return err
}
@@ -907,46 +974,53 @@ func (c *ClusterClient) pipelineProcessCmds(
func (c *ClusterClient) pipelineReadCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
- var firstErr error
for _, cmd := range cmds {
err := cmd.readReply(cn)
if err == nil {
continue
}
- if firstErr == nil {
- firstErr = err
+ if c.checkMovedErr(cmd, err, failedCmds) {
+ continue
}
- err = c.checkMovedErr(cmd, failedCmds)
- if err != nil && firstErr == nil {
- firstErr = err
+ if internal.IsRedisError(err) {
+ continue
}
+
+ return err
}
- return firstErr
+ return nil
}
-func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error {
- moved, ask, addr := internal.IsMovedError(cmd.Err())
+func (c *ClusterClient) checkMovedErr(
+ cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
+) bool {
+ moved, ask, addr := internal.IsMovedError(err)
+
if moved {
c.lazyReloadState()
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
- return err
+ return false
}
failedCmds[node] = append(failedCmds[node], cmd)
+ return true
}
+
if ask {
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
- return err
+ return false
}
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
+ return true
}
- return nil
+
+ return false
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
@@ -959,29 +1033,29 @@ func (c *ClusterClient) TxPipeline() Pipeliner {
}
func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.TxPipeline().pipelined(fn)
+ return c.TxPipeline().Pipelined(fn)
}
func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
- cmdsMap, err := c.mapCmdsBySlot(cmds)
+ state, err := c.state()
if err != nil {
return err
}
- state := c.state()
- if state == nil {
- return errNilClusterState
- }
-
+ cmdsMap := c.mapCmdsBySlot(cmds)
for slot, cmds := range cmdsMap {
node, err := state.slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
continue
}
-
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
- for i := 0; i <= c.opt.MaxRedirects; i++ {
+
+ for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
@@ -992,7 +1066,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
}
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
- node.Client.releaseConn(cn, err)
+ if err == nil || internal.IsRedisError(err) {
+ _ = node.Client.connPool.Put(cn)
+ } else {
+ _ = node.Client.connPool.Remove(cn)
+ }
}
if len(failedCmds) == 0 {
@@ -1002,27 +1080,16 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
}
}
- var firstErr error
- for _, cmd := range cmds {
- if err := cmd.Err(); err != nil {
- firstErr = err
- break
- }
- }
- return firstErr
+ return firstCmdsErr(cmds)
}
-func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) (map[int][]Cmder, error) {
- state := c.state()
+func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
cmdsMap := make(map[int][]Cmder)
for _, cmd := range cmds {
- slot, _, err := c.cmdSlotAndNode(state, cmd)
- if err != nil {
- return nil, err
- }
+ slot := c.cmdSlot(cmd)
cmdsMap[slot] = append(cmdsMap[slot], cmd)
}
- return cmdsMap, nil
+ return cmdsMap
}
func (c *ClusterClient) txPipelineProcessCmds(
@@ -1039,22 +1106,20 @@ func (c *ClusterClient) txPipelineProcessCmds(
cn.SetReadTimeout(c.opt.ReadTimeout)
if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
+ setCmdsErr(cmds, err)
return err
}
- _, err := pipelineReadCmds(cn, cmds)
- return err
+ return pipelineReadCmds(cn, cmds)
}
func (c *ClusterClient) txPipelineReadQueued(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
- var firstErr error
-
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
- firstErr = err
+ if err := statusCmd.readReply(cn); err != nil {
+ return err
}
for _, cmd := range cmds {
@@ -1063,15 +1128,11 @@ func (c *ClusterClient) txPipelineReadQueued(
continue
}
- cmd.setErr(err)
- if firstErr == nil {
- firstErr = err
+ if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) {
+ continue
}
- err = c.checkMovedErr(cmd, failedCmds)
- if err != nil && firstErr == nil {
- firstErr = err
- }
+ return err
}
// Parse number of replies.
@@ -1085,7 +1146,13 @@ func (c *ClusterClient) txPipelineReadQueued(
switch line[0] {
case proto.ErrorReply:
- return proto.ParseErrorReply(line)
+ err := proto.ParseErrorReply(line)
+ for _, cmd := range cmds {
+ if !c.checkMovedErr(cmd, err, failedCmds) {
+ break
+ }
+ }
+ return err
case proto.ArrayReply:
// ok
default:
@@ -1093,7 +1160,7 @@ func (c *ClusterClient) txPipelineReadQueued(
return err
}
- return firstErr
+ return nil
}
func (c *ClusterClient) pubSub(channels []string) *PubSub {
@@ -1112,7 +1179,12 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub {
slot = -1
}
- masterNode, err := c.state().slotMasterNode(slot)
+ state, err := c.state()
+ if err != nil {
+ return nil, err
+ }
+
+ masterNode, err := state.slotMasterNode(slot)
if err != nil {
return nil, err
}