summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis')
-rw-r--r--vendor/github.com/go-redis/redis/cluster.go42
-rw-r--r--vendor/github.com/go-redis/redis/command.go4
-rw-r--r--vendor/github.com/go-redis/redis/example_instrumentation_test.go69
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/reader.go66
-rw-r--r--vendor/github.com/go-redis/redis/pubsub.go10
-rw-r--r--vendor/github.com/go-redis/redis/redis.go102
-rw-r--r--vendor/github.com/go-redis/redis/redis_context.go5
-rw-r--r--vendor/github.com/go-redis/redis/redis_no_context.go5
-rw-r--r--vendor/github.com/go-redis/redis/ring.go29
-rw-r--r--vendor/github.com/go-redis/redis/sentinel.go14
-rw-r--r--vendor/github.com/go-redis/redis/tx.go11
11 files changed, 211 insertions, 146 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go
index accdb3d27..a2c18b387 100644
--- a/vendor/github.com/go-redis/redis/cluster.go
+++ b/vendor/github.com/go-redis/redis/cluster.go
@@ -445,6 +445,10 @@ type ClusterClient struct {
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
+ process func(Cmder) error
+ processPipeline func([]Cmder) error
+ processTxPipeline func([]Cmder) error
+
// Reports whether slots reloading is in progress.
reloading uint32
}
@@ -458,7 +462,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt: opt,
nodes: newClusterNodes(opt),
}
- c.setProcessor(c.Process)
+
+ c.process = c.defaultProcess
+ c.processPipeline = c.defaultProcessPipeline
+ c.processTxPipeline = c.defaultProcessTxPipeline
+
+ c.cmdable.setProcessor(c.Process)
// Add initial nodes.
for _, addr := range opt.Addrs {
@@ -628,7 +637,20 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close()
}
+func (c *ClusterClient) WrapProcess(
+ fn func(oldProcess func(Cmder) error) func(Cmder) error,
+) {
+ c.process = fn(c.process)
+}
+
func (c *ClusterClient) Process(cmd Cmder) error {
+ if c.process != nil {
+ return c.process(cmd)
+ }
+ return c.defaultProcess(cmd)
+}
+
+func (c *ClusterClient) defaultProcess(cmd Cmder) error {
state, err := c.state()
if err != nil {
cmd.setErr(err)
@@ -910,9 +932,9 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
func (c *ClusterClient) Pipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExec,
+ exec: c.processPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -920,7 +942,13 @@ func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
-func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
+func (c *ClusterClient) WrapProcessPipeline(
+ fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
+) {
+ c.processPipeline = fn(c.processPipeline)
+}
+
+func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsByNode(cmds)
if err != nil {
setCmdsErr(cmds, err)
@@ -1064,9 +1092,9 @@ func (c *ClusterClient) checkMovedErr(
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *ClusterClient) TxPipeline() Pipeliner {
pipe := Pipeline{
- exec: c.txPipelineExec,
+ exec: c.processTxPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -1074,7 +1102,7 @@ func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(fn)
}
-func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
+func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
state, err := c.state()
if err != nil {
return err
diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go
index 598ed9800..480a5ce19 100644
--- a/vendor/github.com/go-redis/redis/command.go
+++ b/vendor/github.com/go-redis/redis/command.go
@@ -81,9 +81,9 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
case "eval", "evalsha":
if cmd.stringArg(2) != "0" {
return 3
- } else {
- return 0
}
+
+ return 0
case "publish":
return 1
}
diff --git a/vendor/github.com/go-redis/redis/example_instrumentation_test.go b/vendor/github.com/go-redis/redis/example_instrumentation_test.go
index 02051f9c9..85abbd744 100644
--- a/vendor/github.com/go-redis/redis/example_instrumentation_test.go
+++ b/vendor/github.com/go-redis/redis/example_instrumentation_test.go
@@ -2,58 +2,47 @@ package redis_test
import (
"fmt"
- "sync/atomic"
- "time"
"github.com/go-redis/redis"
)
func Example_instrumentation() {
- ring := redis.NewRing(&redis.RingOptions{
- Addrs: map[string]string{
- "shard1": ":6379",
- },
+ cl := redis.NewClient(&redis.Options{
+ Addr: ":6379",
})
- ring.ForEachShard(func(client *redis.Client) error {
- wrapRedisProcess(client)
- return nil
+ cl.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
+ return func(cmd redis.Cmder) error {
+ fmt.Printf("starting processing: <%s>\n", cmd)
+ err := old(cmd)
+ fmt.Printf("finished processing: <%s>\n", cmd)
+ return err
+ }
})
- for {
- ring.Ping()
- }
+ cl.Ping()
+ // Output: starting processing: <ping: >
+ // finished processing: <ping: PONG>
}
-func wrapRedisProcess(client *redis.Client) {
- const precision = time.Microsecond
- var count, avgDur uint32
-
- go func() {
- for range time.Tick(3 * time.Second) {
- n := atomic.LoadUint32(&count)
- dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision
- fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur)
- }
- }()
-
- client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error {
- return func(cmd redis.Cmder) error {
- start := time.Now()
- err := oldProcess(cmd)
- dur := time.Since(start)
-
- const decay = float64(1) / 100
- ms := float64(dur / precision)
- for {
- avg := atomic.LoadUint32(&avgDur)
- newAvg := uint32((1-decay)*float64(avg) + decay*ms)
- if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) {
- break
- }
- }
- atomic.AddUint32(&count, 1)
+func Example_Pipeline_instrumentation() {
+ client := redis.NewClient(&redis.Options{
+ Addr: ":6379",
+ })
+ client.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error {
+ return func(cmds []redis.Cmder) error {
+ fmt.Printf("pipeline starting processing: %v\n", cmds)
+ err := old(cmds)
+ fmt.Printf("pipeline finished processing: %v\n", cmds)
return err
}
})
+
+ client.Pipelined(func(pipe redis.Pipeliner) error {
+ pipe.Ping()
+ pipe.Ping()
+ return nil
+ })
+ // Output: pipeline starting processing: [ping: ping: ]
+ // pipeline finished processing: [ping: PONG ping: PONG]
}
diff --git a/vendor/github.com/go-redis/redis/internal/proto/reader.go b/vendor/github.com/go-redis/redis/internal/proto/reader.go
index cd94329d8..e5ae8a03e 100644
--- a/vendor/github.com/go-redis/redis/internal/proto/reader.go
+++ b/vendor/github.com/go-redis/redis/internal/proto/reader.go
@@ -37,25 +37,25 @@ func (r *Reader) Reset(rd io.Reader) {
r.src.Reset(rd)
}
-func (p *Reader) PeekBuffered() []byte {
- if n := p.src.Buffered(); n != 0 {
- b, _ := p.src.Peek(n)
+func (r *Reader) PeekBuffered() []byte {
+ if n := r.src.Buffered(); n != 0 {
+ b, _ := r.src.Peek(n)
return b
}
return nil
}
-func (p *Reader) ReadN(n int) ([]byte, error) {
- b, err := readN(p.src, p.buf, n)
+func (r *Reader) ReadN(n int) ([]byte, error) {
+ b, err := readN(r.src, r.buf, n)
if err != nil {
return nil, err
}
- p.buf = b
+ r.buf = b
return b, nil
}
-func (p *Reader) ReadLine() ([]byte, error) {
- line, isPrefix, err := p.src.ReadLine()
+func (r *Reader) ReadLine() ([]byte, error) {
+ line, isPrefix, err := r.src.ReadLine()
if err != nil {
return nil, err
}
@@ -71,8 +71,8 @@ func (p *Reader) ReadLine() ([]byte, error) {
return line, nil
}
-func (p *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
- line, err := p.ReadLine()
+func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
+ line, err := r.ReadLine()
if err != nil {
return nil, err
}
@@ -85,19 +85,19 @@ func (p *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
case IntReply:
return parseInt(line[1:], 10, 64)
case StringReply:
- return p.readTmpBytesValue(line)
+ return r.readTmpBytesValue(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
return nil, err
}
- return m(p, n)
+ return m(r, n)
}
return nil, fmt.Errorf("redis: can't parse %.100q", line)
}
-func (p *Reader) ReadIntReply() (int64, error) {
- line, err := p.ReadLine()
+func (r *Reader) ReadIntReply() (int64, error) {
+ line, err := r.ReadLine()
if err != nil {
return 0, err
}
@@ -111,8 +111,8 @@ func (p *Reader) ReadIntReply() (int64, error) {
}
}
-func (p *Reader) ReadTmpBytesReply() ([]byte, error) {
- line, err := p.ReadLine()
+func (r *Reader) ReadTmpBytesReply() ([]byte, error) {
+ line, err := r.ReadLine()
if err != nil {
return nil, err
}
@@ -120,7 +120,7 @@ func (p *Reader) ReadTmpBytesReply() ([]byte, error) {
case ErrorReply:
return nil, ParseErrorReply(line)
case StringReply:
- return p.readTmpBytesValue(line)
+ return r.readTmpBytesValue(line)
case StatusReply:
return parseStatusValue(line), nil
default:
@@ -138,24 +138,24 @@ func (r *Reader) ReadBytesReply() ([]byte, error) {
return cp, nil
}
-func (p *Reader) ReadStringReply() (string, error) {
- b, err := p.ReadTmpBytesReply()
+func (r *Reader) ReadStringReply() (string, error) {
+ b, err := r.ReadTmpBytesReply()
if err != nil {
return "", err
}
return string(b), nil
}
-func (p *Reader) ReadFloatReply() (float64, error) {
- b, err := p.ReadTmpBytesReply()
+func (r *Reader) ReadFloatReply() (float64, error) {
+ b, err := r.ReadTmpBytesReply()
if err != nil {
return 0, err
}
return parseFloat(b, 64)
}
-func (p *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
- line, err := p.ReadLine()
+func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
+ line, err := r.ReadLine()
if err != nil {
return nil, err
}
@@ -167,14 +167,14 @@ func (p *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
if err != nil {
return nil, err
}
- return m(p, n)
+ return m(r, n)
default:
return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
}
-func (p *Reader) ReadArrayLen() (int64, error) {
- line, err := p.ReadLine()
+func (r *Reader) ReadArrayLen() (int64, error) {
+ line, err := r.ReadLine()
if err != nil {
return 0, err
}
@@ -188,8 +188,8 @@ func (p *Reader) ReadArrayLen() (int64, error) {
}
}
-func (p *Reader) ReadScanReply() ([]string, uint64, error) {
- n, err := p.ReadArrayLen()
+func (r *Reader) ReadScanReply() ([]string, uint64, error) {
+ n, err := r.ReadArrayLen()
if err != nil {
return nil, 0, err
}
@@ -197,19 +197,19 @@ func (p *Reader) ReadScanReply() ([]string, uint64, error) {
return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
}
- cursor, err := p.ReadUint()
+ cursor, err := r.ReadUint()
if err != nil {
return nil, 0, err
}
- n, err = p.ReadArrayLen()
+ n, err = r.ReadArrayLen()
if err != nil {
return nil, 0, err
}
keys := make([]string, n)
for i := int64(0); i < n; i++ {
- key, err := p.ReadStringReply()
+ key, err := r.ReadStringReply()
if err != nil {
return nil, 0, err
}
@@ -219,7 +219,7 @@ func (p *Reader) ReadScanReply() ([]string, uint64, error) {
return keys, cursor, err
}
-func (p *Reader) readTmpBytesValue(line []byte) ([]byte, error) {
+func (r *Reader) readTmpBytesValue(line []byte) ([]byte, error) {
if isNilReply(line) {
return nil, internal.Nil
}
@@ -229,7 +229,7 @@ func (p *Reader) readTmpBytesValue(line []byte) ([]byte, error) {
return nil, err
}
- b, err := p.ReadN(replyLen + 2)
+ b, err := r.ReadN(replyLen + 2)
if err != nil {
return nil, err
}
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go
index 01f8a61aa..3ee4ea9d0 100644
--- a/vendor/github.com/go-redis/redis/pubsub.go
+++ b/vendor/github.com/go-redis/redis/pubsub.go
@@ -127,7 +127,7 @@ func (c *PubSub) Close() error {
return nil
}
-// Subscribes the client to the specified channels. It returns
+// Subscribe the client to the specified channels. It returns
// empty subscription if there are no channels.
func (c *PubSub) Subscribe(channels ...string) error {
c.mu.Lock()
@@ -137,7 +137,7 @@ func (c *PubSub) Subscribe(channels ...string) error {
return err
}
-// Subscribes the client to the given patterns. It returns
+// PSubscribe the client to the given patterns. It returns
// empty subscription if there are no patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
c.mu.Lock()
@@ -147,7 +147,7 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
return err
}
-// Unsubscribes the client from the given channels, or from all of
+// Unsubscribe the client from the given channels, or from all of
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
c.mu.Lock()
@@ -157,7 +157,7 @@ func (c *PubSub) Unsubscribe(channels ...string) error {
return err
}
-// Unsubscribes the client from the given patterns, or from all of
+// PUnsubscribe the client from the given patterns, or from all of
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
c.mu.Lock()
@@ -196,7 +196,7 @@ func (c *PubSub) Ping(payload ...string) error {
return err
}
-// Message received after a successful subscription to channel.
+// Subscription received after a successful subscription to channel.
type Subscription struct {
// Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
Kind string
diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go
index 37ffafd97..cf402986d 100644
--- a/vendor/github.com/go-redis/redis/redis.go
+++ b/vendor/github.com/go-redis/redis/redis.go
@@ -11,7 +11,7 @@ import (
"github.com/go-redis/redis/internal/proto"
)
-// Redis nil reply returned when key does not exist.
+// Nil reply redis returned when key does not exist.
const Nil = internal.Nil
func init() {
@@ -22,6 +22,12 @@ func SetLogger(logger *log.Logger) {
internal.Logger = logger
}
+func (c *baseClient) init() {
+ c.process = c.defaultProcess
+ c.processPipeline = c.defaultProcessPipeline
+ c.processTxPipeline = c.defaultProcessTxPipeline
+}
+
func (c *baseClient) String() string {
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
}
@@ -85,7 +91,8 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
connPool: pool.NewSingleConnPool(cn),
},
}
- conn.setProcessor(conn.Process)
+ conn.baseClient.init()
+ conn.statefulCmdable.setProcessor(conn.Process)
_, err := conn.Pipelined(func(pipe Pipeliner) error {
if c.opt.Password != "" {
@@ -117,14 +124,11 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// an input and returns the new wrapper process func. createWrapper should
// use call the old process func within the new process func.
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
- c.process = fn(c.defaultProcess)
+ c.process = fn(c.process)
}
func (c *baseClient) Process(cmd Cmder) error {
- if c.process != nil {
- return c.process(cmd)
- }
- return c.defaultProcess(cmd)
+ return c.process(cmd)
}
func (c *baseClient) defaultProcess(cmd Cmder) error {
@@ -172,9 +176,9 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration {
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil {
return *timeout
- } else {
- return c.opt.ReadTimeout
}
+
+ return c.opt.ReadTimeout
}
// Close closes the client, releasing any open resources.
@@ -198,35 +202,48 @@ func (c *baseClient) getAddr() string {
return c.opt.Addr
}
+func (c *baseClient) WrapProcessPipeline(
+ fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
+) {
+ c.processPipeline = fn(c.processPipeline)
+ c.processTxPipeline = fn(c.processTxPipeline)
+}
+
+func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
+ return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
+}
+
+func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
+ return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
+}
+
type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
-func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
- return func(cmds []Cmder) error {
- for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
- if attempt > 0 {
- time.Sleep(c.retryBackoff(attempt))
- }
+func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
- cn, _, err := c.getConn()
- if err != nil {
- setCmdsErr(cmds, err)
- return err
- }
+ cn, _, err := c.getConn()
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
- canRetry, err := p(cn, cmds)
+ canRetry, err := p(cn, cmds)
- if err == nil || internal.IsRedisError(err) {
- _ = c.connPool.Put(cn)
- break
- }
- _ = c.connPool.Remove(cn)
+ if err == nil || internal.IsRedisError(err) {
+ _ = c.connPool.Put(cn)
+ break
+ }
+ _ = c.connPool.Remove(cn)
- if !canRetry || !internal.IsRetryableError(err, true) {
- break
- }
+ if !canRetry || !internal.IsRetryableError(err, true) {
+ break
}
- return firstCmdsErr(cmds)
}
+ return firstCmdsErr(cmds)
}
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
@@ -324,14 +341,15 @@ type Client struct {
}
func newClient(opt *Options, pool pool.Pooler) *Client {
- client := Client{
+ c := Client{
baseClient: baseClient{
opt: opt,
connPool: pool,
},
}
- client.setProcessor(client.Process)
- return &client
+ c.baseClient.init()
+ c.cmdable.setProcessor(c.Process)
+ return &c
}
// NewClient returns a client to the Redis Server specified by Options.
@@ -343,7 +361,7 @@ func NewClient(opt *Options) *Client {
func (c *Client) copy() *Client {
c2 := new(Client)
*c2 = *c
- c2.setProcessor(c2.Process)
+ c2.cmdable.setProcessor(c2.Process)
return c2
}
@@ -366,9 +384,9 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
func (c *Client) Pipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExecer(c.pipelineProcessCmds),
+ exec: c.processPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -379,9 +397,9 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Client) TxPipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExecer(c.txPipelineProcessCmds),
+ exec: c.processTxPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -430,9 +448,9 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
func (c *Conn) Pipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExecer(c.pipelineProcessCmds),
+ exec: c.processPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -443,8 +461,8 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Conn) TxPipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExecer(c.txPipelineProcessCmds),
+ exec: c.processTxPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
diff --git a/vendor/github.com/go-redis/redis/redis_context.go b/vendor/github.com/go-redis/redis/redis_context.go
index 6ec811ca5..c00e505f6 100644
--- a/vendor/github.com/go-redis/redis/redis_context.go
+++ b/vendor/github.com/go-redis/redis/redis_context.go
@@ -12,7 +12,10 @@ type baseClient struct {
connPool pool.Pooler
opt *Options
- process func(Cmder) error
+ process func(Cmder) error
+ processPipeline func([]Cmder) error
+ processTxPipeline func([]Cmder) error
+
onClose func() error // hook called when client is closed
ctx context.Context
diff --git a/vendor/github.com/go-redis/redis/redis_no_context.go b/vendor/github.com/go-redis/redis/redis_no_context.go
index 0752192f1..8555c5c09 100644
--- a/vendor/github.com/go-redis/redis/redis_no_context.go
+++ b/vendor/github.com/go-redis/redis/redis_no_context.go
@@ -10,6 +10,9 @@ type baseClient struct {
connPool pool.Pooler
opt *Options
- process func(Cmder) error
+ process func(Cmder) error
+ processPipeline func([]Cmder) error
+ processTxPipeline func([]Cmder) error
+
onClose func() error // hook called when client is closed
}
diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go
index c11ef6bc2..10f33ed00 100644
--- a/vendor/github.com/go-redis/redis/ring.go
+++ b/vendor/github.com/go-redis/redis/ring.go
@@ -150,6 +150,8 @@ type Ring struct {
shards map[string]*ringShard
shardsList []*ringShard
+ processPipeline func([]Cmder) error
+
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
@@ -158,7 +160,9 @@ type Ring struct {
func NewRing(opt *RingOptions) *Ring {
const nreplicas = 100
+
opt.init()
+
ring := &Ring{
opt: opt,
nreplicas: nreplicas,
@@ -166,13 +170,17 @@ func NewRing(opt *RingOptions) *Ring {
hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard),
}
- ring.setProcessor(ring.Process)
+ ring.processPipeline = ring.defaultProcessPipeline
+ ring.cmdable.setProcessor(ring.Process)
+
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
clopt.Addr = addr
ring.addShard(name, NewClient(clopt))
}
+
go ring.heartbeat()
+
return ring
}
@@ -354,6 +362,13 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
return c.shardByKey(firstKey)
}
+func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
+ c.ForEachShard(func(c *Client) error {
+ c.WrapProcess(fn)
+ return nil
+ })
+}
+
func (c *Ring) Process(cmd Cmder) error {
shard, err := c.cmdShard(cmd)
if err != nil {
@@ -436,9 +451,9 @@ func (c *Ring) Close() error {
func (c *Ring) Pipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExec,
+ exec: c.processPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.cmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -446,7 +461,13 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}
-func (c *Ring) pipelineExec(cmds []Cmder) error {
+func (c *Ring) WrapProcessPipeline(
+ fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
+) {
+ c.processPipeline = fn(c.processPipeline)
+}
+
+func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
cmdInfo := c.cmdInfo(cmd.Name())
diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go
index 37d06b482..3f56f08b3 100644
--- a/vendor/github.com/go-redis/redis/sentinel.go
+++ b/vendor/github.com/go-redis/redis/sentinel.go
@@ -76,7 +76,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt: opt,
}
- client := Client{
+ c := Client{
baseClient: baseClient{
opt: opt,
connPool: failover.Pool(),
@@ -86,9 +86,10 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
},
},
}
- client.setProcessor(client.Process)
+ c.baseClient.init()
+ c.setProcessor(c.Process)
- return &client
+ return &c
}
//------------------------------------------------------------------------------
@@ -100,14 +101,15 @@ type sentinelClient struct {
func newSentinel(opt *Options) *sentinelClient {
opt.init()
- client := sentinelClient{
+ c := sentinelClient{
baseClient: baseClient{
opt: opt,
connPool: newConnPool(opt),
},
}
- client.cmdable = cmdable{client.Process}
- return &client
+ c.baseClient.init()
+ c.cmdable.setProcessor(c.Process)
+ return &c
}
func (c *sentinelClient) PubSub() *PubSub {
diff --git a/vendor/github.com/go-redis/redis/tx.go b/vendor/github.com/go-redis/redis/tx.go
index 11d5d5cb0..26c29bef5 100644
--- a/vendor/github.com/go-redis/redis/tx.go
+++ b/vendor/github.com/go-redis/redis/tx.go
@@ -5,7 +5,7 @@ import (
"github.com/go-redis/redis/internal/pool"
)
-// Redis transaction failed.
+// TxFailedErr transaction redis failed.
const TxFailedErr = internal.RedisError("redis: transaction failed")
// Tx implements Redis transactions as described in
@@ -24,7 +24,8 @@ func (c *Client) newTx() *Tx {
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true),
},
}
- tx.setProcessor(tx.Process)
+ tx.baseClient.init()
+ tx.statefulCmdable.setProcessor(tx.Process)
return &tx
}
@@ -42,7 +43,7 @@ func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
return err
}
-// close closes the transaction, releasing any open resources.
+// Close closes the transaction, releasing any open resources.
func (c *Tx) Close() error {
_ = c.Unwatch().Err()
return c.baseClient.Close()
@@ -75,9 +76,9 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd {
func (c *Tx) Pipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExecer(c.txPipelineProcessCmds),
+ exec: c.processTxPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}