diff options
author | Christopher Speller <crspeller@gmail.com> | 2017-08-17 17:19:06 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-17 17:19:06 -0700 |
commit | 96eab1202717e073782ec399a4e0820cae15b1bb (patch) | |
tree | 011012982be971c7e9ef91466f026bc0956ac9a2 /vendor/github.com/go-redis/redis/pubsub.go | |
parent | 2c895ee66eed626721135acfcc48254c6e3f3b29 (diff) | |
download | chat-96eab1202717e073782ec399a4e0820cae15b1bb.tar.gz chat-96eab1202717e073782ec399a4e0820cae15b1bb.tar.bz2 chat-96eab1202717e073782ec399a4e0820cae15b1bb.zip |
Updating server dependancies. (#7246)
Diffstat (limited to 'vendor/github.com/go-redis/redis/pubsub.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/pubsub.go | 54 |
1 files changed, 25 insertions, 29 deletions
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go index 4872b4e88..4a5c65f57 100644 --- a/vendor/github.com/go-redis/redis/pubsub.go +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -17,7 +17,10 @@ import ( // PubSub automatically resubscribes to the channels and patterns // when Redis becomes unavailable. type PubSub struct { - base baseClient + opt *Options + + newConn func([]string) (*pool.Conn, error) + closeConn func(*pool.Conn) error mu sync.Mutex cn *pool.Conn @@ -30,12 +33,12 @@ type PubSub struct { func (c *PubSub) conn() (*pool.Conn, error) { c.mu.Lock() - cn, err := c._conn() + cn, err := c._conn(nil) c.mu.Unlock() return cn, err } -func (c *PubSub) _conn() (*pool.Conn, error) { +func (c *PubSub) _conn(channels []string) (*pool.Conn, error) { if c.closed { return nil, pool.ErrClosed } @@ -44,20 +47,13 @@ func (c *PubSub) _conn() (*pool.Conn, error) { return c.cn, nil } - cn, err := c.base.connPool.NewConn() + cn, err := c.newConn(channels) if err != nil { return nil, err } - if !cn.Inited { - if err := c.base.initConn(cn); err != nil { - _ = c.base.connPool.CloseConn(cn) - return nil, err - } - } - if err := c.resubscribe(cn); err != nil { - _ = c.base.connPool.CloseConn(cn) + _ = c.closeConn(cn) return nil, err } @@ -88,24 +84,24 @@ func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) } cmd := NewSliceCmd(args...) - cn.SetWriteTimeout(c.base.opt.WriteTimeout) + cn.SetWriteTimeout(c.opt.WriteTimeout) return writeCmd(cn, cmd) } -func (c *PubSub) putConn(cn *pool.Conn, err error) { - if !internal.IsBadConn(err, true) { - return - } - +func (c *PubSub) releaseConn(cn *pool.Conn, err error) { c.mu.Lock() - if c.cn == cn { - _ = c.closeConn() - } + c._releaseConn(cn, err) c.mu.Unlock() } -func (c *PubSub) closeConn() error { - err := c.base.connPool.CloseConn(c.cn) +func (c *PubSub) _releaseConn(cn *pool.Conn, err error) { + if internal.IsBadConn(err, true) && c.cn == cn { + _ = c.closeTheCn() + } +} + +func (c *PubSub) closeTheCn() error { + err := c.closeConn(c.cn) c.cn = nil return err } @@ -120,7 +116,7 @@ func (c *PubSub) Close() error { c.closed = true if c.cn != nil { - return c.closeConn() + return c.closeTheCn() } return nil } @@ -166,13 +162,13 @@ func (c *PubSub) PUnsubscribe(patterns ...string) error { } func (c *PubSub) subscribe(redisCmd string, channels ...string) error { - cn, err := c._conn() + cn, err := c._conn(channels) if err != nil { return err } err = c._subscribe(cn, redisCmd, channels...) - c.putConn(cn, err) + c._releaseConn(cn, err) return err } @@ -188,9 +184,9 @@ func (c *PubSub) Ping(payload ...string) error { return err } - cn.SetWriteTimeout(c.base.opt.WriteTimeout) + cn.SetWriteTimeout(c.opt.WriteTimeout) err = writeCmd(cn, cmd) - c.putConn(cn, err) + c.releaseConn(cn, err) return err } @@ -283,7 +279,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { cn.SetReadTimeout(timeout) err = c.cmd.readReply(cn) - c.putConn(cn, err) + c.releaseConn(cn, err) if err != nil { return nil, err } |