summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/hashicorp/memberlist/state.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/memberlist/state.go')
-rw-r--r--vendor/github.com/hashicorp/memberlist/state.go1165
1 files changed, 1165 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/memberlist/state.go b/vendor/github.com/hashicorp/memberlist/state.go
new file mode 100644
index 000000000..8513361b1
--- /dev/null
+++ b/vendor/github.com/hashicorp/memberlist/state.go
@@ -0,0 +1,1165 @@
+package memberlist
+
+import (
+ "bytes"
+ "fmt"
+ "math"
+ "math/rand"
+ "net"
+ "sync/atomic"
+ "time"
+
+ "github.com/armon/go-metrics"
+)
+
+type nodeStateType int
+
+const (
+ stateAlive nodeStateType = iota
+ stateSuspect
+ stateDead
+)
+
+// Node represents a node in the cluster.
+type Node struct {
+ Name string
+ Addr net.IP
+ Port uint16
+ Meta []byte // Metadata from the delegate for this node.
+ PMin uint8 // Minimum protocol version this understands
+ PMax uint8 // Maximum protocol version this understands
+ PCur uint8 // Current version node is speaking
+ DMin uint8 // Min protocol version for the delegate to understand
+ DMax uint8 // Max protocol version for the delegate to understand
+ DCur uint8 // Current version delegate is speaking
+}
+
+// Address returns the host:port form of a node's address, suitable for use
+// with a transport.
+func (n *Node) Address() string {
+ return joinHostPort(n.Addr.String(), n.Port)
+}
+
+// String returns the node name
+func (n *Node) String() string {
+ return n.Name
+}
+
+// NodeState is used to manage our state view of another node
+type nodeState struct {
+ Node
+ Incarnation uint32 // Last known incarnation number
+ State nodeStateType // Current state
+ StateChange time.Time // Time last state change happened
+}
+
+// Address returns the host:port form of a node's address, suitable for use
+// with a transport.
+func (n *nodeState) Address() string {
+ return n.Node.Address()
+}
+
+// ackHandler is used to register handlers for incoming acks and nacks.
+type ackHandler struct {
+ ackFn func([]byte, time.Time)
+ nackFn func()
+ timer *time.Timer
+}
+
+// NoPingResponseError is used to indicate a 'ping' packet was
+// successfully issued but no response was received
+type NoPingResponseError struct {
+ node string
+}
+
+func (f NoPingResponseError) Error() string {
+ return fmt.Sprintf("No response from node %s", f.node)
+}
+
+// Schedule is used to ensure the Tick is performed periodically. This
+// function is safe to call multiple times. If the memberlist is already
+// scheduled, then it won't do anything.
+func (m *Memberlist) schedule() {
+ m.tickerLock.Lock()
+ defer m.tickerLock.Unlock()
+
+ // If we already have tickers, then don't do anything, since we're
+ // scheduled
+ if len(m.tickers) > 0 {
+ return
+ }
+
+ // Create the stop tick channel, a blocking channel. We close this
+ // when we should stop the tickers.
+ stopCh := make(chan struct{})
+
+ // Create a new probeTicker
+ if m.config.ProbeInterval > 0 {
+ t := time.NewTicker(m.config.ProbeInterval)
+ go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
+ m.tickers = append(m.tickers, t)
+ }
+
+ // Create a push pull ticker if needed
+ if m.config.PushPullInterval > 0 {
+ go m.pushPullTrigger(stopCh)
+ }
+
+ // Create a gossip ticker if needed
+ if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
+ t := time.NewTicker(m.config.GossipInterval)
+ go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
+ m.tickers = append(m.tickers, t)
+ }
+
+ // If we made any tickers, then record the stopTick channel for
+ // later.
+ if len(m.tickers) > 0 {
+ m.stopTick = stopCh
+ }
+}
+
+// triggerFunc is used to trigger a function call each time a
+// message is received until a stop tick arrives.
+func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
+ // Use a random stagger to avoid syncronizing
+ randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger))
+ select {
+ case <-time.After(randStagger):
+ case <-stop:
+ return
+ }
+ for {
+ select {
+ case <-C:
+ f()
+ case <-stop:
+ return
+ }
+ }
+}
+
+// pushPullTrigger is used to periodically trigger a push/pull until
+// a stop tick arrives. We don't use triggerFunc since the push/pull
+// timer is dynamically scaled based on cluster size to avoid network
+// saturation
+func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
+ interval := m.config.PushPullInterval
+
+ // Use a random stagger to avoid syncronizing
+ randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
+ select {
+ case <-time.After(randStagger):
+ case <-stop:
+ return
+ }
+
+ // Tick using a dynamic timer
+ for {
+ tickTime := pushPullScale(interval, m.estNumNodes())
+ select {
+ case <-time.After(tickTime):
+ m.pushPull()
+ case <-stop:
+ return
+ }
+ }
+}
+
+// Deschedule is used to stop the background maintenance. This is safe
+// to call multiple times.
+func (m *Memberlist) deschedule() {
+ m.tickerLock.Lock()
+ defer m.tickerLock.Unlock()
+
+ // If we have no tickers, then we aren't scheduled.
+ if len(m.tickers) == 0 {
+ return
+ }
+
+ // Close the stop channel so all the ticker listeners stop.
+ close(m.stopTick)
+
+ // Explicitly stop all the tickers themselves so they don't take
+ // up any more resources, and get rid of the list.
+ for _, t := range m.tickers {
+ t.Stop()
+ }
+ m.tickers = nil
+}
+
+// Tick is used to perform a single round of failure detection and gossip
+func (m *Memberlist) probe() {
+ // Track the number of indexes we've considered probing
+ numCheck := 0
+START:
+ m.nodeLock.RLock()
+
+ // Make sure we don't wrap around infinitely
+ if numCheck >= len(m.nodes) {
+ m.nodeLock.RUnlock()
+ return
+ }
+
+ // Handle the wrap around case
+ if m.probeIndex >= len(m.nodes) {
+ m.nodeLock.RUnlock()
+ m.resetNodes()
+ m.probeIndex = 0
+ numCheck++
+ goto START
+ }
+
+ // Determine if we should probe this node
+ skip := false
+ var node nodeState
+
+ node = *m.nodes[m.probeIndex]
+ if node.Name == m.config.Name {
+ skip = true
+ } else if node.State == stateDead {
+ skip = true
+ }
+
+ // Potentially skip
+ m.nodeLock.RUnlock()
+ m.probeIndex++
+ if skip {
+ numCheck++
+ goto START
+ }
+
+ // Probe the specific node
+ m.probeNode(&node)
+}
+
+// probeNode handles a single round of failure checking on a node.
+func (m *Memberlist) probeNode(node *nodeState) {
+ defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
+
+ // We use our health awareness to scale the overall probe interval, so we
+ // slow down if we detect problems. The ticker that calls us can handle
+ // us running over the base interval, and will skip missed ticks.
+ probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
+ if probeInterval > m.config.ProbeInterval {
+ metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
+ }
+
+ // Prepare a ping message and setup an ack handler.
+ ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name}
+ ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
+ nackCh := make(chan struct{}, m.config.IndirectChecks+1)
+ m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
+
+ // Send a ping to the node. If this node looks like it's suspect or dead,
+ // also tack on a suspect message so that it has a chance to refute as
+ // soon as possible.
+ deadline := time.Now().Add(probeInterval)
+ addr := node.Address()
+ if node.State == stateAlive {
+ if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
+ m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
+ return
+ }
+ } else {
+ var msgs [][]byte
+ if buf, err := encode(pingMsg, &ping); err != nil {
+ m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
+ return
+ } else {
+ msgs = append(msgs, buf.Bytes())
+ }
+ s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
+ if buf, err := encode(suspectMsg, &s); err != nil {
+ m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
+ return
+ } else {
+ msgs = append(msgs, buf.Bytes())
+ }
+
+ compound := makeCompoundMessage(msgs)
+ if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
+ m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
+ return
+ }
+ }
+
+ // Mark the sent time here, which should be after any pre-processing and
+ // system calls to do the actual send. This probably under-reports a bit,
+ // but it's the best we can do.
+ sent := time.Now()
+
+ // Arrange for our self-awareness to get updated. At this point we've
+ // sent the ping, so any return statement means the probe succeeded
+ // which will improve our health until we get to the failure scenarios
+ // at the end of this function, which will alter this delta variable
+ // accordingly.
+ awarenessDelta := -1
+ defer func() {
+ m.awareness.ApplyDelta(awarenessDelta)
+ }()
+
+ // Wait for response or round-trip-time.
+ select {
+ case v := <-ackCh:
+ if v.Complete == true {
+ if m.config.Ping != nil {
+ rtt := v.Timestamp.Sub(sent)
+ m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
+ }
+ return
+ }
+
+ // As an edge case, if we get a timeout, we need to re-enqueue it
+ // here to break out of the select below.
+ if v.Complete == false {
+ ackCh <- v
+ }
+ case <-time.After(m.config.ProbeTimeout):
+ // Note that we don't scale this timeout based on awareness and
+ // the health score. That's because we don't really expect waiting
+ // longer to help get UDP through. Since health does extend the
+ // probe interval it will give the TCP fallback more time, which
+ // is more active in dealing with lost packets, and it gives more
+ // time to wait for indirect acks/nacks.
+ m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
+ }
+
+ // Get some random live nodes.
+ m.nodeLock.RLock()
+ kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
+ return n.Name == m.config.Name ||
+ n.Name == node.Name ||
+ n.State != stateAlive
+ })
+ m.nodeLock.RUnlock()
+
+ // Attempt an indirect ping.
+ expectedNacks := 0
+ ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
+ for _, peer := range kNodes {
+ // We only expect nack to be sent from peers who understand
+ // version 4 of the protocol.
+ if ind.Nack = peer.PMax >= 4; ind.Nack {
+ expectedNacks++
+ }
+
+ if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
+ m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
+ }
+ }
+
+ // Also make an attempt to contact the node directly over TCP. This
+ // helps prevent confused clients who get isolated from UDP traffic
+ // but can still speak TCP (which also means they can possibly report
+ // misinformation to other nodes via anti-entropy), avoiding flapping in
+ // the cluster.
+ //
+ // This is a little unusual because we will attempt a TCP ping to any
+ // member who understands version 3 of the protocol, regardless of
+ // which protocol version we are speaking. That's why we've included a
+ // config option to turn this off if desired.
+ fallbackCh := make(chan bool, 1)
+ if (!m.config.DisableTcpPings) && (node.PMax >= 3) {
+ go func() {
+ defer close(fallbackCh)
+ didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
+ if err != nil {
+ m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
+ } else {
+ fallbackCh <- didContact
+ }
+ }()
+ } else {
+ close(fallbackCh)
+ }
+
+ // Wait for the acks or timeout. Note that we don't check the fallback
+ // channel here because we want to issue a warning below if that's the
+ // *only* way we hear back from the peer, so we have to let this time
+ // out first to allow the normal UDP-based acks to come in.
+ select {
+ case v := <-ackCh:
+ if v.Complete == true {
+ return
+ }
+ }
+
+ // Finally, poll the fallback channel. The timeouts are set such that
+ // the channel will have something or be closed without having to wait
+ // any additional time here.
+ for didContact := range fallbackCh {
+ if didContact {
+ m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
+ return
+ }
+ }
+
+ // Update our self-awareness based on the results of this failed probe.
+ // If we don't have peers who will send nacks then we penalize for any
+ // failed probe as a simple health metric. If we do have peers to nack
+ // verify, then we can use that as a more sophisticated measure of self-
+ // health because we assume them to be working, and they can help us
+ // decide if the probed node was really dead or if it was something wrong
+ // with ourselves.
+ awarenessDelta = 0
+ if expectedNacks > 0 {
+ if nackCount := len(nackCh); nackCount < expectedNacks {
+ awarenessDelta += (expectedNacks - nackCount)
+ }
+ } else {
+ awarenessDelta += 1
+ }
+
+ // No acks received from target, suspect it as failed.
+ m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
+ s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
+ m.suspectNode(&s)
+}
+
+// Ping initiates a ping to the node with the specified name.
+func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
+ // Prepare a ping message and setup an ack handler.
+ ping := ping{SeqNo: m.nextSeqNo(), Node: node}
+ ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
+ m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
+
+ // Send a ping to the node.
+ if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil {
+ return 0, err
+ }
+
+ // Mark the sent time here, which should be after any pre-processing and
+ // system calls to do the actual send. This probably under-reports a bit,
+ // but it's the best we can do.
+ sent := time.Now()
+
+ // Wait for response or timeout.
+ select {
+ case v := <-ackCh:
+ if v.Complete == true {
+ return v.Timestamp.Sub(sent), nil
+ }
+ case <-time.After(m.config.ProbeTimeout):
+ // Timeout, return an error below.
+ }
+
+ m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node)
+ return 0, NoPingResponseError{ping.Node}
+}
+
+// resetNodes is used when the tick wraps around. It will reap the
+// dead nodes and shuffle the node list.
+func (m *Memberlist) resetNodes() {
+ m.nodeLock.Lock()
+ defer m.nodeLock.Unlock()
+
+ // Move dead nodes, but respect gossip to the dead interval
+ deadIdx := moveDeadNodes(m.nodes, m.config.GossipToTheDeadTime)
+
+ // Deregister the dead nodes
+ for i := deadIdx; i < len(m.nodes); i++ {
+ delete(m.nodeMap, m.nodes[i].Name)
+ m.nodes[i] = nil
+ }
+
+ // Trim the nodes to exclude the dead nodes
+ m.nodes = m.nodes[0:deadIdx]
+
+ // Update numNodes after we've trimmed the dead nodes
+ atomic.StoreUint32(&m.numNodes, uint32(deadIdx))
+
+ // Shuffle live nodes
+ shuffleNodes(m.nodes)
+}
+
+// gossip is invoked every GossipInterval period to broadcast our gossip
+// messages to a few random nodes.
+func (m *Memberlist) gossip() {
+ defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())
+
+ // Get some random live, suspect, or recently dead nodes
+ m.nodeLock.RLock()
+ kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
+ if n.Name == m.config.Name {
+ return true
+ }
+
+ switch n.State {
+ case stateAlive, stateSuspect:
+ return false
+
+ case stateDead:
+ return time.Since(n.StateChange) > m.config.GossipToTheDeadTime
+
+ default:
+ return true
+ }
+ })
+ m.nodeLock.RUnlock()
+
+ // Compute the bytes available
+ bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead
+ if m.config.EncryptionEnabled() {
+ bytesAvail -= encryptOverhead(m.encryptionVersion())
+ }
+
+ for _, node := range kNodes {
+ // Get any pending broadcasts
+ msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
+ if len(msgs) == 0 {
+ return
+ }
+
+ addr := node.Address()
+ if len(msgs) == 1 {
+ // Send single message as is
+ if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
+ m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
+ }
+ } else {
+ // Otherwise create and send a compound message
+ compound := makeCompoundMessage(msgs)
+ if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
+ m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
+ }
+ }
+ }
+}
+
+// pushPull is invoked periodically to randomly perform a complete state
+// exchange. Used to ensure a high level of convergence, but is also
+// reasonably expensive as the entire state of this node is exchanged
+// with the other node.
+func (m *Memberlist) pushPull() {
+ // Get a random live node
+ m.nodeLock.RLock()
+ nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
+ return n.Name == m.config.Name ||
+ n.State != stateAlive
+ })
+ m.nodeLock.RUnlock()
+
+ // If no nodes, bail
+ if len(nodes) == 0 {
+ return
+ }
+ node := nodes[0]
+
+ // Attempt a push pull
+ if err := m.pushPullNode(node.Address(), false); err != nil {
+ m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
+ }
+}
+
+// pushPullNode does a complete state exchange with a specific node.
+func (m *Memberlist) pushPullNode(addr string, join bool) error {
+ defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
+
+ // Attempt to send and receive with the node
+ remote, userState, err := m.sendAndReceiveState(addr, join)
+ if err != nil {
+ return err
+ }
+
+ if err := m.mergeRemoteState(join, remote, userState); err != nil {
+ return err
+ }
+ return nil
+}
+
+// verifyProtocol verifies that all the remote nodes can speak with our
+// nodes and vice versa on both the core protocol as well as the
+// delegate protocol level.
+//
+// The verification works by finding the maximum minimum and
+// minimum maximum understood protocol and delegate versions. In other words,
+// it finds the common denominator of protocol and delegate version ranges
+// for the entire cluster.
+//
+// After this, it goes through the entire cluster (local and remote) and
+// verifies that everyone's speaking protocol versions satisfy this range.
+// If this passes, it means that every node can understand each other.
+func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {
+ m.nodeLock.RLock()
+ defer m.nodeLock.RUnlock()
+
+ // Maximum minimum understood and minimum maximum understood for both
+ // the protocol and delegate versions. We use this to verify everyone
+ // can be understood.
+ var maxpmin, minpmax uint8
+ var maxdmin, mindmax uint8
+ minpmax = math.MaxUint8
+ mindmax = math.MaxUint8
+
+ for _, rn := range remote {
+ // If the node isn't alive, then skip it
+ if rn.State != stateAlive {
+ continue
+ }
+
+ // Skip nodes that don't have versions set, it just means
+ // their version is zero.
+ if len(rn.Vsn) == 0 {
+ continue
+ }
+
+ if rn.Vsn[0] > maxpmin {
+ maxpmin = rn.Vsn[0]
+ }
+
+ if rn.Vsn[1] < minpmax {
+ minpmax = rn.Vsn[1]
+ }
+
+ if rn.Vsn[3] > maxdmin {
+ maxdmin = rn.Vsn[3]
+ }
+
+ if rn.Vsn[4] < mindmax {
+ mindmax = rn.Vsn[4]
+ }
+ }
+
+ for _, n := range m.nodes {
+ // Ignore non-alive nodes
+ if n.State != stateAlive {
+ continue
+ }
+
+ if n.PMin > maxpmin {
+ maxpmin = n.PMin
+ }
+
+ if n.PMax < minpmax {
+ minpmax = n.PMax
+ }
+
+ if n.DMin > maxdmin {
+ maxdmin = n.DMin
+ }
+
+ if n.DMax < mindmax {
+ mindmax = n.DMax
+ }
+ }
+
+ // Now that we definitively know the minimum and maximum understood
+ // version that satisfies the whole cluster, we verify that every
+ // node in the cluster satisifies this.
+ for _, n := range remote {
+ var nPCur, nDCur uint8
+ if len(n.Vsn) > 0 {
+ nPCur = n.Vsn[2]
+ nDCur = n.Vsn[5]
+ }
+
+ if nPCur < maxpmin || nPCur > minpmax {
+ return fmt.Errorf(
+ "Node '%s' protocol version (%d) is incompatible: [%d, %d]",
+ n.Name, nPCur, maxpmin, minpmax)
+ }
+
+ if nDCur < maxdmin || nDCur > mindmax {
+ return fmt.Errorf(
+ "Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
+ n.Name, nDCur, maxdmin, mindmax)
+ }
+ }
+
+ for _, n := range m.nodes {
+ nPCur := n.PCur
+ nDCur := n.DCur
+
+ if nPCur < maxpmin || nPCur > minpmax {
+ return fmt.Errorf(
+ "Node '%s' protocol version (%d) is incompatible: [%d, %d]",
+ n.Name, nPCur, maxpmin, minpmax)
+ }
+
+ if nDCur < maxdmin || nDCur > mindmax {
+ return fmt.Errorf(
+ "Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
+ n.Name, nDCur, maxdmin, mindmax)
+ }
+ }
+
+ return nil
+}
+
+// nextSeqNo returns a usable sequence number in a thread safe way
+func (m *Memberlist) nextSeqNo() uint32 {
+ return atomic.AddUint32(&m.sequenceNum, 1)
+}
+
+// nextIncarnation returns the next incarnation number in a thread safe way
+func (m *Memberlist) nextIncarnation() uint32 {
+ return atomic.AddUint32(&m.incarnation, 1)
+}
+
+// skipIncarnation adds the positive offset to the incarnation number.
+func (m *Memberlist) skipIncarnation(offset uint32) uint32 {
+ return atomic.AddUint32(&m.incarnation, offset)
+}
+
+// estNumNodes is used to get the current estimate of the number of nodes
+func (m *Memberlist) estNumNodes() int {
+ return int(atomic.LoadUint32(&m.numNodes))
+}
+
+type ackMessage struct {
+ Complete bool
+ Payload []byte
+ Timestamp time.Time
+}
+
+// setProbeChannels is used to attach the ackCh to receive a message when an ack
+// with a given sequence number is received. The `complete` field of the message
+// will be false on timeout. Any nack messages will cause an empty struct to be
+// passed to the nackCh, which can be nil if not needed.
+func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) {
+ // Create handler functions for acks and nacks
+ ackFn := func(payload []byte, timestamp time.Time) {
+ select {
+ case ackCh <- ackMessage{true, payload, timestamp}:
+ default:
+ }
+ }
+ nackFn := func() {
+ select {
+ case nackCh <- struct{}{}:
+ default:
+ }
+ }
+
+ // Add the handlers
+ ah := &ackHandler{ackFn, nackFn, nil}
+ m.ackLock.Lock()
+ m.ackHandlers[seqNo] = ah
+ m.ackLock.Unlock()
+
+ // Setup a reaping routing
+ ah.timer = time.AfterFunc(timeout, func() {
+ m.ackLock.Lock()
+ delete(m.ackHandlers, seqNo)
+ m.ackLock.Unlock()
+ select {
+ case ackCh <- ackMessage{false, nil, time.Now()}:
+ default:
+ }
+ })
+}
+
+// setAckHandler is used to attach a handler to be invoked when an ack with a
+// given sequence number is received. If a timeout is reached, the handler is
+// deleted. This is used for indirect pings so does not configure a function
+// for nacks.
+func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) {
+ // Add the handler
+ ah := &ackHandler{ackFn, nil, nil}
+ m.ackLock.Lock()
+ m.ackHandlers[seqNo] = ah
+ m.ackLock.Unlock()
+
+ // Setup a reaping routing
+ ah.timer = time.AfterFunc(timeout, func() {
+ m.ackLock.Lock()
+ delete(m.ackHandlers, seqNo)
+ m.ackLock.Unlock()
+ })
+}
+
+// Invokes an ack handler if any is associated, and reaps the handler immediately
+func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
+ m.ackLock.Lock()
+ ah, ok := m.ackHandlers[ack.SeqNo]
+ delete(m.ackHandlers, ack.SeqNo)
+ m.ackLock.Unlock()
+ if !ok {
+ return
+ }
+ ah.timer.Stop()
+ ah.ackFn(ack.Payload, timestamp)
+}
+
+// Invokes nack handler if any is associated.
+func (m *Memberlist) invokeNackHandler(nack nackResp) {
+ m.ackLock.Lock()
+ ah, ok := m.ackHandlers[nack.SeqNo]
+ m.ackLock.Unlock()
+ if !ok || ah.nackFn == nil {
+ return
+ }
+ ah.nackFn()
+}
+
+// refute gossips an alive message in response to incoming information that we
+// are suspect or dead. It will make sure the incarnation number beats the given
+// accusedInc value, or you can supply 0 to just get the next incarnation number.
+// This alters the node state that's passed in so this MUST be called while the
+// nodeLock is held.
+func (m *Memberlist) refute(me *nodeState, accusedInc uint32) {
+ // Make sure the incarnation number beats the accusation.
+ inc := m.nextIncarnation()
+ if accusedInc >= inc {
+ inc = m.skipIncarnation(accusedInc - inc + 1)
+ }
+ me.Incarnation = inc
+
+ // Decrease our health because we are being asked to refute a problem.
+ m.awareness.ApplyDelta(1)
+
+ // Format and broadcast an alive message.
+ a := alive{
+ Incarnation: inc,
+ Node: me.Name,
+ Addr: me.Addr,
+ Port: me.Port,
+ Meta: me.Meta,
+ Vsn: []uint8{
+ me.PMin, me.PMax, me.PCur,
+ me.DMin, me.DMax, me.DCur,
+ },
+ }
+ m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a)
+}
+
+// aliveNode is invoked by the network layer when we get a message about a
+// live node.
+func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
+ m.nodeLock.Lock()
+ defer m.nodeLock.Unlock()
+ state, ok := m.nodeMap[a.Node]
+
+ // It is possible that during a Leave(), there is already an aliveMsg
+ // in-queue to be processed but blocked by the locks above. If we let
+ // that aliveMsg process, it'll cause us to re-join the cluster. This
+ // ensures that we don't.
+ if m.leave && a.Node == m.config.Name {
+ return
+ }
+
+ // Invoke the Alive delegate if any. This can be used to filter out
+ // alive messages based on custom logic. For example, using a cluster name.
+ // Using a merge delegate is not enough, as it is possible for passive
+ // cluster merging to still occur.
+ if m.config.Alive != nil {
+ node := &Node{
+ Name: a.Node,
+ Addr: a.Addr,
+ Port: a.Port,
+ Meta: a.Meta,
+ PMin: a.Vsn[0],
+ PMax: a.Vsn[1],
+ PCur: a.Vsn[2],
+ DMin: a.Vsn[3],
+ DMax: a.Vsn[4],
+ DCur: a.Vsn[5],
+ }
+ if err := m.config.Alive.NotifyAlive(node); err != nil {
+ m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s",
+ a.Node, err)
+ return
+ }
+ }
+
+ // Check if we've never seen this node before, and if not, then
+ // store this node in our node map.
+ if !ok {
+ state = &nodeState{
+ Node: Node{
+ Name: a.Node,
+ Addr: a.Addr,
+ Port: a.Port,
+ Meta: a.Meta,
+ },
+ State: stateDead,
+ }
+
+ // Add to map
+ m.nodeMap[a.Node] = state
+
+ // Get a random offset. This is important to ensure
+ // the failure detection bound is low on average. If all
+ // nodes did an append, failure detection bound would be
+ // very high.
+ n := len(m.nodes)
+ offset := randomOffset(n)
+
+ // Add at the end and swap with the node at the offset
+ m.nodes = append(m.nodes, state)
+ m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
+
+ // Update numNodes after we've added a new node
+ atomic.AddUint32(&m.numNodes, 1)
+ }
+
+ // Check if this address is different than the existing node
+ if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
+ m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d",
+ state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
+
+ // Inform the conflict delegate if provided
+ if m.config.Conflict != nil {
+ other := Node{
+ Name: a.Node,
+ Addr: a.Addr,
+ Port: a.Port,
+ Meta: a.Meta,
+ }
+ m.config.Conflict.NotifyConflict(&state.Node, &other)
+ }
+ return
+ }
+
+ // Bail if the incarnation number is older, and this is not about us
+ isLocalNode := state.Name == m.config.Name
+ if a.Incarnation <= state.Incarnation && !isLocalNode {
+ return
+ }
+
+ // Bail if strictly less and this is about us
+ if a.Incarnation < state.Incarnation && isLocalNode {
+ return
+ }
+
+ // Clear out any suspicion timer that may be in effect.
+ delete(m.nodeTimers, a.Node)
+
+ // Store the old state and meta data
+ oldState := state.State
+ oldMeta := state.Meta
+
+ // If this is us we need to refute, otherwise re-broadcast
+ if !bootstrap && isLocalNode {
+ // Compute the version vector
+ versions := []uint8{
+ state.PMin, state.PMax, state.PCur,
+ state.DMin, state.DMax, state.DCur,
+ }
+
+ // If the Incarnation is the same, we need special handling, since it
+ // possible for the following situation to happen:
+ // 1) Start with configuration C, join cluster
+ // 2) Hard fail / Kill / Shutdown
+ // 3) Restart with configuration C', join cluster
+ //
+ // In this case, other nodes and the local node see the same incarnation,
+ // but the values may not be the same. For this reason, we always
+ // need to do an equality check for this Incarnation. In most cases,
+ // we just ignore, but we may need to refute.
+ //
+ if a.Incarnation == state.Incarnation &&
+ bytes.Equal(a.Meta, state.Meta) &&
+ bytes.Equal(a.Vsn, versions) {
+ return
+ }
+
+ m.refute(state, a.Incarnation)
+ m.logger.Printf("[WARN] memberlist: Refuting an alive message")
+ } else {
+ m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
+
+ // Update protocol versions if it arrived
+ if len(a.Vsn) > 0 {
+ state.PMin = a.Vsn[0]
+ state.PMax = a.Vsn[1]
+ state.PCur = a.Vsn[2]
+ state.DMin = a.Vsn[3]
+ state.DMax = a.Vsn[4]
+ state.DCur = a.Vsn[5]
+ }
+
+ // Update the state and incarnation number
+ state.Incarnation = a.Incarnation
+ state.Meta = a.Meta
+ if state.State != stateAlive {
+ state.State = stateAlive
+ state.StateChange = time.Now()
+ }
+ }
+
+ // Update metrics
+ metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)
+
+ // Notify the delegate of any relevant updates
+ if m.config.Events != nil {
+ if oldState == stateDead {
+ // if Dead -> Alive, notify of join
+ m.config.Events.NotifyJoin(&state.Node)
+
+ } else if !bytes.Equal(oldMeta, state.Meta) {
+ // if Meta changed, trigger an update notification
+ m.config.Events.NotifyUpdate(&state.Node)
+ }
+ }
+}
+
+// suspectNode is invoked by the network layer when we get a message
+// about a suspect node
+func (m *Memberlist) suspectNode(s *suspect) {
+ m.nodeLock.Lock()
+ defer m.nodeLock.Unlock()
+ state, ok := m.nodeMap[s.Node]
+
+ // If we've never heard about this node before, ignore it
+ if !ok {
+ return
+ }
+
+ // Ignore old incarnation numbers
+ if s.Incarnation < state.Incarnation {
+ return
+ }
+
+ // See if there's a suspicion timer we can confirm. If the info is new
+ // to us we will go ahead and re-gossip it. This allows for multiple
+ // independent confirmations to flow even when a node probes a node
+ // that's already suspect.
+ if timer, ok := m.nodeTimers[s.Node]; ok {
+ if timer.Confirm(s.From) {
+ m.encodeAndBroadcast(s.Node, suspectMsg, s)
+ }
+ return
+ }
+
+ // Ignore non-alive nodes
+ if state.State != stateAlive {
+ return
+ }
+
+ // If this is us we need to refute, otherwise re-broadcast
+ if state.Name == m.config.Name {
+ m.refute(state, s.Incarnation)
+ m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
+ return // Do not mark ourself suspect
+ } else {
+ m.encodeAndBroadcast(s.Node, suspectMsg, s)
+ }
+
+ // Update metrics
+ metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)
+
+ // Update the state
+ state.Incarnation = s.Incarnation
+ state.State = stateSuspect
+ changeTime := time.Now()
+ state.StateChange = changeTime
+
+ // Setup a suspicion timer. Given that we don't have any known phase
+ // relationship with our peers, we set up k such that we hit the nominal
+ // timeout two probe intervals short of what we expect given the suspicion
+ // multiplier.
+ k := m.config.SuspicionMult - 2
+
+ // If there aren't enough nodes to give the expected confirmations, just
+ // set k to 0 to say that we don't expect any. Note we subtract 2 from n
+ // here to take out ourselves and the node being probed.
+ n := m.estNumNodes()
+ if n-2 < k {
+ k = 0
+ }
+
+ // Compute the timeouts based on the size of the cluster.
+ min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
+ max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
+ fn := func(numConfirmations int) {
+ m.nodeLock.Lock()
+ state, ok := m.nodeMap[s.Node]
+ timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
+ m.nodeLock.Unlock()
+
+ if timeout {
+ if k > 0 && numConfirmations < k {
+ metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
+ }
+
+ m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
+ state.Name, numConfirmations)
+ d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
+ m.deadNode(&d)
+ }
+ }
+ m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
+}
+
+// deadNode is invoked by the network layer when we get a message
+// about a dead node
+func (m *Memberlist) deadNode(d *dead) {
+ m.nodeLock.Lock()
+ defer m.nodeLock.Unlock()
+ state, ok := m.nodeMap[d.Node]
+
+ // If we've never heard about this node before, ignore it
+ if !ok {
+ return
+ }
+
+ // Ignore old incarnation numbers
+ if d.Incarnation < state.Incarnation {
+ return
+ }
+
+ // Clear out any suspicion timer that may be in effect.
+ delete(m.nodeTimers, d.Node)
+
+ // Ignore if node is already dead
+ if state.State == stateDead {
+ return
+ }
+
+ // Check if this is us
+ if state.Name == m.config.Name {
+ // If we are not leaving we need to refute
+ if !m.leave {
+ m.refute(state, d.Incarnation)
+ m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
+ return // Do not mark ourself dead
+ }
+
+ // If we are leaving, we broadcast and wait
+ m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast)
+ } else {
+ m.encodeAndBroadcast(d.Node, deadMsg, d)
+ }
+
+ // Update metrics
+ metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)
+
+ // Update the state
+ state.Incarnation = d.Incarnation
+ state.State = stateDead
+ state.StateChange = time.Now()
+
+ // Notify of death
+ if m.config.Events != nil {
+ m.config.Events.NotifyLeave(&state.Node)
+ }
+}
+
+// mergeState is invoked by the network layer when we get a Push/Pull
+// state transfer
+func (m *Memberlist) mergeState(remote []pushNodeState) {
+ for _, r := range remote {
+ switch r.State {
+ case stateAlive:
+ a := alive{
+ Incarnation: r.Incarnation,
+ Node: r.Name,
+ Addr: r.Addr,
+ Port: r.Port,
+ Meta: r.Meta,
+ Vsn: r.Vsn,
+ }
+ m.aliveNode(&a, nil, false)
+
+ case stateDead:
+ // If the remote node believes a node is dead, we prefer to
+ // suspect that node instead of declaring it dead instantly
+ fallthrough
+ case stateSuspect:
+ s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
+ m.suspectNode(&s)
+ }
+ }
+}