From 6d8f122a5160f6d9e4c51579f2429dfaa62c7271 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Fri, 16 Feb 2018 06:47:51 -0800 Subject: Upgrading server dependancies (#8308) --- vendor/github.com/go-redis/redis/redis.go | 102 ++++++++++++++++++------------ 1 file changed, 60 insertions(+), 42 deletions(-) (limited to 'vendor/github.com/go-redis/redis/redis.go') diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go index 37ffafd97..cf402986d 100644 --- a/vendor/github.com/go-redis/redis/redis.go +++ b/vendor/github.com/go-redis/redis/redis.go @@ -11,7 +11,7 @@ import ( "github.com/go-redis/redis/internal/proto" ) -// Redis nil reply returned when key does not exist. +// Nil reply redis returned when key does not exist. const Nil = internal.Nil func init() { @@ -22,6 +22,12 @@ func SetLogger(logger *log.Logger) { internal.Logger = logger } +func (c *baseClient) init() { + c.process = c.defaultProcess + c.processPipeline = c.defaultProcessPipeline + c.processTxPipeline = c.defaultProcessTxPipeline +} + func (c *baseClient) String() string { return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) } @@ -85,7 +91,8 @@ func (c *baseClient) initConn(cn *pool.Conn) error { connPool: pool.NewSingleConnPool(cn), }, } - conn.setProcessor(conn.Process) + conn.baseClient.init() + conn.statefulCmdable.setProcessor(conn.Process) _, err := conn.Pipelined(func(pipe Pipeliner) error { if c.opt.Password != "" { @@ -117,14 +124,11 @@ func (c *baseClient) initConn(cn *pool.Conn) error { // an input and returns the new wrapper process func. createWrapper should // use call the old process func within the new process func. func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { - c.process = fn(c.defaultProcess) + c.process = fn(c.process) } func (c *baseClient) Process(cmd Cmder) error { - if c.process != nil { - return c.process(cmd) - } - return c.defaultProcess(cmd) + return c.process(cmd) } func (c *baseClient) defaultProcess(cmd Cmder) error { @@ -172,9 +176,9 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration { func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { if timeout := cmd.readTimeout(); timeout != nil { return *timeout - } else { - return c.opt.ReadTimeout } + + return c.opt.ReadTimeout } // Close closes the client, releasing any open resources. @@ -198,35 +202,48 @@ func (c *baseClient) getAddr() string { return c.opt.Addr } +func (c *baseClient) WrapProcessPipeline( + fn func(oldProcess func([]Cmder) error) func([]Cmder) error, +) { + c.processPipeline = fn(c.processPipeline) + c.processTxPipeline = fn(c.processTxPipeline) +} + +func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.pipelineProcessCmds) +} + +func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error { + return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds) +} + type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) -func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { - return func(cmds []Cmder) error { - for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { - if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) - } +func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } - cn, _, err := c.getConn() - if err != nil { - setCmdsErr(cmds, err) - return err - } + cn, _, err := c.getConn() + if err != nil { + setCmdsErr(cmds, err) + return err + } - canRetry, err := p(cn, cmds) + canRetry, err := p(cn, cmds) - if err == nil || internal.IsRedisError(err) { - _ = c.connPool.Put(cn) - break - } - _ = c.connPool.Remove(cn) + if err == nil || internal.IsRedisError(err) { + _ = c.connPool.Put(cn) + break + } + _ = c.connPool.Remove(cn) - if !canRetry || !internal.IsRetryableError(err, true) { - break - } + if !canRetry || !internal.IsRetryableError(err, true) { + break } - return firstCmdsErr(cmds) } + return firstCmdsErr(cmds) } func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { @@ -324,14 +341,15 @@ type Client struct { } func newClient(opt *Options, pool pool.Pooler) *Client { - client := Client{ + c := Client{ baseClient: baseClient{ opt: opt, connPool: pool, }, } - client.setProcessor(client.Process) - return &client + c.baseClient.init() + c.cmdable.setProcessor(c.Process) + return &c } // NewClient returns a client to the Redis Server specified by Options. @@ -343,7 +361,7 @@ func NewClient(opt *Options) *Client { func (c *Client) copy() *Client { c2 := new(Client) *c2 = *c - c2.setProcessor(c2.Process) + c2.cmdable.setProcessor(c2.Process) return c2 } @@ -366,9 +384,9 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Client) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.pipelineProcessCmds), + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -379,9 +397,9 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Client) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -430,9 +448,9 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Conn) Pipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.pipelineProcessCmds), + exec: c.processPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } @@ -443,8 +461,8 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Conn) TxPipeline() Pipeliner { pipe := Pipeline{ - exec: c.pipelineExecer(c.txPipelineProcessCmds), + exec: c.processTxPipeline, } - pipe.setProcessor(pipe.Process) + pipe.statefulCmdable.setProcessor(pipe.Process) return &pipe } -- cgit v1.2.3-1-g7c22