summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/ring.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/ring.go')
-rw-r--r--vendor/github.com/go-redis/redis/ring.go29
1 files changed, 25 insertions, 4 deletions
diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go
index c11ef6bc2..10f33ed00 100644
--- a/vendor/github.com/go-redis/redis/ring.go
+++ b/vendor/github.com/go-redis/redis/ring.go
@@ -150,6 +150,8 @@ type Ring struct {
shards map[string]*ringShard
shardsList []*ringShard
+ processPipeline func([]Cmder) error
+
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
@@ -158,7 +160,9 @@ type Ring struct {
func NewRing(opt *RingOptions) *Ring {
const nreplicas = 100
+
opt.init()
+
ring := &Ring{
opt: opt,
nreplicas: nreplicas,
@@ -166,13 +170,17 @@ func NewRing(opt *RingOptions) *Ring {
hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard),
}
- ring.setProcessor(ring.Process)
+ ring.processPipeline = ring.defaultProcessPipeline
+ ring.cmdable.setProcessor(ring.Process)
+
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
clopt.Addr = addr
ring.addShard(name, NewClient(clopt))
}
+
go ring.heartbeat()
+
return ring
}
@@ -354,6 +362,13 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
return c.shardByKey(firstKey)
}
+func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
+ c.ForEachShard(func(c *Client) error {
+ c.WrapProcess(fn)
+ return nil
+ })
+}
+
func (c *Ring) Process(cmd Cmder) error {
shard, err := c.cmdShard(cmd)
if err != nil {
@@ -436,9 +451,9 @@ func (c *Ring) Close() error {
func (c *Ring) Pipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExec,
+ exec: c.processPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.cmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -446,7 +461,13 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
-func (c *Ring) pipelineExec(cmds []Cmder) error {
+func (c *Ring) WrapProcessPipeline(
+ fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
+) {
+ c.processPipeline = fn(c.processPipeline)
+}
+
+func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
cmdInfo := c.cmdInfo(cmd.Name())