From d103ed6ca97ca5a2669f6cf5fe4b3d2a9c945f26 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Wed, 17 May 2017 16:51:25 -0400 Subject: Upgrading server dependancies (#6431) --- .../github.com/hashicorp/memberlist/memberlist.go | 625 +++++++++++++++++++++ 1 file changed, 625 insertions(+) create mode 100644 vendor/github.com/hashicorp/memberlist/memberlist.go (limited to 'vendor/github.com/hashicorp/memberlist/memberlist.go') diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go new file mode 100644 index 000000000..e4b0d7347 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/memberlist.go @@ -0,0 +1,625 @@ +/* +memberlist is a library that manages cluster +membership and member failure detection using a gossip based protocol. + +The use cases for such a library are far-reaching: all distributed systems +require membership, and memberlist is a re-usable solution to managing +cluster membership and node failure detection. + +memberlist is eventually consistent but converges quickly on average. +The speed at which it converges can be heavily tuned via various knobs +on the protocol. Node failures are detected and network partitions are partially +tolerated by attempting to communicate to potentially dead nodes through +multiple routes. +*/ +package memberlist + +import ( + "fmt" + "log" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/hashicorp/go-multierror" + sockaddr "github.com/hashicorp/go-sockaddr" + "github.com/miekg/dns" +) + +type Memberlist struct { + sequenceNum uint32 // Local sequence number + incarnation uint32 // Local incarnation number + numNodes uint32 // Number of known nodes (estimate) + + config *Config + shutdown bool + shutdownCh chan struct{} + leave bool + leaveBroadcast chan struct{} + + transport Transport + handoff chan msgHandoff + + nodeLock sync.RWMutex + nodes []*nodeState // Known nodes + nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState + nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer + awareness *awareness + + tickerLock sync.Mutex + tickers []*time.Ticker + stopTick chan struct{} + probeIndex int + + ackLock sync.Mutex + ackHandlers map[uint32]*ackHandler + + broadcasts *TransmitLimitedQueue + + logger *log.Logger +} + +// newMemberlist creates the network listeners. +// Does not schedule execution of background maintenance. +func newMemberlist(conf *Config) (*Memberlist, error) { + if conf.ProtocolVersion < ProtocolVersionMin { + return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]", + conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) + } else if conf.ProtocolVersion > ProtocolVersionMax { + return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]", + conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) + } + + if len(conf.SecretKey) > 0 { + if conf.Keyring == nil { + keyring, err := NewKeyring(nil, conf.SecretKey) + if err != nil { + return nil, err + } + conf.Keyring = keyring + } else { + if err := conf.Keyring.AddKey(conf.SecretKey); err != nil { + return nil, err + } + if err := conf.Keyring.UseKey(conf.SecretKey); err != nil { + return nil, err + } + } + } + + if conf.LogOutput != nil && conf.Logger != nil { + return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.") + } + + logDest := conf.LogOutput + if logDest == nil { + logDest = os.Stderr + } + + logger := conf.Logger + if logger == nil { + logger = log.New(logDest, "", log.LstdFlags) + } + + // Set up a network transport by default if a custom one wasn't given + // by the config. + transport := conf.Transport + if transport == nil { + nc := &NetTransportConfig{ + BindAddrs: []string{conf.BindAddr}, + BindPort: conf.BindPort, + Logger: logger, + } + nt, err := NewNetTransport(nc) + if err != nil { + return nil, fmt.Errorf("Could not set up network transport: %v", err) + } + + if conf.BindPort == 0 { + port := nt.GetAutoBindPort() + conf.BindPort = port + logger.Printf("[DEBUG] Using dynamic bind port %d", port) + } + transport = nt + } + + m := &Memberlist{ + config: conf, + shutdownCh: make(chan struct{}), + leaveBroadcast: make(chan struct{}, 1), + transport: transport, + handoff: make(chan msgHandoff, conf.HandoffQueueDepth), + nodeMap: make(map[string]*nodeState), + nodeTimers: make(map[string]*suspicion), + awareness: newAwareness(conf.AwarenessMaxMultiplier), + ackHandlers: make(map[uint32]*ackHandler), + broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, + logger: logger, + } + m.broadcasts.NumNodes = func() int { + return m.estNumNodes() + } + go m.streamListen() + go m.packetListen() + go m.packetHandler() + return m, nil +} + +// Create will create a new Memberlist using the given configuration. +// This will not connect to any other node (see Join) yet, but will start +// all the listeners to allow other nodes to join this memberlist. +// After creating a Memberlist, the configuration given should not be +// modified by the user anymore. +func Create(conf *Config) (*Memberlist, error) { + m, err := newMemberlist(conf) + if err != nil { + return nil, err + } + if err := m.setAlive(); err != nil { + m.Shutdown() + return nil, err + } + m.schedule() + return m, nil +} + +// Join is used to take an existing Memberlist and attempt to join a cluster +// by contacting all the given hosts and performing a state sync. Initially, +// the Memberlist only contains our own state, so doing this will cause +// remote nodes to become aware of the existence of this node, effectively +// joining the cluster. +// +// This returns the number of hosts successfully contacted and an error if +// none could be reached. If an error is returned, the node did not successfully +// join the cluster. +func (m *Memberlist) Join(existing []string) (int, error) { + numSuccess := 0 + var errs error + for _, exist := range existing { + addrs, err := m.resolveAddr(exist) + if err != nil { + err = fmt.Errorf("Failed to resolve %s: %v", exist, err) + errs = multierror.Append(errs, err) + m.logger.Printf("[WARN] memberlist: %v", err) + continue + } + + for _, addr := range addrs { + hp := joinHostPort(addr.ip.String(), addr.port) + if err := m.pushPullNode(hp, true); err != nil { + err = fmt.Errorf("Failed to join %s: %v", addr.ip, err) + errs = multierror.Append(errs, err) + m.logger.Printf("[DEBUG] memberlist: %v", err) + continue + } + numSuccess++ + } + + } + if numSuccess > 0 { + errs = nil + } + return numSuccess, errs +} + +// ipPort holds information about a node we want to try to join. +type ipPort struct { + ip net.IP + port uint16 +} + +// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host. +// The built-in Go resolver will do a UDP lookup first, and will only use TCP if +// the response has the truncate bit set, which isn't common on DNS servers like +// Consul's. By doing the TCP lookup directly, we get the best chance for the +// largest list of hosts to join. Since joins are relatively rare events, it's ok +// to do this rather expensive operation. +func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, error) { + // Don't attempt any TCP lookups against non-fully qualified domain + // names, since those will likely come from the resolv.conf file. + if !strings.Contains(host, ".") { + return nil, nil + } + + // Make sure the domain name is terminated with a dot (we know there's + // at least one character at this point). + dn := host + if dn[len(dn)-1] != '.' { + dn = dn + "." + } + + // See if we can find a server to try. + cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath) + if err != nil { + return nil, err + } + if len(cc.Servers) > 0 { + // We support host:port in the DNS config, but need to add the + // default port if one is not supplied. + server := cc.Servers[0] + if !hasPort(server) { + server = net.JoinHostPort(server, cc.Port) + } + + // Do the lookup. + c := new(dns.Client) + c.Net = "tcp" + msg := new(dns.Msg) + msg.SetQuestion(dn, dns.TypeANY) + in, _, err := c.Exchange(msg, server) + if err != nil { + return nil, err + } + + // Handle any IPs we get back that we can attempt to join. + var ips []ipPort + for _, r := range in.Answer { + switch rr := r.(type) { + case (*dns.A): + ips = append(ips, ipPort{rr.A, defaultPort}) + case (*dns.AAAA): + ips = append(ips, ipPort{rr.AAAA, defaultPort}) + case (*dns.CNAME): + m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host) + } + } + return ips, nil + } + + return nil, nil +} + +// resolveAddr is used to resolve the address into an address, +// port, and error. If no port is given, use the default +func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { + // Normalize the incoming string to host:port so we can apply Go's + // parser to it. + port := uint16(0) + if !hasPort(hostStr) { + hostStr += ":" + strconv.Itoa(m.config.BindPort) + } + host, sport, err := net.SplitHostPort(hostStr) + if err != nil { + return nil, err + } + + // This will capture the supplied port, or the default one added above. + lport, err := strconv.ParseUint(sport, 10, 16) + if err != nil { + return nil, err + } + port = uint16(lport) + + // If it looks like an IP address we are done. The SplitHostPort() above + // will make sure the host part is in good shape for parsing, even for + // IPv6 addresses. + if ip := net.ParseIP(host); ip != nil { + return []ipPort{ipPort{ip, port}}, nil + } + + // First try TCP so we have the best chance for the largest list of + // hosts to join. If this fails it's not fatal since this isn't a standard + // way to query DNS, and we have a fallback below. + ips, err := m.tcpLookupIP(host, port) + if err != nil { + m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err) + } + if len(ips) > 0 { + return ips, nil + } + + // If TCP didn't yield anything then use the normal Go resolver which + // will try UDP, then might possibly try TCP again if the UDP response + // indicates it was truncated. + ans, err := net.LookupIP(host) + if err != nil { + return nil, err + } + ips = make([]ipPort, 0, len(ans)) + for _, ip := range ans { + ips = append(ips, ipPort{ip, port}) + } + return ips, nil +} + +// setAlive is used to mark this node as being alive. This is the same +// as if we received an alive notification our own network channel for +// ourself. +func (m *Memberlist) setAlive() error { + // Get the final advertise address from the transport, which may need + // to see which address we bound to. + addr, port, err := m.transport.FinalAdvertiseAddr( + m.config.AdvertiseAddr, m.config.AdvertisePort) + if err != nil { + return fmt.Errorf("Failed to get final advertise address: %v", err) + } + + // Check if this is a public address without encryption + ipAddr, err := sockaddr.NewIPAddr(addr.String()) + if err != nil { + return fmt.Errorf("Failed to parse interface addresses: %v", err) + } + ifAddrs := []sockaddr.IfAddr{ + sockaddr.IfAddr{ + SockAddr: ipAddr, + }, + } + _, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs) + if len(publicIfs) > 0 && !m.config.EncryptionEnabled() { + m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!") + } + + // Set any metadata from the delegate. + var meta []byte + if m.config.Delegate != nil { + meta = m.config.Delegate.NodeMeta(MetaMaxSize) + if len(meta) > MetaMaxSize { + panic("Node meta data provided is longer than the limit") + } + } + + a := alive{ + Incarnation: m.nextIncarnation(), + Node: m.config.Name, + Addr: addr, + Port: uint16(port), + Meta: meta, + Vsn: []uint8{ + ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, + m.config.DelegateProtocolMin, m.config.DelegateProtocolMax, + m.config.DelegateProtocolVersion, + }, + } + m.aliveNode(&a, nil, true) + return nil +} + +// LocalNode is used to return the local Node +func (m *Memberlist) LocalNode() *Node { + m.nodeLock.RLock() + defer m.nodeLock.RUnlock() + state := m.nodeMap[m.config.Name] + return &state.Node +} + +// UpdateNode is used to trigger re-advertising the local node. This is +// primarily used with a Delegate to support dynamic updates to the local +// meta data. This will block until the update message is successfully +// broadcasted to a member of the cluster, if any exist or until a specified +// timeout is reached. +func (m *Memberlist) UpdateNode(timeout time.Duration) error { + // Get the node meta data + var meta []byte + if m.config.Delegate != nil { + meta = m.config.Delegate.NodeMeta(MetaMaxSize) + if len(meta) > MetaMaxSize { + panic("Node meta data provided is longer than the limit") + } + } + + // Get the existing node + m.nodeLock.RLock() + state := m.nodeMap[m.config.Name] + m.nodeLock.RUnlock() + + // Format a new alive message + a := alive{ + Incarnation: m.nextIncarnation(), + Node: m.config.Name, + Addr: state.Addr, + Port: state.Port, + Meta: meta, + Vsn: []uint8{ + ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, + m.config.DelegateProtocolMin, m.config.DelegateProtocolMax, + m.config.DelegateProtocolVersion, + }, + } + notifyCh := make(chan struct{}) + m.aliveNode(&a, notifyCh, true) + + // Wait for the broadcast or a timeout + if m.anyAlive() { + var timeoutCh <-chan time.Time + if timeout > 0 { + timeoutCh = time.After(timeout) + } + select { + case <-notifyCh: + case <-timeoutCh: + return fmt.Errorf("timeout waiting for update broadcast") + } + } + return nil +} + +// SendTo is deprecated in favor of SendBestEffort, which requires a node to +// target. +func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { + // Encode as a user message + buf := make([]byte, 1, len(msg)+1) + buf[0] = byte(userMsg) + buf = append(buf, msg...) + + // Send the message + return m.rawSendMsgPacket(to.String(), nil, buf) +} + +// SendToUDP is deprecated in favor of SendBestEffort. +func (m *Memberlist) SendToUDP(to *Node, msg []byte) error { + return m.SendBestEffort(to, msg) +} + +// SendToTCP is deprecated in favor of SendReliable. +func (m *Memberlist) SendToTCP(to *Node, msg []byte) error { + return m.SendReliable(to, msg) +} + +// SendBestEffort uses the unreliable packet-oriented interface of the transport +// to target a user message at the given node (this does not use the gossip +// mechanism). The maximum size of the message depends on the configured +// UDPBufferSize for this memberlist instance. +func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error { + // Encode as a user message + buf := make([]byte, 1, len(msg)+1) + buf[0] = byte(userMsg) + buf = append(buf, msg...) + + // Send the message + return m.rawSendMsgPacket(to.Address(), to, buf) +} + +// SendReliable uses the reliable stream-oriented interface of the transport to +// target a user message at the given node (this does not use the gossip +// mechanism). Delivery is guaranteed if no error is returned, and there is no +// limit on the size of the message. +func (m *Memberlist) SendReliable(to *Node, msg []byte) error { + return m.sendUserMsg(to.Address(), msg) +} + +// Members returns a list of all known live nodes. The node structures +// returned must not be modified. If you wish to modify a Node, make a +// copy first. +func (m *Memberlist) Members() []*Node { + m.nodeLock.RLock() + defer m.nodeLock.RUnlock() + + nodes := make([]*Node, 0, len(m.nodes)) + for _, n := range m.nodes { + if n.State != stateDead { + nodes = append(nodes, &n.Node) + } + } + + return nodes +} + +// NumMembers returns the number of alive nodes currently known. Between +// the time of calling this and calling Members, the number of alive nodes +// may have changed, so this shouldn't be used to determine how many +// members will be returned by Members. +func (m *Memberlist) NumMembers() (alive int) { + m.nodeLock.RLock() + defer m.nodeLock.RUnlock() + + for _, n := range m.nodes { + if n.State != stateDead { + alive++ + } + } + + return +} + +// Leave will broadcast a leave message but will not shutdown the background +// listeners, meaning the node will continue participating in gossip and state +// updates. +// +// This will block until the leave message is successfully broadcasted to +// a member of the cluster, if any exist or until a specified timeout +// is reached. +// +// This method is safe to call multiple times, but must not be called +// after the cluster is already shut down. +func (m *Memberlist) Leave(timeout time.Duration) error { + m.nodeLock.Lock() + // We can't defer m.nodeLock.Unlock() because m.deadNode will also try to + // acquire a lock so we need to Unlock before that. + + if m.shutdown { + m.nodeLock.Unlock() + panic("leave after shutdown") + } + + if !m.leave { + m.leave = true + + state, ok := m.nodeMap[m.config.Name] + m.nodeLock.Unlock() + if !ok { + m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.") + return nil + } + + d := dead{ + Incarnation: state.Incarnation, + Node: state.Name, + } + m.deadNode(&d) + + // Block until the broadcast goes out + if m.anyAlive() { + var timeoutCh <-chan time.Time + if timeout > 0 { + timeoutCh = time.After(timeout) + } + select { + case <-m.leaveBroadcast: + case <-timeoutCh: + return fmt.Errorf("timeout waiting for leave broadcast") + } + } + } else { + m.nodeLock.Unlock() + } + + return nil +} + +// Check for any other alive node. +func (m *Memberlist) anyAlive() bool { + m.nodeLock.RLock() + defer m.nodeLock.RUnlock() + for _, n := range m.nodes { + if n.State != stateDead && n.Name != m.config.Name { + return true + } + } + return false +} + +// GetHealthScore gives this instance's idea of how well it is meeting the soft +// real-time requirements of the protocol. Lower numbers are better, and zero +// means "totally healthy". +func (m *Memberlist) GetHealthScore() int { + return m.awareness.GetHealthScore() +} + +// ProtocolVersion returns the protocol version currently in use by +// this memberlist. +func (m *Memberlist) ProtocolVersion() uint8 { + // NOTE: This method exists so that in the future we can control + // any locking if necessary, if we change the protocol version at + // runtime, etc. + return m.config.ProtocolVersion +} + +// Shutdown will stop any background maintanence of network activity +// for this memberlist, causing it to appear "dead". A leave message +// will not be broadcasted prior, so the cluster being left will have +// to detect this node's shutdown using probing. If you wish to more +// gracefully exit the cluster, call Leave prior to shutting down. +// +// This method is safe to call multiple times. +func (m *Memberlist) Shutdown() error { + m.nodeLock.Lock() + defer m.nodeLock.Unlock() + + if m.shutdown { + return nil + } + + // Shut down the transport first, which should block until it's + // completely torn down. If we kill the memberlist-side handlers + // those I/O handlers might get stuck. + m.transport.Shutdown() + + // Now tear down everything else. + m.shutdown = true + close(m.shutdownCh) + m.deschedule() + return nil +} -- cgit v1.2.3-1-g7c22