summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/redis.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/redis.go')
-rw-r--r--vendor/github.com/go-redis/redis/redis.go91
1 files changed, 55 insertions, 36 deletions
diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go
index beb632e1e..3e72bf060 100644
--- a/vendor/github.com/go-redis/redis/redis.go
+++ b/vendor/github.com/go-redis/redis/redis.go
@@ -50,7 +50,7 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
return nil, err
}
- if !cn.Inited {
+ if cn.InitedAt.IsZero() {
if err := c.initConn(cn); err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
@@ -66,7 +66,7 @@ func (c *baseClient) getConn() (*pool.Conn, error) {
return nil, err
}
- if !cn.Inited {
+ if cn.InitedAt.IsZero() {
err := c.initConn(cn)
if err != nil {
c.connPool.Remove(cn)
@@ -88,7 +88,7 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
}
func (c *baseClient) initConn(cn *pool.Conn) error {
- cn.Inited = true
+ cn.InitedAt = time.Now()
if c.opt.Password == "" &&
c.opt.DB == 0 &&
@@ -123,8 +123,17 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
return nil
}
+// Do creates a Cmd from the args and processes the cmd.
+func (c *baseClient) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
// WrapProcess wraps function that processes Redis commands.
-func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
+func (c *baseClient) WrapProcess(
+ fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
+) {
c.process = fn(c.process)
}
@@ -147,8 +156,10 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err
}
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := writeCmd(cn, cmd); err != nil {
+ err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmd)
+ })
+ if err != nil {
c.releaseConn(cn, err)
cmd.setErr(err)
if internal.IsRetryableError(err, true) {
@@ -157,8 +168,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err
}
- cn.SetReadTimeout(c.cmdTimeout(cmd))
- err = cmd.readReply(cn)
+ err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error {
+ return cmd.readReply(rd)
+ })
c.releaseConn(cn, err)
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
continue
@@ -176,9 +188,8 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration {
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil {
- return *timeout
+ return readTimeout(*timeout)
}
-
return c.opt.ReadTimeout
}
@@ -244,24 +255,27 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
break
}
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := writeCmd(cn, cmds...); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmds...)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
return true, err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
- return true, pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ return pipelineReadCmds(rd, cmds)
+ })
+ return true, err
}
-func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
+func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
for _, cmd := range cmds {
- err := cmd.readReply(cn)
+ err := cmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) {
return err
}
@@ -270,47 +284,50 @@ func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
}
func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := txPipelineWriteMulti(cn, cmds); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return txPipelineWriteMulti(wr, cmds)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
return true, err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- if err := c.txPipelineReadQueued(cn, cmds); err != nil {
- setCmdsErr(cmds, err)
- return false, err
- }
-
- return false, pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ err := txPipelineReadQueued(rd, cmds)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
+ return pipelineReadCmds(rd, cmds)
+ })
+ return false, err
}
-func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
+func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
multiExec := make([]Cmder, 0, len(cmds)+2)
multiExec = append(multiExec, NewStatusCmd("MULTI"))
multiExec = append(multiExec, cmds...)
multiExec = append(multiExec, NewSliceCmd("EXEC"))
- return writeCmd(cn, multiExec...)
+ return writeCmd(wr, multiExec...)
}
-func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
+func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil {
+ err := statusCmd.readReply(rd)
+ if err != nil {
return err
}
for _ = range cmds {
- err := statusCmd.readReply(cn)
+ err = statusCmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) {
return err
}
}
// Parse number of replies.
- line, err := cn.Rd.ReadLine()
+ line, err := rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
@@ -424,7 +441,7 @@ func (c *Client) TxPipeline() Pipeliner {
}
func (c *Client) pubSub() *PubSub {
- return &PubSub{
+ pubsub := &PubSub{
opt: c.opt,
newConn: func(channels []string) (*pool.Conn, error) {
@@ -432,6 +449,8 @@ func (c *Client) pubSub() *PubSub {
},
closeConn: c.connPool.CloseConn,
}
+ pubsub.init()
+ return pubsub
}
// Subscribe subscribes the client to the specified channels.