From 6d8f122a5160f6d9e4c51579f2429dfaa62c7271 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Fri, 16 Feb 2018 06:47:51 -0800 Subject: Upgrading server dependancies (#8308) --- vendor/github.com/go-redis/redis/ring.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) (limited to 'vendor/github.com/go-redis/redis/ring.go') diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index c11ef6bc2..10f33ed00 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -150,6 +150,8 @@ type Ring struct { shards map[string]*ringShard shardsList []*ringShard + processPipeline func([]Cmder) error + cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo @@ -158,7 +160,9 @@ type Ring struct { func NewRing(opt *RingOptions) *Ring { const nreplicas = 100 + opt.init() + ring := &Ring{ opt: opt, nreplicas: nreplicas, @@ -166,13 +170,17 @@ func NewRing(opt *RingOptions) *Ring { hash: consistenthash.New(nreplicas, nil), shards: make(map[string]*ringShard), } - ring.setProcessor(ring.Process) + ring.processPipeline = ring.defaultProcessPipeline + ring.cmdable.setProcessor(ring.Process) + for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr ring.addShard(name, NewClient(clopt)) } + go ring.heartbeat() + return ring } @@ -354,6 +362,13 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { return c.shardByKey(firstKey) } +func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { + c.ForEachShard(func(c *Client) error { + c.WrapProcess(fn) + return nil + }) +} + func (c *Ring) Process(cmd Cmder) error { shard, err := c.cmdShard(cmd) if err != nil { @@ -436,9 +451,9 @@ func (c *Ring) Close() error { func (c *Ring) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExec, + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.cmdable.setProcessor(pipe.Process) return &pipe } @@ -446,7 +461,13 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(fn) } -func (c *Ring) pipelineExec(cmds []Cmder) error { +func (c *Ring) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) -- cgit v1.2.3-1-g7c22