summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/cluster.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/cluster.go')
-rw-r--r--vendor/github.com/go-redis/redis/cluster.go321
1 files changed, 255 insertions, 66 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go
index f758b01b9..647a25be3 100644
--- a/vendor/github.com/go-redis/redis/cluster.go
+++ b/vendor/github.com/go-redis/redis/cluster.go
@@ -28,18 +28,19 @@ type ClusterOptions struct {
// Default is 16.
MaxRedirects int
- // Enables read queries for a connection to a Redis Cluster slave node.
+ // Enables read-only commands on slave nodes.
ReadOnly bool
-
- // Enables routing read-only queries to the closest master or slave node.
+ // Allows routing read-only commands to the closest master or slave node.
RouteByLatency bool
// Following options are copied from Options struct.
OnConnect func(*Conn) error
- MaxRetries int
- Password string
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
+ Password string
DialTimeout time.Duration
ReadTimeout time.Duration
@@ -62,6 +63,19 @@ func (opt *ClusterOptions) init() {
if opt.RouteByLatency {
opt.ReadOnly = true
}
+
+ switch opt.MinRetryBackoff {
+ case -1:
+ opt.MinRetryBackoff = 0
+ case 0:
+ opt.MinRetryBackoff = 8 * time.Millisecond
+ }
+ switch opt.MaxRetryBackoff {
+ case -1:
+ opt.MaxRetryBackoff = 0
+ case 0:
+ opt.MaxRetryBackoff = 512 * time.Millisecond
+ }
}
func (opt *ClusterOptions) clientOptions() *Options {
@@ -70,9 +84,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
return &Options{
OnConnect: opt.OnConnect,
- MaxRetries: opt.MaxRetries,
- Password: opt.Password,
- ReadOnly: opt.ReadOnly,
+ MaxRetries: opt.MaxRetries,
+ MinRetryBackoff: opt.MinRetryBackoff,
+ MaxRetryBackoff: opt.MaxRetryBackoff,
+ Password: opt.Password,
+ readOnly: opt.ReadOnly,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
@@ -91,7 +107,9 @@ func (opt *ClusterOptions) clientOptions() *Options {
type clusterNode struct {
Client *Client
Latency time.Duration
- loading time.Time
+
+ loading time.Time
+ generation uint32
}
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@@ -122,6 +140,17 @@ func (n *clusterNode) Loading() bool {
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
}
+func (n *clusterNode) Generation() uint32 {
+ return n.generation
+}
+
+func (n *clusterNode) SetGeneration(gen uint32) {
+ if gen < n.generation {
+ panic("gen < n.generation")
+ }
+ n.generation = gen
+}
+
//------------------------------------------------------------------------------
type clusterNodes struct {
@@ -131,6 +160,8 @@ type clusterNodes struct {
addrs []string
nodes map[string]*clusterNode
closed bool
+
+ generation uint32
}
func newClusterNodes(opt *ClusterOptions) *clusterNodes {
@@ -161,6 +192,39 @@ func (c *clusterNodes) Close() error {
return firstErr
}
+func (c *clusterNodes) NextGeneration() uint32 {
+ c.generation++
+ return c.generation
+}
+
+// GC removes unused nodes.
+func (c *clusterNodes) GC(generation uint32) error {
+ var collected []*clusterNode
+ c.mu.Lock()
+ for i := 0; i < len(c.addrs); {
+ addr := c.addrs[i]
+ node := c.nodes[addr]
+ if node.Generation() >= generation {
+ i++
+ continue
+ }
+
+ c.addrs = append(c.addrs[:i], c.addrs[i+1:]...)
+ delete(c.nodes, addr)
+ collected = append(collected, node)
+ }
+ c.mu.Unlock()
+
+ var firstErr error
+ for _, node := range collected {
+ if err := node.Client.Close(); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+
+ return firstErr
+}
+
func (c *clusterNodes) All() ([]*clusterNode, error) {
c.mu.RLock()
defer c.mu.RUnlock()
@@ -176,7 +240,7 @@ func (c *clusterNodes) All() ([]*clusterNode, error) {
return nodes, nil
}
-func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
+func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
var node *clusterNode
var ok bool
@@ -223,7 +287,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
var nodeErr error
for i := 0; i <= c.opt.MaxRedirects; i++ {
n := rand.Intn(len(addrs))
- node, err := c.Get(addrs[n])
+ node, err := c.GetOrCreate(addrs[n])
if err != nil {
return nil, err
}
@@ -239,30 +303,45 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
//------------------------------------------------------------------------------
type clusterState struct {
- nodes *clusterNodes
+ nodes *clusterNodes
+ masters []*clusterNode
+ slaves []*clusterNode
+
slots [][]*clusterNode
+
+ generation uint32
}
func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*clusterState, error) {
c := clusterState{
- nodes: nodes,
+ nodes: nodes,
+ generation: nodes.NextGeneration(),
+
slots: make([][]*clusterNode, hashtag.SlotNumber),
}
isLoopbackOrigin := isLoopbackAddr(origin)
for _, slot := range slots {
var nodes []*clusterNode
- for _, slotNode := range slot.Nodes {
+ for i, slotNode := range slot.Nodes {
addr := slotNode.Addr
if !isLoopbackOrigin && isLoopbackAddr(addr) {
addr = origin
}
- node, err := c.nodes.Get(addr)
+ node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return nil, err
}
+
+ node.SetGeneration(c.generation)
nodes = append(nodes, node)
+
+ if i == 0 {
+ c.masters = appendNode(c.masters, node)
+ } else {
+ c.slaves = appendNode(c.slaves, node)
+ }
}
for i := slot.Start; i <= slot.End; i++ {
@@ -327,7 +406,7 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
}
func (c *clusterState) slotNodes(slot int) []*clusterNode {
- if slot < len(c.slots) {
+ if slot >= 0 && slot < len(c.slots) {
return c.slots[slot]
}
return nil
@@ -348,7 +427,7 @@ type ClusterClient struct {
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
- // Reports where slots reloading is in progress.
+ // Reports whether slots reloading is in progress.
reloading uint32
}
@@ -365,12 +444,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
// Add initial nodes.
for _, addr := range opt.Addrs {
- _, _ = c.nodes.Get(addr)
+ _, _ = c.nodes.GetOrCreate(addr)
}
// Preload cluster slots.
for i := 0; i < 10; i++ {
- state, err := c.reloadSlots()
+ state, err := c.reloadState()
if err == nil {
c._state.Store(state)
break
@@ -394,7 +473,7 @@ func (c *ClusterClient) state() *clusterState {
if v != nil {
return v.(*clusterState)
}
- c.lazyReloadSlots()
+ c.lazyReloadState()
return nil
}
@@ -476,6 +555,10 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
+ if attempt > 0 {
+ time.Sleep(node.Client.retryBackoff(attempt))
+ }
+
if ask {
pipe := node.Client.Pipeline()
pipe.Process(NewCmd("ASKING"))
@@ -487,19 +570,20 @@ func (c *ClusterClient) Process(cmd Cmder) error {
err = node.Client.Process(cmd)
}
- // If there is no (real) error - we are done.
+ // If there is no error - we are done.
if err == nil {
return nil
}
// If slave is loading - read from master.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
+ // TODO: race
node.loading = time.Now()
continue
}
// On network errors try random node.
- if internal.IsRetryableError(err) {
+ if internal.IsRetryableError(err) || internal.IsClusterDownError(err) {
node, err = c.nodes.Random()
if err != nil {
cmd.setErr(err)
@@ -516,11 +600,11 @@ func (c *ClusterClient) Process(cmd Cmder) error {
if state != nil && slot >= 0 {
master, _ := state.slotMasterNode(slot)
if moved && (master == nil || master.Client.getAddr() != addr) {
- c.lazyReloadSlots()
+ c.lazyReloadState()
}
}
- node, err = c.nodes.Get(addr)
+ node, err = c.nodes.GetOrCreate(addr)
if err != nil {
cmd.setErr(err)
return err
@@ -535,17 +619,17 @@ func (c *ClusterClient) Process(cmd Cmder) error {
return cmd.Err()
}
-// ForEachNode concurrently calls the fn on each ever known node in the cluster.
+// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
-func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
- nodes, err := c.nodes.All()
- if err != nil {
- return err
+func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
+ state := c.state()
+ if state == nil {
+ return errNilClusterState
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
- for _, node := range nodes {
+ for _, master := range state.masters {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
@@ -556,7 +640,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
default:
}
}
- }(node)
+ }(master)
}
wg.Wait()
@@ -568,28 +652,17 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
}
}
-// ForEachMaster concurrently calls the fn on each master node in the cluster.
+// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
-func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
+func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
state := c.state()
if state == nil {
return errNilClusterState
}
var wg sync.WaitGroup
- visited := make(map[*clusterNode]struct{})
errCh := make(chan error, 1)
- for _, nodes := range state.slots {
- if len(nodes) == 0 {
- continue
- }
-
- master := nodes[0]
- if _, ok := visited[master]; ok {
- continue
- }
- visited[master] = struct{}{}
-
+ for _, slave := range state.slaves {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
@@ -600,7 +673,7 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
default:
}
}
- }(master)
+ }(slave)
}
wg.Wait()
@@ -612,16 +685,64 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
}
}
+// ForEachNode concurrently calls the fn on each known node in the cluster.
+// It returns the first error if any.
+func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
+ state := c.state()
+ if state == nil {
+ return errNilClusterState
+ }
+
+ var wg sync.WaitGroup
+ errCh := make(chan error, 1)
+ worker := func(node *clusterNode) {
+ defer wg.Done()
+ err := fn(node.Client)
+ if err != nil {
+ select {
+ case errCh <- err:
+ default:
+ }
+ }
+ }
+
+ for _, node := range state.masters {
+ wg.Add(1)
+ go worker(node)
+ }
+ for _, node := range state.slaves {
+ wg.Add(1)
+ go worker(node)
+ }
+
+ wg.Wait()
+ select {
+ case err := <-errCh:
+ return err
+ default:
+ return nil
+ }
+}
+
// PoolStats returns accumulated connection pool stats.
func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats
- nodes, err := c.nodes.All()
- if err != nil {
+ state := c.state()
+ if state == nil {
return &acc
}
- for _, node := range nodes {
+ for _, node := range state.masters {
+ s := node.Client.connPool.Stats()
+ acc.Requests += s.Requests
+ acc.Hits += s.Hits
+ acc.Timeouts += s.Timeouts
+ acc.TotalConns += s.TotalConns
+ acc.FreeConns += s.FreeConns
+ }
+
+ for _, node := range state.slaves {
s := node.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits
@@ -629,33 +750,42 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
}
+
return &acc
}
-func (c *ClusterClient) lazyReloadSlots() {
+func (c *ClusterClient) lazyReloadState() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return
}
go func() {
- for i := 0; i < 1000; i++ {
- state, err := c.reloadSlots()
+ defer atomic.StoreUint32(&c.reloading, 0)
+
+ var state *clusterState
+ for {
+ var err error
+ state, err = c.reloadState()
if err == pool.ErrClosed {
- break
+ return
}
- if err == nil {
- c._state.Store(state)
- break
+
+ if err != nil {
+ time.Sleep(time.Millisecond)
+ continue
}
- time.Sleep(time.Millisecond)
+
+ c._state.Store(state)
+ break
}
time.Sleep(3 * time.Second)
- atomic.StoreUint32(&c.reloading, 0)
+ c.nodes.GC(state.generation)
}()
}
-func (c *ClusterClient) reloadSlots() (*clusterState, error) {
+// Not thread-safe.
+func (c *ClusterClient) reloadState() (*clusterState, error) {
node, err := c.nodes.Random()
if err != nil {
return nil, err
@@ -720,14 +850,14 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
- cn, _, err := node.Client.conn()
+ cn, _, err := node.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
continue
}
err = c.pipelineProcessCmds(cn, cmds, failedCmds)
- node.Client.putConn(cn, err)
+ node.Client.releaseConn(cn, err)
}
if len(failedCmds) == 0 {
@@ -799,9 +929,9 @@ func (c *ClusterClient) pipelineReadCmds(
func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error {
moved, ask, addr := internal.IsMovedError(cmd.Err())
if moved {
- c.lazyReloadSlots()
+ c.lazyReloadState()
- node, err := c.nodes.Get(addr)
+ node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
@@ -809,7 +939,7 @@ func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]C
failedCmds[node] = append(failedCmds[node], cmd)
}
if ask {
- node, err := c.nodes.Get(addr)
+ node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
@@ -855,14 +985,14 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
- cn, _, err := node.Client.conn()
+ cn, _, err := node.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
continue
}
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
- node.Client.putConn(cn, err)
+ node.Client.releaseConn(cn, err)
}
if len(failedCmds) == 0 {
@@ -966,6 +1096,56 @@ func (c *ClusterClient) txPipelineReadQueued(
return firstErr
}
+func (c *ClusterClient) pubSub(channels []string) *PubSub {
+ opt := c.opt.clientOptions()
+
+ var node *clusterNode
+ return &PubSub{
+ opt: opt,
+
+ newConn: func(channels []string) (*pool.Conn, error) {
+ if node == nil {
+ var slot int
+ if len(channels) > 0 {
+ slot = hashtag.Slot(channels[0])
+ } else {
+ slot = -1
+ }
+
+ masterNode, err := c.state().slotMasterNode(slot)
+ if err != nil {
+ return nil, err
+ }
+ node = masterNode
+ }
+ return node.Client.newConn()
+ },
+ closeConn: func(cn *pool.Conn) error {
+ return node.Client.connPool.CloseConn(cn)
+ },
+ }
+}
+
+// Subscribe subscribes the client to the specified channels.
+// Channels can be omitted to create empty subscription.
+func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
+ pubsub := c.pubSub(channels)
+ if len(channels) > 0 {
+ _ = pubsub.Subscribe(channels...)
+ }
+ return pubsub
+}
+
+// PSubscribe subscribes the client to the given patterns.
+// Patterns can be omitted to create empty subscription.
+func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
+ pubsub := c.pubSub(channels)
+ if len(channels) > 0 {
+ _ = pubsub.PSubscribe(channels...)
+ }
+ return pubsub
+}
+
func isLoopbackAddr(addr string) bool {
host, _, err := net.SplitHostPort(addr)
if err != nil {
@@ -979,3 +1159,12 @@ func isLoopbackAddr(addr string) bool {
return ip.IsLoopback()
}
+
+func appendNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
+ for _, n := range nodes {
+ if n == node {
+ return nodes
+ }
+ }
+ return append(nodes, node)
+}