From 6d8f122a5160f6d9e4c51579f2429dfaa62c7271 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Fri, 16 Feb 2018 06:47:51 -0800 Subject: Upgrading server dependancies (#8308) --- vendor/github.com/go-redis/redis/cluster.go | 42 +++++++-- vendor/github.com/go-redis/redis/command.go | 4 +- .../go-redis/redis/example_instrumentation_test.go | 69 ++++++-------- .../go-redis/redis/internal/proto/reader.go | 66 ++++++------- vendor/github.com/go-redis/redis/pubsub.go | 10 +- vendor/github.com/go-redis/redis/redis.go | 102 ++++++++++++--------- vendor/github.com/go-redis/redis/redis_context.go | 5 +- .../github.com/go-redis/redis/redis_no_context.go | 5 +- vendor/github.com/go-redis/redis/ring.go | 29 +++++- vendor/github.com/go-redis/redis/sentinel.go | 14 +-- vendor/github.com/go-redis/redis/tx.go | 11 ++- 11 files changed, 211 insertions(+), 146 deletions(-) (limited to 'vendor/github.com/go-redis') diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index accdb3d27..a2c18b387 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -445,6 +445,10 @@ type ClusterClient struct { cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + // Reports whether slots reloading is in progress. reloading uint32 } @@ -458,7 +462,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { opt: opt, nodes: newClusterNodes(opt), } - c.setProcessor(c.Process) + + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline + + c.cmdable.setProcessor(c.Process) // Add initial nodes. for _, addr := range opt.Addrs { @@ -628,7 +637,20 @@ func (c *ClusterClient) Close() error { return c.nodes.Close() } +func (c *ClusterClient) WrapProcess( + fn func(oldProcess func(Cmder) error) func(Cmder) error, +) { + c.process = fn(c.process) +} + func (c *ClusterClient) Process(cmd Cmder) error { + if c.process != nil { + return c.process(cmd) + } + return c.defaultProcess(cmd) +} + +func (c *ClusterClient) defaultProcess(cmd Cmder) error { state, err := c.state() if err != nil { cmd.setErr(err) @@ -910,9 +932,9 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { func (c *ClusterClient) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExec, + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -920,7 +942,13 @@ func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(fn) } -func (c *ClusterClient) pipelineExec(cmds []Cmder) error { +func (c *ClusterClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { cmdsMap, err := c.mapCmdsByNode(cmds) if err != nil { setCmdsErr(cmds, err) @@ -1064,9 +1092,9 @@ func (c *ClusterClient) checkMovedErr( // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *ClusterClient) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.txPipelineExec, + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -1074,7 +1102,7 @@ func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.TxPipeline().Pipelined(fn) } -func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { +func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { state, err := c.state() if err != nil { return err diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go index 598ed9800..480a5ce19 100644 --- a/vendor/github.com/go-redis/redis/command.go +++ b/vendor/github.com/go-redis/redis/command.go @@ -81,9 +81,9 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int { case "eval", "evalsha": if cmd.stringArg(2) != "0" { return 3 - } else { - return 0 } + + return 0 case "publish": return 1 } diff --git a/vendor/github.com/go-redis/redis/example_instrumentation_test.go b/vendor/github.com/go-redis/redis/example_instrumentation_test.go index 02051f9c9..85abbd744 100644 --- a/vendor/github.com/go-redis/redis/example_instrumentation_test.go +++ b/vendor/github.com/go-redis/redis/example_instrumentation_test.go @@ -2,58 +2,47 @@ package redis_test import ( "fmt" - "sync/atomic" - "time" "github.com/go-redis/redis" ) func Example_instrumentation() { - ring := redis.NewRing(&redis.RingOptions{ - Addrs: map[string]string{ - "shard1": ":6379", - }, + cl := redis.NewClient(&redis.Options{ + Addr: ":6379", }) - ring.ForEachShard(func(client *redis.Client) error { - wrapRedisProcess(client) - return nil + cl.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error { + return func(cmd redis.Cmder) error { + fmt.Printf("starting processing: <%s>\n", cmd) + err := old(cmd) + fmt.Printf("finished processing: <%s>\n", cmd) + return err + } }) - for { - ring.Ping() - } + cl.Ping() + // Output: starting processing: + // finished processing: } -func wrapRedisProcess(client *redis.Client) { - const precision = time.Microsecond - var count, avgDur uint32 - - go func() { - for range time.Tick(3 * time.Second) { - n := atomic.LoadUint32(&count) - dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision - fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur) - } - }() - - client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error { - return func(cmd redis.Cmder) error { - start := time.Now() - err := oldProcess(cmd) - dur := time.Since(start) - - const decay = float64(1) / 100 - ms := float64(dur / precision) - for { - avg := atomic.LoadUint32(&avgDur) - newAvg := uint32((1-decay)*float64(avg) + decay*ms) - if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) { - break - } - } - atomic.AddUint32(&count, 1) +func Example_Pipeline_instrumentation() { + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + client.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error { + return func(cmds []redis.Cmder) error { + fmt.Printf("pipeline starting processing: %v\n", cmds) + err := old(cmds) + fmt.Printf("pipeline finished processing: %v\n", cmds) return err } }) + + client.Pipelined(func(pipe redis.Pipeliner) error { + pipe.Ping() + pipe.Ping() + return nil + }) + // Output: pipeline starting processing: [ping: ping: ] + // pipeline finished processing: [ping: PONG ping: PONG] } diff --git a/vendor/github.com/go-redis/redis/internal/proto/reader.go b/vendor/github.com/go-redis/redis/internal/proto/reader.go index cd94329d8..e5ae8a03e 100644 --- a/vendor/github.com/go-redis/redis/internal/proto/reader.go +++ b/vendor/github.com/go-redis/redis/internal/proto/reader.go @@ -37,25 +37,25 @@ func (r *Reader) Reset(rd io.Reader) { r.src.Reset(rd) } -func (p *Reader) PeekBuffered() []byte { - if n := p.src.Buffered(); n != 0 { - b, _ := p.src.Peek(n) +func (r *Reader) PeekBuffered() []byte { + if n := r.src.Buffered(); n != 0 { + b, _ := r.src.Peek(n) return b } return nil } -func (p *Reader) ReadN(n int) ([]byte, error) { - b, err := readN(p.src, p.buf, n) +func (r *Reader) ReadN(n int) ([]byte, error) { + b, err := readN(r.src, r.buf, n) if err != nil { return nil, err } - p.buf = b + r.buf = b return b, nil } -func (p *Reader) ReadLine() ([]byte, error) { - line, isPrefix, err := p.src.ReadLine() +func (r *Reader) ReadLine() ([]byte, error) { + line, isPrefix, err := r.src.ReadLine() if err != nil { return nil, err } @@ -71,8 +71,8 @@ func (p *Reader) ReadLine() ([]byte, error) { return line, nil } -func (p *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { - line, err := p.ReadLine() +func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { + line, err := r.ReadLine() if err != nil { return nil, err } @@ -85,19 +85,19 @@ func (p *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { case IntReply: return parseInt(line[1:], 10, 64) case StringReply: - return p.readTmpBytesValue(line) + return r.readTmpBytesValue(line) case ArrayReply: n, err := parseArrayLen(line) if err != nil { return nil, err } - return m(p, n) + return m(r, n) } return nil, fmt.Errorf("redis: can't parse %.100q", line) } -func (p *Reader) ReadIntReply() (int64, error) { - line, err := p.ReadLine() +func (r *Reader) ReadIntReply() (int64, error) { + line, err := r.ReadLine() if err != nil { return 0, err } @@ -111,8 +111,8 @@ func (p *Reader) ReadIntReply() (int64, error) { } } -func (p *Reader) ReadTmpBytesReply() ([]byte, error) { - line, err := p.ReadLine() +func (r *Reader) ReadTmpBytesReply() ([]byte, error) { + line, err := r.ReadLine() if err != nil { return nil, err } @@ -120,7 +120,7 @@ func (p *Reader) ReadTmpBytesReply() ([]byte, error) { case ErrorReply: return nil, ParseErrorReply(line) case StringReply: - return p.readTmpBytesValue(line) + return r.readTmpBytesValue(line) case StatusReply: return parseStatusValue(line), nil default: @@ -138,24 +138,24 @@ func (r *Reader) ReadBytesReply() ([]byte, error) { return cp, nil } -func (p *Reader) ReadStringReply() (string, error) { - b, err := p.ReadTmpBytesReply() +func (r *Reader) ReadStringReply() (string, error) { + b, err := r.ReadTmpBytesReply() if err != nil { return "", err } return string(b), nil } -func (p *Reader) ReadFloatReply() (float64, error) { - b, err := p.ReadTmpBytesReply() +func (r *Reader) ReadFloatReply() (float64, error) { + b, err := r.ReadTmpBytesReply() if err != nil { return 0, err } return parseFloat(b, 64) } -func (p *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { - line, err := p.ReadLine() +func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { + line, err := r.ReadLine() if err != nil { return nil, err } @@ -167,14 +167,14 @@ func (p *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { if err != nil { return nil, err } - return m(p, n) + return m(r, n) default: return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line) } } -func (p *Reader) ReadArrayLen() (int64, error) { - line, err := p.ReadLine() +func (r *Reader) ReadArrayLen() (int64, error) { + line, err := r.ReadLine() if err != nil { return 0, err } @@ -188,8 +188,8 @@ func (p *Reader) ReadArrayLen() (int64, error) { } } -func (p *Reader) ReadScanReply() ([]string, uint64, error) { - n, err := p.ReadArrayLen() +func (r *Reader) ReadScanReply() ([]string, uint64, error) { + n, err := r.ReadArrayLen() if err != nil { return nil, 0, err } @@ -197,19 +197,19 @@ func (p *Reader) ReadScanReply() ([]string, uint64, error) { return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n) } - cursor, err := p.ReadUint() + cursor, err := r.ReadUint() if err != nil { return nil, 0, err } - n, err = p.ReadArrayLen() + n, err = r.ReadArrayLen() if err != nil { return nil, 0, err } keys := make([]string, n) for i := int64(0); i < n; i++ { - key, err := p.ReadStringReply() + key, err := r.ReadStringReply() if err != nil { return nil, 0, err } @@ -219,7 +219,7 @@ func (p *Reader) ReadScanReply() ([]string, uint64, error) { return keys, cursor, err } -func (p *Reader) readTmpBytesValue(line []byte) ([]byte, error) { +func (r *Reader) readTmpBytesValue(line []byte) ([]byte, error) { if isNilReply(line) { return nil, internal.Nil } @@ -229,7 +229,7 @@ func (p *Reader) readTmpBytesValue(line []byte) ([]byte, error) { return nil, err } - b, err := p.ReadN(replyLen + 2) + b, err := r.ReadN(replyLen + 2) if err != nil { return nil, err } diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go index 01f8a61aa..3ee4ea9d0 100644 --- a/vendor/github.com/go-redis/redis/pubsub.go +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -127,7 +127,7 @@ func (c *PubSub) Close() error { return nil } -// Subscribes the client to the specified channels. It returns +// Subscribe the client to the specified channels. It returns // empty subscription if there are no channels. func (c *PubSub) Subscribe(channels ...string) error { c.mu.Lock() @@ -137,7 +137,7 @@ func (c *PubSub) Subscribe(channels ...string) error { return err } -// Subscribes the client to the given patterns. It returns +// PSubscribe the client to the given patterns. It returns // empty subscription if there are no patterns. func (c *PubSub) PSubscribe(patterns ...string) error { c.mu.Lock() @@ -147,7 +147,7 @@ func (c *PubSub) PSubscribe(patterns ...string) error { return err } -// Unsubscribes the client from the given channels, or from all of +// Unsubscribe the client from the given channels, or from all of // them if none is given. func (c *PubSub) Unsubscribe(channels ...string) error { c.mu.Lock() @@ -157,7 +157,7 @@ func (c *PubSub) Unsubscribe(channels ...string) error { return err } -// Unsubscribes the client from the given patterns, or from all of +// PUnsubscribe the client from the given patterns, or from all of // them if none is given. func (c *PubSub) PUnsubscribe(patterns ...string) error { c.mu.Lock() @@ -196,7 +196,7 @@ func (c *PubSub) Ping(payload ...string) error { return err } -// Message received after a successful subscription to channel. +// Subscription received after a successful subscription to channel. type Subscription struct { // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe". Kind string diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go index 37ffafd97..cf402986d 100644 --- a/vendor/github.com/go-redis/redis/redis.go +++ b/vendor/github.com/go-redis/redis/redis.go @@ -11,7 +11,7 @@ import ( "github.com/go-redis/redis/internal/proto" ) -// Redis nil reply returned when key does not exist. +// Nil reply redis returned when key does not exist. const Nil = internal.Nil func init() { @@ -22,6 +22,12 @@ func SetLogger(logger *log.Logger) { internal.Logger = logger } +func (c *baseClient) init() { + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline +} + func (c *baseClient) String() string { return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) } @@ -85,7 +91,8 @@ func (c *baseClient) initConn(cn *pool.Conn) error { connPool: pool.NewSingleConnPool(cn), }, } - conn.setProcessor(conn.Process) + conn.baseClient.init() + conn.statefulCmdable.setProcessor(conn.Process) _, err := conn.Pipelined(func(pipe Pipeliner) error { if c.opt.Password != "" { @@ -117,14 +124,11 @@ func (c *baseClient) initConn(cn *pool.Conn) error { // an input and returns the new wrapper process func. createWrapper should // use call the old process func within the new process func. func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { - c.process = fn(c.defaultProcess) + c.process = fn(c.process) } func (c *baseClient) Process(cmd Cmder) error { - if c.process != nil { - return c.process(cmd) - } - return c.defaultProcess(cmd) + return c.process(cmd) } func (c *baseClient) defaultProcess(cmd Cmder) error { @@ -172,9 +176,9 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration { func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { if timeout := cmd.readTimeout(); timeout != nil { return *timeout - } else { - return c.opt.ReadTimeout } + + return c.opt.ReadTimeout } // Close closes the client, releasing any open resources. @@ -198,35 +202,48 @@ func (c *baseClient) getAddr() string { return c.opt.Addr } +func (c *baseClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) + c.processTxPipeline = fn(c.processTxPipeline) +} + +func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.pipelineProcessCmds) +} + +func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds) +} + type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) -func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { - return func(cmds []Cmder) error { - for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { - if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) - } +func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } - cn, _, err := c.getConn() - if err != nil { - setCmdsErr(cmds, err) - return err - } + cn, _, err := c.getConn() + if err != nil { + setCmdsErr(cmds, err) + return err + } - canRetry, err := p(cn, cmds) + canRetry, err := p(cn, cmds) - if err == nil || internal.IsRedisError(err) { - _ = c.connPool.Put(cn) - break - } - _ = c.connPool.Remove(cn) + if err == nil || internal.IsRedisError(err) { + _ = c.connPool.Put(cn) + break + } + _ = c.connPool.Remove(cn) - if !canRetry || !internal.IsRetryableError(err, true) { - break - } + if !canRetry || !internal.IsRetryableError(err, true) { + break } - return firstCmdsErr(cmds) } + return firstCmdsErr(cmds) } func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { @@ -324,14 +341,15 @@ type Client struct { } func newClient(opt *Options, pool pool.Pooler) *Client { - client := Client{ + c := Client{ baseClient: baseClient{ opt: opt, connPool: pool, }, } - client.setProcessor(client.Process) - return &client + c.baseClient.init() + c.cmdable.setProcessor(c.Process) + return &c } // NewClient returns a client to the Redis Server specified by Options. @@ -343,7 +361,7 @@ func NewClient(opt *Options) *Client { func (c *Client) copy() *Client { c2 := new(Client) *c2 = *c - c2.setProcessor(c2.Process) + c2.cmdable.setProcessor(c2.Process) return c2 } @@ -366,9 +384,9 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Client) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.pipelineProcessCmds), + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -379,9 +397,9 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Client) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -430,9 +448,9 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Conn) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.pipelineProcessCmds), + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -443,8 +461,8 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Conn) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } diff --git a/vendor/github.com/go-redis/redis/redis_context.go b/vendor/github.com/go-redis/redis/redis_context.go index 6ec811ca5..c00e505f6 100644 --- a/vendor/github.com/go-redis/redis/redis_context.go +++ b/vendor/github.com/go-redis/redis/redis_context.go @@ -12,7 +12,10 @@ type baseClient struct { connPool pool.Pooler opt *Options - process func(Cmder) error + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + onClose func() error // hook called when client is closed ctx context.Context diff --git a/vendor/github.com/go-redis/redis/redis_no_context.go b/vendor/github.com/go-redis/redis/redis_no_context.go index 0752192f1..8555c5c09 100644 --- a/vendor/github.com/go-redis/redis/redis_no_context.go +++ b/vendor/github.com/go-redis/redis/redis_no_context.go @@ -10,6 +10,9 @@ type baseClient struct { connPool pool.Pooler opt *Options - process func(Cmder) error + process func(Cmder) error + processPipeline func([]Cmder) error + processTxPipeline func([]Cmder) error + onClose func() error // hook called when client is closed } diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index c11ef6bc2..10f33ed00 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -150,6 +150,8 @@ type Ring struct { shards map[string]*ringShard shardsList []*ringShard + processPipeline func([]Cmder) error + cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo @@ -158,7 +160,9 @@ type Ring struct { func NewRing(opt *RingOptions) *Ring { const nreplicas = 100 + opt.init() + ring := &Ring{ opt: opt, nreplicas: nreplicas, @@ -166,13 +170,17 @@ func NewRing(opt *RingOptions) *Ring { hash: consistenthash.New(nreplicas, nil), shards: make(map[string]*ringShard), } - ring.setProcessor(ring.Process) + ring.processPipeline = ring.defaultProcessPipeline + ring.cmdable.setProcessor(ring.Process) + for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr ring.addShard(name, NewClient(clopt)) } + go ring.heartbeat() + return ring } @@ -354,6 +362,13 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { return c.shardByKey(firstKey) } +func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { + c.ForEachShard(func(c *Client) error { + c.WrapProcess(fn) + return nil + }) +} + func (c *Ring) Process(cmd Cmder) error { shard, err := c.cmdShard(cmd) if err != nil { @@ -436,9 +451,9 @@ func (c *Ring) Close() error { func (c *Ring) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExec, + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.cmdable.setProcessor(pipe.Process) return &pipe } @@ -446,7 +461,13 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(fn) } -func (c *Ring) pipelineExec(cmds []Cmder) error { +func (c *Ring) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) +} + +func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go index 37d06b482..3f56f08b3 100644 --- a/vendor/github.com/go-redis/redis/sentinel.go +++ b/vendor/github.com/go-redis/redis/sentinel.go @@ -76,7 +76,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { opt: opt, } - client := Client{ + c := Client{ baseClient: baseClient{ opt: opt, connPool: failover.Pool(), @@ -86,9 +86,10 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { }, }, } - client.setProcessor(client.Process) + c.baseClient.init() + c.setProcessor(c.Process) - return &client + return &c } //------------------------------------------------------------------------------ @@ -100,14 +101,15 @@ type sentinelClient struct { func newSentinel(opt *Options) *sentinelClient { opt.init() - client := sentinelClient{ + c := sentinelClient{ baseClient: baseClient{ opt: opt, connPool: newConnPool(opt), }, } - client.cmdable = cmdable{client.Process} - return &client + c.baseClient.init() + c.cmdable.setProcessor(c.Process) + return &c } func (c *sentinelClient) PubSub() *PubSub { diff --git a/vendor/github.com/go-redis/redis/tx.go b/vendor/github.com/go-redis/redis/tx.go index 11d5d5cb0..26c29bef5 100644 --- a/vendor/github.com/go-redis/redis/tx.go +++ b/vendor/github.com/go-redis/redis/tx.go @@ -5,7 +5,7 @@ import ( "github.com/go-redis/redis/internal/pool" ) -// Redis transaction failed. +// TxFailedErr transaction redis failed. const TxFailedErr = internal.RedisError("redis: transaction failed") // Tx implements Redis transactions as described in @@ -24,7 +24,8 @@ func (c *Client) newTx() *Tx { connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true), }, } - tx.setProcessor(tx.Process) + tx.baseClient.init() + tx.statefulCmdable.setProcessor(tx.Process) return &tx } @@ -42,7 +43,7 @@ func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { return err } -// close closes the transaction, releasing any open resources. +// Close closes the transaction, releasing any open resources. func (c *Tx) Close() error { _ = c.Unwatch().Err() return c.baseClient.Close() @@ -75,9 +76,9 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd { func (c *Tx) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } -- cgit v1.2.3-1-g7c22