summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/pubsub.go')
-rw-r--r--vendor/github.com/go-redis/redis/pubsub.go290
1 files changed, 180 insertions, 110 deletions
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go
index b56728f3e..b08f34ad2 100644
--- a/vendor/github.com/go-redis/redis/pubsub.go
+++ b/vendor/github.com/go-redis/redis/pubsub.go
@@ -2,20 +2,20 @@ package redis
import (
"fmt"
- "net"
"sync"
"time"
"github.com/go-redis/redis/internal"
"github.com/go-redis/redis/internal/pool"
+ "github.com/go-redis/redis/internal/proto"
)
-// PubSub implements Pub/Sub commands as described in
-// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
-// multiple goroutines.
+// PubSub implements Pub/Sub commands bas described in
+// http://redis.io/topics/pubsub. Message receiving is NOT safe
+// for concurrent use by multiple goroutines.
//
-// PubSub automatically resubscribes to the channels and patterns
-// when Redis becomes unavailable.
+// PubSub automatically reconnects to Redis Server and resubscribes
+// to the channels in case of network errors.
type PubSub struct {
opt *Options
@@ -27,11 +27,17 @@ type PubSub struct {
channels map[string]struct{}
patterns map[string]struct{}
closed bool
+ exit chan struct{}
cmd *Cmd
chOnce sync.Once
ch chan *Message
+ ping chan struct{}
+}
+
+func (c *PubSub) init() {
+ c.exit = make(chan struct{})
}
func (c *PubSub) conn() (*pool.Conn, error) {
@@ -41,7 +47,7 @@ func (c *PubSub) conn() (*pool.Conn, error) {
return cn, err
}
-func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
+func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
if c.closed {
return nil, pool.ErrClosed
}
@@ -50,6 +56,9 @@ func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
return c.cn, nil
}
+ channels := mapKeys(c.channels)
+ channels = append(channels, newChannels...)
+
cn, err := c.newConn(channels)
if err != nil {
return nil, err
@@ -64,61 +73,81 @@ func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
return cn, nil
}
+func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error {
+ return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmd)
+ })
+}
+
func (c *PubSub) resubscribe(cn *pool.Conn) error {
var firstErr error
+
if len(c.channels) > 0 {
- channels := make([]string, len(c.channels))
- i := 0
- for channel := range c.channels {
- channels[i] = channel
- i++
- }
- if err := c._subscribe(cn, "subscribe", channels...); err != nil && firstErr == nil {
+ err := c._subscribe(cn, "subscribe", mapKeys(c.channels))
+ if err != nil && firstErr == nil {
firstErr = err
}
}
+
if len(c.patterns) > 0 {
- patterns := make([]string, len(c.patterns))
- i := 0
- for pattern := range c.patterns {
- patterns[i] = pattern
- i++
- }
- if err := c._subscribe(cn, "psubscribe", patterns...); err != nil && firstErr == nil {
+ err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns))
+ if err != nil && firstErr == nil {
firstErr = err
}
}
+
return firstErr
}
-func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
- args := make([]interface{}, 1+len(channels))
- args[0] = redisCmd
- for i, channel := range channels {
- args[1+i] = channel
+func mapKeys(m map[string]struct{}) []string {
+ s := make([]string, len(m))
+ i := 0
+ for k := range m {
+ s[i] = k
+ i++
}
- cmd := NewSliceCmd(args...)
+ return s
+}
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- return writeCmd(cn, cmd)
+func (c *PubSub) _subscribe(
+ cn *pool.Conn, redisCmd string, channels []string,
+) error {
+ args := make([]interface{}, 0, 1+len(channels))
+ args = append(args, redisCmd)
+ for _, channel := range channels {
+ args = append(args, channel)
+ }
+ cmd := NewSliceCmd(args...)
+ return c.writeCmd(cn, cmd)
}
-func (c *PubSub) releaseConn(cn *pool.Conn, err error) {
+func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
c.mu.Lock()
- c._releaseConn(cn, err)
+ c._releaseConn(cn, err, allowTimeout)
c.mu.Unlock()
}
-func (c *PubSub) _releaseConn(cn *pool.Conn, err error) {
+func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
if c.cn != cn {
return
}
- if internal.IsBadConn(err, true) {
- _ = c.closeTheCn()
+ if internal.IsBadConn(err, allowTimeout) {
+ c._reconnect(err)
}
}
-func (c *PubSub) closeTheCn() error {
+func (c *PubSub) _reconnect(reason error) {
+ _ = c._closeTheCn(reason)
+ _, _ = c._conn(nil)
+}
+
+func (c *PubSub) _closeTheCn(reason error) error {
+ if c.cn == nil {
+ return nil
+ }
+ if !c.closed {
+ internal.Logf("redis: discarding bad PubSub connection: %s", reason)
+ }
err := c.closeConn(c.cn)
c.cn = nil
return err
@@ -132,25 +161,25 @@ func (c *PubSub) Close() error {
return pool.ErrClosed
}
c.closed = true
+ close(c.exit)
- if c.cn != nil {
- return c.closeTheCn()
- }
- return nil
+ err := c._closeTheCn(pool.ErrClosed)
+ return err
}
// Subscribe the client to the specified channels. It returns
// empty subscription if there are no channels.
func (c *PubSub) Subscribe(channels ...string) error {
c.mu.Lock()
+ defer c.mu.Unlock()
+
err := c.subscribe("subscribe", channels...)
if c.channels == nil {
c.channels = make(map[string]struct{})
}
- for _, channel := range channels {
- c.channels[channel] = struct{}{}
+ for _, s := range channels {
+ c.channels[s] = struct{}{}
}
- c.mu.Unlock()
return err
}
@@ -158,14 +187,15 @@ func (c *PubSub) Subscribe(channels ...string) error {
// empty subscription if there are no patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
c.mu.Lock()
+ defer c.mu.Unlock()
+
err := c.subscribe("psubscribe", patterns...)
if c.patterns == nil {
c.patterns = make(map[string]struct{})
}
- for _, pattern := range patterns {
- c.patterns[pattern] = struct{}{}
+ for _, s := range patterns {
+ c.patterns[s] = struct{}{}
}
- c.mu.Unlock()
return err
}
@@ -173,11 +203,12 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
c.mu.Lock()
- err := c.subscribe("unsubscribe", channels...)
+ defer c.mu.Unlock()
+
for _, channel := range channels {
delete(c.channels, channel)
}
- c.mu.Unlock()
+ err := c.subscribe("unsubscribe", channels...)
return err
}
@@ -185,11 +216,12 @@ func (c *PubSub) Unsubscribe(channels ...string) error {
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
c.mu.Lock()
- err := c.subscribe("punsubscribe", patterns...)
+ defer c.mu.Unlock()
+
for _, pattern := range patterns {
delete(c.patterns, pattern)
}
- c.mu.Unlock()
+ err := c.subscribe("punsubscribe", patterns...)
return err
}
@@ -199,8 +231,8 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
return err
}
- err = c._subscribe(cn, redisCmd, channels...)
- c._releaseConn(cn, err)
+ err = c._subscribe(cn, redisCmd, channels)
+ c._releaseConn(cn, err, false)
return err
}
@@ -216,9 +248,8 @@ func (c *PubSub) Ping(payload ...string) error {
return err
}
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- err = writeCmd(cn, cmd)
- c.releaseConn(cn, err)
+ err = c.writeCmd(cn, cmd)
+ c.releaseConn(cn, err, false)
return err
}
@@ -297,8 +328,8 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
}
// ReceiveTimeout acts like Receive but returns an error if message
-// is not received in time. This is low-level API and most clients
-// should use ReceiveMessage.
+// is not received in time. This is low-level API and in most cases
+// Channel should be used instead.
func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
if c.cmd == nil {
c.cmd = NewCmd()
@@ -309,9 +340,11 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
return nil, err
}
- cn.SetReadTimeout(timeout)
- err = c.cmd.readReply(cn)
- c.releaseConn(cn, err)
+ err = cn.WithReader(timeout, func(rd *proto.Reader) error {
+ return c.cmd.readReply(rd)
+ })
+
+ c.releaseConn(cn, err, timeout > 0)
if err != nil {
return nil, err
}
@@ -320,49 +353,23 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
}
// Receive returns a message as a Subscription, Message, Pong or error.
-// See PubSub example for details. This is low-level API and most clients
-// should use ReceiveMessage.
+// See PubSub example for details. This is low-level API and in most cases
+// Channel should be used instead.
func (c *PubSub) Receive() (interface{}, error) {
return c.ReceiveTimeout(0)
}
-// ReceiveMessage returns a Message or error ignoring Subscription or Pong
-// messages. It automatically reconnects to Redis Server and resubscribes
-// to channels in case of network errors.
+// ReceiveMessage returns a Message or error ignoring Subscription and Pong
+// messages. This is low-level API and in most cases Channel should be used
+// instead.
func (c *PubSub) ReceiveMessage() (*Message, error) {
- return c.receiveMessage(5 * time.Second)
-}
-
-func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
- var errNum uint
for {
- msgi, err := c.ReceiveTimeout(timeout)
+ msg, err := c.Receive()
if err != nil {
- if !internal.IsNetworkError(err) {
- return nil, err
- }
-
- errNum++
- if errNum < 3 {
- if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
- err := c.Ping()
- if err != nil {
- internal.Logf("PubSub.Ping failed: %s", err)
- }
- }
- } else {
- // 3 consequent errors - connection is broken or
- // Redis Server is down.
- // Sleep to not exceed max number of open connections.
- time.Sleep(time.Second)
- }
- continue
+ return nil, err
}
- // Reset error number, because we received a message.
- errNum = 0
-
- switch msg := msgi.(type) {
+ switch msg := msg.(type) {
case *Subscription:
// Ignore.
case *Pong:
@@ -370,30 +377,93 @@ func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
case *Message:
return msg, nil
default:
- return nil, fmt.Errorf("redis: unknown message: %T", msgi)
+ err := fmt.Errorf("redis: unknown message: %T", msg)
+ return nil, err
}
}
}
// Channel returns a Go channel for concurrently receiving messages.
-// The channel is closed with PubSub. Receive or ReceiveMessage APIs
-// can not be used after channel is created.
+// It periodically sends Ping messages to test connection health.
+// The channel is closed with PubSub. Receive* APIs can not be used
+// after channel is created.
func (c *PubSub) Channel() <-chan *Message {
- c.chOnce.Do(func() {
- c.ch = make(chan *Message, 100)
- go func() {
- for {
- msg, err := c.ReceiveMessage()
- if err != nil {
- if err == pool.ErrClosed {
- break
- }
- continue
+ c.chOnce.Do(c.initChannel)
+ return c.ch
+}
+
+func (c *PubSub) initChannel() {
+ c.ch = make(chan *Message, 100)
+ c.ping = make(chan struct{}, 10)
+
+ go func() {
+ var errCount int
+ for {
+ msg, err := c.Receive()
+ if err != nil {
+ if err == pool.ErrClosed {
+ close(c.ch)
+ return
}
+ if errCount > 0 {
+ time.Sleep(c.retryBackoff(errCount))
+ }
+ errCount++
+ continue
+ }
+ errCount = 0
+
+ // Any message is as good as a ping.
+ select {
+ case c.ping <- struct{}{}:
+ default:
+ }
+
+ switch msg := msg.(type) {
+ case *Subscription:
+ // Ignore.
+ case *Pong:
+ // Ignore.
+ case *Message:
c.ch <- msg
+ default:
+ internal.Logf("redis: unknown message: %T", msg)
}
- close(c.ch)
- }()
- })
- return c.ch
+ }
+ }()
+
+ go func() {
+ const timeout = 5 * time.Second
+
+ timer := time.NewTimer(timeout)
+ timer.Stop()
+
+ healthy := true
+ var pingErr error
+ for {
+ timer.Reset(timeout)
+ select {
+ case <-c.ping:
+ healthy = true
+ if !timer.Stop() {
+ <-timer.C
+ }
+ case <-timer.C:
+ pingErr = c.Ping()
+ if healthy {
+ healthy = false
+ } else {
+ c.mu.Lock()
+ c._reconnect(pingErr)
+ c.mu.Unlock()
+ }
+ case <-c.exit:
+ return
+ }
+ }
+ }()
+}
+
+func (c *PubSub) retryBackoff(attempt int) time.Duration {
+ return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}