summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/pubsub.go')
-rw-r--r--vendor/github.com/go-redis/redis/pubsub.go36
1 files changed, 21 insertions, 15 deletions
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go
index e754a16f2..01f8a61aa 100644
--- a/vendor/github.com/go-redis/redis/pubsub.go
+++ b/vendor/github.com/go-redis/redis/pubsub.go
@@ -29,6 +29,9 @@ type PubSub struct {
closed bool
cmd *Cmd
+
+ chOnce sync.Once
+ ch chan *Message
}
func (c *PubSub) conn() (*pool.Conn, error) {
@@ -346,24 +349,27 @@ func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
}
}
-// Channel returns a channel for concurrently receiving messages.
-// The channel is closed with PubSub.
+// Channel returns a Go channel for concurrently receiving messages.
+// The channel is closed with PubSub. Receive or ReceiveMessage APIs
+// can not be used after channel is created.
func (c *PubSub) Channel() <-chan *Message {
- ch := make(chan *Message, 100)
- go func() {
- for {
- msg, err := c.ReceiveMessage()
- if err != nil {
- if err == pool.ErrClosed {
- break
+ c.chOnce.Do(func() {
+ c.ch = make(chan *Message, 100)
+ go func() {
+ for {
+ msg, err := c.ReceiveMessage()
+ if err != nil {
+ if err == pool.ErrClosed {
+ break
+ }
+ continue
}
- continue
+ c.ch <- msg
}
- ch <- msg
- }
- close(ch)
- }()
- return ch
+ close(c.ch)
+ }()
+ })
+ return c.ch
}
func appendIfNotExists(ss []string, es ...string) []string {