diff options
author | Christopher Speller <crspeller@gmail.com> | 2018-08-28 10:05:26 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-28 10:05:26 -0700 |
commit | 61e27beabc9804fdcf59ed9df2180802175a4f70 (patch) | |
tree | 52c86f5cdbd4e13d05b8f9dddad1a01b88e26cab /vendor/github.com/go-redis/redis/redis.go | |
parent | 347ee1d205c95f5fd766e206cc65bfb9782a2623 (diff) | |
download | chat-61e27beabc9804fdcf59ed9df2180802175a4f70.tar.gz chat-61e27beabc9804fdcf59ed9df2180802175a4f70.tar.bz2 chat-61e27beabc9804fdcf59ed9df2180802175a4f70.zip |
Updating dependancies. (#9303)
Diffstat (limited to 'vendor/github.com/go-redis/redis/redis.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/redis.go | 91 |
1 files changed, 55 insertions, 36 deletions
diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go index beb632e1e..3e72bf060 100644 --- a/vendor/github.com/go-redis/redis/redis.go +++ b/vendor/github.com/go-redis/redis/redis.go @@ -50,7 +50,7 @@ func (c *baseClient) newConn() (*pool.Conn, error) { return nil, err } - if !cn.Inited { + if cn.InitedAt.IsZero() { if err := c.initConn(cn); err != nil { _ = c.connPool.CloseConn(cn) return nil, err @@ -66,7 +66,7 @@ func (c *baseClient) getConn() (*pool.Conn, error) { return nil, err } - if !cn.Inited { + if cn.InitedAt.IsZero() { err := c.initConn(cn) if err != nil { c.connPool.Remove(cn) @@ -88,7 +88,7 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { } func (c *baseClient) initConn(cn *pool.Conn) error { - cn.Inited = true + cn.InitedAt = time.Now() if c.opt.Password == "" && c.opt.DB == 0 && @@ -123,8 +123,17 @@ func (c *baseClient) initConn(cn *pool.Conn) error { return nil } +// Do creates a Cmd from the args and processes the cmd. +func (c *baseClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + // WrapProcess wraps function that processes Redis commands. -func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { +func (c *baseClient) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { c.process = fn(c.process) } @@ -147,8 +156,10 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { return err } - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := writeCmd(cn, cmd); err != nil { + err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmd) + }) + if err != nil { c.releaseConn(cn, err) cmd.setErr(err) if internal.IsRetryableError(err, true) { @@ -157,8 +168,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { return err } - cn.SetReadTimeout(c.cmdTimeout(cmd)) - err = cmd.readReply(cn) + err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error { + return cmd.readReply(rd) + }) c.releaseConn(cn, err) if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { continue @@ -176,9 +188,8 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration { func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { if timeout := cmd.readTimeout(); timeout != nil { - return *timeout + return readTimeout(*timeout) } - return c.opt.ReadTimeout } @@ -244,24 +255,27 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e break } } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := writeCmd(cn, cmds...); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmds...) + }) + if err != nil { setCmdsErr(cmds, err) return true, err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - return true, pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + return pipelineReadCmds(rd, cmds) + }) + return true, err } -func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { +func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { for _, cmd := range cmds { - err := cmd.readReply(cn) + err := cmd.readReply(rd) if err != nil && !internal.IsRedisError(err) { return err } @@ -270,47 +284,50 @@ func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { } func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return txPipelineWriteMulti(wr, cmds) + }) + if err != nil { setCmdsErr(cmds, err) return true, err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - if err := c.txPipelineReadQueued(cn, cmds); err != nil { - setCmdsErr(cmds, err) - return false, err - } - - return false, pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + err := txPipelineReadQueued(rd, cmds) + if err != nil { + setCmdsErr(cmds, err) + return err + } + return pipelineReadCmds(rd, cmds) + }) + return false, err } -func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { +func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error { multiExec := make([]Cmder, 0, len(cmds)+2) multiExec = append(multiExec, NewStatusCmd("MULTI")) multiExec = append(multiExec, cmds...) multiExec = append(multiExec, NewSliceCmd("EXEC")) - return writeCmd(cn, multiExec...) + return writeCmd(wr, multiExec...) } -func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error { +func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error { // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil { + err := statusCmd.readReply(rd) + if err != nil { return err } for _ = range cmds { - err := statusCmd.readReply(cn) + err = statusCmd.readReply(rd) if err != nil && !internal.IsRedisError(err) { return err } } // Parse number of replies. - line, err := cn.Rd.ReadLine() + line, err := rd.ReadLine() if err != nil { if err == Nil { err = TxFailedErr @@ -424,7 +441,7 @@ func (c *Client) TxPipeline() Pipeliner { } func (c *Client) pubSub() *PubSub { - return &PubSub{ + pubsub := &PubSub{ opt: c.opt, newConn: func(channels []string) (*pool.Conn, error) { @@ -432,6 +449,8 @@ func (c *Client) pubSub() *PubSub { }, closeConn: c.connPool.CloseConn, } + pubsub.init() + return pubsub } // Subscribe subscribes the client to the specified channels. |