diff options
author | George Goldberg <george@gberg.me> | 2018-02-19 11:19:39 +0000 |
---|---|---|
committer | George Goldberg <george@gberg.me> | 2018-02-19 11:19:39 +0000 |
commit | f8289eb286d00c29859a8df495b957c7b46cb249 (patch) | |
tree | 1bc18d6a3a795482c7229786f7ab427fabbcd007 /vendor/github.com/go-redis/redis/ring.go | |
parent | 8891fa2a5e9e08eb9fa99ec163c47a6e9761a816 (diff) | |
parent | 30197584d5a215a3b25bffa79a034ed9e360cf52 (diff) | |
download | chat-f8289eb286d00c29859a8df495b957c7b46cb249.tar.gz chat-f8289eb286d00c29859a8df495b957c7b46cb249.tar.bz2 chat-f8289eb286d00c29859a8df495b957c7b46cb249.zip |
Merge branch 'master' into advanced-permissions-phase-1
Diffstat (limited to 'vendor/github.com/go-redis/redis/ring.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/ring.go | 29 |
1 files changed, 25 insertions, 4 deletions
diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index c11ef6bc2..10f33ed00 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -150,6 +150,8 @@ type Ring struct { shards map[string]*ringShard shardsList []*ringShard + processPipeline func([]Cmder) error + cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo @@ -158,7 +160,9 @@ type Ring struct { func NewRing(opt *RingOptions) *Ring { const nreplicas = 100 + opt.init() + ring := &Ring{ opt: opt, nreplicas: nreplicas, @@ -166,13 +170,17 @@ func NewRing(opt *RingOptions) *Ring { hash: consistenthash.New(nreplicas, nil), shards: make(map[string]*ringShard), } - ring.setProcessor(ring.Process) + ring.processPipeline = ring.defaultProcessPipeline + ring.cmdable.setProcessor(ring.Process) + for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr ring.addShard(name, NewClient(clopt)) } + go ring.heartbeat() + return ring } @@ -354,6 +362,13 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { return c.shardByKey(firstKey) } +func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { + c.ForEachShard(func(c *Client) error { + c.WrapProcess(fn) + return nil + }) +} + func (c *Ring) Process(cmd Cmder) error { shard, err := c.cmdShard(cmd) if err != nil { @@ -436,9 +451,9 @@ func (c *Ring) Close() error { func (c *Ring) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExec, + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.cmdable.setProcessor(pipe.Process) return &pipe } @@ -446,7 +461,13 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(fn) } -func (c *Ring) pipelineExec(cmds []Cmder) error { +func (c *Ring) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) |