summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/pubsub_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/pubsub_test.go')
-rw-r--r--vendor/github.com/go-redis/redis/pubsub_test.go443
1 files changed, 0 insertions, 443 deletions
diff --git a/vendor/github.com/go-redis/redis/pubsub_test.go b/vendor/github.com/go-redis/redis/pubsub_test.go
deleted file mode 100644
index 6a85bd038..000000000
--- a/vendor/github.com/go-redis/redis/pubsub_test.go
+++ /dev/null
@@ -1,443 +0,0 @@
-package redis_test
-
-import (
- "io"
- "net"
- "sync"
- "time"
-
- "github.com/go-redis/redis"
-
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-)
-
-var _ = Describe("PubSub", func() {
- var client *redis.Client
-
- BeforeEach(func() {
- client = redis.NewClient(redisOptions())
- Expect(client.FlushDB().Err()).NotTo(HaveOccurred())
- })
-
- AfterEach(func() {
- Expect(client.Close()).NotTo(HaveOccurred())
- })
-
- It("should support pattern matching", func() {
- pubsub := client.PSubscribe("mychannel*")
- defer pubsub.Close()
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- subscr := msgi.(*redis.Subscription)
- Expect(subscr.Kind).To(Equal("psubscribe"))
- Expect(subscr.Channel).To(Equal("mychannel*"))
- Expect(subscr.Count).To(Equal(1))
- }
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err.(net.Error).Timeout()).To(Equal(true))
- Expect(msgi).To(BeNil())
- }
-
- n, err := client.Publish("mychannel1", "hello").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(n).To(Equal(int64(1)))
-
- Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred())
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- subscr := msgi.(*redis.Message)
- Expect(subscr.Channel).To(Equal("mychannel1"))
- Expect(subscr.Pattern).To(Equal("mychannel*"))
- Expect(subscr.Payload).To(Equal("hello"))
- }
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- subscr := msgi.(*redis.Subscription)
- Expect(subscr.Kind).To(Equal("punsubscribe"))
- Expect(subscr.Channel).To(Equal("mychannel*"))
- Expect(subscr.Count).To(Equal(0))
- }
-
- stats := client.PoolStats()
- Expect(stats.Misses).To(Equal(uint32(2)))
- })
-
- It("should pub/sub channels", func() {
- channels, err := client.PubSubChannels("mychannel*").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(channels).To(BeEmpty())
-
- pubsub := client.Subscribe("mychannel", "mychannel2")
- defer pubsub.Close()
-
- channels, err = client.PubSubChannels("mychannel*").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
-
- channels, err = client.PubSubChannels("").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(channels).To(BeEmpty())
-
- channels, err = client.PubSubChannels("*").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(len(channels)).To(BeNumerically(">=", 2))
- })
-
- It("should return the numbers of subscribers", func() {
- pubsub := client.Subscribe("mychannel", "mychannel2")
- defer pubsub.Close()
-
- channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(channels).To(Equal(map[string]int64{
- "mychannel": 1,
- "mychannel2": 1,
- "mychannel3": 0,
- }))
- })
-
- It("should return the numbers of subscribers by pattern", func() {
- num, err := client.PubSubNumPat().Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(num).To(Equal(int64(0)))
-
- pubsub := client.PSubscribe("*")
- defer pubsub.Close()
-
- num, err = client.PubSubNumPat().Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(num).To(Equal(int64(1)))
- })
-
- It("should pub/sub", func() {
- pubsub := client.Subscribe("mychannel", "mychannel2")
- defer pubsub.Close()
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- subscr := msgi.(*redis.Subscription)
- Expect(subscr.Kind).To(Equal("subscribe"))
- Expect(subscr.Channel).To(Equal("mychannel"))
- Expect(subscr.Count).To(Equal(1))
- }
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- subscr := msgi.(*redis.Subscription)
- Expect(subscr.Kind).To(Equal("subscribe"))
- Expect(subscr.Channel).To(Equal("mychannel2"))
- Expect(subscr.Count).To(Equal(2))
- }
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err.(net.Error).Timeout()).To(Equal(true))
- Expect(msgi).NotTo(HaveOccurred())
- }
-
- n, err := client.Publish("mychannel", "hello").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(n).To(Equal(int64(1)))
-
- n, err = client.Publish("mychannel2", "hello2").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(n).To(Equal(int64(1)))
-
- Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- msg := msgi.(*redis.Message)
- Expect(msg.Channel).To(Equal("mychannel"))
- Expect(msg.Payload).To(Equal("hello"))
- }
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- msg := msgi.(*redis.Message)
- Expect(msg.Channel).To(Equal("mychannel2"))
- Expect(msg.Payload).To(Equal("hello2"))
- }
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- subscr := msgi.(*redis.Subscription)
- Expect(subscr.Kind).To(Equal("unsubscribe"))
- Expect(subscr.Channel).To(Equal("mychannel"))
- Expect(subscr.Count).To(Equal(1))
- }
-
- {
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- subscr := msgi.(*redis.Subscription)
- Expect(subscr.Kind).To(Equal("unsubscribe"))
- Expect(subscr.Channel).To(Equal("mychannel2"))
- Expect(subscr.Count).To(Equal(0))
- }
-
- stats := client.PoolStats()
- Expect(stats.Misses).To(Equal(uint32(2)))
- })
-
- It("should ping/pong", func() {
- pubsub := client.Subscribe("mychannel")
- defer pubsub.Close()
-
- _, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
-
- err = pubsub.Ping("")
- Expect(err).NotTo(HaveOccurred())
-
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- pong := msgi.(*redis.Pong)
- Expect(pong.Payload).To(Equal(""))
- })
-
- It("should ping/pong with payload", func() {
- pubsub := client.Subscribe("mychannel")
- defer pubsub.Close()
-
- _, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
-
- err = pubsub.Ping("hello")
- Expect(err).NotTo(HaveOccurred())
-
- msgi, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- pong := msgi.(*redis.Pong)
- Expect(pong.Payload).To(Equal("hello"))
- })
-
- It("should multi-ReceiveMessage", func() {
- pubsub := client.Subscribe("mychannel")
- defer pubsub.Close()
-
- subscr, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- Expect(subscr).To(Equal(&redis.Subscription{
- Kind: "subscribe",
- Channel: "mychannel",
- Count: 1,
- }))
-
- err = client.Publish("mychannel", "hello").Err()
- Expect(err).NotTo(HaveOccurred())
-
- err = client.Publish("mychannel", "world").Err()
- Expect(err).NotTo(HaveOccurred())
-
- msg, err := pubsub.ReceiveMessage()
- Expect(err).NotTo(HaveOccurred())
- Expect(msg.Channel).To(Equal("mychannel"))
- Expect(msg.Payload).To(Equal("hello"))
-
- msg, err = pubsub.ReceiveMessage()
- Expect(err).NotTo(HaveOccurred())
- Expect(msg.Channel).To(Equal("mychannel"))
- Expect(msg.Payload).To(Equal("world"))
- })
-
- It("should ReceiveMessage after timeout", func() {
- timeout := 100 * time.Millisecond
-
- pubsub := client.Subscribe("mychannel")
- defer pubsub.Close()
-
- subscr, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- Expect(subscr).To(Equal(&redis.Subscription{
- Kind: "subscribe",
- Channel: "mychannel",
- Count: 1,
- }))
-
- done := make(chan bool, 1)
- go func() {
- defer GinkgoRecover()
- defer func() {
- done <- true
- }()
-
- time.Sleep(timeout + 100*time.Millisecond)
- n, err := client.Publish("mychannel", "hello").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(n).To(Equal(int64(1)))
- }()
-
- msg, err := pubsub.ReceiveMessageTimeout(timeout)
- Expect(err).NotTo(HaveOccurred())
- Expect(msg.Channel).To(Equal("mychannel"))
- Expect(msg.Payload).To(Equal("hello"))
-
- Eventually(done).Should(Receive())
-
- stats := client.PoolStats()
- Expect(stats.Hits).To(Equal(uint32(1)))
- Expect(stats.Misses).To(Equal(uint32(1)))
- })
-
- It("returns an error when subscribe fails", func() {
- pubsub := client.Subscribe()
- defer pubsub.Close()
-
- pubsub.SetNetConn(&badConn{
- readErr: io.EOF,
- writeErr: io.EOF,
- })
-
- err := pubsub.Subscribe("mychannel")
- Expect(err).To(MatchError("EOF"))
-
- err = pubsub.Subscribe("mychannel")
- Expect(err).NotTo(HaveOccurred())
- })
-
- expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
- pubsub.SetNetConn(&badConn{
- readErr: io.EOF,
- writeErr: io.EOF,
- })
-
- done := make(chan bool, 1)
- go func() {
- defer GinkgoRecover()
- defer func() {
- done <- true
- }()
-
- time.Sleep(100 * time.Millisecond)
- err := client.Publish("mychannel", "hello").Err()
- Expect(err).NotTo(HaveOccurred())
- }()
-
- msg, err := pubsub.ReceiveMessage()
- Expect(err).NotTo(HaveOccurred())
- Expect(msg.Channel).To(Equal("mychannel"))
- Expect(msg.Payload).To(Equal("hello"))
-
- Eventually(done).Should(Receive())
- }
-
- It("Subscribe should reconnect on ReceiveMessage error", func() {
- pubsub := client.Subscribe("mychannel")
- defer pubsub.Close()
-
- subscr, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- Expect(subscr).To(Equal(&redis.Subscription{
- Kind: "subscribe",
- Channel: "mychannel",
- Count: 1,
- }))
-
- expectReceiveMessageOnError(pubsub)
- })
-
- It("PSubscribe should reconnect on ReceiveMessage error", func() {
- pubsub := client.PSubscribe("mychannel")
- defer pubsub.Close()
-
- subscr, err := pubsub.ReceiveTimeout(time.Second)
- Expect(err).NotTo(HaveOccurred())
- Expect(subscr).To(Equal(&redis.Subscription{
- Kind: "psubscribe",
- Channel: "mychannel",
- Count: 1,
- }))
-
- expectReceiveMessageOnError(pubsub)
- })
-
- It("should return on Close", func() {
- pubsub := client.Subscribe("mychannel")
- defer pubsub.Close()
-
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer GinkgoRecover()
-
- wg.Done()
- defer wg.Done()
-
- _, err := pubsub.ReceiveMessage()
- Expect(err).To(HaveOccurred())
- Expect(err).To(SatisfyAny(
- MatchError("redis: client is closed"),
- MatchError("use of closed network connection"), // Go 1.4
- ))
- }()
-
- wg.Wait()
- wg.Add(1)
-
- Expect(pubsub.Close()).NotTo(HaveOccurred())
-
- wg.Wait()
- })
-
- It("should ReceiveMessage without a subscription", func() {
- timeout := 100 * time.Millisecond
-
- pubsub := client.Subscribe()
- defer pubsub.Close()
-
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer GinkgoRecover()
- defer wg.Done()
-
- time.Sleep(2 * timeout)
-
- err := pubsub.Subscribe("mychannel")
- Expect(err).NotTo(HaveOccurred())
-
- time.Sleep(timeout)
-
- err = client.Publish("mychannel", "hello").Err()
- Expect(err).NotTo(HaveOccurred())
- }()
-
- msg, err := pubsub.ReceiveMessageTimeout(timeout)
- Expect(err).NotTo(HaveOccurred())
- Expect(msg.Channel).To(Equal("mychannel"))
- Expect(msg.Payload).To(Equal("hello"))
-
- wg.Wait()
- })
-
- It("handles big message payload", func() {
- pubsub := client.Subscribe("mychannel")
- defer pubsub.Close()
-
- ch := pubsub.Channel()
-
- bigVal := bigVal()
- err := client.Publish("mychannel", bigVal).Err()
- Expect(err).NotTo(HaveOccurred())
-
- var msg *redis.Message
- Eventually(ch).Should(Receive(&msg))
- Expect(msg.Channel).To(Equal("mychannel"))
- Expect(msg.Payload).To(Equal(string(bigVal)))
- })
-})