diff options
Diffstat (limited to 'vendor/github.com/go-redis/redis/ring.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/ring.go | 79 |
1 files changed, 71 insertions, 8 deletions
diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index b47a1094e..3ded28060 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -16,7 +16,8 @@ import ( "github.com/go-redis/redis/internal/pool" ) -const nreplicas = 100 +// Hash is type of hash function used in consistent hash. +type Hash consistenthash.Hash var errRingShardsDown = errors.New("redis: all ring shards are down") @@ -30,6 +31,27 @@ type RingOptions struct { // Shard is considered down after 3 subsequent failed checks. HeartbeatFrequency time.Duration + // Hash function used in consistent hash. + // Default is crc32.ChecksumIEEE. + Hash Hash + + // Number of replicas in consistent hash. + // Default is 100 replicas. + // + // Higher number of replicas will provide less deviation, that is keys will be + // distributed to nodes more evenly. + // + // Following is deviation for common nreplicas: + // -------------------------------------------------------- + // | nreplicas | standard error | 99% confidence interval | + // | 10 | 0.3152 | (0.37, 1.98) | + // | 100 | 0.0997 | (0.76, 1.28) | + // | 1000 | 0.0316 | (0.92, 1.09) | + // -------------------------------------------------------- + // + // See https://arxiv.org/abs/1406.2294 for reference + HashReplicas int + // Following options are copied from Options struct. OnConnect func(*Conn) error @@ -46,6 +68,8 @@ type RingOptions struct { WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -56,6 +80,10 @@ func (opt *RingOptions) init() { opt.HeartbeatFrequency = 500 * time.Millisecond } + if opt.HashReplicas == 0 { + opt.HashReplicas = 100 + } + switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 @@ -82,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options { WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, @@ -133,16 +163,21 @@ func (shard *ringShard) Vote(up bool) bool { //------------------------------------------------------------------------------ type ringShards struct { + opt *RingOptions + mu sync.RWMutex hash *consistenthash.Map shards map[string]*ringShard // read only list []*ringShard // read only + len int closed bool } -func newRingShards() *ringShards { +func newRingShards(opt *RingOptions) *ringShards { return &ringShards{ - hash: consistenthash.New(nreplicas, nil), + opt: opt, + + hash: newConsistentHash(opt), shards: make(map[string]*ringShard), } } @@ -238,18 +273,28 @@ func (c *ringShards) Heartbeat(frequency time.Duration) { // rebalance removes dead shards from the Ring. func (c *ringShards) rebalance() { - hash := consistenthash.New(nreplicas, nil) + hash := newConsistentHash(c.opt) + var shardsNum int for name, shard := range c.shards { if shard.IsUp() { hash.Add(name) + shardsNum++ } } c.mu.Lock() c.hash = hash + c.len = shardsNum c.mu.Unlock() } +func (c *ringShards) Len() int { + c.mu.RLock() + l := c.len + c.mu.RUnlock() + return l +} + func (c *ringShards) Close() error { c.mu.Lock() defer c.mu.Unlock() @@ -305,7 +350,7 @@ func NewRing(opt *RingOptions) *Ring { ring := &Ring{ opt: opt, - shards: newRingShards(), + shards: newRingShards(opt), } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) @@ -363,11 +408,16 @@ func (c *Ring) PoolStats() *PoolStats { acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns } return &acc } +// Len returns the current number of shards in the ring. +func (c *Ring) Len() int { + return c.shards.Len() +} + // Subscribe subscribes the client to the specified channels. func (c *Ring) Subscribe(channels ...string) *PubSub { if len(channels) == 0 { @@ -466,7 +516,16 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { return c.shards.GetByKey(firstKey) } -func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { +// Do creates a Cmd from the args and processes the cmd. +func (c *Ring) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + +func (c *Ring) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { c.ForEachShard(func(c *Client) error { c.WrapProcess(fn) return nil @@ -552,7 +611,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { cmdsMap = failedCmdsMap } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *Ring) TxPipeline() Pipeliner { @@ -570,3 +629,7 @@ func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Ring) Close() error { return c.shards.Close() } + +func newConsistentHash(opt *RingOptions) *consistenthash.Map { + return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) +} |