diff options
author | Christopher Speller <crspeller@gmail.com> | 2018-08-28 10:05:26 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-28 10:05:26 -0700 |
commit | 61e27beabc9804fdcf59ed9df2180802175a4f70 (patch) | |
tree | 52c86f5cdbd4e13d05b8f9dddad1a01b88e26cab /vendor/github.com/go-redis/redis/pubsub.go | |
parent | 347ee1d205c95f5fd766e206cc65bfb9782a2623 (diff) | |
download | chat-61e27beabc9804fdcf59ed9df2180802175a4f70.tar.gz chat-61e27beabc9804fdcf59ed9df2180802175a4f70.tar.bz2 chat-61e27beabc9804fdcf59ed9df2180802175a4f70.zip |
Updating dependancies. (#9303)
Diffstat (limited to 'vendor/github.com/go-redis/redis/pubsub.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/pubsub.go | 290 |
1 files changed, 180 insertions, 110 deletions
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go index b56728f3e..b08f34ad2 100644 --- a/vendor/github.com/go-redis/redis/pubsub.go +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -2,20 +2,20 @@ package redis import ( "fmt" - "net" "sync" "time" "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/pool" + "github.com/go-redis/redis/internal/proto" ) -// PubSub implements Pub/Sub commands as described in -// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by -// multiple goroutines. +// PubSub implements Pub/Sub commands bas described in +// http://redis.io/topics/pubsub. Message receiving is NOT safe +// for concurrent use by multiple goroutines. // -// PubSub automatically resubscribes to the channels and patterns -// when Redis becomes unavailable. +// PubSub automatically reconnects to Redis Server and resubscribes +// to the channels in case of network errors. type PubSub struct { opt *Options @@ -27,11 +27,17 @@ type PubSub struct { channels map[string]struct{} patterns map[string]struct{} closed bool + exit chan struct{} cmd *Cmd chOnce sync.Once ch chan *Message + ping chan struct{} +} + +func (c *PubSub) init() { + c.exit = make(chan struct{}) } func (c *PubSub) conn() (*pool.Conn, error) { @@ -41,7 +47,7 @@ func (c *PubSub) conn() (*pool.Conn, error) { return cn, err } -func (c *PubSub) _conn(channels []string) (*pool.Conn, error) { +func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) { if c.closed { return nil, pool.ErrClosed } @@ -50,6 +56,9 @@ func (c *PubSub) _conn(channels []string) (*pool.Conn, error) { return c.cn, nil } + channels := mapKeys(c.channels) + channels = append(channels, newChannels...) + cn, err := c.newConn(channels) if err != nil { return nil, err @@ -64,61 +73,81 @@ func (c *PubSub) _conn(channels []string) (*pool.Conn, error) { return cn, nil } +func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error { + return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmd) + }) +} + func (c *PubSub) resubscribe(cn *pool.Conn) error { var firstErr error + if len(c.channels) > 0 { - channels := make([]string, len(c.channels)) - i := 0 - for channel := range c.channels { - channels[i] = channel - i++ - } - if err := c._subscribe(cn, "subscribe", channels...); err != nil && firstErr == nil { + err := c._subscribe(cn, "subscribe", mapKeys(c.channels)) + if err != nil && firstErr == nil { firstErr = err } } + if len(c.patterns) > 0 { - patterns := make([]string, len(c.patterns)) - i := 0 - for pattern := range c.patterns { - patterns[i] = pattern - i++ - } - if err := c._subscribe(cn, "psubscribe", patterns...); err != nil && firstErr == nil { + err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns)) + if err != nil && firstErr == nil { firstErr = err } } + return firstErr } -func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error { - args := make([]interface{}, 1+len(channels)) - args[0] = redisCmd - for i, channel := range channels { - args[1+i] = channel +func mapKeys(m map[string]struct{}) []string { + s := make([]string, len(m)) + i := 0 + for k := range m { + s[i] = k + i++ } - cmd := NewSliceCmd(args...) + return s +} - cn.SetWriteTimeout(c.opt.WriteTimeout) - return writeCmd(cn, cmd) +func (c *PubSub) _subscribe( + cn *pool.Conn, redisCmd string, channels []string, +) error { + args := make([]interface{}, 0, 1+len(channels)) + args = append(args, redisCmd) + for _, channel := range channels { + args = append(args, channel) + } + cmd := NewSliceCmd(args...) + return c.writeCmd(cn, cmd) } -func (c *PubSub) releaseConn(cn *pool.Conn, err error) { +func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) { c.mu.Lock() - c._releaseConn(cn, err) + c._releaseConn(cn, err, allowTimeout) c.mu.Unlock() } -func (c *PubSub) _releaseConn(cn *pool.Conn, err error) { +func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) { if c.cn != cn { return } - if internal.IsBadConn(err, true) { - _ = c.closeTheCn() + if internal.IsBadConn(err, allowTimeout) { + c._reconnect(err) } } -func (c *PubSub) closeTheCn() error { +func (c *PubSub) _reconnect(reason error) { + _ = c._closeTheCn(reason) + _, _ = c._conn(nil) +} + +func (c *PubSub) _closeTheCn(reason error) error { + if c.cn == nil { + return nil + } + if !c.closed { + internal.Logf("redis: discarding bad PubSub connection: %s", reason) + } err := c.closeConn(c.cn) c.cn = nil return err @@ -132,25 +161,25 @@ func (c *PubSub) Close() error { return pool.ErrClosed } c.closed = true + close(c.exit) - if c.cn != nil { - return c.closeTheCn() - } - return nil + err := c._closeTheCn(pool.ErrClosed) + return err } // Subscribe the client to the specified channels. It returns // empty subscription if there are no channels. func (c *PubSub) Subscribe(channels ...string) error { c.mu.Lock() + defer c.mu.Unlock() + err := c.subscribe("subscribe", channels...) if c.channels == nil { c.channels = make(map[string]struct{}) } - for _, channel := range channels { - c.channels[channel] = struct{}{} + for _, s := range channels { + c.channels[s] = struct{}{} } - c.mu.Unlock() return err } @@ -158,14 +187,15 @@ func (c *PubSub) Subscribe(channels ...string) error { // empty subscription if there are no patterns. func (c *PubSub) PSubscribe(patterns ...string) error { c.mu.Lock() + defer c.mu.Unlock() + err := c.subscribe("psubscribe", patterns...) if c.patterns == nil { c.patterns = make(map[string]struct{}) } - for _, pattern := range patterns { - c.patterns[pattern] = struct{}{} + for _, s := range patterns { + c.patterns[s] = struct{}{} } - c.mu.Unlock() return err } @@ -173,11 +203,12 @@ func (c *PubSub) PSubscribe(patterns ...string) error { // them if none is given. func (c *PubSub) Unsubscribe(channels ...string) error { c.mu.Lock() - err := c.subscribe("unsubscribe", channels...) + defer c.mu.Unlock() + for _, channel := range channels { delete(c.channels, channel) } - c.mu.Unlock() + err := c.subscribe("unsubscribe", channels...) return err } @@ -185,11 +216,12 @@ func (c *PubSub) Unsubscribe(channels ...string) error { // them if none is given. func (c *PubSub) PUnsubscribe(patterns ...string) error { c.mu.Lock() - err := c.subscribe("punsubscribe", patterns...) + defer c.mu.Unlock() + for _, pattern := range patterns { delete(c.patterns, pattern) } - c.mu.Unlock() + err := c.subscribe("punsubscribe", patterns...) return err } @@ -199,8 +231,8 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error { return err } - err = c._subscribe(cn, redisCmd, channels...) - c._releaseConn(cn, err) + err = c._subscribe(cn, redisCmd, channels) + c._releaseConn(cn, err, false) return err } @@ -216,9 +248,8 @@ func (c *PubSub) Ping(payload ...string) error { return err } - cn.SetWriteTimeout(c.opt.WriteTimeout) - err = writeCmd(cn, cmd) - c.releaseConn(cn, err) + err = c.writeCmd(cn, cmd) + c.releaseConn(cn, err, false) return err } @@ -297,8 +328,8 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { } // ReceiveTimeout acts like Receive but returns an error if message -// is not received in time. This is low-level API and most clients -// should use ReceiveMessage. +// is not received in time. This is low-level API and in most cases +// Channel should be used instead. func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { if c.cmd == nil { c.cmd = NewCmd() @@ -309,9 +340,11 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { return nil, err } - cn.SetReadTimeout(timeout) - err = c.cmd.readReply(cn) - c.releaseConn(cn, err) + err = cn.WithReader(timeout, func(rd *proto.Reader) error { + return c.cmd.readReply(rd) + }) + + c.releaseConn(cn, err, timeout > 0) if err != nil { return nil, err } @@ -320,49 +353,23 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { } // Receive returns a message as a Subscription, Message, Pong or error. -// See PubSub example for details. This is low-level API and most clients -// should use ReceiveMessage. +// See PubSub example for details. This is low-level API and in most cases +// Channel should be used instead. func (c *PubSub) Receive() (interface{}, error) { return c.ReceiveTimeout(0) } -// ReceiveMessage returns a Message or error ignoring Subscription or Pong -// messages. It automatically reconnects to Redis Server and resubscribes -// to channels in case of network errors. +// ReceiveMessage returns a Message or error ignoring Subscription and Pong +// messages. This is low-level API and in most cases Channel should be used +// instead. func (c *PubSub) ReceiveMessage() (*Message, error) { - return c.receiveMessage(5 * time.Second) -} - -func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) { - var errNum uint for { - msgi, err := c.ReceiveTimeout(timeout) + msg, err := c.Receive() if err != nil { - if !internal.IsNetworkError(err) { - return nil, err - } - - errNum++ - if errNum < 3 { - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - err := c.Ping() - if err != nil { - internal.Logf("PubSub.Ping failed: %s", err) - } - } - } else { - // 3 consequent errors - connection is broken or - // Redis Server is down. - // Sleep to not exceed max number of open connections. - time.Sleep(time.Second) - } - continue + return nil, err } - // Reset error number, because we received a message. - errNum = 0 - - switch msg := msgi.(type) { + switch msg := msg.(type) { case *Subscription: // Ignore. case *Pong: @@ -370,30 +377,93 @@ func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) { case *Message: return msg, nil default: - return nil, fmt.Errorf("redis: unknown message: %T", msgi) + err := fmt.Errorf("redis: unknown message: %T", msg) + return nil, err } } } // Channel returns a Go channel for concurrently receiving messages. -// The channel is closed with PubSub. Receive or ReceiveMessage APIs -// can not be used after channel is created. +// It periodically sends Ping messages to test connection health. +// The channel is closed with PubSub. Receive* APIs can not be used +// after channel is created. func (c *PubSub) Channel() <-chan *Message { - c.chOnce.Do(func() { - c.ch = make(chan *Message, 100) - go func() { - for { - msg, err := c.ReceiveMessage() - if err != nil { - if err == pool.ErrClosed { - break - } - continue + c.chOnce.Do(c.initChannel) + return c.ch +} + +func (c *PubSub) initChannel() { + c.ch = make(chan *Message, 100) + c.ping = make(chan struct{}, 10) + + go func() { + var errCount int + for { + msg, err := c.Receive() + if err != nil { + if err == pool.ErrClosed { + close(c.ch) + return } + if errCount > 0 { + time.Sleep(c.retryBackoff(errCount)) + } + errCount++ + continue + } + errCount = 0 + + // Any message is as good as a ping. + select { + case c.ping <- struct{}{}: + default: + } + + switch msg := msg.(type) { + case *Subscription: + // Ignore. + case *Pong: + // Ignore. + case *Message: c.ch <- msg + default: + internal.Logf("redis: unknown message: %T", msg) } - close(c.ch) - }() - }) - return c.ch + } + }() + + go func() { + const timeout = 5 * time.Second + + timer := time.NewTimer(timeout) + timer.Stop() + + healthy := true + var pingErr error + for { + timer.Reset(timeout) + select { + case <-c.ping: + healthy = true + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + pingErr = c.Ping() + if healthy { + healthy = false + } else { + c.mu.Lock() + c._reconnect(pingErr) + c.mu.Unlock() + } + case <-c.exit: + return + } + } + }() +} + +func (c *PubSub) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) } |