summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/redis.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/redis.go')
-rw-r--r--vendor/github.com/go-redis/redis/redis.go102
1 files changed, 60 insertions, 42 deletions
diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go
index 37ffafd97..cf402986d 100644
--- a/vendor/github.com/go-redis/redis/redis.go
+++ b/vendor/github.com/go-redis/redis/redis.go
@@ -11,7 +11,7 @@ import (
"github.com/go-redis/redis/internal/proto"
)
-// Redis nil reply returned when key does not exist.
+// Nil reply redis returned when key does not exist.
const Nil = internal.Nil
func init() {
@@ -22,6 +22,12 @@ func SetLogger(logger *log.Logger) {
internal.Logger = logger
}
+func (c *baseClient) init() {
+ c.process = c.defaultProcess
+ c.processPipeline = c.defaultProcessPipeline
+ c.processTxPipeline = c.defaultProcessTxPipeline
+}
+
func (c *baseClient) String() string {
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
}
@@ -85,7 +91,8 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
connPool: pool.NewSingleConnPool(cn),
},
}
- conn.setProcessor(conn.Process)
+ conn.baseClient.init()
+ conn.statefulCmdable.setProcessor(conn.Process)
_, err := conn.Pipelined(func(pipe Pipeliner) error {
if c.opt.Password != "" {
@@ -117,14 +124,11 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// an input and returns the new wrapper process func. createWrapper should
// use call the old process func within the new process func.
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
- c.process = fn(c.defaultProcess)
+ c.process = fn(c.process)
}
func (c *baseClient) Process(cmd Cmder) error {
- if c.process != nil {
- return c.process(cmd)
- }
- return c.defaultProcess(cmd)
+ return c.process(cmd)
}
func (c *baseClient) defaultProcess(cmd Cmder) error {
@@ -172,9 +176,9 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration {
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil {
return *timeout
- } else {
- return c.opt.ReadTimeout
}
+
+ return c.opt.ReadTimeout
}
// Close closes the client, releasing any open resources.
@@ -198,35 +202,48 @@ func (c *baseClient) getAddr() string {
return c.opt.Addr
}
+func (c *baseClient) WrapProcessPipeline(
+ fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
+) {
+ c.processPipeline = fn(c.processPipeline)
+ c.processTxPipeline = fn(c.processTxPipeline)
+}
+
+func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
+ return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
+}
+
+func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
+ return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
+}
+
type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
-func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
- return func(cmds []Cmder) error {
- for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
- if attempt > 0 {
- time.Sleep(c.retryBackoff(attempt))
- }
+func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
- cn, _, err := c.getConn()
- if err != nil {
- setCmdsErr(cmds, err)
- return err
- }
+ cn, _, err := c.getConn()
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
- canRetry, err := p(cn, cmds)
+ canRetry, err := p(cn, cmds)
- if err == nil || internal.IsRedisError(err) {
- _ = c.connPool.Put(cn)
- break
- }
- _ = c.connPool.Remove(cn)
+ if err == nil || internal.IsRedisError(err) {
+ _ = c.connPool.Put(cn)
+ break
+ }
+ _ = c.connPool.Remove(cn)
- if !canRetry || !internal.IsRetryableError(err, true) {
- break
- }
+ if !canRetry || !internal.IsRetryableError(err, true) {
+ break
}
- return firstCmdsErr(cmds)
}
+ return firstCmdsErr(cmds)
}
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
@@ -324,14 +341,15 @@ type Client struct {
}
func newClient(opt *Options, pool pool.Pooler) *Client {
- client := Client{
+ c := Client{
baseClient: baseClient{
opt: opt,
connPool: pool,
},
}
- client.setProcessor(client.Process)
- return &client
+ c.baseClient.init()
+ c.cmdable.setProcessor(c.Process)
+ return &c
}
// NewClient returns a client to the Redis Server specified by Options.
@@ -343,7 +361,7 @@ func NewClient(opt *Options) *Client {
func (c *Client) copy() *Client {
c2 := new(Client)
*c2 = *c
- c2.setProcessor(c2.Process)
+ c2.cmdable.setProcessor(c2.Process)
return c2
}
@@ -366,9 +384,9 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
func (c *Client) Pipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExecer(c.pipelineProcessCmds),
+ exec: c.processPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -379,9 +397,9 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Client) TxPipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExecer(c.txPipelineProcessCmds),
+ exec: c.processTxPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -430,9 +448,9 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
func (c *Conn) Pipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExecer(c.pipelineProcessCmds),
+ exec: c.processPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}
@@ -443,8 +461,8 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Conn) TxPipeline() Pipeliner {
pipe := Pipeline{
- exec: c.pipelineExecer(c.txPipelineProcessCmds),
+ exec: c.processTxPipeline,
}
- pipe.setProcessor(pipe.Process)
+ pipe.statefulCmdable.setProcessor(pipe.Process)
return &pipe
}