summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/pubsub.go')
-rw-r--r--vendor/github.com/go-redis/redis/pubsub.go396
1 files changed, 396 insertions, 0 deletions
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go
new file mode 100644
index 000000000..4872b4e88
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/pubsub.go
@@ -0,0 +1,396 @@
+package redis
+
+import (
+ "fmt"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/go-redis/redis/internal"
+ "github.com/go-redis/redis/internal/pool"
+)
+
+// PubSub implements Pub/Sub commands as described in
+// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
+// multiple goroutines.
+//
+// PubSub automatically resubscribes to the channels and patterns
+// when Redis becomes unavailable.
+type PubSub struct {
+ base baseClient
+
+ mu sync.Mutex
+ cn *pool.Conn
+ channels []string
+ patterns []string
+ closed bool
+
+ cmd *Cmd
+}
+
+func (c *PubSub) conn() (*pool.Conn, error) {
+ c.mu.Lock()
+ cn, err := c._conn()
+ c.mu.Unlock()
+ return cn, err
+}
+
+func (c *PubSub) _conn() (*pool.Conn, error) {
+ if c.closed {
+ return nil, pool.ErrClosed
+ }
+
+ if c.cn != nil {
+ return c.cn, nil
+ }
+
+ cn, err := c.base.connPool.NewConn()
+ if err != nil {
+ return nil, err
+ }
+
+ if !cn.Inited {
+ if err := c.base.initConn(cn); err != nil {
+ _ = c.base.connPool.CloseConn(cn)
+ return nil, err
+ }
+ }
+
+ if err := c.resubscribe(cn); err != nil {
+ _ = c.base.connPool.CloseConn(cn)
+ return nil, err
+ }
+
+ c.cn = cn
+ return cn, nil
+}
+
+func (c *PubSub) resubscribe(cn *pool.Conn) error {
+ var firstErr error
+ if len(c.channels) > 0 {
+ if err := c._subscribe(cn, "subscribe", c.channels...); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ if len(c.patterns) > 0 {
+ if err := c._subscribe(cn, "psubscribe", c.patterns...); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ return firstErr
+}
+
+func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
+ args := make([]interface{}, 1+len(channels))
+ args[0] = redisCmd
+ for i, channel := range channels {
+ args[1+i] = channel
+ }
+ cmd := NewSliceCmd(args...)
+
+ cn.SetWriteTimeout(c.base.opt.WriteTimeout)
+ return writeCmd(cn, cmd)
+}
+
+func (c *PubSub) putConn(cn *pool.Conn, err error) {
+ if !internal.IsBadConn(err, true) {
+ return
+ }
+
+ c.mu.Lock()
+ if c.cn == cn {
+ _ = c.closeConn()
+ }
+ c.mu.Unlock()
+}
+
+func (c *PubSub) closeConn() error {
+ err := c.base.connPool.CloseConn(c.cn)
+ c.cn = nil
+ return err
+}
+
+func (c *PubSub) Close() error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.closed {
+ return pool.ErrClosed
+ }
+ c.closed = true
+
+ if c.cn != nil {
+ return c.closeConn()
+ }
+ return nil
+}
+
+// Subscribes the client to the specified channels. It returns
+// empty subscription if there are no channels.
+func (c *PubSub) Subscribe(channels ...string) error {
+ c.mu.Lock()
+ err := c.subscribe("subscribe", channels...)
+ c.channels = appendIfNotExists(c.channels, channels...)
+ c.mu.Unlock()
+ return err
+}
+
+// Subscribes the client to the given patterns. It returns
+// empty subscription if there are no patterns.
+func (c *PubSub) PSubscribe(patterns ...string) error {
+ c.mu.Lock()
+ err := c.subscribe("psubscribe", patterns...)
+ c.patterns = appendIfNotExists(c.patterns, patterns...)
+ c.mu.Unlock()
+ return err
+}
+
+// Unsubscribes the client from the given channels, or from all of
+// them if none is given.
+func (c *PubSub) Unsubscribe(channels ...string) error {
+ c.mu.Lock()
+ err := c.subscribe("unsubscribe", channels...)
+ c.channels = remove(c.channels, channels...)
+ c.mu.Unlock()
+ return err
+}
+
+// Unsubscribes the client from the given patterns, or from all of
+// them if none is given.
+func (c *PubSub) PUnsubscribe(patterns ...string) error {
+ c.mu.Lock()
+ err := c.subscribe("punsubscribe", patterns...)
+ c.patterns = remove(c.patterns, patterns...)
+ c.mu.Unlock()
+ return err
+}
+
+func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
+ cn, err := c._conn()
+ if err != nil {
+ return err
+ }
+
+ err = c._subscribe(cn, redisCmd, channels...)
+ c.putConn(cn, err)
+ return err
+}
+
+func (c *PubSub) Ping(payload ...string) error {
+ args := []interface{}{"ping"}
+ if len(payload) == 1 {
+ args = append(args, payload[0])
+ }
+ cmd := NewCmd(args...)
+
+ cn, err := c.conn()
+ if err != nil {
+ return err
+ }
+
+ cn.SetWriteTimeout(c.base.opt.WriteTimeout)
+ err = writeCmd(cn, cmd)
+ c.putConn(cn, err)
+ return err
+}
+
+// Message received after a successful subscription to channel.
+type Subscription struct {
+ // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
+ Kind string
+ // Channel name we have subscribed to.
+ Channel string
+ // Number of channels we are currently subscribed to.
+ Count int
+}
+
+func (m *Subscription) String() string {
+ return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
+}
+
+// Message received as result of a PUBLISH command issued by another client.
+type Message struct {
+ Channel string
+ Pattern string
+ Payload string
+}
+
+func (m *Message) String() string {
+ return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
+}
+
+// Pong received as result of a PING command issued by another client.
+type Pong struct {
+ Payload string
+}
+
+func (p *Pong) String() string {
+ if p.Payload != "" {
+ return fmt.Sprintf("Pong<%s>", p.Payload)
+ }
+ return "Pong"
+}
+
+func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
+ switch reply := reply.(type) {
+ case string:
+ return &Pong{
+ Payload: reply,
+ }, nil
+ case []interface{}:
+ switch kind := reply[0].(string); kind {
+ case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
+ return &Subscription{
+ Kind: kind,
+ Channel: reply[1].(string),
+ Count: int(reply[2].(int64)),
+ }, nil
+ case "message":
+ return &Message{
+ Channel: reply[1].(string),
+ Payload: reply[2].(string),
+ }, nil
+ case "pmessage":
+ return &Message{
+ Pattern: reply[1].(string),
+ Channel: reply[2].(string),
+ Payload: reply[3].(string),
+ }, nil
+ case "pong":
+ return &Pong{
+ Payload: reply[1].(string),
+ }, nil
+ default:
+ return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
+ }
+ default:
+ return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
+ }
+}
+
+// ReceiveTimeout acts like Receive but returns an error if message
+// is not received in time. This is low-level API and most clients
+// should use ReceiveMessage.
+func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
+ if c.cmd == nil {
+ c.cmd = NewCmd()
+ }
+
+ cn, err := c.conn()
+ if err != nil {
+ return nil, err
+ }
+
+ cn.SetReadTimeout(timeout)
+ err = c.cmd.readReply(cn)
+ c.putConn(cn, err)
+ if err != nil {
+ return nil, err
+ }
+
+ return c.newMessage(c.cmd.Val())
+}
+
+// Receive returns a message as a Subscription, Message, Pong or error.
+// See PubSub example for details. This is low-level API and most clients
+// should use ReceiveMessage.
+func (c *PubSub) Receive() (interface{}, error) {
+ return c.ReceiveTimeout(0)
+}
+
+// ReceiveMessage returns a Message or error ignoring Subscription or Pong
+// messages. It automatically reconnects to Redis Server and resubscribes
+// to channels in case of network errors.
+func (c *PubSub) ReceiveMessage() (*Message, error) {
+ return c.receiveMessage(5 * time.Second)
+}
+
+func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
+ var errNum uint
+ for {
+ msgi, err := c.ReceiveTimeout(timeout)
+ if err != nil {
+ if !internal.IsNetworkError(err) {
+ return nil, err
+ }
+
+ errNum++
+ if errNum < 3 {
+ if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+ err := c.Ping()
+ if err != nil {
+ internal.Logf("PubSub.Ping failed: %s", err)
+ }
+ }
+ } else {
+ // 3 consequent errors - connection is broken or
+ // Redis Server is down.
+ // Sleep to not exceed max number of open connections.
+ time.Sleep(time.Second)
+ }
+ continue
+ }
+
+ // Reset error number, because we received a message.
+ errNum = 0
+
+ switch msg := msgi.(type) {
+ case *Subscription:
+ // Ignore.
+ case *Pong:
+ // Ignore.
+ case *Message:
+ return msg, nil
+ default:
+ return nil, fmt.Errorf("redis: unknown message: %T", msgi)
+ }
+ }
+}
+
+// Channel returns a channel for concurrently receiving messages.
+// The channel is closed with PubSub.
+func (c *PubSub) Channel() <-chan *Message {
+ ch := make(chan *Message, 100)
+ go func() {
+ for {
+ msg, err := c.ReceiveMessage()
+ if err != nil {
+ if err == pool.ErrClosed {
+ break
+ }
+ continue
+ }
+ ch <- msg
+ }
+ close(ch)
+ }()
+ return ch
+}
+
+func appendIfNotExists(ss []string, es ...string) []string {
+loop:
+ for _, e := range es {
+ for _, s := range ss {
+ if s == e {
+ continue loop
+ }
+ }
+ ss = append(ss, e)
+ }
+ return ss
+}
+
+func remove(ss []string, es ...string) []string {
+ if len(es) == 0 {
+ return ss[:0]
+ }
+ for _, e := range es {
+ for i, s := range ss {
+ if s == e {
+ ss = append(ss[:i], ss[i+1:]...)
+ break
+ }
+ }
+ }
+ return ss
+}