summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/ring.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/ring.go')
-rw-r--r--vendor/github.com/go-redis/redis/ring.go79
1 files changed, 71 insertions, 8 deletions
diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go
index b47a1094e..3ded28060 100644
--- a/vendor/github.com/go-redis/redis/ring.go
+++ b/vendor/github.com/go-redis/redis/ring.go
@@ -16,7 +16,8 @@ import (
"github.com/go-redis/redis/internal/pool"
)
-const nreplicas = 100
+// Hash is type of hash function used in consistent hash.
+type Hash consistenthash.Hash
var errRingShardsDown = errors.New("redis: all ring shards are down")
@@ -30,6 +31,27 @@ type RingOptions struct {
// Shard is considered down after 3 subsequent failed checks.
HeartbeatFrequency time.Duration
+ // Hash function used in consistent hash.
+ // Default is crc32.ChecksumIEEE.
+ Hash Hash
+
+ // Number of replicas in consistent hash.
+ // Default is 100 replicas.
+ //
+ // Higher number of replicas will provide less deviation, that is keys will be
+ // distributed to nodes more evenly.
+ //
+ // Following is deviation for common nreplicas:
+ // --------------------------------------------------------
+ // | nreplicas | standard error | 99% confidence interval |
+ // | 10 | 0.3152 | (0.37, 1.98) |
+ // | 100 | 0.0997 | (0.76, 1.28) |
+ // | 1000 | 0.0316 | (0.92, 1.09) |
+ // --------------------------------------------------------
+ //
+ // See https://arxiv.org/abs/1406.2294 for reference
+ HashReplicas int
+
// Following options are copied from Options struct.
OnConnect func(*Conn) error
@@ -46,6 +68,8 @@ type RingOptions struct {
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -56,6 +80,10 @@ func (opt *RingOptions) init() {
opt.HeartbeatFrequency = 500 * time.Millisecond
}
+ if opt.HashReplicas == 0 {
+ opt.HashReplicas = 100
+ }
+
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0
@@ -82,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options {
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
@@ -133,16 +163,21 @@ func (shard *ringShard) Vote(up bool) bool {
//------------------------------------------------------------------------------
type ringShards struct {
+ opt *RingOptions
+
mu sync.RWMutex
hash *consistenthash.Map
shards map[string]*ringShard // read only
list []*ringShard // read only
+ len int
closed bool
}
-func newRingShards() *ringShards {
+func newRingShards(opt *RingOptions) *ringShards {
return &ringShards{
- hash: consistenthash.New(nreplicas, nil),
+ opt: opt,
+
+ hash: newConsistentHash(opt),
shards: make(map[string]*ringShard),
}
}
@@ -238,18 +273,28 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
// rebalance removes dead shards from the Ring.
func (c *ringShards) rebalance() {
- hash := consistenthash.New(nreplicas, nil)
+ hash := newConsistentHash(c.opt)
+ var shardsNum int
for name, shard := range c.shards {
if shard.IsUp() {
hash.Add(name)
+ shardsNum++
}
}
c.mu.Lock()
c.hash = hash
+ c.len = shardsNum
c.mu.Unlock()
}
+func (c *ringShards) Len() int {
+ c.mu.RLock()
+ l := c.len
+ c.mu.RUnlock()
+ return l
+}
+
func (c *ringShards) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
@@ -305,7 +350,7 @@ func NewRing(opt *RingOptions) *Ring {
ring := &Ring{
opt: opt,
- shards: newRingShards(),
+ shards: newRingShards(opt),
}
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
@@ -363,11 +408,16 @@ func (c *Ring) PoolStats() *PoolStats {
acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
}
return &acc
}
+// Len returns the current number of shards in the ring.
+func (c *Ring) Len() int {
+ return c.shards.Len()
+}
+
// Subscribe subscribes the client to the specified channels.
func (c *Ring) Subscribe(channels ...string) *PubSub {
if len(channels) == 0 {
@@ -466,7 +516,16 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
return c.shards.GetByKey(firstKey)
}
-func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
+// Do creates a Cmd from the args and processes the cmd.
+func (c *Ring) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
+func (c *Ring) WrapProcess(
+ fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
+) {
c.ForEachShard(func(c *Client) error {
c.WrapProcess(fn)
return nil
@@ -552,7 +611,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap = failedCmdsMap
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *Ring) TxPipeline() Pipeliner {
@@ -570,3 +629,7 @@ func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
func (c *Ring) Close() error {
return c.shards.Close()
}
+
+func newConsistentHash(opt *RingOptions) *consistenthash.Map {
+ return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
+}