From 96eab1202717e073782ec399a4e0820cae15b1bb Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Thu, 17 Aug 2017 17:19:06 -0700 Subject: Updating server dependancies. (#7246) --- vendor/github.com/go-redis/redis/cluster.go | 321 ++++++++++++++++++++++------ 1 file changed, 255 insertions(+), 66 deletions(-) (limited to 'vendor/github.com/go-redis/redis/cluster.go') diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index f758b01b9..647a25be3 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -28,18 +28,19 @@ type ClusterOptions struct { // Default is 16. MaxRedirects int - // Enables read queries for a connection to a Redis Cluster slave node. + // Enables read-only commands on slave nodes. ReadOnly bool - - // Enables routing read-only queries to the closest master or slave node. + // Allows routing read-only commands to the closest master or slave node. RouteByLatency bool // Following options are copied from Options struct. OnConnect func(*Conn) error - MaxRetries int - Password string + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration + Password string DialTimeout time.Duration ReadTimeout time.Duration @@ -62,6 +63,19 @@ func (opt *ClusterOptions) init() { if opt.RouteByLatency { opt.ReadOnly = true } + + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } + switch opt.MaxRetryBackoff { + case -1: + opt.MaxRetryBackoff = 0 + case 0: + opt.MaxRetryBackoff = 512 * time.Millisecond + } } func (opt *ClusterOptions) clientOptions() *Options { @@ -70,9 +84,11 @@ func (opt *ClusterOptions) clientOptions() *Options { return &Options{ OnConnect: opt.OnConnect, - MaxRetries: opt.MaxRetries, - Password: opt.Password, - ReadOnly: opt.ReadOnly, + MaxRetries: opt.MaxRetries, + MinRetryBackoff: opt.MinRetryBackoff, + MaxRetryBackoff: opt.MaxRetryBackoff, + Password: opt.Password, + readOnly: opt.ReadOnly, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, @@ -91,7 +107,9 @@ func (opt *ClusterOptions) clientOptions() *Options { type clusterNode struct { Client *Client Latency time.Duration - loading time.Time + + loading time.Time + generation uint32 } func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { @@ -122,6 +140,17 @@ func (n *clusterNode) Loading() bool { return !n.loading.IsZero() && time.Since(n.loading) < time.Minute } +func (n *clusterNode) Generation() uint32 { + return n.generation +} + +func (n *clusterNode) SetGeneration(gen uint32) { + if gen < n.generation { + panic("gen < n.generation") + } + n.generation = gen +} + //------------------------------------------------------------------------------ type clusterNodes struct { @@ -131,6 +160,8 @@ type clusterNodes struct { addrs []string nodes map[string]*clusterNode closed bool + + generation uint32 } func newClusterNodes(opt *ClusterOptions) *clusterNodes { @@ -161,6 +192,39 @@ func (c *clusterNodes) Close() error { return firstErr } +func (c *clusterNodes) NextGeneration() uint32 { + c.generation++ + return c.generation +} + +// GC removes unused nodes. +func (c *clusterNodes) GC(generation uint32) error { + var collected []*clusterNode + c.mu.Lock() + for i := 0; i < len(c.addrs); { + addr := c.addrs[i] + node := c.nodes[addr] + if node.Generation() >= generation { + i++ + continue + } + + c.addrs = append(c.addrs[:i], c.addrs[i+1:]...) + delete(c.nodes, addr) + collected = append(collected, node) + } + c.mu.Unlock() + + var firstErr error + for _, node := range collected { + if err := node.Client.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + + return firstErr +} + func (c *clusterNodes) All() ([]*clusterNode, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -176,7 +240,7 @@ func (c *clusterNodes) All() ([]*clusterNode, error) { return nodes, nil } -func (c *clusterNodes) Get(addr string) (*clusterNode, error) { +func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { var node *clusterNode var ok bool @@ -223,7 +287,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) { var nodeErr error for i := 0; i <= c.opt.MaxRedirects; i++ { n := rand.Intn(len(addrs)) - node, err := c.Get(addrs[n]) + node, err := c.GetOrCreate(addrs[n]) if err != nil { return nil, err } @@ -239,30 +303,45 @@ func (c *clusterNodes) Random() (*clusterNode, error) { //------------------------------------------------------------------------------ type clusterState struct { - nodes *clusterNodes + nodes *clusterNodes + masters []*clusterNode + slaves []*clusterNode + slots [][]*clusterNode + + generation uint32 } func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*clusterState, error) { c := clusterState{ - nodes: nodes, + nodes: nodes, + generation: nodes.NextGeneration(), + slots: make([][]*clusterNode, hashtag.SlotNumber), } isLoopbackOrigin := isLoopbackAddr(origin) for _, slot := range slots { var nodes []*clusterNode - for _, slotNode := range slot.Nodes { + for i, slotNode := range slot.Nodes { addr := slotNode.Addr if !isLoopbackOrigin && isLoopbackAddr(addr) { addr = origin } - node, err := c.nodes.Get(addr) + node, err := c.nodes.GetOrCreate(addr) if err != nil { return nil, err } + + node.SetGeneration(c.generation) nodes = append(nodes, node) + + if i == 0 { + c.masters = appendNode(c.masters, node) + } else { + c.slaves = appendNode(c.slaves, node) + } } for i := slot.Start; i <= slot.End; i++ { @@ -327,7 +406,7 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { } func (c *clusterState) slotNodes(slot int) []*clusterNode { - if slot < len(c.slots) { + if slot >= 0 && slot < len(c.slots) { return c.slots[slot] } return nil @@ -348,7 +427,7 @@ type ClusterClient struct { cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo - // Reports where slots reloading is in progress. + // Reports whether slots reloading is in progress. reloading uint32 } @@ -365,12 +444,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { // Add initial nodes. for _, addr := range opt.Addrs { - _, _ = c.nodes.Get(addr) + _, _ = c.nodes.GetOrCreate(addr) } // Preload cluster slots. for i := 0; i < 10; i++ { - state, err := c.reloadSlots() + state, err := c.reloadState() if err == nil { c._state.Store(state) break @@ -394,7 +473,7 @@ func (c *ClusterClient) state() *clusterState { if v != nil { return v.(*clusterState) } - c.lazyReloadSlots() + c.lazyReloadState() return nil } @@ -476,6 +555,10 @@ func (c *ClusterClient) Process(cmd Cmder) error { var ask bool for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(node.Client.retryBackoff(attempt)) + } + if ask { pipe := node.Client.Pipeline() pipe.Process(NewCmd("ASKING")) @@ -487,19 +570,20 @@ func (c *ClusterClient) Process(cmd Cmder) error { err = node.Client.Process(cmd) } - // If there is no (real) error - we are done. + // If there is no error - we are done. if err == nil { return nil } // If slave is loading - read from master. if c.opt.ReadOnly && internal.IsLoadingError(err) { + // TODO: race node.loading = time.Now() continue } // On network errors try random node. - if internal.IsRetryableError(err) { + if internal.IsRetryableError(err) || internal.IsClusterDownError(err) { node, err = c.nodes.Random() if err != nil { cmd.setErr(err) @@ -516,11 +600,11 @@ func (c *ClusterClient) Process(cmd Cmder) error { if state != nil && slot >= 0 { master, _ := state.slotMasterNode(slot) if moved && (master == nil || master.Client.getAddr() != addr) { - c.lazyReloadSlots() + c.lazyReloadState() } } - node, err = c.nodes.Get(addr) + node, err = c.nodes.GetOrCreate(addr) if err != nil { cmd.setErr(err) return err @@ -535,17 +619,17 @@ func (c *ClusterClient) Process(cmd Cmder) error { return cmd.Err() } -// ForEachNode concurrently calls the fn on each ever known node in the cluster. +// ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. -func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - nodes, err := c.nodes.All() - if err != nil { - return err +func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { + state := c.state() + if state == nil { + return errNilClusterState } var wg sync.WaitGroup errCh := make(chan error, 1) - for _, node := range nodes { + for _, master := range state.masters { wg.Add(1) go func(node *clusterNode) { defer wg.Done() @@ -556,7 +640,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { default: } } - }(node) + }(master) } wg.Wait() @@ -568,28 +652,17 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { } } -// ForEachMaster concurrently calls the fn on each master node in the cluster. +// ForEachSlave concurrently calls the fn on each slave node in the cluster. // It returns the first error if any. -func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { +func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { state := c.state() if state == nil { return errNilClusterState } var wg sync.WaitGroup - visited := make(map[*clusterNode]struct{}) errCh := make(chan error, 1) - for _, nodes := range state.slots { - if len(nodes) == 0 { - continue - } - - master := nodes[0] - if _, ok := visited[master]; ok { - continue - } - visited[master] = struct{}{} - + for _, slave := range state.slaves { wg.Add(1) go func(node *clusterNode) { defer wg.Done() @@ -600,7 +673,7 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { default: } } - }(master) + }(slave) } wg.Wait() @@ -612,16 +685,64 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { } } +// ForEachNode concurrently calls the fn on each known node in the cluster. +// It returns the first error if any. +func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { + state := c.state() + if state == nil { + return errNilClusterState + } + + var wg sync.WaitGroup + errCh := make(chan error, 1) + worker := func(node *clusterNode) { + defer wg.Done() + err := fn(node.Client) + if err != nil { + select { + case errCh <- err: + default: + } + } + } + + for _, node := range state.masters { + wg.Add(1) + go worker(node) + } + for _, node := range state.slaves { + wg.Add(1) + go worker(node) + } + + wg.Wait() + select { + case err := <-errCh: + return err + default: + return nil + } +} + // PoolStats returns accumulated connection pool stats. func (c *ClusterClient) PoolStats() *PoolStats { var acc PoolStats - nodes, err := c.nodes.All() - if err != nil { + state := c.state() + if state == nil { return &acc } - for _, node := range nodes { + for _, node := range state.masters { + s := node.Client.connPool.Stats() + acc.Requests += s.Requests + acc.Hits += s.Hits + acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns + acc.FreeConns += s.FreeConns + } + + for _, node := range state.slaves { s := node.Client.connPool.Stats() acc.Requests += s.Requests acc.Hits += s.Hits @@ -629,33 +750,42 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns } + return &acc } -func (c *ClusterClient) lazyReloadSlots() { +func (c *ClusterClient) lazyReloadState() { if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { return } go func() { - for i := 0; i < 1000; i++ { - state, err := c.reloadSlots() + defer atomic.StoreUint32(&c.reloading, 0) + + var state *clusterState + for { + var err error + state, err = c.reloadState() if err == pool.ErrClosed { - break + return } - if err == nil { - c._state.Store(state) - break + + if err != nil { + time.Sleep(time.Millisecond) + continue } - time.Sleep(time.Millisecond) + + c._state.Store(state) + break } time.Sleep(3 * time.Second) - atomic.StoreUint32(&c.reloading, 0) + c.nodes.GC(state.generation) }() } -func (c *ClusterClient) reloadSlots() (*clusterState, error) { +// Not thread-safe. +func (c *ClusterClient) reloadState() (*clusterState, error) { node, err := c.nodes.Random() if err != nil { return nil, err @@ -720,14 +850,14 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.conn() + cn, _, err := node.Client.getConn() if err != nil { setCmdsErr(cmds, err) continue } err = c.pipelineProcessCmds(cn, cmds, failedCmds) - node.Client.putConn(cn, err) + node.Client.releaseConn(cn, err) } if len(failedCmds) == 0 { @@ -799,9 +929,9 @@ func (c *ClusterClient) pipelineReadCmds( func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error { moved, ask, addr := internal.IsMovedError(cmd.Err()) if moved { - c.lazyReloadSlots() + c.lazyReloadState() - node, err := c.nodes.Get(addr) + node, err := c.nodes.GetOrCreate(addr) if err != nil { return err } @@ -809,7 +939,7 @@ func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]C failedCmds[node] = append(failedCmds[node], cmd) } if ask { - node, err := c.nodes.Get(addr) + node, err := c.nodes.GetOrCreate(addr) if err != nil { return err } @@ -855,14 +985,14 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.conn() + cn, _, err := node.Client.getConn() if err != nil { setCmdsErr(cmds, err) continue } err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - node.Client.putConn(cn, err) + node.Client.releaseConn(cn, err) } if len(failedCmds) == 0 { @@ -966,6 +1096,56 @@ func (c *ClusterClient) txPipelineReadQueued( return firstErr } +func (c *ClusterClient) pubSub(channels []string) *PubSub { + opt := c.opt.clientOptions() + + var node *clusterNode + return &PubSub{ + opt: opt, + + newConn: func(channels []string) (*pool.Conn, error) { + if node == nil { + var slot int + if len(channels) > 0 { + slot = hashtag.Slot(channels[0]) + } else { + slot = -1 + } + + masterNode, err := c.state().slotMasterNode(slot) + if err != nil { + return nil, err + } + node = masterNode + } + return node.Client.newConn() + }, + closeConn: func(cn *pool.Conn) error { + return node.Client.connPool.CloseConn(cn) + }, + } +} + +// Subscribe subscribes the client to the specified channels. +// Channels can be omitted to create empty subscription. +func (c *ClusterClient) Subscribe(channels ...string) *PubSub { + pubsub := c.pubSub(channels) + if len(channels) > 0 { + _ = pubsub.Subscribe(channels...) + } + return pubsub +} + +// PSubscribe subscribes the client to the given patterns. +// Patterns can be omitted to create empty subscription. +func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { + pubsub := c.pubSub(channels) + if len(channels) > 0 { + _ = pubsub.PSubscribe(channels...) + } + return pubsub +} + func isLoopbackAddr(addr string) bool { host, _, err := net.SplitHostPort(addr) if err != nil { @@ -979,3 +1159,12 @@ func isLoopbackAddr(addr string) bool { return ip.IsLoopback() } + +func appendNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { + for _, n := range nodes { + if n == node { + return nodes + } + } + return append(nodes, node) +} -- cgit v1.2.3-1-g7c22