summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/cluster.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/cluster.go')
-rw-r--r--vendor/github.com/go-redis/redis/cluster.go76
1 files changed, 59 insertions, 17 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go
index c81fc1d57..accdb3d27 100644
--- a/vendor/github.com/go-redis/redis/cluster.go
+++ b/vendor/github.com/go-redis/redis/cluster.go
@@ -226,7 +226,7 @@ func (c *clusterNodes) NextGeneration() uint32 {
}
// GC removes unused nodes.
-func (c *clusterNodes) GC(generation uint32) error {
+func (c *clusterNodes) GC(generation uint32) {
var collected []*clusterNode
c.mu.Lock()
for i := 0; i < len(c.addrs); {
@@ -243,14 +243,11 @@ func (c *clusterNodes) GC(generation uint32) error {
}
c.mu.Unlock()
- var firstErr error
- for _, node := range collected {
- if err := node.Client.Close(); err != nil && firstErr == nil {
- firstErr = err
+ time.AfterFunc(time.Minute, func() {
+ for _, node := range collected {
+ _ = node.Client.Close()
}
- }
-
- return firstErr
+ })
}
func (c *clusterNodes) All() ([]*clusterNode, error) {
@@ -533,16 +530,22 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
return info
}
+func cmdSlot(cmd Cmder, pos int) int {
+ if pos == 0 {
+ return hashtag.RandomSlot()
+ }
+ firstKey := cmd.stringArg(pos)
+ return hashtag.Slot(firstKey)
+}
+
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
cmdInfo := c.cmdInfo(cmd.Name())
- firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
- return hashtag.Slot(firstKey)
+ return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
}
func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
cmdInfo := c.cmdInfo(cmd.Name())
- firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
- slot := hashtag.Slot(firstKey)
+ slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
if c.opt.RouteByLatency {
@@ -590,6 +593,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
break
}
+ if internal.IsRetryableError(err, true) {
+ continue
+ }
+
moved, ask, addr := internal.IsMovedError(err)
if moved || ask {
c.lazyReloadState()
@@ -600,6 +607,13 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
+ if err == pool.ErrClosed {
+ node, err = state.slotMasterNode(slot)
+ if err != nil {
+ return err
+ }
+ }
+
return err
}
@@ -635,10 +649,10 @@ func (c *ClusterClient) Process(cmd Cmder) error {
if ask {
pipe := node.Client.Pipeline()
- pipe.Process(NewCmd("ASKING"))
- pipe.Process(cmd)
+ _ = pipe.Process(NewCmd("ASKING"))
+ _ = pipe.Process(cmd)
_, err = pipe.Exec()
- pipe.Close()
+ _ = pipe.Close()
ask = false
} else {
err = node.Client.Process(cmd)
@@ -679,6 +693,14 @@ func (c *ClusterClient) Process(cmd Cmder) error {
continue
}
+ if err == pool.ErrClosed {
+ _, node, err = c.cmdSlotAndNode(state, cmd)
+ if err != nil {
+ cmd.setErr(err)
+ return err
+ }
+ }
+
break
}
@@ -915,7 +937,11 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
for node, cmds := range cmdsMap {
cn, _, err := node.Client.getConn()
if err != nil {
- setCmdsErr(cmds, err)
+ if err == pool.ErrClosed {
+ c.remapCmds(cmds, failedCmds)
+ } else {
+ setCmdsErr(cmds, err)
+ }
continue
}
@@ -955,6 +981,18 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
return cmdsMap, nil
}
+func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
+ remappedCmds, err := c.mapCmdsByNode(cmds)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return
+ }
+
+ for node, cmds := range remappedCmds {
+ failedCmds[node] = cmds
+ }
+}
+
func (c *ClusterClient) pipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
@@ -1061,7 +1099,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
for node, cmds := range cmdsMap {
cn, _, err := node.Client.getConn()
if err != nil {
- setCmdsErr(cmds, err)
+ if err == pool.ErrClosed {
+ c.remapCmds(cmds, failedCmds)
+ } else {
+ setCmdsErr(cmds, err)
+ }
continue
}