From b84736e9b6401df0c6eeab9950bef09458a6aefd Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Fri, 29 Sep 2017 12:46:30 -0700 Subject: Updating server dependancies. (#7538) --- vendor/github.com/go-redis/redis/.travis.yml | 1 + vendor/github.com/go-redis/redis/README.md | 3 +- vendor/github.com/go-redis/redis/cluster.go | 366 ++++++++++++--------- .../github.com/go-redis/redis/cluster_commands.go | 22 ++ vendor/github.com/go-redis/redis/cluster_test.go | 79 ++--- vendor/github.com/go-redis/redis/command.go | 86 +++-- vendor/github.com/go-redis/redis/commands.go | 5 +- vendor/github.com/go-redis/redis/commands_test.go | 55 ++-- vendor/github.com/go-redis/redis/export_test.go | 14 +- vendor/github.com/go-redis/redis/internal/error.go | 27 +- .../go-redis/redis/internal/pool/pool.go | 25 +- .../go-redis/redis/internal/proto/reader.go | 2 +- .../go-redis/redis/internal/proto/scan.go | 2 +- vendor/github.com/go-redis/redis/main_test.go | 4 - vendor/github.com/go-redis/redis/options.go | 10 - vendor/github.com/go-redis/redis/pipeline.go | 10 +- vendor/github.com/go-redis/redis/pool_test.go | 10 +- vendor/github.com/go-redis/redis/pubsub.go | 5 +- vendor/github.com/go-redis/redis/pubsub_test.go | 6 +- vendor/github.com/go-redis/redis/redis.go | 92 +++--- vendor/github.com/go-redis/redis/ring.go | 121 ++++--- vendor/github.com/go-redis/redis/sentinel.go | 6 +- vendor/github.com/go-redis/redis/tx.go | 23 +- vendor/github.com/go-redis/redis/universal.go | 7 +- 24 files changed, 588 insertions(+), 393 deletions(-) create mode 100644 vendor/github.com/go-redis/redis/cluster_commands.go (limited to 'vendor/github.com/go-redis/redis') diff --git a/vendor/github.com/go-redis/redis/.travis.yml b/vendor/github.com/go-redis/redis/.travis.yml index f4666c593..f49927ee8 100644 --- a/vendor/github.com/go-redis/redis/.travis.yml +++ b/vendor/github.com/go-redis/redis/.travis.yml @@ -8,6 +8,7 @@ go: - 1.4.x - 1.7.x - 1.8.x + - 1.9.x - tip matrix: diff --git a/vendor/github.com/go-redis/redis/README.md b/vendor/github.com/go-redis/redis/README.md index fd036496d..0a2a67124 100644 --- a/vendor/github.com/go-redis/redis/README.md +++ b/vendor/github.com/go-redis/redis/README.md @@ -6,6 +6,7 @@ Supports: - Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC. +- Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support. - [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub). - [Transactions](https://godoc.org/github.com/go-redis/redis#Multi). - [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline). @@ -16,7 +17,7 @@ Supports: - [Ring](https://godoc.org/github.com/go-redis/redis#NewRing). - [Instrumentation](https://godoc.org/github.com/go-redis/redis#ex-package--Instrumentation). - [Cache friendly](https://github.com/go-redis/cache). -- [Rate limiting](https://github.com/go-redis/rate). +- [Rate limiting](https://github.com/go-redis/redis_rate). - [Distributed Locks](https://github.com/bsm/redis-lock). API docs: https://godoc.org/github.com/go-redis/redis. diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index 647a25be3..c81fc1d57 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -14,8 +14,8 @@ import ( "github.com/go-redis/redis/internal/proto" ) -var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes") -var errNilClusterState = internal.RedisError("redis: cannot load cluster slots") +var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") +var errNilClusterState = fmt.Errorf("redis: cannot load cluster slots") // ClusterOptions are used to configure a cluster client and should be // passed to NewClusterClient. @@ -64,6 +64,19 @@ func (opt *ClusterOptions) init() { opt.ReadOnly = true } + switch opt.ReadTimeout { + case -1: + opt.ReadTimeout = 0 + case 0: + opt.ReadTimeout = 3 * time.Second + } + switch opt.WriteTimeout { + case -1: + opt.WriteTimeout = 0 + case 0: + opt.WriteTimeout = opt.ReadTimeout + } + switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 @@ -192,6 +205,21 @@ func (c *clusterNodes) Close() error { return firstErr } +func (c *clusterNodes) Addrs() ([]string, error) { + c.mu.RLock() + closed := c.closed + addrs := c.addrs + c.mu.RUnlock() + + if closed { + return nil, pool.ErrClosed + } + if len(addrs) == 0 { + return nil, errClusterNoNodes + } + return addrs, nil +} + func (c *clusterNodes) NextGeneration() uint32 { c.generation++ return c.generation @@ -272,16 +300,9 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { } func (c *clusterNodes) Random() (*clusterNode, error) { - c.mu.RLock() - closed := c.closed - addrs := c.addrs - c.mu.RUnlock() - - if closed { - return nil, pool.ErrClosed - } - if len(addrs) == 0 { - return nil, errClusterNoNodes + addrs, err := c.Addrs() + if err != nil { + return nil, err } var nodeErr error @@ -468,13 +489,23 @@ func (c *ClusterClient) Options() *ClusterOptions { return c.opt } -func (c *ClusterClient) state() *clusterState { +func (c *ClusterClient) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + +func (c *ClusterClient) state() (*clusterState, error) { v := c._state.Load() if v != nil { - return v.(*clusterState) + return v.(*clusterState), nil } + + _, err := c.nodes.Addrs() + if err != nil { + return nil, err + } + c.lazyReloadState() - return nil + return nil, errNilClusterState } func (c *ClusterClient) cmdInfo(name string) *CommandInfo { @@ -495,17 +526,22 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo { if err != nil { return nil } - return c.cmdsInfo[name] + info := c.cmdsInfo[name] + if info == nil { + internal.Logf("info for cmd=%s not found", name) + } + return info } -func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { - if state == nil { - node, err := c.nodes.Random() - return 0, node, err - } +func (c *ClusterClient) cmdSlot(cmd Cmder) int { + cmdInfo := c.cmdInfo(cmd.Name()) + firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) + return hashtag.Slot(firstKey) +} +func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { cmdInfo := c.cmdInfo(cmd.Name()) - firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) slot := hashtag.Slot(firstKey) if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly { @@ -523,19 +559,51 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl } func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { - state := c.state() + if len(keys) == 0 { + return fmt.Errorf("redis: keys don't hash to the same slot") + } - var node *clusterNode - var err error - if state != nil && len(keys) > 0 { - node, err = state.slotMasterNode(hashtag.Slot(keys[0])) - } else { - node, err = c.nodes.Random() + state, err := c.state() + if err != nil { + return err + } + + slot := hashtag.Slot(keys[0]) + for _, key := range keys[1:] { + if hashtag.Slot(key) != slot { + return fmt.Errorf("redis: Watch requires all keys to be in the same slot") + } } + + node, err := state.slotMasterNode(slot) if err != nil { return err } - return node.Client.Watch(fn, keys...) + + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + err = node.Client.Watch(fn, keys...) + if err == nil { + break + } + + moved, ask, addr := internal.IsMovedError(err) + if moved || ask { + c.lazyReloadState() + node, err = c.nodes.GetOrCreate(addr) + if err != nil { + return err + } + continue + } + + return err + } + + return err } // Close closes the cluster client, releasing any open resources. @@ -547,7 +615,13 @@ func (c *ClusterClient) Close() error { } func (c *ClusterClient) Process(cmd Cmder) error { - slot, node, err := c.cmdSlotAndNode(c.state(), cmd) + state, err := c.state() + if err != nil { + cmd.setErr(err) + return err + } + + _, node, err := c.cmdSlotAndNode(state, cmd) if err != nil { cmd.setErr(err) return err @@ -556,7 +630,7 @@ func (c *ClusterClient) Process(cmd Cmder) error { var ask bool for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { if attempt > 0 { - time.Sleep(node.Client.retryBackoff(attempt)) + time.Sleep(c.retryBackoff(attempt)) } if ask { @@ -572,7 +646,7 @@ func (c *ClusterClient) Process(cmd Cmder) error { // If there is no error - we are done. if err == nil { - return nil + break } // If slave is loading - read from master. @@ -582,12 +656,11 @@ func (c *ClusterClient) Process(cmd Cmder) error { continue } - // On network errors try random node. - if internal.IsRetryableError(err) || internal.IsClusterDownError(err) { - node, err = c.nodes.Random() - if err != nil { - cmd.setErr(err) - return err + if internal.IsRetryableError(err, true) { + var nodeErr error + node, nodeErr = c.nodes.Random() + if nodeErr != nil { + break } continue } @@ -596,20 +669,13 @@ func (c *ClusterClient) Process(cmd Cmder) error { var addr string moved, ask, addr = internal.IsMovedError(err) if moved || ask { - state := c.state() - if state != nil && slot >= 0 { - master, _ := state.slotMasterNode(slot) - if moved && (master == nil || master.Client.getAddr() != addr) { - c.lazyReloadState() - } - } + c.lazyReloadState() - node, err = c.nodes.GetOrCreate(addr) - if err != nil { - cmd.setErr(err) - return err + var nodeErr error + node, nodeErr = c.nodes.GetOrCreate(addr) + if nodeErr != nil { + break } - continue } @@ -622,9 +688,9 @@ func (c *ClusterClient) Process(cmd Cmder) error { // ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { - state := c.state() - if state == nil { - return errNilClusterState + state, err := c.state() + if err != nil { + return err } var wg sync.WaitGroup @@ -655,9 +721,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { // ForEachSlave concurrently calls the fn on each slave node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { - state := c.state() - if state == nil { - return errNilClusterState + state, err := c.state() + if err != nil { + return err } var wg sync.WaitGroup @@ -688,9 +754,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { // ForEachNode concurrently calls the fn on each known node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - state := c.state() - if state == nil { - return errNilClusterState + state, err := c.state() + if err != nil { + return err } var wg sync.WaitGroup @@ -728,27 +794,31 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { func (c *ClusterClient) PoolStats() *PoolStats { var acc PoolStats - state := c.state() + state, _ := c.state() if state == nil { return &acc } for _, node := range state.masters { s := node.Client.connPool.Stats() - acc.Requests += s.Requests acc.Hits += s.Hits + acc.Misses += s.Misses acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns + acc.StaleConns += s.StaleConns } for _, node := range state.slaves { s := node.Client.connPool.Stats() - acc.Requests += s.Requests acc.Hits += s.Hits + acc.Misses += s.Misses acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns + acc.StaleConns += s.StaleConns } return &acc @@ -762,10 +832,8 @@ func (c *ClusterClient) lazyReloadState() { go func() { defer atomic.StoreUint32(&c.reloading, 0) - var state *clusterState for { - var err error - state, err = c.reloadState() + state, err := c.reloadState() if err == pool.ErrClosed { return } @@ -776,11 +844,10 @@ func (c *ClusterClient) lazyReloadState() { } c._state.Store(state) + time.Sleep(5 * time.Second) + c.nodes.GC(state.generation) break } - - time.Sleep(3 * time.Second) - c.nodes.GC(state.generation) }() } @@ -810,21 +877,12 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { break } - var n int for _, node := range nodes { - nn, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() + _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() if err != nil { internal.Logf("ReapStaleConns failed: %s", err) - } else { - n += nn } } - - s := c.PoolStats() - internal.Logf( - "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)", - n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts, - ) } } @@ -837,16 +895,21 @@ func (c *ClusterClient) Pipeline() Pipeliner { } func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipeline().pipelined(fn) + return c.Pipeline().Pipelined(fn) } func (c *ClusterClient) pipelineExec(cmds []Cmder) error { cmdsMap, err := c.mapCmdsByNode(cmds) if err != nil { + setCmdsErr(cmds, err) return err } - for i := 0; i <= c.opt.MaxRedirects; i++ { + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { @@ -856,8 +919,12 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { continue } - err = c.pipelineProcessCmds(cn, cmds, failedCmds) - node.Client.releaseConn(cn, err) + err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) + if err == nil || internal.IsRedisError(err) { + _ = node.Client.connPool.Put(cn) + } else { + _ = node.Client.connPool.Remove(cn) + } } if len(failedCmds) == 0 { @@ -866,21 +933,20 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { cmdsMap = failedCmds } - var firstErr error - for _, cmd := range cmds { - if err := cmd.Err(); err != nil { - firstErr = err - break - } - } - return firstErr + return firstCmdsErr(cmds) } func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { - state := c.state() + state, err := c.state() + if err != nil { + setCmdsErr(cmds, err) + return nil, err + } + cmdsMap := make(map[*clusterNode][]Cmder) for _, cmd := range cmds { - _, node, err := c.cmdSlotAndNode(state, cmd) + slot := c.cmdSlot(cmd) + node, err := state.slotMasterNode(slot) if err != nil { return nil, err } @@ -890,11 +956,12 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e } func (c *ClusterClient) pipelineProcessCmds( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { cn.SetWriteTimeout(c.opt.WriteTimeout) if err := writeCmd(cn, cmds...); err != nil { setCmdsErr(cmds, err) + failedCmds[node] = cmds return err } @@ -907,46 +974,53 @@ func (c *ClusterClient) pipelineProcessCmds( func (c *ClusterClient) pipelineReadCmds( cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - var firstErr error for _, cmd := range cmds { err := cmd.readReply(cn) if err == nil { continue } - if firstErr == nil { - firstErr = err + if c.checkMovedErr(cmd, err, failedCmds) { + continue } - err = c.checkMovedErr(cmd, failedCmds) - if err != nil && firstErr == nil { - firstErr = err + if internal.IsRedisError(err) { + continue } + + return err } - return firstErr + return nil } -func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error { - moved, ask, addr := internal.IsMovedError(cmd.Err()) +func (c *ClusterClient) checkMovedErr( + cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder, +) bool { + moved, ask, addr := internal.IsMovedError(err) + if moved { c.lazyReloadState() node, err := c.nodes.GetOrCreate(addr) if err != nil { - return err + return false } failedCmds[node] = append(failedCmds[node], cmd) + return true } + if ask { node, err := c.nodes.GetOrCreate(addr) if err != nil { - return err + return false } failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) + return true } - return nil + + return false } // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. @@ -959,29 +1033,29 @@ func (c *ClusterClient) TxPipeline() Pipeliner { } func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.TxPipeline().pipelined(fn) + return c.TxPipeline().Pipelined(fn) } func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { - cmdsMap, err := c.mapCmdsBySlot(cmds) + state, err := c.state() if err != nil { return err } - state := c.state() - if state == nil { - return errNilClusterState - } - + cmdsMap := c.mapCmdsBySlot(cmds) for slot, cmds := range cmdsMap { node, err := state.slotMasterNode(slot) if err != nil { setCmdsErr(cmds, err) continue } - cmdsMap := map[*clusterNode][]Cmder{node: cmds} - for i := 0; i <= c.opt.MaxRedirects; i++ { + + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { @@ -992,7 +1066,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { } err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - node.Client.releaseConn(cn, err) + if err == nil || internal.IsRedisError(err) { + _ = node.Client.connPool.Put(cn) + } else { + _ = node.Client.connPool.Remove(cn) + } } if len(failedCmds) == 0 { @@ -1002,27 +1080,16 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { } } - var firstErr error - for _, cmd := range cmds { - if err := cmd.Err(); err != nil { - firstErr = err - break - } - } - return firstErr + return firstCmdsErr(cmds) } -func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) (map[int][]Cmder, error) { - state := c.state() +func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { cmdsMap := make(map[int][]Cmder) for _, cmd := range cmds { - slot, _, err := c.cmdSlotAndNode(state, cmd) - if err != nil { - return nil, err - } + slot := c.cmdSlot(cmd) cmdsMap[slot] = append(cmdsMap[slot], cmd) } - return cmdsMap, nil + return cmdsMap } func (c *ClusterClient) txPipelineProcessCmds( @@ -1039,22 +1106,20 @@ func (c *ClusterClient) txPipelineProcessCmds( cn.SetReadTimeout(c.opt.ReadTimeout) if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil { + setCmdsErr(cmds, err) return err } - _, err := pipelineReadCmds(cn, cmds) - return err + return pipelineReadCmds(cn, cmds) } func (c *ClusterClient) txPipelineReadQueued( cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - var firstErr error - // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil && firstErr == nil { - firstErr = err + if err := statusCmd.readReply(cn); err != nil { + return err } for _, cmd := range cmds { @@ -1063,15 +1128,11 @@ func (c *ClusterClient) txPipelineReadQueued( continue } - cmd.setErr(err) - if firstErr == nil { - firstErr = err + if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) { + continue } - err = c.checkMovedErr(cmd, failedCmds) - if err != nil && firstErr == nil { - firstErr = err - } + return err } // Parse number of replies. @@ -1085,7 +1146,13 @@ func (c *ClusterClient) txPipelineReadQueued( switch line[0] { case proto.ErrorReply: - return proto.ParseErrorReply(line) + err := proto.ParseErrorReply(line) + for _, cmd := range cmds { + if !c.checkMovedErr(cmd, err, failedCmds) { + break + } + } + return err case proto.ArrayReply: // ok default: @@ -1093,7 +1160,7 @@ func (c *ClusterClient) txPipelineReadQueued( return err } - return firstErr + return nil } func (c *ClusterClient) pubSub(channels []string) *PubSub { @@ -1112,7 +1179,12 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub { slot = -1 } - masterNode, err := c.state().slotMasterNode(slot) + state, err := c.state() + if err != nil { + return nil, err + } + + masterNode, err := state.slotMasterNode(slot) if err != nil { return nil, err } diff --git a/vendor/github.com/go-redis/redis/cluster_commands.go b/vendor/github.com/go-redis/redis/cluster_commands.go new file mode 100644 index 000000000..dff62c902 --- /dev/null +++ b/vendor/github.com/go-redis/redis/cluster_commands.go @@ -0,0 +1,22 @@ +package redis + +import "sync/atomic" + +func (c *ClusterClient) DBSize() *IntCmd { + cmd := NewIntCmd("dbsize") + var size int64 + err := c.ForEachMaster(func(master *Client) error { + n, err := master.DBSize().Result() + if err != nil { + return err + } + atomic.AddInt64(&size, n) + return nil + }) + if err != nil { + cmd.setErr(err) + return cmd + } + cmd.val = size + return cmd +} diff --git a/vendor/github.com/go-redis/redis/cluster_test.go b/vendor/github.com/go-redis/redis/cluster_test.go index 324bd1ce1..6f3677b93 100644 --- a/vendor/github.com/go-redis/redis/cluster_test.go +++ b/vendor/github.com/go-redis/redis/cluster_test.go @@ -200,7 +200,7 @@ var _ = Describe("ClusterClient", func() { Eventually(func() string { return client.Get("A").Val() - }).Should(Equal("VALUE")) + }, 30*time.Second).Should(Equal("VALUE")) cnt, err := client.Del("A").Result() Expect(err).NotTo(HaveOccurred()) @@ -215,7 +215,7 @@ var _ = Describe("ClusterClient", func() { Eventually(func() string { return client.Get("A").Val() - }).Should(Equal("VALUE")) + }, 30*time.Second).Should(Equal("VALUE")) }) It("distributes keys", func() { @@ -227,7 +227,7 @@ var _ = Describe("ClusterClient", func() { for _, master := range cluster.masters() { Eventually(func() string { return master.Info("keyspace").Val() - }, 5*time.Second).Should(Or( + }, 30*time.Second).Should(Or( ContainSubstring("keys=31"), ContainSubstring("keys=29"), ContainSubstring("keys=40"), @@ -251,7 +251,7 @@ var _ = Describe("ClusterClient", func() { for _, master := range cluster.masters() { Eventually(func() string { return master.Info("keyspace").Val() - }, 5*time.Second).Should(Or( + }, 30*time.Second).Should(Or( ContainSubstring("keys=31"), ContainSubstring("keys=29"), ContainSubstring("keys=40"), @@ -320,10 +320,6 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(14)) - if opt.RouteByLatency { - return - } - for _, key := range keys { slot := hashtag.Slot(key) client.SwapSlotNodes(slot) @@ -432,6 +428,9 @@ var _ = Describe("ClusterClient", func() { }) AfterEach(func() { + _ = client.ForEachMaster(func(master *redis.Client) error { + return master.FlushDB().Err() + }) Expect(client.Close()).NotTo(HaveOccurred()) }) @@ -476,11 +475,9 @@ var _ = Describe("ClusterClient", func() { }) Expect(err).NotTo(HaveOccurred()) - for _, client := range cluster.masters() { - size, err := client.DBSize().Result() - Expect(err).NotTo(HaveOccurred()) - Expect(size).To(Equal(int64(0))) - } + size, err := client.DBSize().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(size).To(Equal(int64(0))) }) It("should CLUSTER SLOTS", func() { @@ -560,6 +557,9 @@ var _ = Describe("ClusterClient", func() { }) AfterEach(func() { + _ = client.ForEachMaster(func(master *redis.Client) error { + return master.FlushDB().Err() + }) Expect(client.Close()).NotTo(HaveOccurred()) }) @@ -575,10 +575,19 @@ var _ = Describe("ClusterClient", func() { _ = client.ForEachMaster(func(master *redis.Client) error { return master.FlushDB().Err() }) + + _ = client.ForEachSlave(func(slave *redis.Client) error { + Eventually(func() int64 { + return client.DBSize().Val() + }, 30*time.Second).Should(Equal(int64(0))) + return nil + }) }) AfterEach(func() { - client.FlushDB() + _ = client.ForEachMaster(func(master *redis.Client) error { + return master.FlushDB().Err() + }) Expect(client.Close()).NotTo(HaveOccurred()) }) @@ -597,7 +606,7 @@ var _ = Describe("ClusterClient without nodes", func() { Expect(client.Close()).NotTo(HaveOccurred()) }) - It("returns an error", func() { + It("Ping returns an error", func() { err := client.Ping().Err() Expect(err).To(MatchError("redis: cluster has no nodes")) }) @@ -626,7 +635,7 @@ var _ = Describe("ClusterClient without valid nodes", func() { It("returns an error", func() { err := client.Ping().Err() - Expect(err).To(MatchError("ERR This instance has cluster support disabled")) + Expect(err).To(MatchError("redis: cannot load cluster slots")) }) It("pipeline returns an error", func() { @@ -634,7 +643,7 @@ var _ = Describe("ClusterClient without valid nodes", func() { pipe.Ping() return nil }) - Expect(err).To(MatchError("ERR This instance has cluster support disabled")) + Expect(err).To(MatchError("redis: cannot load cluster slots")) }) }) @@ -664,7 +673,7 @@ var _ = Describe("ClusterClient timeout", func() { It("Tx timeouts", func() { err := client.Watch(func(tx *redis.Tx) error { return tx.Ping().Err() - }) + }, "foo") Expect(err).To(HaveOccurred()) Expect(err.(net.Error).Timeout()).To(BeTrue()) }) @@ -676,42 +685,20 @@ var _ = Describe("ClusterClient timeout", func() { return nil }) return err - }) + }, "foo") Expect(err).To(HaveOccurred()) Expect(err.(net.Error).Timeout()).To(BeTrue()) }) } - Context("read timeout", func() { - BeforeEach(func() { - opt := redisClusterOptions() - opt.ReadTimeout = time.Nanosecond - opt.WriteTimeout = -1 - client = cluster.clusterClient(opt) - }) - - testTimeout() - }) - - Context("write timeout", func() { - BeforeEach(func() { - opt := redisClusterOptions() - opt.ReadTimeout = time.Nanosecond - opt.WriteTimeout = -1 - client = cluster.clusterClient(opt) - }) - - testTimeout() - }) - - Context("ClientPause timeout", func() { - const pause = time.Second + const pause = time.Second + Context("read/write timeout", func() { BeforeEach(func() { opt := redisClusterOptions() - opt.ReadTimeout = pause / 10 - opt.WriteTimeout = pause / 10 - opt.MaxRedirects = -1 + opt.ReadTimeout = 100 * time.Millisecond + opt.WriteTimeout = 100 * time.Millisecond + opt.MaxRedirects = 1 client = cluster.clusterClient(opt) err := client.ForEachNode(func(client *redis.Client) error { diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go index 0e5b2016e..d2688082a 100644 --- a/vendor/github.com/go-redis/redis/command.go +++ b/vendor/github.com/go-redis/redis/command.go @@ -12,28 +12,10 @@ import ( "github.com/go-redis/redis/internal/proto" ) -var ( - _ Cmder = (*Cmd)(nil) - _ Cmder = (*SliceCmd)(nil) - _ Cmder = (*StatusCmd)(nil) - _ Cmder = (*IntCmd)(nil) - _ Cmder = (*DurationCmd)(nil) - _ Cmder = (*BoolCmd)(nil) - _ Cmder = (*StringCmd)(nil) - _ Cmder = (*FloatCmd)(nil) - _ Cmder = (*StringSliceCmd)(nil) - _ Cmder = (*BoolSliceCmd)(nil) - _ Cmder = (*StringStringMapCmd)(nil) - _ Cmder = (*StringIntMapCmd)(nil) - _ Cmder = (*ZSliceCmd)(nil) - _ Cmder = (*ScanCmd)(nil) - _ Cmder = (*ClusterSlotsCmd)(nil) -) - type Cmder interface { - args() []interface{} - arg(int) string Name() string + Args() []interface{} + stringArg(int) string readReply(*pool.Conn) error setErr(error) @@ -46,14 +28,25 @@ type Cmder interface { func setCmdsErr(cmds []Cmder, e error) { for _, cmd := range cmds { - cmd.setErr(e) + if cmd.Err() == nil { + cmd.setErr(e) + } } } +func firstCmdsErr(cmds []Cmder) error { + for _, cmd := range cmds { + if err := cmd.Err(); err != nil { + return err + } + } + return nil +} + func writeCmd(cn *pool.Conn, cmds ...Cmder) error { cn.Wb.Reset() for _, cmd := range cmds { - if err := cn.Wb.Append(cmd.args()); err != nil { + if err := cn.Wb.Append(cmd.Args()); err != nil { return err } } @@ -64,7 +57,7 @@ func writeCmd(cn *pool.Conn, cmds ...Cmder) error { func cmdString(cmd Cmder, val interface{}) string { var ss []string - for _, arg := range cmd.args() { + for _, arg := range cmd.Args() { ss = append(ss, fmt.Sprint(arg)) } s := strings.Join(ss, " ") @@ -86,7 +79,7 @@ func cmdString(cmd Cmder, val interface{}) string { func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int { switch cmd.Name() { case "eval", "evalsha": - if cmd.arg(2) != "0" { + if cmd.stringArg(2) != "0" { return 3 } else { return -1 @@ -95,7 +88,6 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int { return 1 } if info == nil { - internal.Logf("info for cmd=%s not found", cmd.Name()) return -1 } return int(info.FirstKeyPos) @@ -110,15 +102,17 @@ type baseCmd struct { _readTimeout *time.Duration } +var _ Cmder = (*Cmd)(nil) + func (cmd *baseCmd) Err() error { return cmd.err } -func (cmd *baseCmd) args() []interface{} { +func (cmd *baseCmd) Args() []interface{} { return cmd._args } -func (cmd *baseCmd) arg(pos int) string { +func (cmd *baseCmd) stringArg(pos int) string { if pos < 0 || pos >= len(cmd._args) { return "" } @@ -129,7 +123,7 @@ func (cmd *baseCmd) arg(pos int) string { func (cmd *baseCmd) Name() string { if len(cmd._args) > 0 { // Cmd name must be lower cased. - s := internal.ToLower(cmd.arg(0)) + s := internal.ToLower(cmd.stringArg(0)) cmd._args[0] = s return s } @@ -194,6 +188,8 @@ type SliceCmd struct { val []interface{} } +var _ Cmder = (*SliceCmd)(nil) + func NewSliceCmd(args ...interface{}) *SliceCmd { return &SliceCmd{ baseCmd: baseCmd{_args: args}, @@ -230,6 +226,8 @@ type StatusCmd struct { val string } +var _ Cmder = (*StatusCmd)(nil) + func NewStatusCmd(args ...interface{}) *StatusCmd { return &StatusCmd{ baseCmd: baseCmd{_args: args}, @@ -261,6 +259,8 @@ type IntCmd struct { val int64 } +var _ Cmder = (*IntCmd)(nil) + func NewIntCmd(args ...interface{}) *IntCmd { return &IntCmd{ baseCmd: baseCmd{_args: args}, @@ -293,6 +293,8 @@ type DurationCmd struct { precision time.Duration } +var _ Cmder = (*DurationCmd)(nil) + func NewDurationCmd(precision time.Duration, args ...interface{}) *DurationCmd { return &DurationCmd{ baseCmd: baseCmd{_args: args}, @@ -330,6 +332,8 @@ type TimeCmd struct { val time.Time } +var _ Cmder = (*TimeCmd)(nil) + func NewTimeCmd(args ...interface{}) *TimeCmd { return &TimeCmd{ baseCmd: baseCmd{_args: args}, @@ -366,6 +370,8 @@ type BoolCmd struct { val bool } +var _ Cmder = (*BoolCmd)(nil) + func NewBoolCmd(args ...interface{}) *BoolCmd { return &BoolCmd{ baseCmd: baseCmd{_args: args}, @@ -421,6 +427,8 @@ type StringCmd struct { val []byte } +var _ Cmder = (*StringCmd)(nil) + func NewStringCmd(args ...interface{}) *StringCmd { return &StringCmd{ baseCmd: baseCmd{_args: args}, @@ -484,6 +492,8 @@ type FloatCmd struct { val float64 } +var _ Cmder = (*FloatCmd)(nil) + func NewFloatCmd(args ...interface{}) *FloatCmd { return &FloatCmd{ baseCmd: baseCmd{_args: args}, @@ -515,6 +525,8 @@ type StringSliceCmd struct { val []string } +var _ Cmder = (*StringSliceCmd)(nil) + func NewStringSliceCmd(args ...interface{}) *StringSliceCmd { return &StringSliceCmd{ baseCmd: baseCmd{_args: args}, @@ -555,6 +567,8 @@ type BoolSliceCmd struct { val []bool } +var _ Cmder = (*BoolSliceCmd)(nil) + func NewBoolSliceCmd(args ...interface{}) *BoolSliceCmd { return &BoolSliceCmd{ baseCmd: baseCmd{_args: args}, @@ -591,6 +605,8 @@ type StringStringMapCmd struct { val map[string]string } +var _ Cmder = (*StringStringMapCmd)(nil) + func NewStringStringMapCmd(args ...interface{}) *StringStringMapCmd { return &StringStringMapCmd{ baseCmd: baseCmd{_args: args}, @@ -627,6 +643,8 @@ type StringIntMapCmd struct { val map[string]int64 } +var _ Cmder = (*StringIntMapCmd)(nil) + func NewStringIntMapCmd(args ...interface{}) *StringIntMapCmd { return &StringIntMapCmd{ baseCmd: baseCmd{_args: args}, @@ -663,6 +681,8 @@ type ZSliceCmd struct { val []Z } +var _ Cmder = (*ZSliceCmd)(nil) + func NewZSliceCmd(args ...interface{}) *ZSliceCmd { return &ZSliceCmd{ baseCmd: baseCmd{_args: args}, @@ -702,6 +722,8 @@ type ScanCmd struct { process func(cmd Cmder) error } +var _ Cmder = (*ScanCmd)(nil) + func NewScanCmd(process func(cmd Cmder) error, args ...interface{}) *ScanCmd { return &ScanCmd{ baseCmd: baseCmd{_args: args}, @@ -752,6 +774,8 @@ type ClusterSlotsCmd struct { val []ClusterSlot } +var _ Cmder = (*ClusterSlotsCmd)(nil) + func NewClusterSlotsCmd(args ...interface{}) *ClusterSlotsCmd { return &ClusterSlotsCmd{ baseCmd: baseCmd{_args: args}, @@ -811,6 +835,8 @@ type GeoLocationCmd struct { locations []GeoLocation } +var _ Cmder = (*GeoLocationCmd)(nil) + func NewGeoLocationCmd(q *GeoRadiusQuery, args ...interface{}) *GeoLocationCmd { args = append(args, q.Radius) if q.Unit != "" { @@ -881,6 +907,8 @@ type GeoPosCmd struct { positions []*GeoPos } +var _ Cmder = (*GeoPosCmd)(nil) + func NewGeoPosCmd(args ...interface{}) *GeoPosCmd { return &GeoPosCmd{ baseCmd: baseCmd{_args: args}, @@ -927,6 +955,8 @@ type CommandsInfoCmd struct { val map[string]*CommandInfo } +var _ Cmder = (*CommandsInfoCmd)(nil) + func NewCommandsInfoCmd(args ...interface{}) *CommandsInfoCmd { return &CommandsInfoCmd{ baseCmd: baseCmd{_args: args}, diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go index 83b3824f8..a3b90f12d 100644 --- a/vendor/github.com/go-redis/redis/commands.go +++ b/vendor/github.com/go-redis/redis/commands.go @@ -11,7 +11,7 @@ func readTimeout(timeout time.Duration) time.Duration { if timeout == 0 { return 0 } - return timeout + time.Second + return timeout + 10*time.Second } func usePrecise(dur time.Duration) bool { @@ -42,6 +42,9 @@ type Cmdable interface { Pipeline() Pipeliner Pipelined(fn func(Pipeliner) error) ([]Cmder, error) + TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) + TxPipeline() Pipeliner + ClientGetName() *StringCmd Echo(message interface{}) *StringCmd Ping() *StatusCmd diff --git a/vendor/github.com/go-redis/redis/commands_test.go b/vendor/github.com/go-redis/redis/commands_test.go index 4298cba68..6b81f23cf 100644 --- a/vendor/github.com/go-redis/redis/commands_test.go +++ b/vendor/github.com/go-redis/redis/commands_test.go @@ -27,11 +27,21 @@ var _ = Describe("Commands", func() { Describe("server", func() { It("should Auth", func() { - _, err := client.Pipelined(func(pipe redis.Pipeliner) error { + cmds, err := client.Pipelined(func(pipe redis.Pipeliner) error { pipe.Auth("password") + pipe.Auth("") return nil }) Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set")) + Expect(cmds[0].Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) + Expect(cmds[1].Err()).To(MatchError("ERR Client sent AUTH, but no password is set")) + + stats := client.PoolStats() + Expect(stats.Hits).To(Equal(uint32(1))) + Expect(stats.Misses).To(Equal(uint32(1))) + Expect(stats.Timeouts).To(Equal(uint32(0))) + Expect(stats.TotalConns).To(Equal(uint32(1))) + Expect(stats.FreeConns).To(Equal(uint32(1))) }) It("should Echo", func() { @@ -187,6 +197,29 @@ var _ = Describe("Commands", func() { Expect(tm).To(BeTemporally("~", time.Now(), 3*time.Second)) }) + It("Should Command", func() { + cmds, err := client.Command().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(cmds)).To(BeNumerically("~", 180, 10)) + + cmd := cmds["mget"] + Expect(cmd.Name).To(Equal("mget")) + Expect(cmd.Arity).To(Equal(int8(-2))) + Expect(cmd.Flags).To(ContainElement("readonly")) + Expect(cmd.FirstKeyPos).To(Equal(int8(1))) + Expect(cmd.LastKeyPos).To(Equal(int8(-1))) + Expect(cmd.StepCount).To(Equal(int8(1))) + + cmd = cmds["ping"] + Expect(cmd.Name).To(Equal("ping")) + Expect(cmd.Arity).To(Equal(int8(-1))) + Expect(cmd.Flags).To(ContainElement("stale")) + Expect(cmd.Flags).To(ContainElement("fast")) + Expect(cmd.FirstKeyPos).To(Equal(int8(0))) + Expect(cmd.LastKeyPos).To(Equal(int8(0))) + Expect(cmd.StepCount).To(Equal(int8(0))) + }) + }) Describe("debugging", func() { @@ -1358,8 +1391,8 @@ var _ = Describe("Commands", func() { Expect(client.Ping().Err()).NotTo(HaveOccurred()) stats := client.PoolStats() - Expect(stats.Requests).To(Equal(uint32(3))) Expect(stats.Hits).To(Equal(uint32(1))) + Expect(stats.Misses).To(Equal(uint32(2))) Expect(stats.Timeouts).To(Equal(uint32(0))) }) @@ -2887,24 +2920,6 @@ var _ = Describe("Commands", func() { }) - Describe("Command", func() { - - It("returns map of commands", func() { - cmds, err := client.Command().Result() - Expect(err).NotTo(HaveOccurred()) - Expect(len(cmds)).To(BeNumerically("~", 180, 10)) - - cmd := cmds["mget"] - Expect(cmd.Name).To(Equal("mget")) - Expect(cmd.Arity).To(Equal(int8(-2))) - Expect(cmd.Flags).To(ContainElement("readonly")) - Expect(cmd.FirstKeyPos).To(Equal(int8(1))) - Expect(cmd.LastKeyPos).To(Equal(int8(-1))) - Expect(cmd.StepCount).To(Equal(int8(1))) - }) - - }) - Describe("Eval", func() { It("returns keys and values", func() { diff --git a/vendor/github.com/go-redis/redis/export_test.go b/vendor/github.com/go-redis/redis/export_test.go index 3b7965d79..bcc18c457 100644 --- a/vendor/github.com/go-redis/redis/export_test.go +++ b/vendor/github.com/go-redis/redis/export_test.go @@ -20,8 +20,13 @@ func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error) } func (c *ClusterClient) SlotAddrs(slot int) []string { + state, err := c.state() + if err != nil { + panic(err) + } + var addrs []string - for _, n := range c.state().slotNodes(slot) { + for _, n := range state.slotNodes(slot) { addrs = append(addrs, n.Client.getAddr()) } return addrs @@ -29,7 +34,12 @@ func (c *ClusterClient) SlotAddrs(slot int) []string { // SwapSlot swaps a slot's master/slave address for testing MOVED redirects. func (c *ClusterClient) SwapSlotNodes(slot int) { - nodes := c.state().slots[slot] + state, err := c.state() + if err != nil { + panic(err) + } + + nodes := state.slots[slot] if len(nodes) == 2 { nodes[0], nodes[1] = nodes[1], nodes[0] } diff --git a/vendor/github.com/go-redis/redis/internal/error.go b/vendor/github.com/go-redis/redis/internal/error.go index 90f6503a1..0898eeb62 100644 --- a/vendor/github.com/go-redis/redis/internal/error.go +++ b/vendor/github.com/go-redis/redis/internal/error.go @@ -12,11 +12,24 @@ type RedisError string func (e RedisError) Error() string { return string(e) } -func IsRetryableError(err error) bool { - return IsNetworkError(err) || err.Error() == "ERR max number of clients reached" +func IsRetryableError(err error, retryNetError bool) bool { + if IsNetworkError(err) { + return retryNetError + } + s := err.Error() + if s == "ERR max number of clients reached" { + return true + } + if strings.HasPrefix(s, "LOADING ") { + return true + } + if strings.HasPrefix(s, "CLUSTERDOWN ") { + return true + } + return false } -func IsInternalError(err error) bool { +func IsRedisError(err error) bool { _, ok := err.(RedisError) return ok } @@ -33,7 +46,7 @@ func IsBadConn(err error, allowTimeout bool) bool { if err == nil { return false } - if IsInternalError(err) { + if IsRedisError(err) { return false } if allowTimeout { @@ -45,7 +58,7 @@ func IsBadConn(err error, allowTimeout bool) bool { } func IsMovedError(err error) (moved bool, ask bool, addr string) { - if !IsInternalError(err) { + if !IsRedisError(err) { return } @@ -69,7 +82,3 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) { func IsLoadingError(err error) bool { return strings.HasPrefix(err.Error(), "LOADING ") } - -func IsClusterDownError(err error) bool { - return strings.HasPrefix(err.Error(), "CLUSTERDOWN ") -} diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go index 25e78aa3c..836ec1045 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -23,12 +23,13 @@ var timers = sync.Pool{ // Stats contains pool state information and accumulated stats. type Stats struct { - Requests uint32 // number of times a connection was requested by the pool Hits uint32 // number of times free connection was found in the pool + Misses uint32 // number of times free connection was NOT found in the pool Timeouts uint32 // number of times a wait timeout occurred - TotalConns uint32 // the number of total connections in the pool - FreeConns uint32 // the number of free connections in the pool + TotalConns uint32 // number of total connections in the pool + FreeConns uint32 // number of free connections in the pool + StaleConns uint32 // number of stale connections removed from the pool } type Pooler interface { @@ -150,8 +151,6 @@ func (p *ConnPool) Get() (*Conn, bool, error) { return nil, false, ErrClosed } - atomic.AddUint32(&p.stats.Requests, 1) - select { case p.queue <- struct{}{}: default: @@ -189,6 +188,8 @@ func (p *ConnPool) Get() (*Conn, bool, error) { return cn, false, nil } + atomic.AddUint32(&p.stats.Misses, 1) + newcn, err := p.NewConn() if err != nil { <-p.queue @@ -265,11 +266,13 @@ func (p *ConnPool) FreeLen() int { func (p *ConnPool) Stats() *Stats { return &Stats{ - Requests: atomic.LoadUint32(&p.stats.Requests), - Hits: atomic.LoadUint32(&p.stats.Hits), - Timeouts: atomic.LoadUint32(&p.stats.Timeouts), + Hits: atomic.LoadUint32(&p.stats.Hits), + Misses: atomic.LoadUint32(&p.stats.Misses), + Timeouts: atomic.LoadUint32(&p.stats.Timeouts), + TotalConns: uint32(p.Len()), FreeConns: uint32(p.FreeLen()), + StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } } @@ -362,10 +365,6 @@ func (p *ConnPool) reaper(frequency time.Duration) { internal.Logf("ReapStaleConns failed: %s", err) continue } - s := p.Stats() - internal.Logf( - "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)", - n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts, - ) + atomic.AddUint32(&p.stats.StaleConns, uint32(n)) } } diff --git a/vendor/github.com/go-redis/redis/internal/proto/reader.go b/vendor/github.com/go-redis/redis/internal/proto/reader.go index 2159cf639..cd94329d8 100644 --- a/vendor/github.com/go-redis/redis/internal/proto/reader.go +++ b/vendor/github.com/go-redis/redis/internal/proto/reader.go @@ -63,7 +63,7 @@ func (p *Reader) ReadLine() ([]byte, error) { return nil, bufio.ErrBufferFull } if len(line) == 0 { - return nil, internal.RedisError("redis: reply is empty") + return nil, fmt.Errorf("redis: reply is empty") } if isNilReply(line) { return nil, internal.Nil diff --git a/vendor/github.com/go-redis/redis/internal/proto/scan.go b/vendor/github.com/go-redis/redis/internal/proto/scan.go index 3ab40b94f..0431a877d 100644 --- a/vendor/github.com/go-redis/redis/internal/proto/scan.go +++ b/vendor/github.com/go-redis/redis/internal/proto/scan.go @@ -11,7 +11,7 @@ import ( func Scan(b []byte, v interface{}) error { switch v := v.(type) { case nil: - return internal.RedisError("redis: Scan(nil)") + return fmt.Errorf("redis: Scan(nil)") case *string: *v = internal.BytesToString(b) return nil diff --git a/vendor/github.com/go-redis/redis/main_test.go b/vendor/github.com/go-redis/redis/main_test.go index 30f09c618..7c5a6a969 100644 --- a/vendor/github.com/go-redis/redis/main_test.go +++ b/vendor/github.com/go-redis/redis/main_test.go @@ -50,10 +50,6 @@ var cluster = &clusterScenario{ clients: make(map[string]*redis.Client, 6), } -func init() { - //redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) -} - var _ = BeforeSuite(func() { var err error diff --git a/vendor/github.com/go-redis/redis/options.go b/vendor/github.com/go-redis/redis/options.go index dea045453..75648053d 100644 --- a/vendor/github.com/go-redis/redis/options.go +++ b/vendor/github.com/go-redis/redis/options.go @@ -198,13 +198,3 @@ func newConnPool(opt *Options) *pool.ConnPool { IdleCheckFrequency: opt.IdleCheckFrequency, }) } - -// PoolStats contains pool state information and accumulated stats. -type PoolStats struct { - Requests uint32 // number of times a connection was requested by the pool - Hits uint32 // number of times free connection was found in the pool - Timeouts uint32 // number of times a wait timeout occurred - - TotalConns uint32 // the number of total connections in the pool - FreeConns uint32 // the number of free connections in the pool -} diff --git a/vendor/github.com/go-redis/redis/pipeline.go b/vendor/github.com/go-redis/redis/pipeline.go index b66c0597f..9349ef553 100644 --- a/vendor/github.com/go-redis/redis/pipeline.go +++ b/vendor/github.com/go-redis/redis/pipeline.go @@ -13,9 +13,7 @@ type Pipeliner interface { Process(cmd Cmder) error Close() error Discard() error - discard() error Exec() ([]Cmder, error) - pipelined(fn func(Pipeliner) error) ([]Cmder, error) } var _ Pipeliner = (*Pipeline)(nil) @@ -104,3 +102,11 @@ func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Pipeline) Pipeline() Pipeliner { return c } + +func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.pipelined(fn) +} + +func (c *Pipeline) TxPipeline() Pipeliner { + return c +} diff --git a/vendor/github.com/go-redis/redis/pool_test.go b/vendor/github.com/go-redis/redis/pool_test.go index 34a548a63..0ca09adc7 100644 --- a/vendor/github.com/go-redis/redis/pool_test.go +++ b/vendor/github.com/go-redis/redis/pool_test.go @@ -95,8 +95,8 @@ var _ = Describe("pool", func() { Expect(pool.FreeLen()).To(Equal(1)) stats := pool.Stats() - Expect(stats.Requests).To(Equal(uint32(4))) Expect(stats.Hits).To(Equal(uint32(2))) + Expect(stats.Misses).To(Equal(uint32(2))) Expect(stats.Timeouts).To(Equal(uint32(0))) }) @@ -112,30 +112,32 @@ var _ = Describe("pool", func() { Expect(pool.FreeLen()).To(Equal(1)) stats := pool.Stats() - Expect(stats.Requests).To(Equal(uint32(101))) Expect(stats.Hits).To(Equal(uint32(100))) + Expect(stats.Misses).To(Equal(uint32(1))) Expect(stats.Timeouts).To(Equal(uint32(0))) }) It("removes idle connections", func() { stats := client.PoolStats() Expect(stats).To(Equal(&redis.PoolStats{ - Requests: 1, Hits: 0, + Misses: 1, Timeouts: 0, TotalConns: 1, FreeConns: 1, + StaleConns: 0, })) time.Sleep(2 * time.Second) stats = client.PoolStats() Expect(stats).To(Equal(&redis.PoolStats{ - Requests: 1, Hits: 0, + Misses: 1, Timeouts: 0, TotalConns: 0, FreeConns: 0, + StaleConns: 1, })) }) }) diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go index 4a5c65f57..e754a16f2 100644 --- a/vendor/github.com/go-redis/redis/pubsub.go +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -95,7 +95,10 @@ func (c *PubSub) releaseConn(cn *pool.Conn, err error) { } func (c *PubSub) _releaseConn(cn *pool.Conn, err error) { - if internal.IsBadConn(err, true) && c.cn == cn { + if c.cn != cn { + return + } + if internal.IsBadConn(err, true) { _ = c.closeTheCn() } } diff --git a/vendor/github.com/go-redis/redis/pubsub_test.go b/vendor/github.com/go-redis/redis/pubsub_test.go index 1d9dfcb99..6fc04a198 100644 --- a/vendor/github.com/go-redis/redis/pubsub_test.go +++ b/vendor/github.com/go-redis/redis/pubsub_test.go @@ -68,7 +68,7 @@ var _ = Describe("PubSub", func() { } stats := client.PoolStats() - Expect(stats.Requests - stats.Hits).To(Equal(uint32(2))) + Expect(stats.Misses).To(Equal(uint32(2))) }) It("should pub/sub channels", func() { @@ -191,7 +191,7 @@ var _ = Describe("PubSub", func() { } stats := client.PoolStats() - Expect(stats.Requests - stats.Hits).To(Equal(uint32(2))) + Expect(stats.Misses).To(Equal(uint32(2))) }) It("should ping/pong", func() { @@ -290,8 +290,8 @@ var _ = Describe("PubSub", func() { Eventually(done).Should(Receive()) stats := client.PoolStats() - Expect(stats.Requests).To(Equal(uint32(2))) Expect(stats.Hits).To(Equal(uint32(1))) + Expect(stats.Misses).To(Equal(uint32(1))) }) It("returns an error when subscribe fails", func() { diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go index b18973cdb..230091b3e 100644 --- a/vendor/github.com/go-redis/redis/redis.go +++ b/vendor/github.com/go-redis/redis/redis.go @@ -3,6 +3,7 @@ package redis import ( "fmt" "log" + "os" "time" "github.com/go-redis/redis/internal" @@ -13,6 +14,10 @@ import ( // Redis nil reply, .e.g. when key does not exist. const Nil = internal.Nil +func init() { + SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) +} + func SetLogger(logger *log.Logger) { internal.Logger = logger } @@ -131,7 +136,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { cn, _, err := c.getConn() if err != nil { cmd.setErr(err) - if internal.IsRetryableError(err) { + if internal.IsRetryableError(err, true) { continue } return err @@ -141,7 +146,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { if err := writeCmd(cn, cmd); err != nil { c.releaseConn(cn, err) cmd.setErr(err) - if internal.IsRetryableError(err) { + if internal.IsRetryableError(err, true) { continue } return err @@ -150,7 +155,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { cn.SetReadTimeout(c.cmdTimeout(cmd)) err = cmd.readReply(cn) c.releaseConn(cn, err) - if err != nil && internal.IsRetryableError(err) { + if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { continue } @@ -197,8 +202,11 @@ type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { return func(cmds []Cmder) error { - var firstErr error - for i := 0; i <= c.opt.MaxRetries; i++ { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + cn, _, err := c.getConn() if err != nil { setCmdsErr(cmds, err) @@ -206,18 +214,18 @@ func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { } canRetry, err := p(cn, cmds) - c.releaseConn(cn, err) - if err == nil { - return nil - } - if firstErr == nil { - firstErr = err + + if err == nil || internal.IsRedisError(err) { + _ = c.connPool.Put(cn) + break } - if !canRetry || !internal.IsRetryableError(err) { + _ = c.connPool.Remove(cn) + + if !canRetry || !internal.IsRetryableError(err, true) { break } } - return firstErr + return firstCmdsErr(cmds) } } @@ -230,23 +238,17 @@ func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, err // Set read timeout for all commands. cn.SetReadTimeout(c.opt.ReadTimeout) - return pipelineReadCmds(cn, cmds) + return true, pipelineReadCmds(cn, cmds) } -func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) { - for i, cmd := range cmds { +func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { + for _, cmd := range cmds { err := cmd.readReply(cn) - if err == nil { - continue - } - if i == 0 { - retry = true - } - if firstErr == nil { - firstErr = err + if err != nil && !internal.IsRedisError(err) { + return err } } - return + return nil } func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { @@ -260,11 +262,11 @@ func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, e cn.SetReadTimeout(c.opt.ReadTimeout) if err := c.txPipelineReadQueued(cn, cmds); err != nil { + setCmdsErr(cmds, err) return false, err } - _, err := pipelineReadCmds(cn, cmds) - return false, err + return false, pipelineReadCmds(cn, cmds) } func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { @@ -276,21 +278,16 @@ func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { } func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error { - var firstErr error - // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil && firstErr == nil { - firstErr = err + if err := statusCmd.readReply(cn); err != nil { + return err } - for _, cmd := range cmds { + for _ = range cmds { err := statusCmd.readReply(cn) - if err != nil { - cmd.setErr(err) - if firstErr == nil { - firstErr = err - } + if err != nil && !internal.IsRedisError(err) { + return err } } @@ -355,21 +352,16 @@ func (c *Client) Options() *Options { return c.opt } +type PoolStats pool.Stats + // PoolStats returns connection pool stats. func (c *Client) PoolStats() *PoolStats { - s := c.connPool.Stats() - return &PoolStats{ - Requests: s.Requests, - Hits: s.Hits, - Timeouts: s.Timeouts, - - TotalConns: s.TotalConns, - FreeConns: s.FreeConns, - } + stats := c.connPool.Stats() + return (*PoolStats)(stats) } func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipeline().pipelined(fn) + return c.Pipeline().Pipelined(fn) } func (c *Client) Pipeline() Pipeliner { @@ -381,7 +373,7 @@ func (c *Client) Pipeline() Pipeliner { } func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.TxPipeline().pipelined(fn) + return c.TxPipeline().Pipelined(fn) } // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. @@ -433,7 +425,7 @@ type Conn struct { } func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipeline().pipelined(fn) + return c.Pipeline().Pipelined(fn) } func (c *Conn) Pipeline() Pipeliner { @@ -445,7 +437,7 @@ func (c *Conn) Pipeline() Pipeliner { } func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.TxPipeline().pipelined(fn) + return c.TxPipeline().Pipelined(fn) } // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index 72d52bf75..a30c32102 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -34,7 +34,9 @@ type RingOptions struct { DB int Password string - MaxRetries int + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration @@ -50,6 +52,19 @@ func (opt *RingOptions) init() { if opt.HeartbeatFrequency == 0 { opt.HeartbeatFrequency = 500 * time.Millisecond } + + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } + switch opt.MaxRetryBackoff { + case -1: + opt.MaxRetryBackoff = 0 + case 0: + opt.MaxRetryBackoff = 512 * time.Millisecond + } } func (opt *RingOptions) clientOptions() *Options { @@ -130,9 +145,10 @@ type Ring struct { opt *RingOptions nreplicas int - mu sync.RWMutex - hash *consistenthash.Map - shards map[string]*ringShard + mu sync.RWMutex + hash *consistenthash.Map + shards map[string]*ringShard + shardsList []*ringShard cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo @@ -154,24 +170,41 @@ func NewRing(opt *RingOptions) *Ring { for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr - ring.addClient(name, NewClient(clopt)) + ring.addShard(name, NewClient(clopt)) } go ring.heartbeat() return ring } +func (c *Ring) addShard(name string, cl *Client) { + shard := &ringShard{Client: cl} + c.mu.Lock() + c.hash.Add(name) + c.shards[name] = shard + c.shardsList = append(c.shardsList, shard) + c.mu.Unlock() +} + // Options returns read-only Options that were used to create the client. func (c *Ring) Options() *RingOptions { return c.opt } +func (c *Ring) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + // PoolStats returns accumulated connection pool stats. func (c *Ring) PoolStats() *PoolStats { + c.mu.RLock() + shards := c.shardsList + c.mu.RUnlock() + var acc PoolStats - for _, shard := range c.shards { + for _, shard := range shards { s := shard.Client.connPool.Stats() - acc.Requests += s.Requests acc.Hits += s.Hits + acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns @@ -210,9 +243,13 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub { // ForEachShard concurrently calls the fn on each live shard in the ring. // It returns the first error if any. func (c *Ring) ForEachShard(fn func(client *Client) error) error { + c.mu.RLock() + shards := c.shardsList + c.mu.RUnlock() + var wg sync.WaitGroup errCh := make(chan error, 1) - for _, shard := range c.shards { + for _, shard := range shards { if shard.IsDown() { continue } @@ -241,8 +278,12 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error { func (c *Ring) cmdInfo(name string) *CommandInfo { err := c.cmdsInfoOnce.Do(func() error { + c.mu.RLock() + shards := c.shardsList + c.mu.RUnlock() + var firstErr error - for _, shard := range c.shards { + for _, shard := range shards { cmdsInfo, err := shard.Client.Command().Result() if err == nil { c.cmdsInfo = cmdsInfo @@ -257,14 +298,11 @@ func (c *Ring) cmdInfo(name string) *CommandInfo { if err != nil { return nil } - return c.cmdsInfo[name] -} - -func (c *Ring) addClient(name string, cl *Client) { - c.mu.Lock() - c.hash.Add(name) - c.shards[name] = &ringShard{Client: cl} - c.mu.Unlock() + info := c.cmdsInfo[name] + if info == nil { + internal.Logf("info for cmd=%s not found", name) + } + return info } func (c *Ring) shardByKey(key string) (*ringShard, error) { @@ -305,7 +343,7 @@ func (c *Ring) shardByName(name string) (*ringShard, error) { func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { cmdInfo := c.cmdInfo(cmd.Name()) - firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) return c.shardByKey(firstKey) } @@ -346,7 +384,10 @@ func (c *Ring) heartbeat() { break } - for _, shard := range c.shards { + shards := c.shardsList + c.mu.RUnlock() + + for _, shard := range shards { err := shard.Client.Ping().Err() if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { internal.Logf("ring shard state changed: %s", shard) @@ -354,8 +395,6 @@ func (c *Ring) heartbeat() { } } - c.mu.RUnlock() - if rebalance { c.rebalance() } @@ -383,6 +422,7 @@ func (c *Ring) Close() error { } c.hash = nil c.shards = nil + c.shardsList = nil return firstErr } @@ -396,51 +436,48 @@ func (c *Ring) Pipeline() Pipeliner { } func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipeline().pipelined(fn) + return c.Pipeline().Pipelined(fn) } -func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { +func (c *Ring) pipelineExec(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) - name := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + name := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) if name != "" { name = c.hash.Get(hashtag.Key(name)) } cmdsMap[name] = append(cmdsMap[name], cmd) } - for i := 0; i <= c.opt.MaxRetries; i++ { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + var failedCmdsMap map[string][]Cmder for name, cmds := range cmdsMap { shard, err := c.shardByName(name) if err != nil { setCmdsErr(cmds, err) - if firstErr == nil { - firstErr = err - } continue } cn, _, err := shard.Client.getConn() if err != nil { setCmdsErr(cmds, err) - if firstErr == nil { - firstErr = err - } continue } canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) - shard.Client.releaseConn(cn, err) - if err == nil { + if err == nil || internal.IsRedisError(err) { + _ = shard.Client.connPool.Put(cn) continue } - if firstErr == nil { - firstErr = err - } - if canRetry && internal.IsRetryableError(err) { + _ = shard.Client.connPool.Remove(cn) + + if canRetry && internal.IsRetryableError(err, true) { if failedCmdsMap == nil { failedCmdsMap = make(map[string][]Cmder) } @@ -454,5 +491,13 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { cmdsMap = failedCmdsMap } - return firstErr + return firstCmdsErr(cmds) +} + +func (c *Ring) TxPipeline() Pipeliner { + panic("not implemented") +} + +func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + panic("not implemented") } diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go index 3bfdb4a3f..37d06b482 100644 --- a/vendor/github.com/go-redis/redis/sentinel.go +++ b/vendor/github.com/go-redis/redis/sentinel.go @@ -301,8 +301,10 @@ func (d *sentinelFailover) listen(sentinel *sentinelClient) { msg, err := pubsub.ReceiveMessage() if err != nil { - internal.Logf("sentinel: ReceiveMessage failed: %s", err) - pubsub.Close() + if err != pool.ErrClosed { + internal.Logf("sentinel: ReceiveMessage failed: %s", err) + pubsub.Close() + } d.resetSentinel() return } diff --git a/vendor/github.com/go-redis/redis/tx.go b/vendor/github.com/go-redis/redis/tx.go index 5ef89619b..11d5d5cb0 100644 --- a/vendor/github.com/go-redis/redis/tx.go +++ b/vendor/github.com/go-redis/redis/tx.go @@ -36,11 +36,10 @@ func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { return err } } - firstErr := fn(tx) - if err := tx.Close(); err != nil && firstErr == nil { - firstErr = err - } - return firstErr + + err := fn(tx) + _ = tx.Close() + return err } // close closes the transaction, releasing any open resources. @@ -53,7 +52,7 @@ func (c *Tx) Close() error { // of a transaction. func (c *Tx) Watch(keys ...string) *StatusCmd { args := make([]interface{}, 1+len(keys)) - args[0] = "WATCH" + args[0] = "watch" for i, key := range keys { args[1+i] = key } @@ -65,7 +64,7 @@ func (c *Tx) Watch(keys ...string) *StatusCmd { // Unwatch flushes all the previously watched keys for a transaction. func (c *Tx) Unwatch(keys ...string) *StatusCmd { args := make([]interface{}, 1+len(keys)) - args[0] = "UNWATCH" + args[0] = "unwatch" for i, key := range keys { args[1+i] = key } @@ -92,5 +91,13 @@ func (c *Tx) Pipeline() Pipeliner { // TxFailedErr is returned. Otherwise Exec returns error of the first // failed command or nil. func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipeline().pipelined(fn) + return c.Pipeline().Pipelined(fn) +} + +func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.Pipelined(fn) +} + +func (c *Tx) TxPipeline() Pipeliner { + return c.Pipeline() } diff --git a/vendor/github.com/go-redis/redis/universal.go b/vendor/github.com/go-redis/redis/universal.go index 4aa579fa4..29eb12b18 100644 --- a/vendor/github.com/go-redis/redis/universal.go +++ b/vendor/github.com/go-redis/redis/universal.go @@ -90,8 +90,8 @@ func (o *UniversalOptions) simple() *Options { } return &Options{ - Addr: addr, - DB: o.DB, + Addr: addr, + DB: o.DB, MaxRetries: o.MaxRetries, Password: o.Password, @@ -117,6 +117,9 @@ type UniversalClient interface { Close() error } +var _ UniversalClient = (*Client)(nil) +var _ UniversalClient = (*ClusterClient)(nil) + // NewUniversalClient returns a new multi client. The type of client returned depends // on the following three conditions: // -- cgit v1.2.3-1-g7c22