summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/pubsub.go
diff options
context:
space:
mode:
authorChristopher Speller <crspeller@gmail.com>2017-08-17 17:19:06 -0700
committerGitHub <noreply@github.com>2017-08-17 17:19:06 -0700
commit96eab1202717e073782ec399a4e0820cae15b1bb (patch)
tree011012982be971c7e9ef91466f026bc0956ac9a2 /vendor/github.com/go-redis/redis/pubsub.go
parent2c895ee66eed626721135acfcc48254c6e3f3b29 (diff)
downloadchat-96eab1202717e073782ec399a4e0820cae15b1bb.tar.gz
chat-96eab1202717e073782ec399a4e0820cae15b1bb.tar.bz2
chat-96eab1202717e073782ec399a4e0820cae15b1bb.zip
Updating server dependancies. (#7246)
Diffstat (limited to 'vendor/github.com/go-redis/redis/pubsub.go')
-rw-r--r--vendor/github.com/go-redis/redis/pubsub.go54
1 files changed, 25 insertions, 29 deletions
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go
index 4872b4e88..4a5c65f57 100644
--- a/vendor/github.com/go-redis/redis/pubsub.go
+++ b/vendor/github.com/go-redis/redis/pubsub.go
@@ -17,7 +17,10 @@ import (
// PubSub automatically resubscribes to the channels and patterns
// when Redis becomes unavailable.
type PubSub struct {
- base baseClient
+ opt *Options
+
+ newConn func([]string) (*pool.Conn, error)
+ closeConn func(*pool.Conn) error
mu sync.Mutex
cn *pool.Conn
@@ -30,12 +33,12 @@ type PubSub struct {
func (c *PubSub) conn() (*pool.Conn, error) {
c.mu.Lock()
- cn, err := c._conn()
+ cn, err := c._conn(nil)
c.mu.Unlock()
return cn, err
}
-func (c *PubSub) _conn() (*pool.Conn, error) {
+func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
if c.closed {
return nil, pool.ErrClosed
}
@@ -44,20 +47,13 @@ func (c *PubSub) _conn() (*pool.Conn, error) {
return c.cn, nil
}
- cn, err := c.base.connPool.NewConn()
+ cn, err := c.newConn(channels)
if err != nil {
return nil, err
}
- if !cn.Inited {
- if err := c.base.initConn(cn); err != nil {
- _ = c.base.connPool.CloseConn(cn)
- return nil, err
- }
- }
-
if err := c.resubscribe(cn); err != nil {
- _ = c.base.connPool.CloseConn(cn)
+ _ = c.closeConn(cn)
return nil, err
}
@@ -88,24 +84,24 @@ func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string)
}
cmd := NewSliceCmd(args...)
- cn.SetWriteTimeout(c.base.opt.WriteTimeout)
+ cn.SetWriteTimeout(c.opt.WriteTimeout)
return writeCmd(cn, cmd)
}
-func (c *PubSub) putConn(cn *pool.Conn, err error) {
- if !internal.IsBadConn(err, true) {
- return
- }
-
+func (c *PubSub) releaseConn(cn *pool.Conn, err error) {
c.mu.Lock()
- if c.cn == cn {
- _ = c.closeConn()
- }
+ c._releaseConn(cn, err)
c.mu.Unlock()
}
-func (c *PubSub) closeConn() error {
- err := c.base.connPool.CloseConn(c.cn)
+func (c *PubSub) _releaseConn(cn *pool.Conn, err error) {
+ if internal.IsBadConn(err, true) && c.cn == cn {
+ _ = c.closeTheCn()
+ }
+}
+
+func (c *PubSub) closeTheCn() error {
+ err := c.closeConn(c.cn)
c.cn = nil
return err
}
@@ -120,7 +116,7 @@ func (c *PubSub) Close() error {
c.closed = true
if c.cn != nil {
- return c.closeConn()
+ return c.closeTheCn()
}
return nil
}
@@ -166,13 +162,13 @@ func (c *PubSub) PUnsubscribe(patterns ...string) error {
}
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
- cn, err := c._conn()
+ cn, err := c._conn(channels)
if err != nil {
return err
}
err = c._subscribe(cn, redisCmd, channels...)
- c.putConn(cn, err)
+ c._releaseConn(cn, err)
return err
}
@@ -188,9 +184,9 @@ func (c *PubSub) Ping(payload ...string) error {
return err
}
- cn.SetWriteTimeout(c.base.opt.WriteTimeout)
+ cn.SetWriteTimeout(c.opt.WriteTimeout)
err = writeCmd(cn, cmd)
- c.putConn(cn, err)
+ c.releaseConn(cn, err)
return err
}
@@ -283,7 +279,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
cn.SetReadTimeout(timeout)
err = c.cmd.readReply(cn)
- c.putConn(cn, err)
+ c.releaseConn(cn, err)
if err != nil {
return nil, err
}