From 6d8f122a5160f6d9e4c51579f2429dfaa62c7271 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Fri, 16 Feb 2018 06:47:51 -0800 Subject: Upgrading server dependancies (#8308) --- vendor/github.com/go-redis/redis/cluster.go | 42 ++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 7 deletions(-) (limited to 'vendor/github.com/go-redis/redis/cluster.go') diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index accdb3d27..a2c18b387 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -445,6 +445,10 @@ type ClusterClient struct { cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + // Reports whether slots reloading is in progress. reloading uint32 } @@ -458,7 +462,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { opt: opt, nodes: newClusterNodes(opt), } - c.setProcessor(c.Process) + + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline + + c.cmdable.setProcessor(c.Process) // Add initial nodes. for _, addr := range opt.Addrs { @@ -628,7 +637,20 @@ func (c *ClusterClient) Close() error { return c.nodes.Close() } +func (c *ClusterClient) WrapProcess( + fn func(oldProcess func(Cmder) error) func(Cmder) error, +) { + c.process = fn(c.process) +} + func (c *ClusterClient) Process(cmd Cmder) error { + if c.process != nil { + return c.process(cmd) + } + return c.defaultProcess(cmd) +} + +func (c *ClusterClient) defaultProcess(cmd Cmder) error { state, err := c.state() if err != nil { cmd.setErr(err) @@ -910,9 +932,9 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { func (c *ClusterClient) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExec, + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -920,7 +942,13 @@ func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(fn) } -func (c *ClusterClient) pipelineExec(cmds []Cmder) error { +func (c *ClusterClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { cmdsMap, err := c.mapCmdsByNode(cmds) if err != nil { setCmdsErr(cmds, err) @@ -1064,9 +1092,9 @@ func (c *ClusterClient) checkMovedErr( // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *ClusterClient) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.txPipelineExec, + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -1074,7 +1102,7 @@ func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.TxPipeline().Pipelined(fn) } -func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { +func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { state, err := c.state() if err != nil { return err -- cgit v1.2.3-1-g7c22