diff options
Diffstat (limited to 'vendor/github.com/go-redis/redis/cluster.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/cluster.go | 366 |
1 files changed, 219 insertions, 147 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index 647a25be3..c81fc1d57 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -14,8 +14,8 @@ import ( "github.com/go-redis/redis/internal/proto" ) -var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes") -var errNilClusterState = internal.RedisError("redis: cannot load cluster slots") +var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") +var errNilClusterState = fmt.Errorf("redis: cannot load cluster slots") // ClusterOptions are used to configure a cluster client and should be // passed to NewClusterClient. @@ -64,6 +64,19 @@ func (opt *ClusterOptions) init() { opt.ReadOnly = true } + switch opt.ReadTimeout { + case -1: + opt.ReadTimeout = 0 + case 0: + opt.ReadTimeout = 3 * time.Second + } + switch opt.WriteTimeout { + case -1: + opt.WriteTimeout = 0 + case 0: + opt.WriteTimeout = opt.ReadTimeout + } + switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 @@ -192,6 +205,21 @@ func (c *clusterNodes) Close() error { return firstErr } +func (c *clusterNodes) Addrs() ([]string, error) { + c.mu.RLock() + closed := c.closed + addrs := c.addrs + c.mu.RUnlock() + + if closed { + return nil, pool.ErrClosed + } + if len(addrs) == 0 { + return nil, errClusterNoNodes + } + return addrs, nil +} + func (c *clusterNodes) NextGeneration() uint32 { c.generation++ return c.generation @@ -272,16 +300,9 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { } func (c *clusterNodes) Random() (*clusterNode, error) { - c.mu.RLock() - closed := c.closed - addrs := c.addrs - c.mu.RUnlock() - - if closed { - return nil, pool.ErrClosed - } - if len(addrs) == 0 { - return nil, errClusterNoNodes + addrs, err := c.Addrs() + if err != nil { + return nil, err } var nodeErr error @@ -468,13 +489,23 @@ func (c *ClusterClient) Options() *ClusterOptions { return c.opt } -func (c *ClusterClient) state() *clusterState { +func (c *ClusterClient) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + +func (c *ClusterClient) state() (*clusterState, error) { v := c._state.Load() if v != nil { - return v.(*clusterState) + return v.(*clusterState), nil } + + _, err := c.nodes.Addrs() + if err != nil { + return nil, err + } + c.lazyReloadState() - return nil + return nil, errNilClusterState } func (c *ClusterClient) cmdInfo(name string) *CommandInfo { @@ -495,17 +526,22 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo { if err != nil { return nil } - return c.cmdsInfo[name] + info := c.cmdsInfo[name] + if info == nil { + internal.Logf("info for cmd=%s not found", name) + } + return info } -func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { - if state == nil { - node, err := c.nodes.Random() - return 0, node, err - } +func (c *ClusterClient) cmdSlot(cmd Cmder) int { + cmdInfo := c.cmdInfo(cmd.Name()) + firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) + return hashtag.Slot(firstKey) +} +func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { cmdInfo := c.cmdInfo(cmd.Name()) - firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo)) + firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) slot := hashtag.Slot(firstKey) if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly { @@ -523,19 +559,51 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl } func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { - state := c.state() + if len(keys) == 0 { + return fmt.Errorf("redis: keys don't hash to the same slot") + } - var node *clusterNode - var err error - if state != nil && len(keys) > 0 { - node, err = state.slotMasterNode(hashtag.Slot(keys[0])) - } else { - node, err = c.nodes.Random() + state, err := c.state() + if err != nil { + return err + } + + slot := hashtag.Slot(keys[0]) + for _, key := range keys[1:] { + if hashtag.Slot(key) != slot { + return fmt.Errorf("redis: Watch requires all keys to be in the same slot") + } } + + node, err := state.slotMasterNode(slot) if err != nil { return err } - return node.Client.Watch(fn, keys...) + + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + err = node.Client.Watch(fn, keys...) + if err == nil { + break + } + + moved, ask, addr := internal.IsMovedError(err) + if moved || ask { + c.lazyReloadState() + node, err = c.nodes.GetOrCreate(addr) + if err != nil { + return err + } + continue + } + + return err + } + + return err } // Close closes the cluster client, releasing any open resources. @@ -547,7 +615,13 @@ func (c *ClusterClient) Close() error { } func (c *ClusterClient) Process(cmd Cmder) error { - slot, node, err := c.cmdSlotAndNode(c.state(), cmd) + state, err := c.state() + if err != nil { + cmd.setErr(err) + return err + } + + _, node, err := c.cmdSlotAndNode(state, cmd) if err != nil { cmd.setErr(err) return err @@ -556,7 +630,7 @@ func (c *ClusterClient) Process(cmd Cmder) error { var ask bool for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { if attempt > 0 { - time.Sleep(node.Client.retryBackoff(attempt)) + time.Sleep(c.retryBackoff(attempt)) } if ask { @@ -572,7 +646,7 @@ func (c *ClusterClient) Process(cmd Cmder) error { // If there is no error - we are done. if err == nil { - return nil + break } // If slave is loading - read from master. @@ -582,12 +656,11 @@ func (c *ClusterClient) Process(cmd Cmder) error { continue } - // On network errors try random node. - if internal.IsRetryableError(err) || internal.IsClusterDownError(err) { - node, err = c.nodes.Random() - if err != nil { - cmd.setErr(err) - return err + if internal.IsRetryableError(err, true) { + var nodeErr error + node, nodeErr = c.nodes.Random() + if nodeErr != nil { + break } continue } @@ -596,20 +669,13 @@ func (c *ClusterClient) Process(cmd Cmder) error { var addr string moved, ask, addr = internal.IsMovedError(err) if moved || ask { - state := c.state() - if state != nil && slot >= 0 { - master, _ := state.slotMasterNode(slot) - if moved && (master == nil || master.Client.getAddr() != addr) { - c.lazyReloadState() - } - } + c.lazyReloadState() - node, err = c.nodes.GetOrCreate(addr) - if err != nil { - cmd.setErr(err) - return err + var nodeErr error + node, nodeErr = c.nodes.GetOrCreate(addr) + if nodeErr != nil { + break } - continue } @@ -622,9 +688,9 @@ func (c *ClusterClient) Process(cmd Cmder) error { // ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { - state := c.state() - if state == nil { - return errNilClusterState + state, err := c.state() + if err != nil { + return err } var wg sync.WaitGroup @@ -655,9 +721,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { // ForEachSlave concurrently calls the fn on each slave node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { - state := c.state() - if state == nil { - return errNilClusterState + state, err := c.state() + if err != nil { + return err } var wg sync.WaitGroup @@ -688,9 +754,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { // ForEachNode concurrently calls the fn on each known node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - state := c.state() - if state == nil { - return errNilClusterState + state, err := c.state() + if err != nil { + return err } var wg sync.WaitGroup @@ -728,27 +794,31 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { func (c *ClusterClient) PoolStats() *PoolStats { var acc PoolStats - state := c.state() + state, _ := c.state() if state == nil { return &acc } for _, node := range state.masters { s := node.Client.connPool.Stats() - acc.Requests += s.Requests acc.Hits += s.Hits + acc.Misses += s.Misses acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns + acc.StaleConns += s.StaleConns } for _, node := range state.slaves { s := node.Client.connPool.Stats() - acc.Requests += s.Requests acc.Hits += s.Hits + acc.Misses += s.Misses acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns + acc.StaleConns += s.StaleConns } return &acc @@ -762,10 +832,8 @@ func (c *ClusterClient) lazyReloadState() { go func() { defer atomic.StoreUint32(&c.reloading, 0) - var state *clusterState for { - var err error - state, err = c.reloadState() + state, err := c.reloadState() if err == pool.ErrClosed { return } @@ -776,11 +844,10 @@ func (c *ClusterClient) lazyReloadState() { } c._state.Store(state) + time.Sleep(5 * time.Second) + c.nodes.GC(state.generation) break } - - time.Sleep(3 * time.Second) - c.nodes.GC(state.generation) }() } @@ -810,21 +877,12 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { break } - var n int for _, node := range nodes { - nn, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() + _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() if err != nil { internal.Logf("ReapStaleConns failed: %s", err) - } else { - n += nn } } - - s := c.PoolStats() - internal.Logf( - "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)", - n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts, - ) } } @@ -837,16 +895,21 @@ func (c *ClusterClient) Pipeline() Pipeliner { } func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.Pipeline().pipelined(fn) + return c.Pipeline().Pipelined(fn) } func (c *ClusterClient) pipelineExec(cmds []Cmder) error { cmdsMap, err := c.mapCmdsByNode(cmds) if err != nil { + setCmdsErr(cmds, err) return err } - for i := 0; i <= c.opt.MaxRedirects; i++ { + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { @@ -856,8 +919,12 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { continue } - err = c.pipelineProcessCmds(cn, cmds, failedCmds) - node.Client.releaseConn(cn, err) + err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) + if err == nil || internal.IsRedisError(err) { + _ = node.Client.connPool.Put(cn) + } else { + _ = node.Client.connPool.Remove(cn) + } } if len(failedCmds) == 0 { @@ -866,21 +933,20 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { cmdsMap = failedCmds } - var firstErr error - for _, cmd := range cmds { - if err := cmd.Err(); err != nil { - firstErr = err - break - } - } - return firstErr + return firstCmdsErr(cmds) } func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { - state := c.state() + state, err := c.state() + if err != nil { + setCmdsErr(cmds, err) + return nil, err + } + cmdsMap := make(map[*clusterNode][]Cmder) for _, cmd := range cmds { - _, node, err := c.cmdSlotAndNode(state, cmd) + slot := c.cmdSlot(cmd) + node, err := state.slotMasterNode(slot) if err != nil { return nil, err } @@ -890,11 +956,12 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e } func (c *ClusterClient) pipelineProcessCmds( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { cn.SetWriteTimeout(c.opt.WriteTimeout) if err := writeCmd(cn, cmds...); err != nil { setCmdsErr(cmds, err) + failedCmds[node] = cmds return err } @@ -907,46 +974,53 @@ func (c *ClusterClient) pipelineProcessCmds( func (c *ClusterClient) pipelineReadCmds( cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - var firstErr error for _, cmd := range cmds { err := cmd.readReply(cn) if err == nil { continue } - if firstErr == nil { - firstErr = err + if c.checkMovedErr(cmd, err, failedCmds) { + continue } - err = c.checkMovedErr(cmd, failedCmds) - if err != nil && firstErr == nil { - firstErr = err + if internal.IsRedisError(err) { + continue } + + return err } - return firstErr + return nil } -func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error { - moved, ask, addr := internal.IsMovedError(cmd.Err()) +func (c *ClusterClient) checkMovedErr( + cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder, +) bool { + moved, ask, addr := internal.IsMovedError(err) + if moved { c.lazyReloadState() node, err := c.nodes.GetOrCreate(addr) if err != nil { - return err + return false } failedCmds[node] = append(failedCmds[node], cmd) + return true } + if ask { node, err := c.nodes.GetOrCreate(addr) if err != nil { - return err + return false } failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) + return true } - return nil + + return false } // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. @@ -959,29 +1033,29 @@ func (c *ClusterClient) TxPipeline() Pipeliner { } func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { - return c.TxPipeline().pipelined(fn) + return c.TxPipeline().Pipelined(fn) } func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { - cmdsMap, err := c.mapCmdsBySlot(cmds) + state, err := c.state() if err != nil { return err } - state := c.state() - if state == nil { - return errNilClusterState - } - + cmdsMap := c.mapCmdsBySlot(cmds) for slot, cmds := range cmdsMap { node, err := state.slotMasterNode(slot) if err != nil { setCmdsErr(cmds, err) continue } - cmdsMap := map[*clusterNode][]Cmder{node: cmds} - for i := 0; i <= c.opt.MaxRedirects; i++ { + + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { @@ -992,7 +1066,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { } err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - node.Client.releaseConn(cn, err) + if err == nil || internal.IsRedisError(err) { + _ = node.Client.connPool.Put(cn) + } else { + _ = node.Client.connPool.Remove(cn) + } } if len(failedCmds) == 0 { @@ -1002,27 +1080,16 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { } } - var firstErr error - for _, cmd := range cmds { - if err := cmd.Err(); err != nil { - firstErr = err - break - } - } - return firstErr + return firstCmdsErr(cmds) } -func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) (map[int][]Cmder, error) { - state := c.state() +func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { cmdsMap := make(map[int][]Cmder) for _, cmd := range cmds { - slot, _, err := c.cmdSlotAndNode(state, cmd) - if err != nil { - return nil, err - } + slot := c.cmdSlot(cmd) cmdsMap[slot] = append(cmdsMap[slot], cmd) } - return cmdsMap, nil + return cmdsMap } func (c *ClusterClient) txPipelineProcessCmds( @@ -1039,22 +1106,20 @@ func (c *ClusterClient) txPipelineProcessCmds( cn.SetReadTimeout(c.opt.ReadTimeout) if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil { + setCmdsErr(cmds, err) return err } - _, err := pipelineReadCmds(cn, cmds) - return err + return pipelineReadCmds(cn, cmds) } func (c *ClusterClient) txPipelineReadQueued( cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - var firstErr error - // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil && firstErr == nil { - firstErr = err + if err := statusCmd.readReply(cn); err != nil { + return err } for _, cmd := range cmds { @@ -1063,15 +1128,11 @@ func (c *ClusterClient) txPipelineReadQueued( continue } - cmd.setErr(err) - if firstErr == nil { - firstErr = err + if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) { + continue } - err = c.checkMovedErr(cmd, failedCmds) - if err != nil && firstErr == nil { - firstErr = err - } + return err } // Parse number of replies. @@ -1085,7 +1146,13 @@ func (c *ClusterClient) txPipelineReadQueued( switch line[0] { case proto.ErrorReply: - return proto.ParseErrorReply(line) + err := proto.ParseErrorReply(line) + for _, cmd := range cmds { + if !c.checkMovedErr(cmd, err, failedCmds) { + break + } + } + return err case proto.ArrayReply: // ok default: @@ -1093,7 +1160,7 @@ func (c *ClusterClient) txPipelineReadQueued( return err } - return firstErr + return nil } func (c *ClusterClient) pubSub(channels []string) *PubSub { @@ -1112,7 +1179,12 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub { slot = -1 } - masterNode, err := c.state().slotMasterNode(slot) + state, err := c.state() + if err != nil { + return nil, err + } + + masterNode, err := state.slotMasterNode(slot) if err != nil { return nil, err } |