summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/cluster.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/cluster.go')
-rw-r--r--vendor/github.com/go-redis/redis/cluster.go325
1 files changed, 195 insertions, 130 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go
index 0c58c8532..6b05f1a56 100644
--- a/vendor/github.com/go-redis/redis/cluster.go
+++ b/vendor/github.com/go-redis/redis/cluster.go
@@ -8,7 +8,8 @@ import (
"math"
"math/rand"
"net"
- "strings"
+ "runtime"
+ "sort"
"sync"
"sync/atomic"
"time"
@@ -30,7 +31,7 @@ type ClusterOptions struct {
// The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects.
- // Default is 8.
+ // Default is 8 retries.
MaxRedirects int
// Enables read-only commands on slave nodes.
@@ -39,16 +40,25 @@ type ClusterOptions struct {
// It automatically enables ReadOnly.
RouteByLatency bool
// Allows routing read-only commands to the random master or slave node.
+ // It automatically enables ReadOnly.
RouteRandomly bool
+ // Optional function that returns cluster slots information.
+ // It is useful to manually create cluster of standalone Redis servers
+ // and load-balance read/write operations between master and slaves.
+ // It can use service like ZooKeeper to maintain configuration information
+ // and Cluster.ReloadState to manually trigger state reloading.
+ ClusterSlots func() ([]ClusterSlot, error)
+
// Following options are copied from Options struct.
OnConnect func(*Conn) error
+ Password string
+
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
- Password string
DialTimeout time.Duration
ReadTimeout time.Duration
@@ -56,6 +66,8 @@ type ClusterOptions struct {
// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -70,10 +82,14 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 8
}
- if opt.RouteByLatency {
+ if opt.RouteByLatency || opt.RouteRandomly {
opt.ReadOnly = true
}
+ if opt.PoolSize == 0 {
+ opt.PoolSize = 5 * runtime.NumCPU()
+ }
+
switch opt.ReadTimeout {
case -1:
opt.ReadTimeout = 0
@@ -117,10 +133,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
- PoolSize: opt.PoolSize,
- PoolTimeout: opt.PoolTimeout,
- IdleTimeout: opt.IdleTimeout,
-
+ PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
+ PoolTimeout: opt.PoolTimeout,
+ IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: disableIdleCheck,
TLSConfig: opt.TLSConfig,
@@ -160,10 +177,6 @@ func (n *clusterNode) Close() error {
return n.Client.Close()
}
-func (n *clusterNode) Test() error {
- return n.Client.ClusterInfo().Err()
-}
-
func (n *clusterNode) updateLatency() {
const probes = 10
@@ -330,7 +343,7 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
node := newClusterNode(c.opt, addr)
- return node, node.Test()
+ return node, nil
})
c.mu.Lock()
@@ -383,12 +396,31 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
//------------------------------------------------------------------------------
+type clusterSlot struct {
+ start, end int
+ nodes []*clusterNode
+}
+
+type clusterSlotSlice []*clusterSlot
+
+func (p clusterSlotSlice) Len() int {
+ return len(p)
+}
+
+func (p clusterSlotSlice) Less(i, j int) bool {
+ return p[i].start < p[j].start
+}
+
+func (p clusterSlotSlice) Swap(i, j int) {
+ p[i], p[j] = p[j], p[i]
+}
+
type clusterState struct {
nodes *clusterNodes
Masters []*clusterNode
Slaves []*clusterNode
- slots [][]*clusterNode
+ slots []*clusterSlot
generation uint32
createdAt time.Time
@@ -400,19 +432,21 @@ func newClusterState(
c := clusterState{
nodes: nodes,
- slots: make([][]*clusterNode, hashtag.SlotNumber),
+ slots: make([]*clusterSlot, 0, len(slots)),
generation: nodes.NextGeneration(),
createdAt: time.Now(),
}
- isLoopbackOrigin := isLoopbackAddr(origin)
+ originHost, _, _ := net.SplitHostPort(origin)
+ isLoopbackOrigin := isLoopback(originHost)
+
for _, slot := range slots {
var nodes []*clusterNode
for i, slotNode := range slot.Nodes {
addr := slotNode.Addr
- if !isLoopbackOrigin && useOriginAddr(origin, addr) {
- addr = origin
+ if !isLoopbackOrigin {
+ addr = replaceLoopbackHost(addr, originHost)
}
node, err := c.nodes.GetOrCreate(addr)
@@ -430,11 +464,15 @@ func newClusterState(
}
}
- for i := slot.Start; i <= slot.End; i++ {
- c.slots[i] = nodes
- }
+ c.slots = append(c.slots, &clusterSlot{
+ start: slot.Start,
+ end: slot.End,
+ nodes: nodes,
+ })
}
+ sort.Sort(clusterSlotSlice(c.slots))
+
time.AfterFunc(time.Minute, func() {
nodes.GC(c.generation)
})
@@ -442,6 +480,33 @@ func newClusterState(
return &c, nil
}
+func replaceLoopbackHost(nodeAddr, originHost string) string {
+ nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
+ if err != nil {
+ return nodeAddr
+ }
+
+ nodeIP := net.ParseIP(nodeHost)
+ if nodeIP == nil {
+ return nodeAddr
+ }
+
+ if !nodeIP.IsLoopback() {
+ return nodeAddr
+ }
+
+ // Use origin host which is not loopback and node port.
+ return net.JoinHostPort(originHost, nodePort)
+}
+
+func isLoopback(host string) bool {
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return true
+ }
+ return ip.IsLoopback()
+}
+
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) > 0 {
@@ -502,32 +567,24 @@ func (c *clusterState) slotRandomNode(slot int) *clusterNode {
}
func (c *clusterState) slotNodes(slot int) []*clusterNode {
- if slot >= 0 && slot < len(c.slots) {
- return c.slots[slot]
+ i := sort.Search(len(c.slots), func(i int) bool {
+ return c.slots[i].end >= slot
+ })
+ if i >= len(c.slots) {
+ return nil
+ }
+ x := c.slots[i]
+ if slot >= x.start && slot <= x.end {
+ return x.nodes
}
return nil
}
func (c *clusterState) IsConsistent() bool {
- if len(c.Masters) > len(c.Slaves) {
- return false
- }
-
- for _, master := range c.Masters {
- s := master.Client.Info("replication").Val()
- if !strings.Contains(s, "role:master") {
- return false
- }
- }
-
- for _, slave := range c.Slaves {
- s := slave.Client.Info("replication").Val()
- if !strings.Contains(s, "role:slave") {
- return false
- }
+ if c.nodes.opt.ClusterSlots != nil {
+ return true
}
-
- return true
+ return len(c.Masters) <= len(c.Slaves)
}
//------------------------------------------------------------------------------
@@ -555,7 +612,7 @@ func (c *clusterStateHolder) Reload() (*clusterState, error) {
return nil, err
}
if !state.IsConsistent() {
- c.LazyReload()
+ time.AfterFunc(time.Second, c.LazyReload)
}
return state, nil
}
@@ -614,6 +671,14 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
return nil, errors.New("redis: cluster has no state")
}
+func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
+ state, err := c.Reload()
+ if err == nil {
+ return state, nil
+ }
+ return c.Get()
+}
+
//------------------------------------------------------------------------------
// ClusterClient is a Redis Cluster client representing a pool of zero
@@ -653,6 +718,8 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.init()
_, _ = c.state.Reload()
+ _, _ = c.cmdsInfoCache.Get()
+
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
@@ -660,6 +727,13 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c
}
+// ReloadState reloads cluster state. It calls ClusterSlots func
+// to get cluster slots information.
+func (c *ClusterClient) ReloadState() error {
+ _, err := c.state.Reload()
+ return err
+}
+
func (c *ClusterClient) init() {
c.cmdable.setProcessor(c.Process)
}
@@ -818,6 +892,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
}
if internal.IsRetryableError(err, true) {
+ c.state.LazyReload()
continue
}
@@ -853,6 +928,13 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close()
}
+// Do creates a Cmd from the args and processes the cmd.
+func (c *ClusterClient) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
func (c *ClusterClient) WrapProcess(
fn func(oldProcess func(Cmder) error) func(Cmder) error,
) {
@@ -904,12 +986,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
}
if internal.IsRetryableError(err, true) {
- // Firstly retry the same node.
+ c.state.LazyReload()
+
+ // First retry the same node.
if attempt == 0 {
continue
}
- // Secondly try random node.
+ // Second try random node.
node, err = c.nodes.Random()
if err != nil {
break
@@ -944,12 +1028,9 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
- state, err := c.state.Reload()
+ state, err := c.state.ReloadOrGet()
if err != nil {
- state, err = c.state.Get()
- if err != nil {
- return err
- }
+ return err
}
var wg sync.WaitGroup
@@ -980,12 +1061,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
- state, err := c.state.Reload()
+ state, err := c.state.ReloadOrGet()
if err != nil {
- state, err = c.state.Get()
- if err != nil {
- return err
- }
+ return err
}
var wg sync.WaitGroup
@@ -1016,12 +1094,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
- state, err := c.state.Reload()
+ state, err := c.state.ReloadOrGet()
if err != nil {
- state, err = c.state.Get()
- if err != nil {
- return err
- }
+ return err
}
var wg sync.WaitGroup
@@ -1071,7 +1146,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}
@@ -1082,7 +1157,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}
@@ -1090,6 +1165,14 @@ func (c *ClusterClient) PoolStats() *PoolStats {
}
func (c *ClusterClient) loadState() (*clusterState, error) {
+ if c.opt.ClusterSlots != nil {
+ slots, err := c.opt.ClusterSlots()
+ if err != nil {
+ return nil, err
+ }
+ return newClusterState(c.nodes, slots, "")
+ }
+
addrs, err := c.nodes.Addrs()
if err != nil {
return nil, err
@@ -1196,7 +1279,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap = failedCmds
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
@@ -1207,9 +1290,16 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
}
cmdsMap := make(map[*clusterNode][]Cmder)
+ cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
for _, cmd := range cmds {
- slot := c.cmdSlot(cmd)
- node, err := state.slotMasterNode(slot)
+ var node *clusterNode
+ var err error
+ if cmdsAreReadOnly {
+ _, node, err = c.cmdSlotAndNode(cmd)
+ } else {
+ slot := c.cmdSlot(cmd)
+ node, err = state.slotMasterNode(slot)
+ }
if err != nil {
return nil, err
}
@@ -1218,6 +1308,16 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
return cmdsMap, nil
}
+func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
+ for _, cmd := range cmds {
+ cmdInfo := c.cmdInfo(cmd.Name())
+ if cmdInfo == nil || !cmdInfo.ReadOnly {
+ return false
+ }
+ }
+ return true
+}
+
func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
remappedCmds, err := c.mapCmdsByNode(cmds)
if err != nil {
@@ -1233,26 +1333,26 @@ func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cm
func (c *ClusterClient) pipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
- _ = cn.SetWriteTimeout(c.opt.WriteTimeout)
-
- err := writeCmd(cn, cmds...)
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmds...)
+ })
if err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
return err
}
- // Set read timeout for all commands.
- _ = cn.SetReadTimeout(c.opt.ReadTimeout)
-
- return c.pipelineReadCmds(cn, cmds, failedCmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ return c.pipelineReadCmds(rd, cmds, failedCmds)
+ })
+ return err
}
func (c *ClusterClient) pipelineReadCmds(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
for _, cmd := range cmds {
- err := cmd.readReply(cn)
+ err := cmd.readReply(rd)
if err == nil {
continue
}
@@ -1361,7 +1461,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
}
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
@@ -1376,35 +1476,37 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
func (c *ClusterClient) txPipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := txPipelineWriteMulti(cn, cmds); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return txPipelineWriteMulti(wr, cmds)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
return err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
- setCmdsErr(cmds, err)
- return err
- }
-
- return pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ err := c.txPipelineReadQueued(rd, cmds, failedCmds)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
+ return pipelineReadCmds(rd, cmds)
+ })
+ return err
}
func (c *ClusterClient) txPipelineReadQueued(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil {
+ if err := statusCmd.readReply(rd); err != nil {
return err
}
for _, cmd := range cmds {
- err := statusCmd.readReply(cn)
+ err := statusCmd.readReply(rd)
if err == nil {
continue
}
@@ -1417,7 +1519,7 @@ func (c *ClusterClient) txPipelineReadQueued(
}
// Parse number of replies.
- line, err := cn.Rd.ReadLine()
+ line, err := rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
@@ -1445,11 +1547,9 @@ func (c *ClusterClient) txPipelineReadQueued(
}
func (c *ClusterClient) pubSub(channels []string) *PubSub {
- opt := c.opt.clientOptions()
-
var node *clusterNode
- return &PubSub{
- opt: opt,
+ pubsub := &PubSub{
+ opt: c.opt.clientOptions(),
newConn: func(channels []string) (*pool.Conn, error) {
if node == nil {
@@ -1472,6 +1572,8 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub {
return node.Client.connPool.CloseConn(cn)
},
}
+ pubsub.init()
+ return pubsub
}
// Subscribe subscribes the client to the specified channels.
@@ -1494,43 +1596,6 @@ func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
return pubsub
}
-func useOriginAddr(originAddr, nodeAddr string) bool {
- nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
- if err != nil {
- return false
- }
-
- nodeIP := net.ParseIP(nodeHost)
- if nodeIP == nil {
- return false
- }
-
- if !nodeIP.IsLoopback() {
- return false
- }
-
- _, originPort, err := net.SplitHostPort(originAddr)
- if err != nil {
- return false
- }
-
- return nodePort == originPort
-}
-
-func isLoopbackAddr(addr string) bool {
- host, _, err := net.SplitHostPort(addr)
- if err != nil {
- return false
- }
-
- ip := net.ParseIP(host)
- if ip == nil {
- return false
- }
-
- return ip.IsLoopback()
-}
-
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes {
if n == node {