summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/cluster_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/cluster_test.go')
-rw-r--r--vendor/github.com/go-redis/redis/cluster_test.go298
1 files changed, 179 insertions, 119 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster_test.go b/vendor/github.com/go-redis/redis/cluster_test.go
index 3a69255a4..324bd1ce1 100644
--- a/vendor/github.com/go-redis/redis/cluster_test.go
+++ b/vendor/github.com/go-redis/redis/cluster_test.go
@@ -75,7 +75,7 @@ func startCluster(scenario *clusterScenario) error {
scenario.nodeIds[pos] = info[:40]
}
- // Meet cluster nodes
+ // Meet cluster nodes.
for _, client := range scenario.clients {
err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err()
if err != nil {
@@ -83,7 +83,7 @@ func startCluster(scenario *clusterScenario) error {
}
}
- // Bootstrap masters
+ // Bootstrap masters.
slots := []int{0, 5000, 10000, 16384}
for pos, master := range scenario.masters() {
err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
@@ -92,7 +92,7 @@ func startCluster(scenario *clusterScenario) error {
}
}
- // Bootstrap slaves
+ // Bootstrap slaves.
for idx, slave := range scenario.slaves() {
masterId := scenario.nodeIds[idx]
@@ -115,7 +115,7 @@ func startCluster(scenario *clusterScenario) error {
}
}
- // Wait until all nodes have consistent info
+ // Wait until all nodes have consistent info.
for _, client := range scenario.clients {
err := eventually(func() error {
res, err := client.ClusterSlots().Result()
@@ -189,62 +189,6 @@ var _ = Describe("ClusterClient", func() {
var client *redis.ClusterClient
assertClusterClient := func() {
- It("should CLUSTER SLOTS", func() {
- res, err := client.ClusterSlots().Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(res).To(HaveLen(3))
-
- wanted := []redis.ClusterSlot{
- {0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}},
- {5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}},
- {10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}},
- }
- Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
- })
-
- It("should CLUSTER NODES", func() {
- res, err := client.ClusterNodes().Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(len(res)).To(BeNumerically(">", 400))
- })
-
- It("should CLUSTER INFO", func() {
- res, err := client.ClusterInfo().Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
- })
-
- It("should CLUSTER KEYSLOT", func() {
- hashSlot, err := client.ClusterKeySlot("somekey").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
- })
-
- It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
- n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(n).To(Equal(int64(0)))
- })
-
- It("should CLUSTER COUNTKEYSINSLOT", func() {
- n, err := client.ClusterCountKeysInSlot(10).Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(n).To(Equal(int64(0)))
- })
-
- It("should CLUSTER SAVECONFIG", func() {
- res, err := client.ClusterSaveConfig().Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(res).To(Equal("OK"))
- })
-
- It("should CLUSTER SLAVES", func() {
- nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
- Expect(nodesList).Should(HaveLen(1))
- })
-
It("should GET/SET/DEL", func() {
val, err := client.Get("A").Result()
Expect(err).To(Equal(redis.Nil))
@@ -254,55 +198,24 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("OK"))
- val, err = client.Get("A").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(val).To(Equal("VALUE"))
+ Eventually(func() string {
+ return client.Get("A").Val()
+ }).Should(Equal("VALUE"))
cnt, err := client.Del("A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(cnt).To(Equal(int64(1)))
})
- It("returns pool stats", func() {
- Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
- })
-
- It("removes idle connections", func() {
- stats := client.PoolStats()
- Expect(stats.TotalConns).NotTo(BeZero())
- Expect(stats.FreeConns).NotTo(BeZero())
-
- time.Sleep(2 * time.Second)
-
- stats = client.PoolStats()
- Expect(stats.TotalConns).To(BeZero())
- Expect(stats.FreeConns).To(BeZero())
- })
-
It("follows redirects", func() {
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
slot := hashtag.Slot("A")
- Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
+ client.SwapSlotNodes(slot)
- val, err := client.Get("A").Result()
- Expect(err).NotTo(HaveOccurred())
- Expect(val).To(Equal("VALUE"))
- })
-
- It("returns an error when there are no attempts left", func() {
- opt := redisClusterOptions()
- opt.MaxRedirects = -1
- client := cluster.clusterClient(opt)
-
- slot := hashtag.Slot("A")
- Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
-
- err := client.Get("A").Err()
- Expect(err).To(HaveOccurred())
- Expect(err.Error()).To(ContainSubstring("MOVED"))
-
- Expect(client.Close()).NotTo(HaveOccurred())
+ Eventually(func() string {
+ return client.Get("A").Val()
+ }).Should(Equal("VALUE"))
})
It("distributes keys", func() {
@@ -311,9 +224,14 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
}
- wanted := []string{"keys=31", "keys=29", "keys=40"}
- for i, master := range cluster.masters() {
- Expect(master.Info().Val()).To(ContainSubstring(wanted[i]))
+ for _, master := range cluster.masters() {
+ Eventually(func() string {
+ return master.Info("keyspace").Val()
+ }, 5*time.Second).Should(Or(
+ ContainSubstring("keys=31"),
+ ContainSubstring("keys=29"),
+ ContainSubstring("keys=40"),
+ ))
}
})
@@ -330,9 +248,14 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
}
- wanted := []string{"keys=31", "keys=29", "keys=40"}
- for i, master := range cluster.masters() {
- Expect(master.Info().Val()).To(ContainSubstring(wanted[i]))
+ for _, master := range cluster.masters() {
+ Eventually(func() string {
+ return master.Info("keyspace").Val()
+ }, 5*time.Second).Should(Or(
+ ContainSubstring("keys=31"),
+ ContainSubstring("keys=29"),
+ ContainSubstring("keys=40"),
+ ))
}
})
@@ -419,7 +342,8 @@ var _ = Describe("ClusterClient", func() {
Expect(get.Val()).To(Equal(key + "_value"))
ttl := cmds[(i*2)+1].(*redis.DurationCmd)
- Expect(ttl.Val()).To(BeNumerically("~", time.Duration(i+1)*time.Hour, time.Second))
+ dur := time.Duration(i+1) * time.Hour
+ Expect(ttl.Val()).To(BeNumerically("~", dur, 5*time.Second))
}
})
@@ -447,7 +371,7 @@ var _ = Describe("ClusterClient", func() {
})
}
- Describe("Pipeline", func() {
+ Describe("with Pipeline", func() {
BeforeEach(func() {
pipe = client.Pipeline().(*redis.Pipeline)
})
@@ -459,7 +383,7 @@ var _ = Describe("ClusterClient", func() {
assertPipeline()
})
- Describe("TxPipeline", func() {
+ Describe("with TxPipeline", func() {
BeforeEach(func() {
pipe = client.TxPipeline().(*redis.Pipeline)
})
@@ -472,6 +396,76 @@ var _ = Describe("ClusterClient", func() {
})
})
+ It("supports PubSub", func() {
+ pubsub := client.Subscribe("mychannel")
+ defer pubsub.Close()
+
+ Eventually(func() error {
+ _, err := client.Publish("mychannel", "hello").Result()
+ if err != nil {
+ return err
+ }
+
+ msg, err := pubsub.ReceiveTimeout(time.Second)
+ if err != nil {
+ return err
+ }
+
+ _, ok := msg.(*redis.Message)
+ if !ok {
+ return fmt.Errorf("got %T, wanted *redis.Message", msg)
+ }
+
+ return nil
+ }, 30*time.Second).ShouldNot(HaveOccurred())
+ })
+ }
+
+ Describe("ClusterClient", func() {
+ BeforeEach(func() {
+ opt = redisClusterOptions()
+ client = cluster.clusterClient(opt)
+
+ _ = client.ForEachMaster(func(master *redis.Client) error {
+ return master.FlushDB().Err()
+ })
+ })
+
+ AfterEach(func() {
+ Expect(client.Close()).NotTo(HaveOccurred())
+ })
+
+ It("returns pool stats", func() {
+ Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
+ })
+
+ It("removes idle connections", func() {
+ stats := client.PoolStats()
+ Expect(stats.TotalConns).NotTo(BeZero())
+ Expect(stats.FreeConns).NotTo(BeZero())
+
+ time.Sleep(2 * time.Second)
+
+ stats = client.PoolStats()
+ Expect(stats.TotalConns).To(BeZero())
+ Expect(stats.FreeConns).To(BeZero())
+ })
+
+ It("returns an error when there are no attempts left", func() {
+ opt := redisClusterOptions()
+ opt.MaxRedirects = -1
+ client := cluster.clusterClient(opt)
+
+ slot := hashtag.Slot("A")
+ client.SwapSlotNodes(slot)
+
+ err := client.Get("A").Err()
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("MOVED"))
+
+ Expect(client.Close()).NotTo(HaveOccurred())
+ })
+
It("calls fn for every master node", func() {
for i := 0; i < 10; i++ {
Expect(client.Set(strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
@@ -483,14 +477,72 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
for _, client := range cluster.masters() {
- keys, err := client.Keys("*").Result()
+ size, err := client.DBSize().Result()
Expect(err).NotTo(HaveOccurred())
- Expect(keys).To(HaveLen(0))
+ Expect(size).To(Equal(int64(0)))
}
})
- }
- Describe("default ClusterClient", func() {
+ It("should CLUSTER SLOTS", func() {
+ res, err := client.ClusterSlots().Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(res).To(HaveLen(3))
+
+ wanted := []redis.ClusterSlot{
+ {0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}},
+ {5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}},
+ {10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}},
+ }
+ Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
+ })
+
+ It("should CLUSTER NODES", func() {
+ res, err := client.ClusterNodes().Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(len(res)).To(BeNumerically(">", 400))
+ })
+
+ It("should CLUSTER INFO", func() {
+ res, err := client.ClusterInfo().Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
+ })
+
+ It("should CLUSTER KEYSLOT", func() {
+ hashSlot, err := client.ClusterKeySlot("somekey").Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
+ })
+
+ It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
+ n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(n).To(Equal(int64(0)))
+ })
+
+ It("should CLUSTER COUNTKEYSINSLOT", func() {
+ n, err := client.ClusterCountKeysInSlot(10).Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(n).To(Equal(int64(0)))
+ })
+
+ It("should CLUSTER SAVECONFIG", func() {
+ res, err := client.ClusterSaveConfig().Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(res).To(Equal("OK"))
+ })
+
+ It("should CLUSTER SLAVES", func() {
+ nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
+ Expect(nodesList).Should(HaveLen(1))
+ })
+
+ assertClusterClient()
+ })
+
+ Describe("ClusterClient failover", func() {
BeforeEach(func() {
opt = redisClusterOptions()
client = cluster.clusterClient(opt)
@@ -498,6 +550,13 @@ var _ = Describe("ClusterClient", func() {
_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDB().Err()
})
+
+ _ = client.ForEachSlave(func(slave *redis.Client) error {
+ Eventually(func() int64 {
+ return client.DBSize().Val()
+ }, 30*time.Second).Should(Equal(int64(0)))
+ return slave.ClusterFailover().Err()
+ })
})
AfterEach(func() {
@@ -645,14 +704,14 @@ var _ = Describe("ClusterClient timeout", func() {
testTimeout()
})
- Context("network timeout", func() {
+ Context("ClientPause timeout", func() {
const pause = time.Second
BeforeEach(func() {
opt := redisClusterOptions()
- opt.ReadTimeout = 100 * time.Millisecond
- opt.WriteTimeout = 100 * time.Millisecond
- opt.MaxRedirects = 1
+ opt.ReadTimeout = pause / 10
+ opt.WriteTimeout = pause / 10
+ opt.MaxRedirects = -1
client = cluster.clusterClient(opt)
err := client.ForEachNode(func(client *redis.Client) error {
@@ -662,11 +721,12 @@ var _ = Describe("ClusterClient timeout", func() {
})
AfterEach(func() {
- Eventually(func() error {
- return client.ForEachNode(func(client *redis.Client) error {
+ client.ForEachNode(func(client *redis.Client) error {
+ Eventually(func() error {
return client.Ping().Err()
- })
- }, 2*pause).ShouldNot(HaveOccurred())
+ }, 2*pause).ShouldNot(HaveOccurred())
+ return nil
+ })
})
testTimeout()