summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/redis.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/redis.go')
-rw-r--r--vendor/github.com/go-redis/redis/redis.go92
1 files changed, 42 insertions, 50 deletions
diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go
index b18973cdb..230091b3e 100644
--- a/vendor/github.com/go-redis/redis/redis.go
+++ b/vendor/github.com/go-redis/redis/redis.go
@@ -3,6 +3,7 @@ package redis
import (
"fmt"
"log"
+ "os"
"time"
"github.com/go-redis/redis/internal"
@@ -13,6 +14,10 @@ import (
// Redis nil reply, .e.g. when key does not exist.
const Nil = internal.Nil
+func init() {
+ SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
+}
+
func SetLogger(logger *log.Logger) {
internal.Logger = logger
}
@@ -131,7 +136,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
cn, _, err := c.getConn()
if err != nil {
cmd.setErr(err)
- if internal.IsRetryableError(err) {
+ if internal.IsRetryableError(err, true) {
continue
}
return err
@@ -141,7 +146,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
if err := writeCmd(cn, cmd); err != nil {
c.releaseConn(cn, err)
cmd.setErr(err)
- if internal.IsRetryableError(err) {
+ if internal.IsRetryableError(err, true) {
continue
}
return err
@@ -150,7 +155,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
cn.SetReadTimeout(c.cmdTimeout(cmd))
err = cmd.readReply(cn)
c.releaseConn(cn, err)
- if err != nil && internal.IsRetryableError(err) {
+ if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
continue
}
@@ -197,8 +202,11 @@ type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
return func(cmds []Cmder) error {
- var firstErr error
- for i := 0; i <= c.opt.MaxRetries; i++ {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
cn, _, err := c.getConn()
if err != nil {
setCmdsErr(cmds, err)
@@ -206,18 +214,18 @@ func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
}
canRetry, err := p(cn, cmds)
- c.releaseConn(cn, err)
- if err == nil {
- return nil
- }
- if firstErr == nil {
- firstErr = err
+
+ if err == nil || internal.IsRedisError(err) {
+ _ = c.connPool.Put(cn)
+ break
}
- if !canRetry || !internal.IsRetryableError(err) {
+ _ = c.connPool.Remove(cn)
+
+ if !canRetry || !internal.IsRetryableError(err, true) {
break
}
}
- return firstErr
+ return firstCmdsErr(cmds)
}
}
@@ -230,23 +238,17 @@ func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, err
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
- return pipelineReadCmds(cn, cmds)
+ return true, pipelineReadCmds(cn, cmds)
}
-func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) {
- for i, cmd := range cmds {
+func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
+ for _, cmd := range cmds {
err := cmd.readReply(cn)
- if err == nil {
- continue
- }
- if i == 0 {
- retry = true
- }
- if firstErr == nil {
- firstErr = err
+ if err != nil && !internal.IsRedisError(err) {
+ return err
}
}
- return
+ return nil
}
func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
@@ -260,11 +262,11 @@ func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, e
cn.SetReadTimeout(c.opt.ReadTimeout)
if err := c.txPipelineReadQueued(cn, cmds); err != nil {
+ setCmdsErr(cmds, err)
return false, err
}
- _, err := pipelineReadCmds(cn, cmds)
- return false, err
+ return false, pipelineReadCmds(cn, cmds)
}
func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
@@ -276,21 +278,16 @@ func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
}
func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
- var firstErr error
-
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
- firstErr = err
+ if err := statusCmd.readReply(cn); err != nil {
+ return err
}
- for _, cmd := range cmds {
+ for _ = range cmds {
err := statusCmd.readReply(cn)
- if err != nil {
- cmd.setErr(err)
- if firstErr == nil {
- firstErr = err
- }
+ if err != nil && !internal.IsRedisError(err) {
+ return err
}
}
@@ -355,21 +352,16 @@ func (c *Client) Options() *Options {
return c.opt
}
+type PoolStats pool.Stats
+
// PoolStats returns connection pool stats.
func (c *Client) PoolStats() *PoolStats {
- s := c.connPool.Stats()
- return &PoolStats{
- Requests: s.Requests,
- Hits: s.Hits,
- Timeouts: s.Timeouts,
-
- TotalConns: s.TotalConns,
- FreeConns: s.FreeConns,
- }
+ stats := c.connPool.Stats()
+ return (*PoolStats)(stats)
}
func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.Pipeline().pipelined(fn)
+ return c.Pipeline().Pipelined(fn)
}
func (c *Client) Pipeline() Pipeliner {
@@ -381,7 +373,7 @@ func (c *Client) Pipeline() Pipeliner {
}
func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.TxPipeline().pipelined(fn)
+ return c.TxPipeline().Pipelined(fn)
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
@@ -433,7 +425,7 @@ type Conn struct {
}
func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.Pipeline().pipelined(fn)
+ return c.Pipeline().Pipelined(fn)
}
func (c *Conn) Pipeline() Pipeliner {
@@ -445,7 +437,7 @@ func (c *Conn) Pipeline() Pipeliner {
}
func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.TxPipeline().pipelined(fn)
+ return c.TxPipeline().Pipelined(fn)
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.