diff options
author | Christopher Speller <crspeller@gmail.com> | 2017-09-29 12:46:30 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-29 12:46:30 -0700 |
commit | b84736e9b6401df0c6eeab9950bef09458a6aefd (patch) | |
tree | d9175208de3236db75a33879750a57b3000ba096 /vendor/github.com/go-redis/redis/ring.go | |
parent | 8b9dbb86133ff0fd6002a391268383d1593918ca (diff) | |
download | chat-b84736e9b6401df0c6eeab9950bef09458a6aefd.tar.gz chat-b84736e9b6401df0c6eeab9950bef09458a6aefd.tar.bz2 chat-b84736e9b6401df0c6eeab9950bef09458a6aefd.zip |
Updating server dependancies. (#7538)
Diffstat (limited to 'vendor/github.com/go-redis/redis/ring.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/ring.go | 121 |
1 files changed, 83 insertions, 38 deletions
diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index 72d52bf75..a30c32102 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -34,7 +34,9 @@ type RingOptions struct { DB int Password string - MaxRetries int + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration @@ -50,6 +52,19 @@ func (opt *RingOptions) init() { if opt.HeartbeatFrequency == 0 { opt.HeartbeatFrequency = 500 * time.Millisecond } + + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } + switch opt.MaxRetryBackoff { + case -1: + opt.MaxRetryBackoff = 0 + case 0: + opt.MaxRetryBackoff = 512 * time.Millisecond + } } func (opt *RingOptions) clientOptions() *Options { @@ -130,9 +145,10 @@ type Ring struct { opt *RingOptions nreplicas int - mu sync.RWMutex - hash *consistenthash.Map - shards map[string]*ringShard + mu sync.RWMutex + hash *consistenthash.Map + shards map[string]*ringShard + shardsList []*ringShard cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo @@ -154,24 +170,41 @@ func NewRing(opt *RingOptions) *Ring { for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr - ring.addClient(name, NewClient(clopt)) + ring.addShard(name, NewClient(clopt)) } go ring.heartbeat() return ring } +func (c *Ring) addShard(name string, cl *Client) { + shard := &ringShard{Client: cl} + c.mu.Lock() + c.hash.Add(name) + c.shards[name] = shard + c.shardsList = append(c.shardsList, shard) + c.mu.Unlock() +} + // Options returns read-only Options that were used to create the client. func (c *Ring) Options() *RingOptions { return c.opt } +func (c *Ring) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + // PoolStats returns accumulated connection pool stats. func (c *Ring) PoolStats() *PoolStats { + c.mu.RLock() + shards := c.shardsList + c.mu.RUnlock() + var acc PoolStats - for _, shard := range c.shards { + for _, shard := range shards { s := shard.Client.connPool.Stats() - acc.Requests += s.Requests acc.Hits += s.Hits + acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns @@ -210,9 +243,13 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub { // ForEachShard concurrently calls the fn on each live shard in the ring. // It returns the first error if any. func (c *Ring) ForEachShard(fn func(client *Client) error) error { + c.mu.RLock() + shards := c.shardsList + c.mu.RUnlock() + var wg sync.WaitGroup errCh := make(chan error, 1) - for _, shard := range c.shards { + for _, shard := range shards { if shard.IsDown() { continue } @@ -241,8 +278,12 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error { func (c *Ring) cmdInfo(name string) *CommandInfo { err := c.cmdsInfoOnce.Do(func() error { + c.mu.RLock() + shards := c.shardsList + c.mu.RUnlock() + var firstErr error - for _, shard := range c.shards { + for _, shard := range shards { cmdsInfo, err := shard.Client.Command().Result() if err == nil { c.cmdsInfo = cmdsInfo @@ -257,14 +298,11 @@ func (c *Ring) cmdInfo(name string) *CommandInfo { if err != nil { return nil } - return c.cmdsInfo[name] -} - -func (c *Ring) addClient(name string, cl *Client) { - c.mu.Lock() - c.hash.Add(name) - c.shards[name] = &ringShard{Client: cl} - c.mu.Unlock() + info := c.cmdsInfo[name] + if info == nil { + internal.Logf("info for cmd=%s not found", name) + } + return info } func (c *Ring) shardByKey(key string) (*ringShard, error) { @@ -305,7 +343,7 @@ func (c *Ring) shardByName(name string) (*ringShard, error) { func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { cmdInfo := c.cmdInfo(cmd.Name()) - firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) return c.shardByKey(firstKey) } @@ -346,7 +384,10 @@ func (c *Ring) heartbeat() { break } - for _, shard := range c.shards { + shards := c.shardsList + c.mu.RUnlock() + + for _, shard := range shards { err := shard.Client.Ping().Err() if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { internal.Logf("ring shard state changed: %s", shard) @@ -354,8 +395,6 @@ func (c *Ring) heartbeat() { } } - c.mu.RUnlock() - if rebalance { c.rebalance() } @@ -383,6 +422,7 @@ func (c *Ring) Close() error { } c.hash = nil c.shards = nil + c.shardsList = nil return firstErr } @@ -396,51 +436,48 @@ func (c *Ring) Pipeline() Pipeliner { } func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipeline().pipelined(fn) + return c.Pipeline().Pipelined(fn) } -func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { +func (c *Ring) pipelineExec(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) - name := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + name := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) if name != "" { name = c.hash.Get(hashtag.Key(name)) } cmdsMap[name] = append(cmdsMap[name], cmd) } - for i := 0; i <= c.opt.MaxRetries; i++ { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + var failedCmdsMap map[string][]Cmder for name, cmds := range cmdsMap { shard, err := c.shardByName(name) if err != nil { setCmdsErr(cmds, err) - if firstErr == nil { - firstErr = err - } continue } cn, _, err := shard.Client.getConn() if err != nil { setCmdsErr(cmds, err) - if firstErr == nil { - firstErr = err - } continue } canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) - shard.Client.releaseConn(cn, err) - if err == nil { + if err == nil || internal.IsRedisError(err) { + _ = shard.Client.connPool.Put(cn) continue } - if firstErr == nil { - firstErr = err - } - if canRetry && internal.IsRetryableError(err) { + _ = shard.Client.connPool.Remove(cn) + + if canRetry && internal.IsRetryableError(err, true) { if failedCmdsMap == nil { failedCmdsMap = make(map[string][]Cmder) } @@ -454,5 +491,13 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { cmdsMap = failedCmdsMap } - return firstErr + return firstCmdsErr(cmds) +} + +func (c *Ring) TxPipeline() Pipeliner { + panic("not implemented") +} + +func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + panic("not implemented") } |