summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/cluster.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/cluster.go')
-rw-r--r--vendor/github.com/go-redis/redis/cluster.go42
1 files changed, 35 insertions, 7 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go
index accdb3d27..a2c18b387 100644
--- a/vendor/github.com/go-redis/redis/cluster.go
+++ b/vendor/github.com/go-redis/redis/cluster.go
@@ -445,6 +445,10 @@ type ClusterClient struct {
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
+ process func(Cmder) error
+ processPipeline func([]Cmder) error
+ processTxPipeline func([]Cmder) error
+
// Reports whether slots reloading is in progress.
reloading uint32
}
@@ -458,7 +462,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt: opt,
nodes: newClusterNodes(opt),
}
- c.setProcessor(c.Process)
+
+ c.process = c.defaultProcess
+ c.processPipeline = c.defaultProcessPipeline
+ c.processTxPipeline = c.defaultProcessTxPipeline
+
+ c.cmdable.setProcessor(c.Process)
// Add initial nodes.
for _, addr := range opt.Addrs {
@@ -628,7 +637,20 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close()
}
+func (c *ClusterClient) WrapProcess(
+ fn func(oldProcess func(Cmder) error) func(Cmder) error,
+) {
+ c.process = fn(c.process)
+}
+
func (c *ClusterClient) Process(cmd Cmder) error {
+ if c.process != nil {
+ return c.process(cmd)
+ }
+ return c.defaultProcess(cmd)
+}
+
+func (c *ClusterClient) defaultProcess(cmd Cmder) error {
state, err := c.state()
if err != nil {
cmd.setErr(err)
@@ -910,9 +932,9 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
func (c *ClusterClient) Pipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExec,
+ exec: c.processPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -920,7 +942,13 @@ func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
-func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
+func (c *ClusterClient) WrapProcessPipeline(
+ fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
+) {
+ c.processPipeline = fn(c.processPipeline)
+}
+
+func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsByNode(cmds)
if err != nil {
setCmdsErr(cmds, err)
@@ -1064,9 +1092,9 @@ func (c *ClusterClient) checkMovedErr(
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *ClusterClient) TxPipeline() Pipeliner {
pipe := Pipeline{
- exec: c.txPipelineExec,
+ exec: c.processTxPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -1074,7 +1102,7 @@ func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(fn)
}
-func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
+func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
state, err := c.state()
if err != nil {
return err