From 09b49c26ddfdb20ced61e7dfd4192e750ce40449 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Mon, 31 Jul 2017 08:15:23 -0700 Subject: PLT-5308 Caching layer part 2 (#6973) * Adding Reaction store cache layer example * Implementing reaction store in new caching system. * Redis for reaction store * Adding redis library * Adding invalidation for DeleteAllWithEmojiName and other minor enhancements --- vendor/github.com/go-redis/redis/pubsub_test.go | 406 ++++++++++++++++++++++++ 1 file changed, 406 insertions(+) create mode 100644 vendor/github.com/go-redis/redis/pubsub_test.go (limited to 'vendor/github.com/go-redis/redis/pubsub_test.go') diff --git a/vendor/github.com/go-redis/redis/pubsub_test.go b/vendor/github.com/go-redis/redis/pubsub_test.go new file mode 100644 index 000000000..e8589f461 --- /dev/null +++ b/vendor/github.com/go-redis/redis/pubsub_test.go @@ -0,0 +1,406 @@ +package redis_test + +import ( + "io" + "net" + "sync" + "time" + + "github.com/go-redis/redis" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("PubSub", func() { + var client *redis.Client + + BeforeEach(func() { + client = redis.NewClient(redisOptions()) + Expect(client.FlushDB().Err()).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should support pattern matching", func() { + pubsub := client.PSubscribe("mychannel*") + defer pubsub.Close() + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + subscr := msgi.(*redis.Subscription) + Expect(subscr.Kind).To(Equal("psubscribe")) + Expect(subscr.Channel).To(Equal("mychannel*")) + Expect(subscr.Count).To(Equal(1)) + } + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err.(net.Error).Timeout()).To(Equal(true)) + Expect(msgi).To(BeNil()) + } + + n, err := client.Publish("mychannel1", "hello").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred()) + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + subscr := msgi.(*redis.Message) + Expect(subscr.Channel).To(Equal("mychannel1")) + Expect(subscr.Pattern).To(Equal("mychannel*")) + Expect(subscr.Payload).To(Equal("hello")) + } + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + subscr := msgi.(*redis.Subscription) + Expect(subscr.Kind).To(Equal("punsubscribe")) + Expect(subscr.Channel).To(Equal("mychannel*")) + Expect(subscr.Count).To(Equal(0)) + } + + stats := client.PoolStats() + Expect(stats.Requests - stats.Hits).To(Equal(uint32(2))) + }) + + It("should pub/sub channels", func() { + channels, err := client.PubSubChannels("mychannel*").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(channels).To(BeEmpty()) + + pubsub := client.Subscribe("mychannel", "mychannel2") + defer pubsub.Close() + + channels, err = client.PubSubChannels("mychannel*").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"})) + + channels, err = client.PubSubChannels("").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(channels).To(BeEmpty()) + + channels, err = client.PubSubChannels("*").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(channels)).To(BeNumerically(">=", 2)) + }) + + It("should return the numbers of subscribers", func() { + pubsub := client.Subscribe("mychannel", "mychannel2") + defer pubsub.Close() + + channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(channels).To(Equal(map[string]int64{ + "mychannel": 1, + "mychannel2": 1, + "mychannel3": 0, + })) + }) + + It("should return the numbers of subscribers by pattern", func() { + num, err := client.PubSubNumPat().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(num).To(Equal(int64(0))) + + pubsub := client.PSubscribe("*") + defer pubsub.Close() + + num, err = client.PubSubNumPat().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(num).To(Equal(int64(1))) + }) + + It("should pub/sub", func() { + pubsub := client.Subscribe("mychannel", "mychannel2") + defer pubsub.Close() + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + subscr := msgi.(*redis.Subscription) + Expect(subscr.Kind).To(Equal("subscribe")) + Expect(subscr.Channel).To(Equal("mychannel")) + Expect(subscr.Count).To(Equal(1)) + } + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + subscr := msgi.(*redis.Subscription) + Expect(subscr.Kind).To(Equal("subscribe")) + Expect(subscr.Channel).To(Equal("mychannel2")) + Expect(subscr.Count).To(Equal(2)) + } + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err.(net.Error).Timeout()).To(Equal(true)) + Expect(msgi).NotTo(HaveOccurred()) + } + + n, err := client.Publish("mychannel", "hello").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + n, err = client.Publish("mychannel2", "hello2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + subscr := msgi.(*redis.Message) + Expect(subscr.Channel).To(Equal("mychannel")) + Expect(subscr.Payload).To(Equal("hello")) + } + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + msg := msgi.(*redis.Message) + Expect(msg.Channel).To(Equal("mychannel2")) + Expect(msg.Payload).To(Equal("hello2")) + } + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + subscr := msgi.(*redis.Subscription) + Expect(subscr.Kind).To(Equal("unsubscribe")) + Expect(subscr.Channel).To(Equal("mychannel")) + Expect(subscr.Count).To(Equal(1)) + } + + { + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + subscr := msgi.(*redis.Subscription) + Expect(subscr.Kind).To(Equal("unsubscribe")) + Expect(subscr.Channel).To(Equal("mychannel2")) + Expect(subscr.Count).To(Equal(0)) + } + + stats := client.PoolStats() + Expect(stats.Requests - stats.Hits).To(Equal(uint32(2))) + }) + + It("should ping/pong", func() { + pubsub := client.Subscribe("mychannel") + defer pubsub.Close() + + _, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + + err = pubsub.Ping("") + Expect(err).NotTo(HaveOccurred()) + + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + pong := msgi.(*redis.Pong) + Expect(pong.Payload).To(Equal("")) + }) + + It("should ping/pong with payload", func() { + pubsub := client.Subscribe("mychannel") + defer pubsub.Close() + + _, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + + err = pubsub.Ping("hello") + Expect(err).NotTo(HaveOccurred()) + + msgi, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + pong := msgi.(*redis.Pong) + Expect(pong.Payload).To(Equal("hello")) + }) + + It("should multi-ReceiveMessage", func() { + pubsub := client.Subscribe("mychannel") + defer pubsub.Close() + + subscr, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(subscr).To(Equal(&redis.Subscription{ + Kind: "subscribe", + Channel: "mychannel", + Count: 1, + })) + + err = client.Publish("mychannel", "hello").Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.Publish("mychannel", "world").Err() + Expect(err).NotTo(HaveOccurred()) + + msg, err := pubsub.ReceiveMessage() + Expect(err).NotTo(HaveOccurred()) + Expect(msg.Channel).To(Equal("mychannel")) + Expect(msg.Payload).To(Equal("hello")) + + msg, err = pubsub.ReceiveMessage() + Expect(err).NotTo(HaveOccurred()) + Expect(msg.Channel).To(Equal("mychannel")) + Expect(msg.Payload).To(Equal("world")) + }) + + It("should ReceiveMessage after timeout", func() { + timeout := 100 * time.Millisecond + + pubsub := client.Subscribe("mychannel") + defer pubsub.Close() + + subscr, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(subscr).To(Equal(&redis.Subscription{ + Kind: "subscribe", + Channel: "mychannel", + Count: 1, + })) + + done := make(chan bool, 1) + go func() { + defer GinkgoRecover() + defer func() { + done <- true + }() + + time.Sleep(timeout + 100*time.Millisecond) + n, err := client.Publish("mychannel", "hello").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + }() + + msg, err := pubsub.ReceiveMessageTimeout(timeout) + Expect(err).NotTo(HaveOccurred()) + Expect(msg.Channel).To(Equal("mychannel")) + Expect(msg.Payload).To(Equal("hello")) + + Eventually(done).Should(Receive()) + + stats := client.PoolStats() + Expect(stats.Requests).To(Equal(uint32(2))) + Expect(stats.Hits).To(Equal(uint32(1))) + }) + + expectReceiveMessageOnError := func(pubsub *redis.PubSub) { + pubsub.SetNetConn(&badConn{ + readErr: io.EOF, + writeErr: io.EOF, + }) + + done := make(chan bool, 1) + go func() { + defer GinkgoRecover() + defer func() { + done <- true + }() + + time.Sleep(100 * time.Millisecond) + err := client.Publish("mychannel", "hello").Err() + Expect(err).NotTo(HaveOccurred()) + }() + + msg, err := pubsub.ReceiveMessage() + Expect(err).NotTo(HaveOccurred()) + Expect(msg.Channel).To(Equal("mychannel")) + Expect(msg.Payload).To(Equal("hello")) + + Eventually(done).Should(Receive()) + } + + It("Subscribe should reconnect on ReceiveMessage error", func() { + pubsub := client.Subscribe("mychannel") + defer pubsub.Close() + + subscr, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(subscr).To(Equal(&redis.Subscription{ + Kind: "subscribe", + Channel: "mychannel", + Count: 1, + })) + + expectReceiveMessageOnError(pubsub) + }) + + It("PSubscribe should reconnect on ReceiveMessage error", func() { + pubsub := client.PSubscribe("mychannel") + defer pubsub.Close() + + subscr, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(subscr).To(Equal(&redis.Subscription{ + Kind: "psubscribe", + Channel: "mychannel", + Count: 1, + })) + + expectReceiveMessageOnError(pubsub) + }) + + It("should return on Close", func() { + pubsub := client.Subscribe("mychannel") + defer pubsub.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer GinkgoRecover() + + wg.Done() + defer wg.Done() + + _, err := pubsub.ReceiveMessage() + Expect(err).To(HaveOccurred()) + Expect(err).To(SatisfyAny( + MatchError("redis: client is closed"), + MatchError("use of closed network connection"), // Go 1.4 + )) + }() + + wg.Wait() + wg.Add(1) + + Expect(pubsub.Close()).NotTo(HaveOccurred()) + + wg.Wait() + }) + + It("should ReceiveMessage without a subscription", func() { + timeout := 100 * time.Millisecond + + pubsub := client.Subscribe() + defer pubsub.Close() + + go func() { + defer GinkgoRecover() + + time.Sleep(2 * timeout) + + err := pubsub.Subscribe("mychannel") + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(timeout) + + err = client.Publish("mychannel", "hello").Err() + Expect(err).NotTo(HaveOccurred()) + }() + + msg, err := pubsub.ReceiveMessageTimeout(timeout) + Expect(err).NotTo(HaveOccurred()) + Expect(msg.Channel).To(Equal("mychannel")) + Expect(msg.Payload).To(Equal("hello")) + }) +}) -- cgit v1.2.3-1-g7c22