summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/ring.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/ring.go')
-rw-r--r--vendor/github.com/go-redis/redis/ring.go121
1 files changed, 83 insertions, 38 deletions
diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go
index 72d52bf75..a30c32102 100644
--- a/vendor/github.com/go-redis/redis/ring.go
+++ b/vendor/github.com/go-redis/redis/ring.go
@@ -34,7 +34,9 @@ type RingOptions struct {
DB int
Password string
- MaxRetries int
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
@@ -50,6 +52,19 @@ func (opt *RingOptions) init() {
if opt.HeartbeatFrequency == 0 {
opt.HeartbeatFrequency = 500 * time.Millisecond
}
+
+ switch opt.MinRetryBackoff {
+ case -1:
+ opt.MinRetryBackoff = 0
+ case 0:
+ opt.MinRetryBackoff = 8 * time.Millisecond
+ }
+ switch opt.MaxRetryBackoff {
+ case -1:
+ opt.MaxRetryBackoff = 0
+ case 0:
+ opt.MaxRetryBackoff = 512 * time.Millisecond
+ }
}
func (opt *RingOptions) clientOptions() *Options {
@@ -130,9 +145,10 @@ type Ring struct {
opt *RingOptions
nreplicas int
- mu sync.RWMutex
- hash *consistenthash.Map
- shards map[string]*ringShard
+ mu sync.RWMutex
+ hash *consistenthash.Map
+ shards map[string]*ringShard
+ shardsList []*ringShard
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
@@ -154,24 +170,41 @@ func NewRing(opt *RingOptions) *Ring {
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
clopt.Addr = addr
- ring.addClient(name, NewClient(clopt))
+ ring.addShard(name, NewClient(clopt))
}
go ring.heartbeat()
return ring
}
+func (c *Ring) addShard(name string, cl *Client) {
+ shard := &ringShard{Client: cl}
+ c.mu.Lock()
+ c.hash.Add(name)
+ c.shards[name] = shard
+ c.shardsList = append(c.shardsList, shard)
+ c.mu.Unlock()
+}
+
// Options returns read-only Options that were used to create the client.
func (c *Ring) Options() *RingOptions {
return c.opt
}
+func (c *Ring) retryBackoff(attempt int) time.Duration {
+ return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
+}
+
// PoolStats returns accumulated connection pool stats.
func (c *Ring) PoolStats() *PoolStats {
+ c.mu.RLock()
+ shards := c.shardsList
+ c.mu.RUnlock()
+
var acc PoolStats
- for _, shard := range c.shards {
+ for _, shard := range shards {
s := shard.Client.connPool.Stats()
- acc.Requests += s.Requests
acc.Hits += s.Hits
+ acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
@@ -210,9 +243,13 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub {
// ForEachShard concurrently calls the fn on each live shard in the ring.
// It returns the first error if any.
func (c *Ring) ForEachShard(fn func(client *Client) error) error {
+ c.mu.RLock()
+ shards := c.shardsList
+ c.mu.RUnlock()
+
var wg sync.WaitGroup
errCh := make(chan error, 1)
- for _, shard := range c.shards {
+ for _, shard := range shards {
if shard.IsDown() {
continue
}
@@ -241,8 +278,12 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
func (c *Ring) cmdInfo(name string) *CommandInfo {
err := c.cmdsInfoOnce.Do(func() error {
+ c.mu.RLock()
+ shards := c.shardsList
+ c.mu.RUnlock()
+
var firstErr error
- for _, shard := range c.shards {
+ for _, shard := range shards {
cmdsInfo, err := shard.Client.Command().Result()
if err == nil {
c.cmdsInfo = cmdsInfo
@@ -257,14 +298,11 @@ func (c *Ring) cmdInfo(name string) *CommandInfo {
if err != nil {
return nil
}
- return c.cmdsInfo[name]
-}
-
-func (c *Ring) addClient(name string, cl *Client) {
- c.mu.Lock()
- c.hash.Add(name)
- c.shards[name] = &ringShard{Client: cl}
- c.mu.Unlock()
+ info := c.cmdsInfo[name]
+ if info == nil {
+ internal.Logf("info for cmd=%s not found", name)
+ }
+ return info
}
func (c *Ring) shardByKey(key string) (*ringShard, error) {
@@ -305,7 +343,7 @@ func (c *Ring) shardByName(name string) (*ringShard, error) {
func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
cmdInfo := c.cmdInfo(cmd.Name())
- firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
+ firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
return c.shardByKey(firstKey)
}
@@ -346,7 +384,10 @@ func (c *Ring) heartbeat() {
break
}
- for _, shard := range c.shards {
+ shards := c.shardsList
+ c.mu.RUnlock()
+
+ for _, shard := range shards {
err := shard.Client.Ping().Err()
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
internal.Logf("ring shard state changed: %s", shard)
@@ -354,8 +395,6 @@ func (c *Ring) heartbeat() {
}
}
- c.mu.RUnlock()
-
if rebalance {
c.rebalance()
}
@@ -383,6 +422,7 @@ func (c *Ring) Close() error {
}
c.hash = nil
c.shards = nil
+ c.shardsList = nil
return firstErr
}
@@ -396,51 +436,48 @@ func (c *Ring) Pipeline() Pipeliner {
}
func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.Pipeline().pipelined(fn)
+ return c.Pipeline().Pipelined(fn)
}
-func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
+func (c *Ring) pipelineExec(cmds []Cmder) error {
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
cmdInfo := c.cmdInfo(cmd.Name())
- name := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
+ name := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
if name != "" {
name = c.hash.Get(hashtag.Key(name))
}
cmdsMap[name] = append(cmdsMap[name], cmd)
}
- for i := 0; i <= c.opt.MaxRetries; i++ {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
var failedCmdsMap map[string][]Cmder
for name, cmds := range cmdsMap {
shard, err := c.shardByName(name)
if err != nil {
setCmdsErr(cmds, err)
- if firstErr == nil {
- firstErr = err
- }
continue
}
cn, _, err := shard.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
- if firstErr == nil {
- firstErr = err
- }
continue
}
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
- shard.Client.releaseConn(cn, err)
- if err == nil {
+ if err == nil || internal.IsRedisError(err) {
+ _ = shard.Client.connPool.Put(cn)
continue
}
- if firstErr == nil {
- firstErr = err
- }
- if canRetry && internal.IsRetryableError(err) {
+ _ = shard.Client.connPool.Remove(cn)
+
+ if canRetry && internal.IsRetryableError(err, true) {
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
}
@@ -454,5 +491,13 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
cmdsMap = failedCmdsMap
}
- return firstErr
+ return firstCmdsErr(cmds)
+}
+
+func (c *Ring) TxPipeline() Pipeliner {
+ panic("not implemented")
+}
+
+func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
+ panic("not implemented")
}