summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis')
-rw-r--r--vendor/github.com/go-redis/redis/.travis.yml1
-rw-r--r--vendor/github.com/go-redis/redis/README.md3
-rw-r--r--vendor/github.com/go-redis/redis/cluster.go366
-rw-r--r--vendor/github.com/go-redis/redis/cluster_commands.go22
-rw-r--r--vendor/github.com/go-redis/redis/cluster_test.go79
-rw-r--r--vendor/github.com/go-redis/redis/command.go86
-rw-r--r--vendor/github.com/go-redis/redis/commands.go5
-rw-r--r--vendor/github.com/go-redis/redis/commands_test.go55
-rw-r--r--vendor/github.com/go-redis/redis/export_test.go14
-rw-r--r--vendor/github.com/go-redis/redis/internal/error.go27
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool.go25
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/reader.go2
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/scan.go2
-rw-r--r--vendor/github.com/go-redis/redis/main_test.go4
-rw-r--r--vendor/github.com/go-redis/redis/options.go10
-rw-r--r--vendor/github.com/go-redis/redis/pipeline.go10
-rw-r--r--vendor/github.com/go-redis/redis/pool_test.go10
-rw-r--r--vendor/github.com/go-redis/redis/pubsub.go5
-rw-r--r--vendor/github.com/go-redis/redis/pubsub_test.go6
-rw-r--r--vendor/github.com/go-redis/redis/redis.go92
-rw-r--r--vendor/github.com/go-redis/redis/ring.go121
-rw-r--r--vendor/github.com/go-redis/redis/sentinel.go6
-rw-r--r--vendor/github.com/go-redis/redis/tx.go23
-rw-r--r--vendor/github.com/go-redis/redis/universal.go7
24 files changed, 588 insertions, 393 deletions
diff --git a/vendor/github.com/go-redis/redis/.travis.yml b/vendor/github.com/go-redis/redis/.travis.yml
index f4666c593..f49927ee8 100644
--- a/vendor/github.com/go-redis/redis/.travis.yml
+++ b/vendor/github.com/go-redis/redis/.travis.yml
@@ -8,6 +8,7 @@ go:
- 1.4.x
- 1.7.x
- 1.8.x
+ - 1.9.x
- tip
matrix:
diff --git a/vendor/github.com/go-redis/redis/README.md b/vendor/github.com/go-redis/redis/README.md
index fd036496d..0a2a67124 100644
--- a/vendor/github.com/go-redis/redis/README.md
+++ b/vendor/github.com/go-redis/redis/README.md
@@ -6,6 +6,7 @@
Supports:
- Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC.
+- Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support.
- [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub).
- [Transactions](https://godoc.org/github.com/go-redis/redis#Multi).
- [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline).
@@ -16,7 +17,7 @@ Supports:
- [Ring](https://godoc.org/github.com/go-redis/redis#NewRing).
- [Instrumentation](https://godoc.org/github.com/go-redis/redis#ex-package--Instrumentation).
- [Cache friendly](https://github.com/go-redis/cache).
-- [Rate limiting](https://github.com/go-redis/rate).
+- [Rate limiting](https://github.com/go-redis/redis_rate).
- [Distributed Locks](https://github.com/bsm/redis-lock).
API docs: https://godoc.org/github.com/go-redis/redis.
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go
index 647a25be3..c81fc1d57 100644
--- a/vendor/github.com/go-redis/redis/cluster.go
+++ b/vendor/github.com/go-redis/redis/cluster.go
@@ -14,8 +14,8 @@ import (
"github.com/go-redis/redis/internal/proto"
)
-var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes")
-var errNilClusterState = internal.RedisError("redis: cannot load cluster slots")
+var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
+var errNilClusterState = fmt.Errorf("redis: cannot load cluster slots")
// ClusterOptions are used to configure a cluster client and should be
// passed to NewClusterClient.
@@ -64,6 +64,19 @@ func (opt *ClusterOptions) init() {
opt.ReadOnly = true
}
+ switch opt.ReadTimeout {
+ case -1:
+ opt.ReadTimeout = 0
+ case 0:
+ opt.ReadTimeout = 3 * time.Second
+ }
+ switch opt.WriteTimeout {
+ case -1:
+ opt.WriteTimeout = 0
+ case 0:
+ opt.WriteTimeout = opt.ReadTimeout
+ }
+
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0
@@ -192,6 +205,21 @@ func (c *clusterNodes) Close() error {
return firstErr
}
+func (c *clusterNodes) Addrs() ([]string, error) {
+ c.mu.RLock()
+ closed := c.closed
+ addrs := c.addrs
+ c.mu.RUnlock()
+
+ if closed {
+ return nil, pool.ErrClosed
+ }
+ if len(addrs) == 0 {
+ return nil, errClusterNoNodes
+ }
+ return addrs, nil
+}
+
func (c *clusterNodes) NextGeneration() uint32 {
c.generation++
return c.generation
@@ -272,16 +300,9 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
}
func (c *clusterNodes) Random() (*clusterNode, error) {
- c.mu.RLock()
- closed := c.closed
- addrs := c.addrs
- c.mu.RUnlock()
-
- if closed {
- return nil, pool.ErrClosed
- }
- if len(addrs) == 0 {
- return nil, errClusterNoNodes
+ addrs, err := c.Addrs()
+ if err != nil {
+ return nil, err
}
var nodeErr error
@@ -468,13 +489,23 @@ func (c *ClusterClient) Options() *ClusterOptions {
return c.opt
}
-func (c *ClusterClient) state() *clusterState {
+func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
+ return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
+}
+
+func (c *ClusterClient) state() (*clusterState, error) {
v := c._state.Load()
if v != nil {
- return v.(*clusterState)
+ return v.(*clusterState), nil
}
+
+ _, err := c.nodes.Addrs()
+ if err != nil {
+ return nil, err
+ }
+
c.lazyReloadState()
- return nil
+ return nil, errNilClusterState
}
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
@@ -495,17 +526,22 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
if err != nil {
return nil
}
- return c.cmdsInfo[name]
+ info := c.cmdsInfo[name]
+ if info == nil {
+ internal.Logf("info for cmd=%s not found", name)
+ }
+ return info
}
-func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
- if state == nil {
- node, err := c.nodes.Random()
- return 0, node, err
- }
+func (c *ClusterClient) cmdSlot(cmd Cmder) int {
+ cmdInfo := c.cmdInfo(cmd.Name())
+ firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
+ return hashtag.Slot(firstKey)
+}
+func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
cmdInfo := c.cmdInfo(cmd.Name())
- firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
+ firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
slot := hashtag.Slot(firstKey)
if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
@@ -523,19 +559,51 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl
}
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
- state := c.state()
+ if len(keys) == 0 {
+ return fmt.Errorf("redis: keys don't hash to the same slot")
+ }
- var node *clusterNode
- var err error
- if state != nil && len(keys) > 0 {
- node, err = state.slotMasterNode(hashtag.Slot(keys[0]))
- } else {
- node, err = c.nodes.Random()
+ state, err := c.state()
+ if err != nil {
+ return err
+ }
+
+ slot := hashtag.Slot(keys[0])
+ for _, key := range keys[1:] {
+ if hashtag.Slot(key) != slot {
+ return fmt.Errorf("redis: Watch requires all keys to be in the same slot")
+ }
}
+
+ node, err := state.slotMasterNode(slot)
if err != nil {
return err
}
- return node.Client.Watch(fn, keys...)
+
+ for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
+ err = node.Client.Watch(fn, keys...)
+ if err == nil {
+ break
+ }
+
+ moved, ask, addr := internal.IsMovedError(err)
+ if moved || ask {
+ c.lazyReloadState()
+ node, err = c.nodes.GetOrCreate(addr)
+ if err != nil {
+ return err
+ }
+ continue
+ }
+
+ return err
+ }
+
+ return err
}
// Close closes the cluster client, releasing any open resources.
@@ -547,7 +615,13 @@ func (c *ClusterClient) Close() error {
}
func (c *ClusterClient) Process(cmd Cmder) error {
- slot, node, err := c.cmdSlotAndNode(c.state(), cmd)
+ state, err := c.state()
+ if err != nil {
+ cmd.setErr(err)
+ return err
+ }
+
+ _, node, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
cmd.setErr(err)
return err
@@ -556,7 +630,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
- time.Sleep(node.Client.retryBackoff(attempt))
+ time.Sleep(c.retryBackoff(attempt))
}
if ask {
@@ -572,7 +646,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
// If there is no error - we are done.
if err == nil {
- return nil
+ break
}
// If slave is loading - read from master.
@@ -582,12 +656,11 @@ func (c *ClusterClient) Process(cmd Cmder) error {
continue
}
- // On network errors try random node.
- if internal.IsRetryableError(err) || internal.IsClusterDownError(err) {
- node, err = c.nodes.Random()
- if err != nil {
- cmd.setErr(err)
- return err
+ if internal.IsRetryableError(err, true) {
+ var nodeErr error
+ node, nodeErr = c.nodes.Random()
+ if nodeErr != nil {
+ break
}
continue
}
@@ -596,20 +669,13 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
- state := c.state()
- if state != nil && slot >= 0 {
- master, _ := state.slotMasterNode(slot)
- if moved && (master == nil || master.Client.getAddr() != addr) {
- c.lazyReloadState()
- }
- }
+ c.lazyReloadState()
- node, err = c.nodes.GetOrCreate(addr)
- if err != nil {
- cmd.setErr(err)
- return err
+ var nodeErr error
+ node, nodeErr = c.nodes.GetOrCreate(addr)
+ if nodeErr != nil {
+ break
}
-
continue
}
@@ -622,9 +688,9 @@ func (c *ClusterClient) Process(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
- state := c.state()
- if state == nil {
- return errNilClusterState
+ state, err := c.state()
+ if err != nil {
+ return err
}
var wg sync.WaitGroup
@@ -655,9 +721,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
- state := c.state()
- if state == nil {
- return errNilClusterState
+ state, err := c.state()
+ if err != nil {
+ return err
}
var wg sync.WaitGroup
@@ -688,9 +754,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
- state := c.state()
- if state == nil {
- return errNilClusterState
+ state, err := c.state()
+ if err != nil {
+ return err
}
var wg sync.WaitGroup
@@ -728,27 +794,31 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats
- state := c.state()
+ state, _ := c.state()
if state == nil {
return &acc
}
for _, node := range state.masters {
s := node.Client.connPool.Stats()
- acc.Requests += s.Requests
acc.Hits += s.Hits
+ acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
+
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
+ acc.StaleConns += s.StaleConns
}
for _, node := range state.slaves {
s := node.Client.connPool.Stats()
- acc.Requests += s.Requests
acc.Hits += s.Hits
+ acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
+
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
+ acc.StaleConns += s.StaleConns
}
return &acc
@@ -762,10 +832,8 @@ func (c *ClusterClient) lazyReloadState() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
- var state *clusterState
for {
- var err error
- state, err = c.reloadState()
+ state, err := c.reloadState()
if err == pool.ErrClosed {
return
}
@@ -776,11 +844,10 @@ func (c *ClusterClient) lazyReloadState() {
}
c._state.Store(state)
+ time.Sleep(5 * time.Second)
+ c.nodes.GC(state.generation)
break
}
-
- time.Sleep(3 * time.Second)
- c.nodes.GC(state.generation)
}()
}
@@ -810,21 +877,12 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
break
}
- var n int
for _, node := range nodes {
- nn, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
+ _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
if err != nil {
internal.Logf("ReapStaleConns failed: %s", err)
- } else {
- n += nn
}
}
-
- s := c.PoolStats()
- internal.Logf(
- "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
- n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
- )
}
}
@@ -837,16 +895,21 @@ func (c *ClusterClient) Pipeline() Pipeliner {
}
func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.Pipeline().pipelined(fn)
+ return c.Pipeline().Pipelined(fn)
}
func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsByNode(cmds)
if err != nil {
+ setCmdsErr(cmds, err)
return err
}
- for i := 0; i <= c.opt.MaxRedirects; i++ {
+ for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
@@ -856,8 +919,12 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
continue
}
- err = c.pipelineProcessCmds(cn, cmds, failedCmds)
- node.Client.releaseConn(cn, err)
+ err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
+ if err == nil || internal.IsRedisError(err) {
+ _ = node.Client.connPool.Put(cn)
+ } else {
+ _ = node.Client.connPool.Remove(cn)
+ }
}
if len(failedCmds) == 0 {
@@ -866,21 +933,20 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
cmdsMap = failedCmds
}
- var firstErr error
- for _, cmd := range cmds {
- if err := cmd.Err(); err != nil {
- firstErr = err
- break
- }
- }
- return firstErr
+ return firstCmdsErr(cmds)
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
- state := c.state()
+ state, err := c.state()
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return nil, err
+ }
+
cmdsMap := make(map[*clusterNode][]Cmder)
for _, cmd := range cmds {
- _, node, err := c.cmdSlotAndNode(state, cmd)
+ slot := c.cmdSlot(cmd)
+ node, err := state.slotMasterNode(slot)
if err != nil {
return nil, err
}
@@ -890,11 +956,12 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
}
func (c *ClusterClient) pipelineProcessCmds(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := writeCmd(cn, cmds...); err != nil {
setCmdsErr(cmds, err)
+ failedCmds[node] = cmds
return err
}
@@ -907,46 +974,53 @@ func (c *ClusterClient) pipelineProcessCmds(
func (c *ClusterClient) pipelineReadCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
- var firstErr error
for _, cmd := range cmds {
err := cmd.readReply(cn)
if err == nil {
continue
}
- if firstErr == nil {
- firstErr = err
+ if c.checkMovedErr(cmd, err, failedCmds) {
+ continue
}
- err = c.checkMovedErr(cmd, failedCmds)
- if err != nil && firstErr == nil {
- firstErr = err
+ if internal.IsRedisError(err) {
+ continue
}
+
+ return err
}
- return firstErr
+ return nil
}
-func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error {
- moved, ask, addr := internal.IsMovedError(cmd.Err())
+func (c *ClusterClient) checkMovedErr(
+ cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
+) bool {
+ moved, ask, addr := internal.IsMovedError(err)
+
if moved {
c.lazyReloadState()
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
- return err
+ return false
}
failedCmds[node] = append(failedCmds[node], cmd)
+ return true
}
+
if ask {
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
- return err
+ return false
}
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
+ return true
}
- return nil
+
+ return false
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
@@ -959,29 +1033,29 @@ func (c *ClusterClient) TxPipeline() Pipeliner {
}
func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.TxPipeline().pipelined(fn)
+ return c.TxPipeline().Pipelined(fn)
}
func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
- cmdsMap, err := c.mapCmdsBySlot(cmds)
+ state, err := c.state()
if err != nil {
return err
}
- state := c.state()
- if state == nil {
- return errNilClusterState
- }
-
+ cmdsMap := c.mapCmdsBySlot(cmds)
for slot, cmds := range cmdsMap {
node, err := state.slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
continue
}
-
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
- for i := 0; i <= c.opt.MaxRedirects; i++ {
+
+ for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
@@ -992,7 +1066,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
}
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
- node.Client.releaseConn(cn, err)
+ if err == nil || internal.IsRedisError(err) {
+ _ = node.Client.connPool.Put(cn)
+ } else {
+ _ = node.Client.connPool.Remove(cn)
+ }
}
if len(failedCmds) == 0 {
@@ -1002,27 +1080,16 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
}
}
- var firstErr error
- for _, cmd := range cmds {
- if err := cmd.Err(); err != nil {
- firstErr = err
- break
- }
- }
- return firstErr
+ return firstCmdsErr(cmds)
}
-func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) (map[int][]Cmder, error) {
- state := c.state()
+func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
cmdsMap := make(map[int][]Cmder)
for _, cmd := range cmds {
- slot, _, err := c.cmdSlotAndNode(state, cmd)
- if err != nil {
- return nil, err
- }
+ slot := c.cmdSlot(cmd)
cmdsMap[slot] = append(cmdsMap[slot], cmd)
}
- return cmdsMap, nil
+ return cmdsMap
}
func (c *ClusterClient) txPipelineProcessCmds(
@@ -1039,22 +1106,20 @@ func (c *ClusterClient) txPipelineProcessCmds(
cn.SetReadTimeout(c.opt.ReadTimeout)
if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
+ setCmdsErr(cmds, err)
return err
}
- _, err := pipelineReadCmds(cn, cmds)
- return err
+ return pipelineReadCmds(cn, cmds)
}
func (c *ClusterClient) txPipelineReadQueued(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
- var firstErr error
-
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
- firstErr = err
+ if err := statusCmd.readReply(cn); err != nil {
+ return err
}
for _, cmd := range cmds {
@@ -1063,15 +1128,11 @@ func (c *ClusterClient) txPipelineReadQueued(
continue
}
- cmd.setErr(err)
- if firstErr == nil {
- firstErr = err
+ if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) {
+ continue
}
- err = c.checkMovedErr(cmd, failedCmds)
- if err != nil && firstErr == nil {
- firstErr = err
- }
+ return err
}
// Parse number of replies.
@@ -1085,7 +1146,13 @@ func (c *ClusterClient) txPipelineReadQueued(
switch line[0] {
case proto.ErrorReply:
- return proto.ParseErrorReply(line)
+ err := proto.ParseErrorReply(line)
+ for _, cmd := range cmds {
+ if !c.checkMovedErr(cmd, err, failedCmds) {
+ break
+ }
+ }
+ return err
case proto.ArrayReply:
// ok
default:
@@ -1093,7 +1160,7 @@ func (c *ClusterClient) txPipelineReadQueued(
return err
}
- return firstErr
+ return nil
}
func (c *ClusterClient) pubSub(channels []string) *PubSub {
@@ -1112,7 +1179,12 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub {
slot = -1
}
- masterNode, err := c.state().slotMasterNode(slot)
+ state, err := c.state()
+ if err != nil {
+ return nil, err
+ }
+
+ masterNode, err := state.slotMasterNode(slot)
if err != nil {
return nil, err
}
diff --git a/vendor/github.com/go-redis/redis/cluster_commands.go b/vendor/github.com/go-redis/redis/cluster_commands.go
new file mode 100644
index 000000000..dff62c902
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/cluster_commands.go
@@ -0,0 +1,22 @@
+package redis
+
+import "sync/atomic"
+
+func (c *ClusterClient) DBSize() *IntCmd {
+ cmd := NewIntCmd("dbsize")
+ var size int64
+ err := c.ForEachMaster(func(master *Client) error {
+ n, err := master.DBSize().Result()
+ if err != nil {
+ return err
+ }
+ atomic.AddInt64(&size, n)
+ return nil
+ })
+ if err != nil {
+ cmd.setErr(err)
+ return cmd
+ }
+ cmd.val = size
+ return cmd
+}
diff --git a/vendor/github.com/go-redis/redis/cluster_test.go b/vendor/github.com/go-redis/redis/cluster_test.go
index 324bd1ce1..6f3677b93 100644
--- a/vendor/github.com/go-redis/redis/cluster_test.go
+++ b/vendor/github.com/go-redis/redis/cluster_test.go
@@ -200,7 +200,7 @@ var _ = Describe("ClusterClient", func() {
Eventually(func() string {
return client.Get("A").Val()
- }).Should(Equal("VALUE"))
+ }, 30*time.Second).Should(Equal("VALUE"))
cnt, err := client.Del("A").Result()
Expect(err).NotTo(HaveOccurred())
@@ -215,7 +215,7 @@ var _ = Describe("ClusterClient", func() {
Eventually(func() string {
return client.Get("A").Val()
- }).Should(Equal("VALUE"))
+ }, 30*time.Second).Should(Equal("VALUE"))
})
It("distributes keys", func() {
@@ -227,7 +227,7 @@ var _ = Describe("ClusterClient", func() {
for _, master := range cluster.masters() {
Eventually(func() string {
return master.Info("keyspace").Val()
- }, 5*time.Second).Should(Or(
+ }, 30*time.Second).Should(Or(
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
@@ -251,7 +251,7 @@ var _ = Describe("ClusterClient", func() {
for _, master := range cluster.masters() {
Eventually(func() string {
return master.Info("keyspace").Val()
- }, 5*time.Second).Should(Or(
+ }, 30*time.Second).Should(Or(
ContainSubstring("keys=31"),
ContainSubstring("keys=29"),
ContainSubstring("keys=40"),
@@ -320,10 +320,6 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14))
- if opt.RouteByLatency {
- return
- }
-
for _, key := range keys {
slot := hashtag.Slot(key)
client.SwapSlotNodes(slot)
@@ -432,6 +428,9 @@ var _ = Describe("ClusterClient", func() {
})
AfterEach(func() {
+ _ = client.ForEachMaster(func(master *redis.Client) error {
+ return master.FlushDB().Err()
+ })
Expect(client.Close()).NotTo(HaveOccurred())
})
@@ -476,11 +475,9 @@ var _ = Describe("ClusterClient", func() {
})
Expect(err).NotTo(HaveOccurred())
- for _, client := range cluster.masters() {
- size, err := client.DBSize().Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(size).To(Equal(int64(0)))
- }
+ size, err := client.DBSize().Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(size).To(Equal(int64(0)))
})
It("should CLUSTER SLOTS", func() {
@@ -560,6 +557,9 @@ var _ = Describe("ClusterClient", func() {
})
AfterEach(func() {
+ _ = client.ForEachMaster(func(master *redis.Client) error {
+ return master.FlushDB().Err()
+ })
Expect(client.Close()).NotTo(HaveOccurred())
})
@@ -575,10 +575,19 @@ var _ = Describe("ClusterClient", func() {
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
+
+ _ = client.ForEachSlave(func(slave *redis.Client) error {
+ Eventually(func() int64 {
+ return client.DBSize().Val()
+ }, 30*time.Second).Should(Equal(int64(0)))
+ return nil
+ })
})
AfterEach(func() {
- client.FlushDB()
+ _ = client.ForEachMaster(func(master *redis.Client) error {
+ return master.FlushDB().Err()
+ })
Expect(client.Close()).NotTo(HaveOccurred())
})
@@ -597,7 +606,7 @@ var _ = Describe("ClusterClient without nodes", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
- It("returns an error", func() {
+ It("Ping returns an error", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("redis: cluster has no nodes"))
})
@@ -626,7 +635,7 @@ var _ = Describe("ClusterClient without valid nodes", func() {
It("returns an error", func() {
err := client.Ping().Err()
- Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
+ Expect(err).To(MatchError("redis: cannot load cluster slots"))
})
It("pipeline returns an error", func() {
@@ -634,7 +643,7 @@ var _ = Describe("ClusterClient without valid nodes", func() {
pipe.Ping()
return nil
})
- Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
+ Expect(err).To(MatchError("redis: cannot load cluster slots"))
})
})
@@ -664,7 +673,7 @@ var _ = Describe("ClusterClient timeout", func() {
It("Tx timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
return tx.Ping().Err()
- })
+ }, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
@@ -676,42 +685,20 @@ var _ = Describe("ClusterClient timeout", func() {
return nil
})
return err
- })
+ }, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
}
- Context("read timeout", func() {
- BeforeEach(func() {
- opt := redisClusterOptions()
- opt.ReadTimeout = time.Nanosecond
- opt.WriteTimeout = -1
- client = cluster.clusterClient(opt)
- })
-
- testTimeout()
- })
-
- Context("write timeout", func() {
- BeforeEach(func() {
- opt := redisClusterOptions()
- opt.ReadTimeout = time.Nanosecond
- opt.WriteTimeout = -1
- client = cluster.clusterClient(opt)
- })
-
- testTimeout()
- })
-
- Context("ClientPause timeout", func() {
- const pause = time.Second
+ const pause = time.Second
+ Context("read/write timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
- opt.ReadTimeout = pause / 10
- opt.WriteTimeout = pause / 10
- opt.MaxRedirects = -1
+ opt.ReadTimeout = 100 * time.Millisecond
+ opt.WriteTimeout = 100 * time.Millisecond
+ opt.MaxRedirects = 1
client = cluster.clusterClient(opt)
err := client.ForEachNode(func(client *redis.Client) error {
diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go
index 0e5b2016e..d2688082a 100644
--- a/vendor/github.com/go-redis/redis/command.go
+++ b/vendor/github.com/go-redis/redis/command.go
@@ -12,28 +12,10 @@ import (
"github.com/go-redis/redis/internal/proto"
)
-var (
- _ Cmder = (*Cmd)(nil)
- _ Cmder = (*SliceCmd)(nil)
- _ Cmder = (*StatusCmd)(nil)
- _ Cmder = (*IntCmd)(nil)
- _ Cmder = (*DurationCmd)(nil)
- _ Cmder = (*BoolCmd)(nil)
- _ Cmder = (*StringCmd)(nil)
- _ Cmder = (*FloatCmd)(nil)
- _ Cmder = (*StringSliceCmd)(nil)
- _ Cmder = (*BoolSliceCmd)(nil)
- _ Cmder = (*StringStringMapCmd)(nil)
- _ Cmder = (*StringIntMapCmd)(nil)
- _ Cmder = (*ZSliceCmd)(nil)
- _ Cmder = (*ScanCmd)(nil)
- _ Cmder = (*ClusterSlotsCmd)(nil)
-)
-
type Cmder interface {
- args() []interface{}
- arg(int) string
Name() string
+ Args() []interface{}
+ stringArg(int) string
readReply(*pool.Conn) error
setErr(error)
@@ -46,14 +28,25 @@ type Cmder interface {
func setCmdsErr(cmds []Cmder, e error) {
for _, cmd := range cmds {
- cmd.setErr(e)
+ if cmd.Err() == nil {
+ cmd.setErr(e)
+ }
}
}
+func firstCmdsErr(cmds []Cmder) error {
+ for _, cmd := range cmds {
+ if err := cmd.Err(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
cn.Wb.Reset()
for _, cmd := range cmds {
- if err := cn.Wb.Append(cmd.args()); err != nil {
+ if err := cn.Wb.Append(cmd.Args()); err != nil {
return err
}
}
@@ -64,7 +57,7 @@ func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
func cmdString(cmd Cmder, val interface{}) string {
var ss []string
- for _, arg := range cmd.args() {
+ for _, arg := range cmd.Args() {
ss = append(ss, fmt.Sprint(arg))
}
s := strings.Join(ss, " ")
@@ -86,7 +79,7 @@ func cmdString(cmd Cmder, val interface{}) string {
func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
switch cmd.Name() {
case "eval", "evalsha":
- if cmd.arg(2) != "0" {
+ if cmd.stringArg(2) != "0" {
return 3
} else {
return -1
@@ -95,7 +88,6 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
return 1
}
if info == nil {
- internal.Logf("info for cmd=%s not found", cmd.Name())
return -1
}
return int(info.FirstKeyPos)
@@ -110,15 +102,17 @@ type baseCmd struct {
_readTimeout *time.Duration
}
+var _ Cmder = (*Cmd)(nil)
+
func (cmd *baseCmd) Err() error {
return cmd.err
}
-func (cmd *baseCmd) args() []interface{} {
+func (cmd *baseCmd) Args() []interface{} {
return cmd._args
}
-func (cmd *baseCmd) arg(pos int) string {
+func (cmd *baseCmd) stringArg(pos int) string {
if pos < 0 || pos >= len(cmd._args) {
return ""
}
@@ -129,7 +123,7 @@ func (cmd *baseCmd) arg(pos int) string {
func (cmd *baseCmd) Name() string {
if len(cmd._args) > 0 {
// Cmd name must be lower cased.
- s := internal.ToLower(cmd.arg(0))
+ s := internal.ToLower(cmd.stringArg(0))
cmd._args[0] = s
return s
}
@@ -194,6 +188,8 @@ type SliceCmd struct {
val []interface{}
}
+var _ Cmder = (*SliceCmd)(nil)
+
func NewSliceCmd(args ...interface{}) *SliceCmd {
return &SliceCmd{
baseCmd: baseCmd{_args: args},
@@ -230,6 +226,8 @@ type StatusCmd struct {
val string
}
+var _ Cmder = (*StatusCmd)(nil)
+
func NewStatusCmd(args ...interface{}) *StatusCmd {
return &StatusCmd{
baseCmd: baseCmd{_args: args},
@@ -261,6 +259,8 @@ type IntCmd struct {
val int64
}
+var _ Cmder = (*IntCmd)(nil)
+
func NewIntCmd(args ...interface{}) *IntCmd {
return &IntCmd{
baseCmd: baseCmd{_args: args},
@@ -293,6 +293,8 @@ type DurationCmd struct {
precision time.Duration
}
+var _ Cmder = (*DurationCmd)(nil)
+
func NewDurationCmd(precision time.Duration, args ...interface{}) *DurationCmd {
return &DurationCmd{
baseCmd: baseCmd{_args: args},
@@ -330,6 +332,8 @@ type TimeCmd struct {
val time.Time
}
+var _ Cmder = (*TimeCmd)(nil)
+
func NewTimeCmd(args ...interface{}) *TimeCmd {
return &TimeCmd{
baseCmd: baseCmd{_args: args},
@@ -366,6 +370,8 @@ type BoolCmd struct {
val bool
}
+var _ Cmder = (*BoolCmd)(nil)
+
func NewBoolCmd(args ...interface{}) *BoolCmd {
return &BoolCmd{
baseCmd: baseCmd{_args: args},
@@ -421,6 +427,8 @@ type StringCmd struct {
val []byte
}
+var _ Cmder = (*StringCmd)(nil)
+
func NewStringCmd(args ...interface{}) *StringCmd {
return &StringCmd{
baseCmd: baseCmd{_args: args},
@@ -484,6 +492,8 @@ type FloatCmd struct {
val float64
}
+var _ Cmder = (*FloatCmd)(nil)
+
func NewFloatCmd(args ...interface{}) *FloatCmd {
return &FloatCmd{
baseCmd: baseCmd{_args: args},
@@ -515,6 +525,8 @@ type StringSliceCmd struct {
val []string
}
+var _ Cmder = (*StringSliceCmd)(nil)
+
func NewStringSliceCmd(args ...interface{}) *StringSliceCmd {
return &StringSliceCmd{
baseCmd: baseCmd{_args: args},
@@ -555,6 +567,8 @@ type BoolSliceCmd struct {
val []bool
}
+var _ Cmder = (*BoolSliceCmd)(nil)
+
func NewBoolSliceCmd(args ...interface{}) *BoolSliceCmd {
return &BoolSliceCmd{
baseCmd: baseCmd{_args: args},
@@ -591,6 +605,8 @@ type StringStringMapCmd struct {
val map[string]string
}
+var _ Cmder = (*StringStringMapCmd)(nil)
+
func NewStringStringMapCmd(args ...interface{}) *StringStringMapCmd {
return &StringStringMapCmd{
baseCmd: baseCmd{_args: args},
@@ -627,6 +643,8 @@ type StringIntMapCmd struct {
val map[string]int64
}
+var _ Cmder = (*StringIntMapCmd)(nil)
+
func NewStringIntMapCmd(args ...interface{}) *StringIntMapCmd {
return &StringIntMapCmd{
baseCmd: baseCmd{_args: args},
@@ -663,6 +681,8 @@ type ZSliceCmd struct {
val []Z
}
+var _ Cmder = (*ZSliceCmd)(nil)
+
func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
return &ZSliceCmd{
baseCmd: baseCmd{_args: args},
@@ -702,6 +722,8 @@ type ScanCmd struct {
process func(cmd Cmder) error
}
+var _ Cmder = (*ScanCmd)(nil)
+
func NewScanCmd(process func(cmd Cmder) error, args ...interface{}) *ScanCmd {
return &ScanCmd{
baseCmd: baseCmd{_args: args},
@@ -752,6 +774,8 @@ type ClusterSlotsCmd struct {
val []ClusterSlot
}
+var _ Cmder = (*ClusterSlotsCmd)(nil)
+
func NewClusterSlotsCmd(args ...interface{}) *ClusterSlotsCmd {
return &ClusterSlotsCmd{
baseCmd: baseCmd{_args: args},
@@ -811,6 +835,8 @@ type GeoLocationCmd struct {
locations []GeoLocation
}
+var _ Cmder = (*GeoLocationCmd)(nil)
+
func NewGeoLocationCmd(q *GeoRadiusQuery, args ...interface{}) *GeoLocationCmd {
args = append(args, q.Radius)
if q.Unit != "" {
@@ -881,6 +907,8 @@ type GeoPosCmd struct {
positions []*GeoPos
}
+var _ Cmder = (*GeoPosCmd)(nil)
+
func NewGeoPosCmd(args ...interface{}) *GeoPosCmd {
return &GeoPosCmd{
baseCmd: baseCmd{_args: args},
@@ -927,6 +955,8 @@ type CommandsInfoCmd struct {
val map[string]*CommandInfo
}
+var _ Cmder = (*CommandsInfoCmd)(nil)
+
func NewCommandsInfoCmd(args ...interface{}) *CommandsInfoCmd {
return &CommandsInfoCmd{
baseCmd: baseCmd{_args: args},
diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go
index 83b3824f8..a3b90f12d 100644
--- a/vendor/github.com/go-redis/redis/commands.go
+++ b/vendor/github.com/go-redis/redis/commands.go
@@ -11,7 +11,7 @@ func readTimeout(timeout time.Duration) time.Duration {
if timeout == 0 {
return 0
}
- return timeout + time.Second
+ return timeout + 10*time.Second
}
func usePrecise(dur time.Duration) bool {
@@ -42,6 +42,9 @@ type Cmdable interface {
Pipeline() Pipeliner
Pipelined(fn func(Pipeliner) error) ([]Cmder, error)
+ TxPipelined(fn func(Pipeliner) error) ([]Cmder, error)
+ TxPipeline() Pipeliner
+
ClientGetName() *StringCmd
Echo(message interface{}) *StringCmd
Ping() *StatusCmd
diff --git a/vendor/github.com/go-redis/redis/commands_test.go b/vendor/github.com/go-redis/redis/commands_test.go
index 4298cba68..6b81f23cf 100644
--- a/vendor/github.com/go-redis/redis/commands_test.go
+++ b/vendor/github.com/go-redis/redis/commands_test.go
@@ -27,11 +27,21 @@ var _ = Describe("Commands", func() {
Describe("server", func() {
It("should Auth", func() {
- _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
+ cmds, err := client.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Auth("password")
+ pipe.Auth("")
return nil
})
Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set"))
+ Expect(cmds[0].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
+ Expect(cmds[1].Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
+
+ stats := client.PoolStats()
+ Expect(stats.Hits).To(Equal(uint32(1)))
+ Expect(stats.Misses).To(Equal(uint32(1)))
+ Expect(stats.Timeouts).To(Equal(uint32(0)))
+ Expect(stats.TotalConns).To(Equal(uint32(1)))
+ Expect(stats.FreeConns).To(Equal(uint32(1)))
})
It("should Echo", func() {
@@ -187,6 +197,29 @@ var _ = Describe("Commands", func() {
Expect(tm).To(BeTemporally("~", time.Now(), 3*time.Second))
})
+ It("Should Command", func() {
+ cmds, err := client.Command().Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(len(cmds)).To(BeNumerically("~", 180, 10))
+
+ cmd := cmds["mget"]
+ Expect(cmd.Name).To(Equal("mget"))
+ Expect(cmd.Arity).To(Equal(int8(-2)))
+ Expect(cmd.Flags).To(ContainElement("readonly"))
+ Expect(cmd.FirstKeyPos).To(Equal(int8(1)))
+ Expect(cmd.LastKeyPos).To(Equal(int8(-1)))
+ Expect(cmd.StepCount).To(Equal(int8(1)))
+
+ cmd = cmds["ping"]
+ Expect(cmd.Name).To(Equal("ping"))
+ Expect(cmd.Arity).To(Equal(int8(-1)))
+ Expect(cmd.Flags).To(ContainElement("stale"))
+ Expect(cmd.Flags).To(ContainElement("fast"))
+ Expect(cmd.FirstKeyPos).To(Equal(int8(0)))
+ Expect(cmd.LastKeyPos).To(Equal(int8(0)))
+ Expect(cmd.StepCount).To(Equal(int8(0)))
+ })
+
})
Describe("debugging", func() {
@@ -1358,8 +1391,8 @@ var _ = Describe("Commands", func() {
Expect(client.Ping().Err()).NotTo(HaveOccurred())
stats := client.PoolStats()
- Expect(stats.Requests).To(Equal(uint32(3)))
Expect(stats.Hits).To(Equal(uint32(1)))
+ Expect(stats.Misses).To(Equal(uint32(2)))
Expect(stats.Timeouts).To(Equal(uint32(0)))
})
@@ -2887,24 +2920,6 @@ var _ = Describe("Commands", func() {
})
- Describe("Command", func() {
-
- It("returns map of commands", func() {
- cmds, err := client.Command().Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(len(cmds)).To(BeNumerically("~", 180, 10))
-
- cmd := cmds["mget"]
- Expect(cmd.Name).To(Equal("mget"))
- Expect(cmd.Arity).To(Equal(int8(-2)))
- Expect(cmd.Flags).To(ContainElement("readonly"))
- Expect(cmd.FirstKeyPos).To(Equal(int8(1)))
- Expect(cmd.LastKeyPos).To(Equal(int8(-1)))
- Expect(cmd.StepCount).To(Equal(int8(1)))
- })
-
- })
-
Describe("Eval", func() {
It("returns keys and values", func() {
diff --git a/vendor/github.com/go-redis/redis/export_test.go b/vendor/github.com/go-redis/redis/export_test.go
index 3b7965d79..bcc18c457 100644
--- a/vendor/github.com/go-redis/redis/export_test.go
+++ b/vendor/github.com/go-redis/redis/export_test.go
@@ -20,8 +20,13 @@ func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error)
}
func (c *ClusterClient) SlotAddrs(slot int) []string {
+ state, err := c.state()
+ if err != nil {
+ panic(err)
+ }
+
var addrs []string
- for _, n := range c.state().slotNodes(slot) {
+ for _, n := range state.slotNodes(slot) {
addrs = append(addrs, n.Client.getAddr())
}
return addrs
@@ -29,7 +34,12 @@ func (c *ClusterClient) SlotAddrs(slot int) []string {
// SwapSlot swaps a slot's master/slave address for testing MOVED redirects.
func (c *ClusterClient) SwapSlotNodes(slot int) {
- nodes := c.state().slots[slot]
+ state, err := c.state()
+ if err != nil {
+ panic(err)
+ }
+
+ nodes := state.slots[slot]
if len(nodes) == 2 {
nodes[0], nodes[1] = nodes[1], nodes[0]
}
diff --git a/vendor/github.com/go-redis/redis/internal/error.go b/vendor/github.com/go-redis/redis/internal/error.go
index 90f6503a1..0898eeb62 100644
--- a/vendor/github.com/go-redis/redis/internal/error.go
+++ b/vendor/github.com/go-redis/redis/internal/error.go
@@ -12,11 +12,24 @@ type RedisError string
func (e RedisError) Error() string { return string(e) }
-func IsRetryableError(err error) bool {
- return IsNetworkError(err) || err.Error() == "ERR max number of clients reached"
+func IsRetryableError(err error, retryNetError bool) bool {
+ if IsNetworkError(err) {
+ return retryNetError
+ }
+ s := err.Error()
+ if s == "ERR max number of clients reached" {
+ return true
+ }
+ if strings.HasPrefix(s, "LOADING ") {
+ return true
+ }
+ if strings.HasPrefix(s, "CLUSTERDOWN ") {
+ return true
+ }
+ return false
}
-func IsInternalError(err error) bool {
+func IsRedisError(err error) bool {
_, ok := err.(RedisError)
return ok
}
@@ -33,7 +46,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
if err == nil {
return false
}
- if IsInternalError(err) {
+ if IsRedisError(err) {
return false
}
if allowTimeout {
@@ -45,7 +58,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
}
func IsMovedError(err error) (moved bool, ask bool, addr string) {
- if !IsInternalError(err) {
+ if !IsRedisError(err) {
return
}
@@ -69,7 +82,3 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ")
}
-
-func IsClusterDownError(err error) bool {
- return strings.HasPrefix(err.Error(), "CLUSTERDOWN ")
-}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go
index 25e78aa3c..836ec1045 100644
--- a/vendor/github.com/go-redis/redis/internal/pool/pool.go
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -23,12 +23,13 @@ var timers = sync.Pool{
// Stats contains pool state information and accumulated stats.
type Stats struct {
- Requests uint32 // number of times a connection was requested by the pool
Hits uint32 // number of times free connection was found in the pool
+ Misses uint32 // number of times free connection was NOT found in the pool
Timeouts uint32 // number of times a wait timeout occurred
- TotalConns uint32 // the number of total connections in the pool
- FreeConns uint32 // the number of free connections in the pool
+ TotalConns uint32 // number of total connections in the pool
+ FreeConns uint32 // number of free connections in the pool
+ StaleConns uint32 // number of stale connections removed from the pool
}
type Pooler interface {
@@ -150,8 +151,6 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
return nil, false, ErrClosed
}
- atomic.AddUint32(&p.stats.Requests, 1)
-
select {
case p.queue <- struct{}{}:
default:
@@ -189,6 +188,8 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
return cn, false, nil
}
+ atomic.AddUint32(&p.stats.Misses, 1)
+
newcn, err := p.NewConn()
if err != nil {
<-p.queue
@@ -265,11 +266,13 @@ func (p *ConnPool) FreeLen() int {
func (p *ConnPool) Stats() *Stats {
return &Stats{
- Requests: atomic.LoadUint32(&p.stats.Requests),
- Hits: atomic.LoadUint32(&p.stats.Hits),
- Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
+ Hits: atomic.LoadUint32(&p.stats.Hits),
+ Misses: atomic.LoadUint32(&p.stats.Misses),
+ Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
+
TotalConns: uint32(p.Len()),
FreeConns: uint32(p.FreeLen()),
+ StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
}
@@ -362,10 +365,6 @@ func (p *ConnPool) reaper(frequency time.Duration) {
internal.Logf("ReapStaleConns failed: %s", err)
continue
}
- s := p.Stats()
- internal.Logf(
- "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
- n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
- )
+ atomic.AddUint32(&p.stats.StaleConns, uint32(n))
}
}
diff --git a/vendor/github.com/go-redis/redis/internal/proto/reader.go b/vendor/github.com/go-redis/redis/internal/proto/reader.go
index 2159cf639..cd94329d8 100644
--- a/vendor/github.com/go-redis/redis/internal/proto/reader.go
+++ b/vendor/github.com/go-redis/redis/internal/proto/reader.go
@@ -63,7 +63,7 @@ func (p *Reader) ReadLine() ([]byte, error) {
return nil, bufio.ErrBufferFull
}
if len(line) == 0 {
- return nil, internal.RedisError("redis: reply is empty")
+ return nil, fmt.Errorf("redis: reply is empty")
}
if isNilReply(line) {
return nil, internal.Nil
diff --git a/vendor/github.com/go-redis/redis/internal/proto/scan.go b/vendor/github.com/go-redis/redis/internal/proto/scan.go
index 3ab40b94f..0431a877d 100644
--- a/vendor/github.com/go-redis/redis/internal/proto/scan.go
+++ b/vendor/github.com/go-redis/redis/internal/proto/scan.go
@@ -11,7 +11,7 @@ import (
func Scan(b []byte, v interface{}) error {
switch v := v.(type) {
case nil:
- return internal.RedisError("redis: Scan(nil)")
+ return fmt.Errorf("redis: Scan(nil)")
case *string:
*v = internal.BytesToString(b)
return nil
diff --git a/vendor/github.com/go-redis/redis/main_test.go b/vendor/github.com/go-redis/redis/main_test.go
index 30f09c618..7c5a6a969 100644
--- a/vendor/github.com/go-redis/redis/main_test.go
+++ b/vendor/github.com/go-redis/redis/main_test.go
@@ -50,10 +50,6 @@ var cluster = &clusterScenario{
clients: make(map[string]*redis.Client, 6),
}
-func init() {
- //redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
-}
-
var _ = BeforeSuite(func() {
var err error
diff --git a/vendor/github.com/go-redis/redis/options.go b/vendor/github.com/go-redis/redis/options.go
index dea045453..75648053d 100644
--- a/vendor/github.com/go-redis/redis/options.go
+++ b/vendor/github.com/go-redis/redis/options.go
@@ -198,13 +198,3 @@ func newConnPool(opt *Options) *pool.ConnPool {
IdleCheckFrequency: opt.IdleCheckFrequency,
})
}
-
-// PoolStats contains pool state information and accumulated stats.
-type PoolStats struct {
- Requests uint32 // number of times a connection was requested by the pool
- Hits uint32 // number of times free connection was found in the pool
- Timeouts uint32 // number of times a wait timeout occurred
-
- TotalConns uint32 // the number of total connections in the pool
- FreeConns uint32 // the number of free connections in the pool
-}
diff --git a/vendor/github.com/go-redis/redis/pipeline.go b/vendor/github.com/go-redis/redis/pipeline.go
index b66c0597f..9349ef553 100644
--- a/vendor/github.com/go-redis/redis/pipeline.go
+++ b/vendor/github.com/go-redis/redis/pipeline.go
@@ -13,9 +13,7 @@ type Pipeliner interface {
Process(cmd Cmder) error
Close() error
Discard() error
- discard() error
Exec() ([]Cmder, error)
- pipelined(fn func(Pipeliner) error) ([]Cmder, error)
}
var _ Pipeliner = (*Pipeline)(nil)
@@ -104,3 +102,11 @@ func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
func (c *Pipeline) Pipeline() Pipeliner {
return c
}
+
+func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
+ return c.pipelined(fn)
+}
+
+func (c *Pipeline) TxPipeline() Pipeliner {
+ return c
+}
diff --git a/vendor/github.com/go-redis/redis/pool_test.go b/vendor/github.com/go-redis/redis/pool_test.go
index 34a548a63..0ca09adc7 100644
--- a/vendor/github.com/go-redis/redis/pool_test.go
+++ b/vendor/github.com/go-redis/redis/pool_test.go
@@ -95,8 +95,8 @@ var _ = Describe("pool", func() {
Expect(pool.FreeLen()).To(Equal(1))
stats := pool.Stats()
- Expect(stats.Requests).To(Equal(uint32(4)))
Expect(stats.Hits).To(Equal(uint32(2)))
+ Expect(stats.Misses).To(Equal(uint32(2)))
Expect(stats.Timeouts).To(Equal(uint32(0)))
})
@@ -112,30 +112,32 @@ var _ = Describe("pool", func() {
Expect(pool.FreeLen()).To(Equal(1))
stats := pool.Stats()
- Expect(stats.Requests).To(Equal(uint32(101)))
Expect(stats.Hits).To(Equal(uint32(100)))
+ Expect(stats.Misses).To(Equal(uint32(1)))
Expect(stats.Timeouts).To(Equal(uint32(0)))
})
It("removes idle connections", func() {
stats := client.PoolStats()
Expect(stats).To(Equal(&redis.PoolStats{
- Requests: 1,
Hits: 0,
+ Misses: 1,
Timeouts: 0,
TotalConns: 1,
FreeConns: 1,
+ StaleConns: 0,
}))
time.Sleep(2 * time.Second)
stats = client.PoolStats()
Expect(stats).To(Equal(&redis.PoolStats{
- Requests: 1,
Hits: 0,
+ Misses: 1,
Timeouts: 0,
TotalConns: 0,
FreeConns: 0,
+ StaleConns: 1,
}))
})
})
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go
index 4a5c65f57..e754a16f2 100644
--- a/vendor/github.com/go-redis/redis/pubsub.go
+++ b/vendor/github.com/go-redis/redis/pubsub.go
@@ -95,7 +95,10 @@ func (c *PubSub) releaseConn(cn *pool.Conn, err error) {
}
func (c *PubSub) _releaseConn(cn *pool.Conn, err error) {
- if internal.IsBadConn(err, true) && c.cn == cn {
+ if c.cn != cn {
+ return
+ }
+ if internal.IsBadConn(err, true) {
_ = c.closeTheCn()
}
}
diff --git a/vendor/github.com/go-redis/redis/pubsub_test.go b/vendor/github.com/go-redis/redis/pubsub_test.go
index 1d9dfcb99..6fc04a198 100644
--- a/vendor/github.com/go-redis/redis/pubsub_test.go
+++ b/vendor/github.com/go-redis/redis/pubsub_test.go
@@ -68,7 +68,7 @@ var _ = Describe("PubSub", func() {
}
stats := client.PoolStats()
- Expect(stats.Requests - stats.Hits).To(Equal(uint32(2)))
+ Expect(stats.Misses).To(Equal(uint32(2)))
})
It("should pub/sub channels", func() {
@@ -191,7 +191,7 @@ var _ = Describe("PubSub", func() {
}
stats := client.PoolStats()
- Expect(stats.Requests - stats.Hits).To(Equal(uint32(2)))
+ Expect(stats.Misses).To(Equal(uint32(2)))
})
It("should ping/pong", func() {
@@ -290,8 +290,8 @@ var _ = Describe("PubSub", func() {
Eventually(done).Should(Receive())
stats := client.PoolStats()
- Expect(stats.Requests).To(Equal(uint32(2)))
Expect(stats.Hits).To(Equal(uint32(1)))
+ Expect(stats.Misses).To(Equal(uint32(1)))
})
It("returns an error when subscribe fails", func() {
diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go
index b18973cdb..230091b3e 100644
--- a/vendor/github.com/go-redis/redis/redis.go
+++ b/vendor/github.com/go-redis/redis/redis.go
@@ -3,6 +3,7 @@ package redis
import (
"fmt"
"log"
+ "os"
"time"
"github.com/go-redis/redis/internal"
@@ -13,6 +14,10 @@ import (
// Redis nil reply, .e.g. when key does not exist.
const Nil = internal.Nil
+func init() {
+ SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
+}
+
func SetLogger(logger *log.Logger) {
internal.Logger = logger
}
@@ -131,7 +136,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
cn, _, err := c.getConn()
if err != nil {
cmd.setErr(err)
- if internal.IsRetryableError(err) {
+ if internal.IsRetryableError(err, true) {
continue
}
return err
@@ -141,7 +146,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
if err := writeCmd(cn, cmd); err != nil {
c.releaseConn(cn, err)
cmd.setErr(err)
- if internal.IsRetryableError(err) {
+ if internal.IsRetryableError(err, true) {
continue
}
return err
@@ -150,7 +155,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
cn.SetReadTimeout(c.cmdTimeout(cmd))
err = cmd.readReply(cn)
c.releaseConn(cn, err)
- if err != nil && internal.IsRetryableError(err) {
+ if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
continue
}
@@ -197,8 +202,11 @@ type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
return func(cmds []Cmder) error {
- var firstErr error
- for i := 0; i <= c.opt.MaxRetries; i++ {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
cn, _, err := c.getConn()
if err != nil {
setCmdsErr(cmds, err)
@@ -206,18 +214,18 @@ func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
}
canRetry, err := p(cn, cmds)
- c.releaseConn(cn, err)
- if err == nil {
- return nil
- }
- if firstErr == nil {
- firstErr = err
+
+ if err == nil || internal.IsRedisError(err) {
+ _ = c.connPool.Put(cn)
+ break
}
- if !canRetry || !internal.IsRetryableError(err) {
+ _ = c.connPool.Remove(cn)
+
+ if !canRetry || !internal.IsRetryableError(err, true) {
break
}
}
- return firstErr
+ return firstCmdsErr(cmds)
}
}
@@ -230,23 +238,17 @@ func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, err
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
- return pipelineReadCmds(cn, cmds)
+ return true, pipelineReadCmds(cn, cmds)
}
-func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) {
- for i, cmd := range cmds {
+func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
+ for _, cmd := range cmds {
err := cmd.readReply(cn)
- if err == nil {
- continue
- }
- if i == 0 {
- retry = true
- }
- if firstErr == nil {
- firstErr = err
+ if err != nil && !internal.IsRedisError(err) {
+ return err
}
}
- return
+ return nil
}
func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
@@ -260,11 +262,11 @@ func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, e
cn.SetReadTimeout(c.opt.ReadTimeout)
if err := c.txPipelineReadQueued(cn, cmds); err != nil {
+ setCmdsErr(cmds, err)
return false, err
}
- _, err := pipelineReadCmds(cn, cmds)
- return false, err
+ return false, pipelineReadCmds(cn, cmds)
}
func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
@@ -276,21 +278,16 @@ func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
}
func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
- var firstErr error
-
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
- firstErr = err
+ if err := statusCmd.readReply(cn); err != nil {
+ return err
}
- for _, cmd := range cmds {
+ for _ = range cmds {
err := statusCmd.readReply(cn)
- if err != nil {
- cmd.setErr(err)
- if firstErr == nil {
- firstErr = err
- }
+ if err != nil && !internal.IsRedisError(err) {
+ return err
}
}
@@ -355,21 +352,16 @@ func (c *Client) Options() *Options {
return c.opt
}
+type PoolStats pool.Stats
+
// PoolStats returns connection pool stats.
func (c *Client) PoolStats() *PoolStats {
- s := c.connPool.Stats()
- return &PoolStats{
- Requests: s.Requests,
- Hits: s.Hits,
- Timeouts: s.Timeouts,
-
- TotalConns: s.TotalConns,
- FreeConns: s.FreeConns,
- }
+ stats := c.connPool.Stats()
+ return (*PoolStats)(stats)
}
func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.Pipeline().pipelined(fn)
+ return c.Pipeline().Pipelined(fn)
}
func (c *Client) Pipeline() Pipeliner {
@@ -381,7 +373,7 @@ func (c *Client) Pipeline() Pipeliner {
}
func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.TxPipeline().pipelined(fn)
+ return c.TxPipeline().Pipelined(fn)
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
@@ -433,7 +425,7 @@ type Conn struct {
}
func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.Pipeline().pipelined(fn)
+ return c.Pipeline().Pipelined(fn)
}
func (c *Conn) Pipeline() Pipeliner {
@@ -445,7 +437,7 @@ func (c *Conn) Pipeline() Pipeliner {
}
func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.TxPipeline().pipelined(fn)
+ return c.TxPipeline().Pipelined(fn)
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go
index 72d52bf75..a30c32102 100644
--- a/vendor/github.com/go-redis/redis/ring.go
+++ b/vendor/github.com/go-redis/redis/ring.go
@@ -34,7 +34,9 @@ type RingOptions struct {
DB int
Password string
- MaxRetries int
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
@@ -50,6 +52,19 @@ func (opt *RingOptions) init() {
if opt.HeartbeatFrequency == 0 {
opt.HeartbeatFrequency = 500 * time.Millisecond
}
+
+ switch opt.MinRetryBackoff {
+ case -1:
+ opt.MinRetryBackoff = 0
+ case 0:
+ opt.MinRetryBackoff = 8 * time.Millisecond
+ }
+ switch opt.MaxRetryBackoff {
+ case -1:
+ opt.MaxRetryBackoff = 0
+ case 0:
+ opt.MaxRetryBackoff = 512 * time.Millisecond
+ }
}
func (opt *RingOptions) clientOptions() *Options {
@@ -130,9 +145,10 @@ type Ring struct {
opt *RingOptions
nreplicas int
- mu sync.RWMutex
- hash *consistenthash.Map
- shards map[string]*ringShard
+ mu sync.RWMutex
+ hash *consistenthash.Map
+ shards map[string]*ringShard
+ shardsList []*ringShard
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
@@ -154,24 +170,41 @@ func NewRing(opt *RingOptions) *Ring {
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
clopt.Addr = addr
- ring.addClient(name, NewClient(clopt))
+ ring.addShard(name, NewClient(clopt))
}
go ring.heartbeat()
return ring
}
+func (c *Ring) addShard(name string, cl *Client) {
+ shard := &ringShard{Client: cl}
+ c.mu.Lock()
+ c.hash.Add(name)
+ c.shards[name] = shard
+ c.shardsList = append(c.shardsList, shard)
+ c.mu.Unlock()
+}
+
// Options returns read-only Options that were used to create the client.
func (c *Ring) Options() *RingOptions {
return c.opt
}
+func (c *Ring) retryBackoff(attempt int) time.Duration {
+ return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
+}
+
// PoolStats returns accumulated connection pool stats.
func (c *Ring) PoolStats() *PoolStats {
+ c.mu.RLock()
+ shards := c.shardsList
+ c.mu.RUnlock()
+
var acc PoolStats
- for _, shard := range c.shards {
+ for _, shard := range shards {
s := shard.Client.connPool.Stats()
- acc.Requests += s.Requests
acc.Hits += s.Hits
+ acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
@@ -210,9 +243,13 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub {
// ForEachShard concurrently calls the fn on each live shard in the ring.
// It returns the first error if any.
func (c *Ring) ForEachShard(fn func(client *Client) error) error {
+ c.mu.RLock()
+ shards := c.shardsList
+ c.mu.RUnlock()
+
var wg sync.WaitGroup
errCh := make(chan error, 1)
- for _, shard := range c.shards {
+ for _, shard := range shards {
if shard.IsDown() {
continue
}
@@ -241,8 +278,12 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
func (c *Ring) cmdInfo(name string) *CommandInfo {
err := c.cmdsInfoOnce.Do(func() error {
+ c.mu.RLock()
+ shards := c.shardsList
+ c.mu.RUnlock()
+
var firstErr error
- for _, shard := range c.shards {
+ for _, shard := range shards {
cmdsInfo, err := shard.Client.Command().Result()
if err == nil {
c.cmdsInfo = cmdsInfo
@@ -257,14 +298,11 @@ func (c *Ring) cmdInfo(name string) *CommandInfo {
if err != nil {
return nil
}
- return c.cmdsInfo[name]
-}
-
-func (c *Ring) addClient(name string, cl *Client) {
- c.mu.Lock()
- c.hash.Add(name)
- c.shards[name] = &ringShard{Client: cl}
- c.mu.Unlock()
+ info := c.cmdsInfo[name]
+ if info == nil {
+ internal.Logf("info for cmd=%s not found", name)
+ }
+ return info
}
func (c *Ring) shardByKey(key string) (*ringShard, error) {
@@ -305,7 +343,7 @@ func (c *Ring) shardByName(name string) (*ringShard, error) {
func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
cmdInfo := c.cmdInfo(cmd.Name())
- firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
+ firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
return c.shardByKey(firstKey)
}
@@ -346,7 +384,10 @@ func (c *Ring) heartbeat() {
break
}
- for _, shard := range c.shards {
+ shards := c.shardsList
+ c.mu.RUnlock()
+
+ for _, shard := range shards {
err := shard.Client.Ping().Err()
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
internal.Logf("ring shard state changed: %s", shard)
@@ -354,8 +395,6 @@ func (c *Ring) heartbeat() {
}
}
- c.mu.RUnlock()
-
if rebalance {
c.rebalance()
}
@@ -383,6 +422,7 @@ func (c *Ring) Close() error {
}
c.hash = nil
c.shards = nil
+ c.shardsList = nil
return firstErr
}
@@ -396,51 +436,48 @@ func (c *Ring) Pipeline() Pipeliner {
}
func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.Pipeline().pipelined(fn)
+ return c.Pipeline().Pipelined(fn)
}
-func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
+func (c *Ring) pipelineExec(cmds []Cmder) error {
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
cmdInfo := c.cmdInfo(cmd.Name())
- name := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
+ name := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
if name != "" {
name = c.hash.Get(hashtag.Key(name))
}
cmdsMap[name] = append(cmdsMap[name], cmd)
}
- for i := 0; i <= c.opt.MaxRetries; i++ {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
var failedCmdsMap map[string][]Cmder
for name, cmds := range cmdsMap {
shard, err := c.shardByName(name)
if err != nil {
setCmdsErr(cmds, err)
- if firstErr == nil {
- firstErr = err
- }
continue
}
cn, _, err := shard.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
- if firstErr == nil {
- firstErr = err
- }
continue
}
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
- shard.Client.releaseConn(cn, err)
- if err == nil {
+ if err == nil || internal.IsRedisError(err) {
+ _ = shard.Client.connPool.Put(cn)
continue
}
- if firstErr == nil {
- firstErr = err
- }
- if canRetry && internal.IsRetryableError(err) {
+ _ = shard.Client.connPool.Remove(cn)
+
+ if canRetry && internal.IsRetryableError(err, true) {
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
}
@@ -454,5 +491,13 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
cmdsMap = failedCmdsMap
}
- return firstErr
+ return firstCmdsErr(cmds)
+}
+
+func (c *Ring) TxPipeline() Pipeliner {
+ panic("not implemented")
+}
+
+func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
+ panic("not implemented")
}
diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go
index 3bfdb4a3f..37d06b482 100644
--- a/vendor/github.com/go-redis/redis/sentinel.go
+++ b/vendor/github.com/go-redis/redis/sentinel.go
@@ -301,8 +301,10 @@ func (d *sentinelFailover) listen(sentinel *sentinelClient) {
msg, err := pubsub.ReceiveMessage()
if err != nil {
- internal.Logf("sentinel: ReceiveMessage failed: %s", err)
- pubsub.Close()
+ if err != pool.ErrClosed {
+ internal.Logf("sentinel: ReceiveMessage failed: %s", err)
+ pubsub.Close()
+ }
d.resetSentinel()
return
}
diff --git a/vendor/github.com/go-redis/redis/tx.go b/vendor/github.com/go-redis/redis/tx.go
index 5ef89619b..11d5d5cb0 100644
--- a/vendor/github.com/go-redis/redis/tx.go
+++ b/vendor/github.com/go-redis/redis/tx.go
@@ -36,11 +36,10 @@ func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
return err
}
}
- firstErr := fn(tx)
- if err := tx.Close(); err != nil && firstErr == nil {
- firstErr = err
- }
- return firstErr
+
+ err := fn(tx)
+ _ = tx.Close()
+ return err
}
// close closes the transaction, releasing any open resources.
@@ -53,7 +52,7 @@ func (c *Tx) Close() error {
// of a transaction.
func (c *Tx) Watch(keys ...string) *StatusCmd {
args := make([]interface{}, 1+len(keys))
- args[0] = "WATCH"
+ args[0] = "watch"
for i, key := range keys {
args[1+i] = key
}
@@ -65,7 +64,7 @@ func (c *Tx) Watch(keys ...string) *StatusCmd {
// Unwatch flushes all the previously watched keys for a transaction.
func (c *Tx) Unwatch(keys ...string) *StatusCmd {
args := make([]interface{}, 1+len(keys))
- args[0] = "UNWATCH"
+ args[0] = "unwatch"
for i, key := range keys {
args[1+i] = key
}
@@ -92,5 +91,13 @@ func (c *Tx) Pipeline() Pipeliner {
// TxFailedErr is returned. Otherwise Exec returns error of the first
// failed command or nil.
func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.Pipeline().pipelined(fn)
+ return c.Pipeline().Pipelined(fn)
+}
+
+func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
+ return c.Pipelined(fn)
+}
+
+func (c *Tx) TxPipeline() Pipeliner {
+ return c.Pipeline()
}
diff --git a/vendor/github.com/go-redis/redis/universal.go b/vendor/github.com/go-redis/redis/universal.go
index 4aa579fa4..29eb12b18 100644
--- a/vendor/github.com/go-redis/redis/universal.go
+++ b/vendor/github.com/go-redis/redis/universal.go
@@ -90,8 +90,8 @@ func (o *UniversalOptions) simple() *Options {
}
return &Options{
- Addr: addr,
- DB: o.DB,
+ Addr: addr,
+ DB: o.DB,
MaxRetries: o.MaxRetries,
Password: o.Password,
@@ -117,6 +117,9 @@ type UniversalClient interface {
Close() error
}
+var _ UniversalClient = (*Client)(nil)
+var _ UniversalClient = (*ClusterClient)(nil)
+
// NewUniversalClient returns a new multi client. The type of client returned depends
// on the following three conditions:
//