summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/hashicorp/memberlist/memberlist.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/memberlist/memberlist.go')
-rw-r--r--vendor/github.com/hashicorp/memberlist/memberlist.go625
1 files changed, 625 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go
new file mode 100644
index 000000000..e4b0d7347
--- /dev/null
+++ b/vendor/github.com/hashicorp/memberlist/memberlist.go
@@ -0,0 +1,625 @@
+/*
+memberlist is a library that manages cluster
+membership and member failure detection using a gossip based protocol.
+
+The use cases for such a library are far-reaching: all distributed systems
+require membership, and memberlist is a re-usable solution to managing
+cluster membership and node failure detection.
+
+memberlist is eventually consistent but converges quickly on average.
+The speed at which it converges can be heavily tuned via various knobs
+on the protocol. Node failures are detected and network partitions are partially
+tolerated by attempting to communicate to potentially dead nodes through
+multiple routes.
+*/
+package memberlist
+
+import (
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/hashicorp/go-multierror"
+ sockaddr "github.com/hashicorp/go-sockaddr"
+ "github.com/miekg/dns"
+)
+
+type Memberlist struct {
+ sequenceNum uint32 // Local sequence number
+ incarnation uint32 // Local incarnation number
+ numNodes uint32 // Number of known nodes (estimate)
+
+ config *Config
+ shutdown bool
+ shutdownCh chan struct{}
+ leave bool
+ leaveBroadcast chan struct{}
+
+ transport Transport
+ handoff chan msgHandoff
+
+ nodeLock sync.RWMutex
+ nodes []*nodeState // Known nodes
+ nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState
+ nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer
+ awareness *awareness
+
+ tickerLock sync.Mutex
+ tickers []*time.Ticker
+ stopTick chan struct{}
+ probeIndex int
+
+ ackLock sync.Mutex
+ ackHandlers map[uint32]*ackHandler
+
+ broadcasts *TransmitLimitedQueue
+
+ logger *log.Logger
+}
+
+// newMemberlist creates the network listeners.
+// Does not schedule execution of background maintenance.
+func newMemberlist(conf *Config) (*Memberlist, error) {
+ if conf.ProtocolVersion < ProtocolVersionMin {
+ return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
+ conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
+ } else if conf.ProtocolVersion > ProtocolVersionMax {
+ return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
+ conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
+ }
+
+ if len(conf.SecretKey) > 0 {
+ if conf.Keyring == nil {
+ keyring, err := NewKeyring(nil, conf.SecretKey)
+ if err != nil {
+ return nil, err
+ }
+ conf.Keyring = keyring
+ } else {
+ if err := conf.Keyring.AddKey(conf.SecretKey); err != nil {
+ return nil, err
+ }
+ if err := conf.Keyring.UseKey(conf.SecretKey); err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ if conf.LogOutput != nil && conf.Logger != nil {
+ return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
+ }
+
+ logDest := conf.LogOutput
+ if logDest == nil {
+ logDest = os.Stderr
+ }
+
+ logger := conf.Logger
+ if logger == nil {
+ logger = log.New(logDest, "", log.LstdFlags)
+ }
+
+ // Set up a network transport by default if a custom one wasn't given
+ // by the config.
+ transport := conf.Transport
+ if transport == nil {
+ nc := &NetTransportConfig{
+ BindAddrs: []string{conf.BindAddr},
+ BindPort: conf.BindPort,
+ Logger: logger,
+ }
+ nt, err := NewNetTransport(nc)
+ if err != nil {
+ return nil, fmt.Errorf("Could not set up network transport: %v", err)
+ }
+
+ if conf.BindPort == 0 {
+ port := nt.GetAutoBindPort()
+ conf.BindPort = port
+ logger.Printf("[DEBUG] Using dynamic bind port %d", port)
+ }
+ transport = nt
+ }
+
+ m := &Memberlist{
+ config: conf,
+ shutdownCh: make(chan struct{}),
+ leaveBroadcast: make(chan struct{}, 1),
+ transport: transport,
+ handoff: make(chan msgHandoff, conf.HandoffQueueDepth),
+ nodeMap: make(map[string]*nodeState),
+ nodeTimers: make(map[string]*suspicion),
+ awareness: newAwareness(conf.AwarenessMaxMultiplier),
+ ackHandlers: make(map[uint32]*ackHandler),
+ broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
+ logger: logger,
+ }
+ m.broadcasts.NumNodes = func() int {
+ return m.estNumNodes()
+ }
+ go m.streamListen()
+ go m.packetListen()
+ go m.packetHandler()
+ return m, nil
+}
+
+// Create will create a new Memberlist using the given configuration.
+// This will not connect to any other node (see Join) yet, but will start
+// all the listeners to allow other nodes to join this memberlist.
+// After creating a Memberlist, the configuration given should not be
+// modified by the user anymore.
+func Create(conf *Config) (*Memberlist, error) {
+ m, err := newMemberlist(conf)
+ if err != nil {
+ return nil, err
+ }
+ if err := m.setAlive(); err != nil {
+ m.Shutdown()
+ return nil, err
+ }
+ m.schedule()
+ return m, nil
+}
+
+// Join is used to take an existing Memberlist and attempt to join a cluster
+// by contacting all the given hosts and performing a state sync. Initially,
+// the Memberlist only contains our own state, so doing this will cause
+// remote nodes to become aware of the existence of this node, effectively
+// joining the cluster.
+//
+// This returns the number of hosts successfully contacted and an error if
+// none could be reached. If an error is returned, the node did not successfully
+// join the cluster.
+func (m *Memberlist) Join(existing []string) (int, error) {
+ numSuccess := 0
+ var errs error
+ for _, exist := range existing {
+ addrs, err := m.resolveAddr(exist)
+ if err != nil {
+ err = fmt.Errorf("Failed to resolve %s: %v", exist, err)
+ errs = multierror.Append(errs, err)
+ m.logger.Printf("[WARN] memberlist: %v", err)
+ continue
+ }
+
+ for _, addr := range addrs {
+ hp := joinHostPort(addr.ip.String(), addr.port)
+ if err := m.pushPullNode(hp, true); err != nil {
+ err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
+ errs = multierror.Append(errs, err)
+ m.logger.Printf("[DEBUG] memberlist: %v", err)
+ continue
+ }
+ numSuccess++
+ }
+
+ }
+ if numSuccess > 0 {
+ errs = nil
+ }
+ return numSuccess, errs
+}
+
+// ipPort holds information about a node we want to try to join.
+type ipPort struct {
+ ip net.IP
+ port uint16
+}
+
+// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host.
+// The built-in Go resolver will do a UDP lookup first, and will only use TCP if
+// the response has the truncate bit set, which isn't common on DNS servers like
+// Consul's. By doing the TCP lookup directly, we get the best chance for the
+// largest list of hosts to join. Since joins are relatively rare events, it's ok
+// to do this rather expensive operation.
+func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, error) {
+ // Don't attempt any TCP lookups against non-fully qualified domain
+ // names, since those will likely come from the resolv.conf file.
+ if !strings.Contains(host, ".") {
+ return nil, nil
+ }
+
+ // Make sure the domain name is terminated with a dot (we know there's
+ // at least one character at this point).
+ dn := host
+ if dn[len(dn)-1] != '.' {
+ dn = dn + "."
+ }
+
+ // See if we can find a server to try.
+ cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath)
+ if err != nil {
+ return nil, err
+ }
+ if len(cc.Servers) > 0 {
+ // We support host:port in the DNS config, but need to add the
+ // default port if one is not supplied.
+ server := cc.Servers[0]
+ if !hasPort(server) {
+ server = net.JoinHostPort(server, cc.Port)
+ }
+
+ // Do the lookup.
+ c := new(dns.Client)
+ c.Net = "tcp"
+ msg := new(dns.Msg)
+ msg.SetQuestion(dn, dns.TypeANY)
+ in, _, err := c.Exchange(msg, server)
+ if err != nil {
+ return nil, err
+ }
+
+ // Handle any IPs we get back that we can attempt to join.
+ var ips []ipPort
+ for _, r := range in.Answer {
+ switch rr := r.(type) {
+ case (*dns.A):
+ ips = append(ips, ipPort{rr.A, defaultPort})
+ case (*dns.AAAA):
+ ips = append(ips, ipPort{rr.AAAA, defaultPort})
+ case (*dns.CNAME):
+ m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host)
+ }
+ }
+ return ips, nil
+ }
+
+ return nil, nil
+}
+
+// resolveAddr is used to resolve the address into an address,
+// port, and error. If no port is given, use the default
+func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
+ // Normalize the incoming string to host:port so we can apply Go's
+ // parser to it.
+ port := uint16(0)
+ if !hasPort(hostStr) {
+ hostStr += ":" + strconv.Itoa(m.config.BindPort)
+ }
+ host, sport, err := net.SplitHostPort(hostStr)
+ if err != nil {
+ return nil, err
+ }
+
+ // This will capture the supplied port, or the default one added above.
+ lport, err := strconv.ParseUint(sport, 10, 16)
+ if err != nil {
+ return nil, err
+ }
+ port = uint16(lport)
+
+ // If it looks like an IP address we are done. The SplitHostPort() above
+ // will make sure the host part is in good shape for parsing, even for
+ // IPv6 addresses.
+ if ip := net.ParseIP(host); ip != nil {
+ return []ipPort{ipPort{ip, port}}, nil
+ }
+
+ // First try TCP so we have the best chance for the largest list of
+ // hosts to join. If this fails it's not fatal since this isn't a standard
+ // way to query DNS, and we have a fallback below.
+ ips, err := m.tcpLookupIP(host, port)
+ if err != nil {
+ m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err)
+ }
+ if len(ips) > 0 {
+ return ips, nil
+ }
+
+ // If TCP didn't yield anything then use the normal Go resolver which
+ // will try UDP, then might possibly try TCP again if the UDP response
+ // indicates it was truncated.
+ ans, err := net.LookupIP(host)
+ if err != nil {
+ return nil, err
+ }
+ ips = make([]ipPort, 0, len(ans))
+ for _, ip := range ans {
+ ips = append(ips, ipPort{ip, port})
+ }
+ return ips, nil
+}
+
+// setAlive is used to mark this node as being alive. This is the same
+// as if we received an alive notification our own network channel for
+// ourself.
+func (m *Memberlist) setAlive() error {
+ // Get the final advertise address from the transport, which may need
+ // to see which address we bound to.
+ addr, port, err := m.transport.FinalAdvertiseAddr(
+ m.config.AdvertiseAddr, m.config.AdvertisePort)
+ if err != nil {
+ return fmt.Errorf("Failed to get final advertise address: %v", err)
+ }
+
+ // Check if this is a public address without encryption
+ ipAddr, err := sockaddr.NewIPAddr(addr.String())
+ if err != nil {
+ return fmt.Errorf("Failed to parse interface addresses: %v", err)
+ }
+ ifAddrs := []sockaddr.IfAddr{
+ sockaddr.IfAddr{
+ SockAddr: ipAddr,
+ },
+ }
+ _, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs)
+ if len(publicIfs) > 0 && !m.config.EncryptionEnabled() {
+ m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
+ }
+
+ // Set any metadata from the delegate.
+ var meta []byte
+ if m.config.Delegate != nil {
+ meta = m.config.Delegate.NodeMeta(MetaMaxSize)
+ if len(meta) > MetaMaxSize {
+ panic("Node meta data provided is longer than the limit")
+ }
+ }
+
+ a := alive{
+ Incarnation: m.nextIncarnation(),
+ Node: m.config.Name,
+ Addr: addr,
+ Port: uint16(port),
+ Meta: meta,
+ Vsn: []uint8{
+ ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
+ m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
+ m.config.DelegateProtocolVersion,
+ },
+ }
+ m.aliveNode(&a, nil, true)
+ return nil
+}
+
+// LocalNode is used to return the local Node
+func (m *Memberlist) LocalNode() *Node {
+ m.nodeLock.RLock()
+ defer m.nodeLock.RUnlock()
+ state := m.nodeMap[m.config.Name]
+ return &state.Node
+}
+
+// UpdateNode is used to trigger re-advertising the local node. This is
+// primarily used with a Delegate to support dynamic updates to the local
+// meta data. This will block until the update message is successfully
+// broadcasted to a member of the cluster, if any exist or until a specified
+// timeout is reached.
+func (m *Memberlist) UpdateNode(timeout time.Duration) error {
+ // Get the node meta data
+ var meta []byte
+ if m.config.Delegate != nil {
+ meta = m.config.Delegate.NodeMeta(MetaMaxSize)
+ if len(meta) > MetaMaxSize {
+ panic("Node meta data provided is longer than the limit")
+ }
+ }
+
+ // Get the existing node
+ m.nodeLock.RLock()
+ state := m.nodeMap[m.config.Name]
+ m.nodeLock.RUnlock()
+
+ // Format a new alive message
+ a := alive{
+ Incarnation: m.nextIncarnation(),
+ Node: m.config.Name,
+ Addr: state.Addr,
+ Port: state.Port,
+ Meta: meta,
+ Vsn: []uint8{
+ ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
+ m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
+ m.config.DelegateProtocolVersion,
+ },
+ }
+ notifyCh := make(chan struct{})
+ m.aliveNode(&a, notifyCh, true)
+
+ // Wait for the broadcast or a timeout
+ if m.anyAlive() {
+ var timeoutCh <-chan time.Time
+ if timeout > 0 {
+ timeoutCh = time.After(timeout)
+ }
+ select {
+ case <-notifyCh:
+ case <-timeoutCh:
+ return fmt.Errorf("timeout waiting for update broadcast")
+ }
+ }
+ return nil
+}
+
+// SendTo is deprecated in favor of SendBestEffort, which requires a node to
+// target.
+func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
+ // Encode as a user message
+ buf := make([]byte, 1, len(msg)+1)
+ buf[0] = byte(userMsg)
+ buf = append(buf, msg...)
+
+ // Send the message
+ return m.rawSendMsgPacket(to.String(), nil, buf)
+}
+
+// SendToUDP is deprecated in favor of SendBestEffort.
+func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
+ return m.SendBestEffort(to, msg)
+}
+
+// SendToTCP is deprecated in favor of SendReliable.
+func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
+ return m.SendReliable(to, msg)
+}
+
+// SendBestEffort uses the unreliable packet-oriented interface of the transport
+// to target a user message at the given node (this does not use the gossip
+// mechanism). The maximum size of the message depends on the configured
+// UDPBufferSize for this memberlist instance.
+func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
+ // Encode as a user message
+ buf := make([]byte, 1, len(msg)+1)
+ buf[0] = byte(userMsg)
+ buf = append(buf, msg...)
+
+ // Send the message
+ return m.rawSendMsgPacket(to.Address(), to, buf)
+}
+
+// SendReliable uses the reliable stream-oriented interface of the transport to
+// target a user message at the given node (this does not use the gossip
+// mechanism). Delivery is guaranteed if no error is returned, and there is no
+// limit on the size of the message.
+func (m *Memberlist) SendReliable(to *Node, msg []byte) error {
+ return m.sendUserMsg(to.Address(), msg)
+}
+
+// Members returns a list of all known live nodes. The node structures
+// returned must not be modified. If you wish to modify a Node, make a
+// copy first.
+func (m *Memberlist) Members() []*Node {
+ m.nodeLock.RLock()
+ defer m.nodeLock.RUnlock()
+
+ nodes := make([]*Node, 0, len(m.nodes))
+ for _, n := range m.nodes {
+ if n.State != stateDead {
+ nodes = append(nodes, &n.Node)
+ }
+ }
+
+ return nodes
+}
+
+// NumMembers returns the number of alive nodes currently known. Between
+// the time of calling this and calling Members, the number of alive nodes
+// may have changed, so this shouldn't be used to determine how many
+// members will be returned by Members.
+func (m *Memberlist) NumMembers() (alive int) {
+ m.nodeLock.RLock()
+ defer m.nodeLock.RUnlock()
+
+ for _, n := range m.nodes {
+ if n.State != stateDead {
+ alive++
+ }
+ }
+
+ return
+}
+
+// Leave will broadcast a leave message but will not shutdown the background
+// listeners, meaning the node will continue participating in gossip and state
+// updates.
+//
+// This will block until the leave message is successfully broadcasted to
+// a member of the cluster, if any exist or until a specified timeout
+// is reached.
+//
+// This method is safe to call multiple times, but must not be called
+// after the cluster is already shut down.
+func (m *Memberlist) Leave(timeout time.Duration) error {
+ m.nodeLock.Lock()
+ // We can't defer m.nodeLock.Unlock() because m.deadNode will also try to
+ // acquire a lock so we need to Unlock before that.
+
+ if m.shutdown {
+ m.nodeLock.Unlock()
+ panic("leave after shutdown")
+ }
+
+ if !m.leave {
+ m.leave = true
+
+ state, ok := m.nodeMap[m.config.Name]
+ m.nodeLock.Unlock()
+ if !ok {
+ m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.")
+ return nil
+ }
+
+ d := dead{
+ Incarnation: state.Incarnation,
+ Node: state.Name,
+ }
+ m.deadNode(&d)
+
+ // Block until the broadcast goes out
+ if m.anyAlive() {
+ var timeoutCh <-chan time.Time
+ if timeout > 0 {
+ timeoutCh = time.After(timeout)
+ }
+ select {
+ case <-m.leaveBroadcast:
+ case <-timeoutCh:
+ return fmt.Errorf("timeout waiting for leave broadcast")
+ }
+ }
+ } else {
+ m.nodeLock.Unlock()
+ }
+
+ return nil
+}
+
+// Check for any other alive node.
+func (m *Memberlist) anyAlive() bool {
+ m.nodeLock.RLock()
+ defer m.nodeLock.RUnlock()
+ for _, n := range m.nodes {
+ if n.State != stateDead && n.Name != m.config.Name {
+ return true
+ }
+ }
+ return false
+}
+
+// GetHealthScore gives this instance's idea of how well it is meeting the soft
+// real-time requirements of the protocol. Lower numbers are better, and zero
+// means "totally healthy".
+func (m *Memberlist) GetHealthScore() int {
+ return m.awareness.GetHealthScore()
+}
+
+// ProtocolVersion returns the protocol version currently in use by
+// this memberlist.
+func (m *Memberlist) ProtocolVersion() uint8 {
+ // NOTE: This method exists so that in the future we can control
+ // any locking if necessary, if we change the protocol version at
+ // runtime, etc.
+ return m.config.ProtocolVersion
+}
+
+// Shutdown will stop any background maintanence of network activity
+// for this memberlist, causing it to appear "dead". A leave message
+// will not be broadcasted prior, so the cluster being left will have
+// to detect this node's shutdown using probing. If you wish to more
+// gracefully exit the cluster, call Leave prior to shutting down.
+//
+// This method is safe to call multiple times.
+func (m *Memberlist) Shutdown() error {
+ m.nodeLock.Lock()
+ defer m.nodeLock.Unlock()
+
+ if m.shutdown {
+ return nil
+ }
+
+ // Shut down the transport first, which should block until it's
+ // completely torn down. If we kill the memberlist-side handlers
+ // those I/O handlers might get stuck.
+ m.transport.Shutdown()
+
+ // Now tear down everything else.
+ m.shutdown = true
+ close(m.shutdownCh)
+ m.deschedule()
+ return nil
+}