summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/hashicorp/memberlist/memberlist_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/memberlist/memberlist_test.go')
-rw-r--r--vendor/github.com/hashicorp/memberlist/memberlist_test.go1416
1 files changed, 1416 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/memberlist/memberlist_test.go b/vendor/github.com/hashicorp/memberlist/memberlist_test.go
new file mode 100644
index 000000000..ff03ab3e4
--- /dev/null
+++ b/vendor/github.com/hashicorp/memberlist/memberlist_test.go
@@ -0,0 +1,1416 @@
+package memberlist
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net"
+ "os"
+ "reflect"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/miekg/dns"
+)
+
+var bindLock sync.Mutex
+var bindNum byte = 10
+
+func getBindAddr() net.IP {
+ bindLock.Lock()
+ defer bindLock.Unlock()
+
+ result := net.IPv4(127, 0, 0, bindNum)
+ bindNum++
+ if bindNum > 255 {
+ bindNum = 10
+ }
+
+ return result
+}
+
+func testConfig() *Config {
+ config := DefaultLANConfig()
+ config.BindAddr = getBindAddr().String()
+ config.Name = config.BindAddr
+ return config
+}
+
+func yield() {
+ time.Sleep(5 * time.Millisecond)
+}
+
+type MockDelegate struct {
+ meta []byte
+ msgs [][]byte
+ broadcasts [][]byte
+ state []byte
+ remoteState []byte
+}
+
+func (m *MockDelegate) NodeMeta(limit int) []byte {
+ return m.meta
+}
+
+func (m *MockDelegate) NotifyMsg(msg []byte) {
+ cp := make([]byte, len(msg))
+ copy(cp, msg)
+ m.msgs = append(m.msgs, cp)
+}
+
+func (m *MockDelegate) GetBroadcasts(overhead, limit int) [][]byte {
+ b := m.broadcasts
+ m.broadcasts = nil
+ return b
+}
+
+func (m *MockDelegate) LocalState(join bool) []byte {
+ return m.state
+}
+
+func (m *MockDelegate) MergeRemoteState(s []byte, join bool) {
+ m.remoteState = s
+}
+
+// Returns a new Memberlist on an open port by trying a range of port numbers
+// until something sticks.
+func NewMemberlistOnOpenPort(c *Config) (*Memberlist, error) {
+ c.BindPort = 0
+ return newMemberlist(c)
+}
+
+func GetMemberlistDelegate(t *testing.T) (*Memberlist, *MockDelegate) {
+ d := &MockDelegate{}
+
+ c := testConfig()
+ c.Delegate = d
+
+ m, err := NewMemberlistOnOpenPort(c)
+ if err != nil {
+ t.Fatalf("failed to start: %v", err)
+ return nil, nil
+ }
+
+ return m, d
+}
+
+func GetMemberlist(t *testing.T) *Memberlist {
+ c := testConfig()
+
+ m, err := NewMemberlistOnOpenPort(c)
+ if err != nil {
+ t.Fatalf("failed to start: %v", err)
+ return nil
+ }
+
+ return m
+}
+
+func TestDefaultLANConfig_protocolVersion(t *testing.T) {
+ c := DefaultLANConfig()
+ if c.ProtocolVersion != ProtocolVersion2Compatible {
+ t.Fatalf("should be max: %d", c.ProtocolVersion)
+ }
+}
+
+func TestCreate_protocolVersion(t *testing.T) {
+ cases := []struct {
+ version uint8
+ err bool
+ }{
+ {ProtocolVersionMin, false},
+ {ProtocolVersionMax, false},
+ // TODO(mitchellh): uncommon when we're over 0
+ //{ProtocolVersionMin - 1, true},
+ {ProtocolVersionMax + 1, true},
+ {ProtocolVersionMax - 1, false},
+ }
+
+ for _, tc := range cases {
+ c := DefaultLANConfig()
+ c.BindAddr = getBindAddr().String()
+ c.ProtocolVersion = tc.version
+ m, err := Create(c)
+ if tc.err && err == nil {
+ t.Errorf("Should've failed with version: %d", tc.version)
+ } else if !tc.err && err != nil {
+ t.Errorf("Version '%d' error: %s", tc.version, err)
+ }
+
+ if err == nil {
+ m.Shutdown()
+ }
+ }
+}
+
+func TestCreate_secretKey(t *testing.T) {
+ cases := []struct {
+ key []byte
+ err bool
+ }{
+ {make([]byte, 0), false},
+ {[]byte("abc"), true},
+ {make([]byte, 16), false},
+ {make([]byte, 38), true},
+ }
+
+ for _, tc := range cases {
+ c := DefaultLANConfig()
+ c.BindAddr = getBindAddr().String()
+ c.SecretKey = tc.key
+ m, err := Create(c)
+ if tc.err && err == nil {
+ t.Errorf("Should've failed with key: %#v", tc.key)
+ } else if !tc.err && err != nil {
+ t.Errorf("Key '%#v' error: %s", tc.key, err)
+ }
+
+ if err == nil {
+ m.Shutdown()
+ }
+ }
+}
+
+func TestCreate_secretKeyEmpty(t *testing.T) {
+ c := DefaultLANConfig()
+ c.BindAddr = getBindAddr().String()
+ c.SecretKey = make([]byte, 0)
+ m, err := Create(c)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m.Shutdown()
+
+ if m.config.EncryptionEnabled() {
+ t.Fatalf("Expected encryption to be disabled")
+ }
+}
+
+func TestCreate_keyringOnly(t *testing.T) {
+ c := DefaultLANConfig()
+ c.BindAddr = getBindAddr().String()
+ keyring, err := NewKeyring(nil, make([]byte, 16))
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ c.Keyring = keyring
+
+ m, err := Create(c)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m.Shutdown()
+
+ if !m.config.EncryptionEnabled() {
+ t.Fatalf("Expected encryption to be enabled")
+ }
+}
+
+func TestCreate_keyringAndSecretKey(t *testing.T) {
+ c := DefaultLANConfig()
+ c.BindAddr = getBindAddr().String()
+ keyring, err := NewKeyring(nil, make([]byte, 16))
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ c.Keyring = keyring
+ c.SecretKey = []byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
+
+ m, err := Create(c)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m.Shutdown()
+
+ if !m.config.EncryptionEnabled() {
+ t.Fatalf("Expected encryption to be enabled")
+ }
+
+ ringKeys := c.Keyring.GetKeys()
+ if !bytes.Equal(c.SecretKey, ringKeys[0]) {
+ t.Fatalf("Unexpected primary key %v", ringKeys[0])
+ }
+}
+
+func TestCreate_invalidLoggerSettings(t *testing.T) {
+ c := DefaultLANConfig()
+ c.BindAddr = getBindAddr().String()
+ c.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
+ c.LogOutput = ioutil.Discard
+
+ _, err := Create(c)
+ if err == nil {
+ t.Fatal("Memberlist should not allow both LogOutput and Logger to be set, but it did not raise an error")
+ }
+}
+
+func TestCreate(t *testing.T) {
+ c := testConfig()
+ c.ProtocolVersion = ProtocolVersionMin
+ c.DelegateProtocolVersion = 13
+ c.DelegateProtocolMin = 12
+ c.DelegateProtocolMax = 24
+
+ m, err := Create(c)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m.Shutdown()
+
+ yield()
+
+ members := m.Members()
+ if len(members) != 1 {
+ t.Fatalf("bad number of members")
+ }
+
+ if members[0].PMin != ProtocolVersionMin {
+ t.Fatalf("bad: %#v", members[0])
+ }
+
+ if members[0].PMax != ProtocolVersionMax {
+ t.Fatalf("bad: %#v", members[0])
+ }
+
+ if members[0].PCur != c.ProtocolVersion {
+ t.Fatalf("bad: %#v", members[0])
+ }
+
+ if members[0].DMin != c.DelegateProtocolMin {
+ t.Fatalf("bad: %#v", members[0])
+ }
+
+ if members[0].DMax != c.DelegateProtocolMax {
+ t.Fatalf("bad: %#v", members[0])
+ }
+
+ if members[0].DCur != c.DelegateProtocolVersion {
+ t.Fatalf("bad: %#v", members[0])
+ }
+}
+
+func TestMemberList_CreateShutdown(t *testing.T) {
+ m := GetMemberlist(t)
+ m.schedule()
+ if err := m.Shutdown(); err != nil {
+ t.Fatalf("failed to shutdown %v", err)
+ }
+}
+
+func TestMemberList_ResolveAddr(t *testing.T) {
+ m := GetMemberlist(t)
+ if _, err := m.resolveAddr("localhost"); err != nil {
+ t.Fatalf("Could not resolve localhost: %s", err)
+ }
+ if _, err := m.resolveAddr("[::1]:80"); err != nil {
+ t.Fatalf("Could not understand ipv6 pair: %s", err)
+ }
+ if _, err := m.resolveAddr("[::1]"); err != nil {
+ t.Fatalf("Could not understand ipv6 non-pair")
+ }
+ if _, err := m.resolveAddr(":80"); err == nil {
+ t.Fatalf("Understood hostless port")
+ }
+ if _, err := m.resolveAddr("localhost:80"); err != nil {
+ t.Fatalf("Could not understand hostname port combo: %s", err)
+ }
+ if _, err := m.resolveAddr("localhost:80000"); err == nil {
+ t.Fatalf("Understood too high port")
+ }
+ if _, err := m.resolveAddr("127.0.0.1:80"); err != nil {
+ t.Fatalf("Could not understand hostname port combo: %s", err)
+ }
+ if _, err := m.resolveAddr("[2001:db8:a0b:12f0::1]:80"); err != nil {
+ t.Fatalf("Could not understand hostname port combo: %s", err)
+ }
+}
+
+type dnsHandler struct {
+ t *testing.T
+}
+
+func (h dnsHandler) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
+ if len(r.Question) != 1 {
+ h.t.Fatalf("bad: %#v", r.Question)
+ }
+
+ name := "join.service.consul."
+ question := r.Question[0]
+ if question.Name != name || question.Qtype != dns.TypeANY {
+ h.t.Fatalf("bad: %#v", question)
+ }
+
+ m := new(dns.Msg)
+ m.SetReply(r)
+ m.Authoritative = true
+ m.RecursionAvailable = false
+ m.Answer = append(m.Answer, &dns.A{
+ Hdr: dns.RR_Header{
+ Name: name,
+ Rrtype: dns.TypeA,
+ Class: dns.ClassINET},
+ A: net.ParseIP("127.0.0.1"),
+ })
+ m.Answer = append(m.Answer, &dns.AAAA{
+ Hdr: dns.RR_Header{
+ Name: name,
+ Rrtype: dns.TypeAAAA,
+ Class: dns.ClassINET},
+ AAAA: net.ParseIP("2001:db8:a0b:12f0::1"),
+ })
+ if err := w.WriteMsg(m); err != nil {
+ h.t.Fatalf("err: %v", err)
+ }
+}
+
+func TestMemberList_ResolveAddr_TCP_First(t *testing.T) {
+ bind := "127.0.0.1:8600"
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ server := &dns.Server{
+ Addr: bind,
+ Handler: dnsHandler{t},
+ Net: "tcp",
+ NotifyStartedFunc: wg.Done,
+ }
+ defer server.Shutdown()
+
+ go func() {
+ if err := server.ListenAndServe(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
+ t.Fatalf("err: %v", err)
+ }
+ }()
+ wg.Wait()
+
+ tmpFile, err := ioutil.TempFile("", "")
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ defer os.Remove(tmpFile.Name())
+
+ content := []byte(fmt.Sprintf("nameserver %s", bind))
+ if _, err := tmpFile.Write(content); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if err := tmpFile.Close(); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ m := GetMemberlist(t)
+ m.config.DNSConfigPath = tmpFile.Name()
+ m.setAlive()
+ m.schedule()
+ defer m.Shutdown()
+
+ // Try with and without the trailing dot.
+ hosts := []string{
+ "join.service.consul.",
+ "join.service.consul",
+ }
+ for _, host := range hosts {
+ ips, err := m.resolveAddr(host)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ port := uint16(m.config.BindPort)
+ expected := []ipPort{
+ ipPort{net.ParseIP("127.0.0.1"), port},
+ ipPort{net.ParseIP("2001:db8:a0b:12f0::1"), port},
+ }
+ if !reflect.DeepEqual(ips, expected) {
+ t.Fatalf("bad: %#v", ips)
+ }
+ }
+}
+
+func TestMemberList_Members(t *testing.T) {
+ n1 := &Node{Name: "test"}
+ n2 := &Node{Name: "test2"}
+ n3 := &Node{Name: "test3"}
+
+ m := &Memberlist{}
+ nodes := []*nodeState{
+ &nodeState{Node: *n1, State: stateAlive},
+ &nodeState{Node: *n2, State: stateDead},
+ &nodeState{Node: *n3, State: stateSuspect},
+ }
+ m.nodes = nodes
+
+ members := m.Members()
+ if !reflect.DeepEqual(members, []*Node{n1, n3}) {
+ t.Fatalf("bad members")
+ }
+}
+
+func TestMemberlist_Join(t *testing.T) {
+ m1 := GetMemberlist(t)
+ m1.setAlive()
+ m1.schedule()
+ defer m1.Shutdown()
+
+ // Create a second node
+ c := DefaultLANConfig()
+ addr1 := getBindAddr()
+ c.Name = addr1.String()
+ c.BindAddr = addr1.String()
+ c.BindPort = m1.config.BindPort
+
+ m2, err := Create(c)
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ num, err := m2.Join([]string{m1.config.BindAddr})
+ if num != 1 {
+ t.Fatalf("unexpected 1: %d", num)
+ }
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+
+ // Check the hosts
+ if len(m2.Members()) != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+ if m2.estNumNodes() != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+}
+
+type CustomMergeDelegate struct {
+ invoked bool
+}
+
+func (c *CustomMergeDelegate) NotifyMerge(nodes []*Node) error {
+ log.Printf("Cancel merge")
+ c.invoked = true
+ return fmt.Errorf("Custom merge canceled")
+}
+
+func TestMemberlist_Join_Cancel(t *testing.T) {
+ m1 := GetMemberlist(t)
+ merge1 := &CustomMergeDelegate{}
+ m1.config.Merge = merge1
+ m1.setAlive()
+ m1.schedule()
+ defer m1.Shutdown()
+
+ // Create a second node
+ c := DefaultLANConfig()
+ addr1 := getBindAddr()
+ c.Name = addr1.String()
+ c.BindAddr = addr1.String()
+ c.BindPort = m1.config.BindPort
+
+ m2, err := Create(c)
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ merge2 := &CustomMergeDelegate{}
+ m2.config.Merge = merge2
+ defer m2.Shutdown()
+
+ num, err := m2.Join([]string{m1.config.BindAddr})
+ if num != 0 {
+ t.Fatalf("unexpected 0: %d", num)
+ }
+ if !strings.Contains(err.Error(), "Custom merge canceled") {
+ t.Fatalf("unexpected err: %s", err)
+ }
+
+ // Check the hosts
+ if len(m2.Members()) != 1 {
+ t.Fatalf("should have 1 nodes! %v", m2.Members())
+ }
+ if len(m1.Members()) != 1 {
+ t.Fatalf("should have 1 nodes! %v", m1.Members())
+ }
+
+ // Check delegate invocation
+ if !merge1.invoked {
+ t.Fatalf("should invoke delegate")
+ }
+ if !merge2.invoked {
+ t.Fatalf("should invoke delegate")
+ }
+}
+
+type CustomAliveDelegate struct {
+ Ignore string
+ count int
+}
+
+func (c *CustomAliveDelegate) NotifyAlive(peer *Node) error {
+ c.count++
+ if peer.Name == c.Ignore {
+ return nil
+ }
+ log.Printf("Cancel alive")
+ return fmt.Errorf("Custom alive canceled")
+}
+
+func TestMemberlist_Join_Cancel_Passive(t *testing.T) {
+ m1 := GetMemberlist(t)
+ alive1 := &CustomAliveDelegate{
+ Ignore: m1.config.Name,
+ }
+ m1.config.Alive = alive1
+ m1.setAlive()
+ m1.schedule()
+ defer m1.Shutdown()
+
+ // Create a second node
+ c := DefaultLANConfig()
+ addr1 := getBindAddr()
+ c.Name = addr1.String()
+ c.BindAddr = addr1.String()
+ c.BindPort = m1.config.BindPort
+
+ m2, err := Create(c)
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ alive2 := &CustomAliveDelegate{
+ Ignore: c.Name,
+ }
+ m2.config.Alive = alive2
+ defer m2.Shutdown()
+
+ num, err := m2.Join([]string{m1.config.BindAddr})
+ if num != 1 {
+ t.Fatalf("unexpected 1: %d", num)
+ }
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+
+ // Check the hosts
+ if len(m2.Members()) != 1 {
+ t.Fatalf("should have 1 nodes! %v", m2.Members())
+ }
+ if len(m1.Members()) != 1 {
+ t.Fatalf("should have 1 nodes! %v", m1.Members())
+ }
+
+ // Check delegate invocation
+ if alive1.count == 0 {
+ t.Fatalf("should invoke delegate: %d", alive1.count)
+ }
+ if alive2.count == 0 {
+ t.Fatalf("should invoke delegate: %d", alive2.count)
+ }
+}
+
+func TestMemberlist_Join_protocolVersions(t *testing.T) {
+ c1 := testConfig()
+ c2 := testConfig()
+ c3 := testConfig()
+ c3.ProtocolVersion = ProtocolVersionMax
+
+ m1, err := Create(c1)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m1.Shutdown()
+
+ m2, err := Create(c2)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ m3, err := Create(c3)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m3.Shutdown()
+
+ _, err = m1.Join([]string{c2.BindAddr})
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+
+ yield()
+
+ _, err = m1.Join([]string{c3.BindAddr})
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+}
+
+func TestMemberlist_Leave(t *testing.T) {
+ m1 := GetMemberlist(t)
+ m1.setAlive()
+ m1.schedule()
+ defer m1.Shutdown()
+
+ // Create a second node
+ c := DefaultLANConfig()
+ addr1 := getBindAddr()
+ c.Name = addr1.String()
+ c.BindAddr = addr1.String()
+ c.BindPort = m1.config.BindPort
+ c.GossipInterval = time.Millisecond
+
+ m2, err := Create(c)
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ num, err := m2.Join([]string{m1.config.BindAddr})
+ if num != 1 {
+ t.Fatalf("unexpected 1: %d", num)
+ }
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+
+ // Check the hosts
+ if len(m2.Members()) != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+ if len(m1.Members()) != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+
+ // Leave
+ m1.Leave(time.Second)
+
+ // Wait for leave
+ time.Sleep(10 * time.Millisecond)
+
+ // m1 should think dead
+ if len(m1.Members()) != 1 {
+ t.Fatalf("should have 1 node")
+ }
+
+ if len(m2.Members()) != 1 {
+ t.Fatalf("should have 1 node")
+ }
+}
+
+func TestMemberlist_JoinShutdown(t *testing.T) {
+ m1 := GetMemberlist(t)
+ m1.setAlive()
+ m1.schedule()
+
+ // Create a second node
+ c := DefaultLANConfig()
+ addr1 := getBindAddr()
+ c.Name = addr1.String()
+ c.BindAddr = addr1.String()
+ c.BindPort = m1.config.BindPort
+ c.ProbeInterval = time.Millisecond
+ c.ProbeTimeout = 100 * time.Microsecond
+ c.SuspicionMaxTimeoutMult = 1
+
+ m2, err := Create(c)
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ num, err := m2.Join([]string{m1.config.BindAddr})
+ if num != 1 {
+ t.Fatalf("unexpected 1: %d", num)
+ }
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+
+ // Check the hosts
+ if len(m2.Members()) != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+
+ m1.Shutdown()
+
+ time.Sleep(10 * time.Millisecond)
+
+ if len(m2.Members()) != 1 {
+ t.Fatalf("should have 1 nodes! %v", m2.Members())
+ }
+}
+
+func TestMemberlist_delegateMeta(t *testing.T) {
+ c1 := testConfig()
+ c2 := testConfig()
+ c1.Delegate = &MockDelegate{meta: []byte("web")}
+ c2.Delegate = &MockDelegate{meta: []byte("lb")}
+
+ m1, err := Create(c1)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m1.Shutdown()
+
+ m2, err := Create(c2)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ _, err = m1.Join([]string{c2.BindAddr})
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+
+ yield()
+
+ var roles map[string]string
+
+ // Check the roles of members of m1
+ m1m := m1.Members()
+ if len(m1m) != 2 {
+ t.Fatalf("bad: %#v", m1m)
+ }
+
+ roles = make(map[string]string)
+ for _, m := range m1m {
+ roles[m.Name] = string(m.Meta)
+ }
+
+ if r := roles[c1.Name]; r != "web" {
+ t.Fatalf("bad role for %s: %s", c1.Name, r)
+ }
+
+ if r := roles[c2.Name]; r != "lb" {
+ t.Fatalf("bad role for %s: %s", c2.Name, r)
+ }
+
+ // Check the roles of members of m2
+ m2m := m2.Members()
+ if len(m2m) != 2 {
+ t.Fatalf("bad: %#v", m2m)
+ }
+
+ roles = make(map[string]string)
+ for _, m := range m2m {
+ roles[m.Name] = string(m.Meta)
+ }
+
+ if r := roles[c1.Name]; r != "web" {
+ t.Fatalf("bad role for %s: %s", c1.Name, r)
+ }
+
+ if r := roles[c2.Name]; r != "lb" {
+ t.Fatalf("bad role for %s: %s", c2.Name, r)
+ }
+}
+
+func TestMemberlist_delegateMeta_Update(t *testing.T) {
+ c1 := testConfig()
+ c2 := testConfig()
+ mock1 := &MockDelegate{meta: []byte("web")}
+ mock2 := &MockDelegate{meta: []byte("lb")}
+ c1.Delegate = mock1
+ c2.Delegate = mock2
+
+ m1, err := Create(c1)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m1.Shutdown()
+
+ m2, err := Create(c2)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ _, err = m1.Join([]string{c2.BindAddr})
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+
+ yield()
+
+ // Update the meta data roles
+ mock1.meta = []byte("api")
+ mock2.meta = []byte("db")
+
+ m1.UpdateNode(0)
+ m2.UpdateNode(0)
+ yield()
+
+ // Check the updates have propagated
+ var roles map[string]string
+
+ // Check the roles of members of m1
+ m1m := m1.Members()
+ if len(m1m) != 2 {
+ t.Fatalf("bad: %#v", m1m)
+ }
+
+ roles = make(map[string]string)
+ for _, m := range m1m {
+ roles[m.Name] = string(m.Meta)
+ }
+
+ if r := roles[c1.Name]; r != "api" {
+ t.Fatalf("bad role for %s: %s", c1.Name, r)
+ }
+
+ if r := roles[c2.Name]; r != "db" {
+ t.Fatalf("bad role for %s: %s", c2.Name, r)
+ }
+
+ // Check the roles of members of m2
+ m2m := m2.Members()
+ if len(m2m) != 2 {
+ t.Fatalf("bad: %#v", m2m)
+ }
+
+ roles = make(map[string]string)
+ for _, m := range m2m {
+ roles[m.Name] = string(m.Meta)
+ }
+
+ if r := roles[c1.Name]; r != "api" {
+ t.Fatalf("bad role for %s: %s", c1.Name, r)
+ }
+
+ if r := roles[c2.Name]; r != "db" {
+ t.Fatalf("bad role for %s: %s", c2.Name, r)
+ }
+}
+
+func TestMemberlist_UserData(t *testing.T) {
+ m1, d1 := GetMemberlistDelegate(t)
+ d1.state = []byte("something")
+ m1.setAlive()
+ m1.schedule()
+ defer m1.Shutdown()
+
+ // Create a second delegate with things to send
+ d2 := &MockDelegate{}
+ d2.broadcasts = [][]byte{
+ []byte("test"),
+ []byte("foobar"),
+ }
+ d2.state = []byte("my state")
+
+ // Create a second node
+ c := DefaultLANConfig()
+ addr1 := getBindAddr()
+ c.Name = addr1.String()
+ c.BindAddr = addr1.String()
+ c.BindPort = m1.config.BindPort
+ c.GossipInterval = time.Millisecond
+ c.PushPullInterval = time.Millisecond
+ c.Delegate = d2
+
+ m2, err := Create(c)
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ num, err := m2.Join([]string{m1.config.BindAddr})
+ if num != 1 {
+ t.Fatalf("unexpected 1: %d", num)
+ }
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ // Check the hosts
+ if m2.NumMembers() != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+
+ // Wait for a little while
+ time.Sleep(3 * time.Millisecond)
+
+ // Ensure we got the messages
+ if len(d1.msgs) != 2 {
+ t.Fatalf("should have 2 messages!")
+ }
+ if !reflect.DeepEqual(d1.msgs[0], []byte("test")) {
+ t.Fatalf("bad msg %v", d1.msgs[0])
+ }
+ if !reflect.DeepEqual(d1.msgs[1], []byte("foobar")) {
+ t.Fatalf("bad msg %v", d1.msgs[1])
+ }
+
+ // Check the push/pull state
+ if !reflect.DeepEqual(d1.remoteState, []byte("my state")) {
+ t.Fatalf("bad state %s", d1.remoteState)
+ }
+ if !reflect.DeepEqual(d2.remoteState, []byte("something")) {
+ t.Fatalf("bad state %s", d2.remoteState)
+ }
+}
+
+func TestMemberlist_SendTo(t *testing.T) {
+ m1, d1 := GetMemberlistDelegate(t)
+ m1.setAlive()
+ m1.schedule()
+ defer m1.Shutdown()
+
+ // Create a second delegate with things to send
+ d2 := &MockDelegate{}
+
+ // Create a second node
+ c := DefaultLANConfig()
+ addr1 := getBindAddr()
+ c.Name = addr1.String()
+ c.BindAddr = addr1.String()
+ c.BindPort = m1.config.BindPort
+ c.GossipInterval = time.Millisecond
+ c.PushPullInterval = time.Millisecond
+ c.Delegate = d2
+
+ m2, err := Create(c)
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ num, err := m2.Join([]string{m1.config.BindAddr})
+ if num != 1 {
+ t.Fatalf("unexpected 1: %d", num)
+ }
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+
+ // Check the hosts
+ if m2.NumMembers() != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+
+ // Try to do a direct send
+ m2Addr := &net.UDPAddr{IP: addr1,
+ Port: c.BindPort}
+ if err := m1.SendTo(m2Addr, []byte("ping")); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ m1Addr := &net.UDPAddr{IP: net.ParseIP(m1.config.BindAddr),
+ Port: m1.config.BindPort}
+ if err := m2.SendTo(m1Addr, []byte("pong")); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ // Wait for a little while
+ time.Sleep(3 * time.Millisecond)
+
+ // Ensure we got the messages
+ if len(d1.msgs) != 1 {
+ t.Fatalf("should have 1 messages!")
+ }
+ if !reflect.DeepEqual(d1.msgs[0], []byte("pong")) {
+ t.Fatalf("bad msg %v", d1.msgs[0])
+ }
+
+ if len(d2.msgs) != 1 {
+ t.Fatalf("should have 1 messages!")
+ }
+ if !reflect.DeepEqual(d2.msgs[0], []byte("ping")) {
+ t.Fatalf("bad msg %v", d2.msgs[0])
+ }
+}
+
+func TestMemberlistProtocolVersion(t *testing.T) {
+ c := DefaultLANConfig()
+ c.BindAddr = getBindAddr().String()
+ c.ProtocolVersion = ProtocolVersionMax
+ m, err := Create(c)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m.Shutdown()
+
+ result := m.ProtocolVersion()
+ if result != ProtocolVersionMax {
+ t.Fatalf("bad: %d", result)
+ }
+}
+
+func TestMemberlist_Join_DeadNode(t *testing.T) {
+ m1 := GetMemberlist(t)
+ m1.config.TCPTimeout = 50 * time.Millisecond
+ m1.setAlive()
+ m1.schedule()
+ defer m1.Shutdown()
+
+ // Create a second "node", which is just a TCP listener that
+ // does not ever respond. This is to test our deadliens
+ addr1 := getBindAddr()
+ list, err := net.Listen("tcp", fmt.Sprintf("%s:%d", addr1.String(), m1.config.BindPort))
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ defer list.Close()
+
+ // Ensure we don't hang forever
+ timer := time.AfterFunc(100*time.Millisecond, func() {
+ panic("should have timed out by now")
+ })
+ defer timer.Stop()
+
+ num, err := m1.Join([]string{addr1.String()})
+ if num != 0 {
+ t.Fatalf("unexpected 0: %d", num)
+ }
+ if err == nil {
+ t.Fatal("expect err")
+ }
+}
+
+// Tests that nodes running different versions of the protocol can successfully
+// discover each other and add themselves to their respective member lists.
+func TestMemberlist_Join_Prototocol_Compatibility(t *testing.T) {
+ testProtocolVersionPair := func(t *testing.T, pv1 uint8, pv2 uint8) {
+ c1 := testConfig()
+ c1.ProtocolVersion = pv1
+ m1, err := NewMemberlistOnOpenPort(c1)
+ if err != nil {
+ t.Fatalf("failed to start: %v", err)
+ }
+ m1.setAlive()
+ m1.schedule()
+ defer m1.Shutdown()
+
+ c2 := DefaultLANConfig()
+ addr1 := getBindAddr()
+ c2.Name = addr1.String()
+ c2.BindAddr = addr1.String()
+ c2.BindPort = m1.config.BindPort
+ c2.ProtocolVersion = pv2
+
+ m2, err := Create(c2)
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ num, err := m2.Join([]string{m1.config.BindAddr})
+ if num != 1 {
+ t.Fatalf("unexpected 1: %d", num)
+ }
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+
+ // Check the hosts
+ if len(m2.Members()) != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+
+ // Check the hosts
+ if len(m1.Members()) != 2 {
+ t.Fatalf("should have 2 nodes! %v", m1.Members())
+ }
+ }
+
+ testProtocolVersionPair(t, 2, 1)
+ testProtocolVersionPair(t, 2, 3)
+ testProtocolVersionPair(t, 3, 2)
+ testProtocolVersionPair(t, 3, 1)
+}
+
+func TestMemberlist_Join_IPv6(t *testing.T) {
+ // Since this binds to all interfaces we need to exclude other tests
+ // from grabbing an interface.
+ bindLock.Lock()
+ defer bindLock.Unlock()
+
+ c1 := DefaultLANConfig()
+ c1.Name = "A"
+ c1.BindAddr = "[::1]"
+ var m1 *Memberlist
+ var err error
+ for i := 0; i < 100; i++ {
+ c1.BindPort = 23456 + i
+ m1, err = Create(c1)
+ if err == nil {
+ break
+ }
+ }
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ defer m1.Shutdown()
+
+ // Create a second node
+ c2 := DefaultLANConfig()
+ c2.Name = "B"
+ c2.BindAddr = "[::1]"
+ var m2 *Memberlist
+ for i := 0; i < 100; i++ {
+ c2.BindPort = c1.BindPort + 1 + i
+ m2, err = Create(c2)
+ if err == nil {
+ break
+ }
+ }
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ num, err := m2.Join([]string{fmt.Sprintf("%s:%d", m1.config.BindAddr, 23456)})
+ if num != 1 {
+ t.Fatalf("unexpected 1: %d", num)
+ }
+ if err != nil {
+ t.Fatalf("unexpected err: %s", err)
+ }
+
+ // Check the hosts
+ if len(m2.Members()) != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+
+ if len(m1.Members()) != 2 {
+ t.Fatalf("should have 2 nodes! %v", m2.Members())
+ }
+}
+
+func TestAdvertiseAddr(t *testing.T) {
+ c := testConfig()
+ c.AdvertiseAddr = "127.0.1.100"
+ c.AdvertisePort = 23456
+
+ m, err := Create(c)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m.Shutdown()
+
+ yield()
+
+ members := m.Members()
+ if len(members) != 1 {
+ t.Fatalf("bad number of members")
+ }
+
+ if bytes.Compare(members[0].Addr, []byte{127, 0, 1, 100}) != 0 {
+ t.Fatalf("bad: %#v", members[0])
+ }
+
+ if members[0].Port != 23456 {
+ t.Fatalf("bad: %#v", members[0])
+ }
+}
+
+type MockConflict struct {
+ existing *Node
+ other *Node
+}
+
+func (m *MockConflict) NotifyConflict(existing, other *Node) {
+ m.existing = existing
+ m.other = other
+}
+
+func TestMemberlist_conflictDelegate(t *testing.T) {
+ c1 := testConfig()
+ c2 := testConfig()
+ mock := &MockConflict{}
+ c1.Conflict = mock
+
+ // Ensure name conflict
+ c2.Name = c1.Name
+
+ m1, err := Create(c1)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m1.Shutdown()
+
+ m2, err := Create(c2)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ _, err = m1.Join([]string{c2.BindAddr})
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+
+ yield()
+
+ // Ensure we were notified
+ if mock.existing == nil || mock.other == nil {
+ t.Fatalf("should get notified")
+ }
+ if mock.existing.Name != mock.other.Name {
+ t.Fatalf("bad: %v %v", mock.existing, mock.other)
+ }
+}
+
+type MockPing struct {
+ other *Node
+ rtt time.Duration
+ payload []byte
+}
+
+func (m *MockPing) NotifyPingComplete(other *Node, rtt time.Duration, payload []byte) {
+ m.other = other
+ m.rtt = rtt
+ m.payload = payload
+}
+
+const DEFAULT_PAYLOAD = "whatever"
+
+func (m *MockPing) AckPayload() []byte {
+ return []byte(DEFAULT_PAYLOAD)
+}
+
+func TestMemberlist_PingDelegate(t *testing.T) {
+ m1 := GetMemberlist(t)
+ m1.config.Ping = &MockPing{}
+ m1.setAlive()
+ m1.schedule()
+ defer m1.Shutdown()
+
+ // Create a second node
+ c := DefaultLANConfig()
+ addr1 := getBindAddr()
+ c.Name = addr1.String()
+ c.BindAddr = addr1.String()
+ c.BindPort = m1.config.BindPort
+ c.ProbeInterval = time.Millisecond
+ mock := &MockPing{}
+ c.Ping = mock
+
+ m2, err := Create(c)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ defer m2.Shutdown()
+
+ _, err = m2.Join([]string{m1.config.BindAddr})
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+
+ yield()
+
+ // Ensure we were notified
+ if mock.other == nil {
+ t.Fatalf("should get notified")
+ }
+
+ if !reflect.DeepEqual(mock.other, m1.LocalNode()) {
+ t.Fatalf("not notified about the correct node; expected: %+v; actual: %+v",
+ m2.LocalNode(), mock.other)
+ }
+
+ if mock.rtt <= 0 {
+ t.Fatalf("rtt should be greater than 0")
+ }
+
+ if bytes.Compare(mock.payload, []byte(DEFAULT_PAYLOAD)) != 0 {
+ t.Fatalf("incorrect payload. expected: %v; actual: %v", []byte(DEFAULT_PAYLOAD), mock.payload)
+ }
+}
+
+// Consul bug, rapid restart (before failure detection),
+// with an updated meta data. Should be at incarnation 1 for
+// both.
+//
+// This test is uncommented because it requires that either we
+// can rebind the socket (SO_REUSEPORT) which Go does not allow,
+// OR we must disable the address conflict checking in memberlist.
+// I just comment out that code to test this case.
+//
+//func TestMemberlist_Restart_delegateMeta_Update(t *testing.T) {
+// c1 := testConfig()
+// c2 := testConfig()
+// mock1 := &MockDelegate{meta: []byte("web")}
+// mock2 := &MockDelegate{meta: []byte("lb")}
+// c1.Delegate = mock1
+// c2.Delegate = mock2
+
+// m1, err := Create(c1)
+// if err != nil {
+// t.Fatalf("err: %s", err)
+// }
+// defer m1.Shutdown()
+
+// m2, err := Create(c2)
+// if err != nil {
+// t.Fatalf("err: %s", err)
+// }
+// defer m2.Shutdown()
+
+// _, err = m1.Join([]string{c2.BindAddr})
+// if err != nil {
+// t.Fatalf("err: %s", err)
+// }
+
+// yield()
+
+// // Recreate m1 with updated meta
+// m1.Shutdown()
+// c3 := testConfig()
+// c3.Name = c1.Name
+// c3.Delegate = mock1
+// c3.GossipInterval = time.Millisecond
+// mock1.meta = []byte("api")
+
+// m1, err = Create(c3)
+// if err != nil {
+// t.Fatalf("err: %s", err)
+// }
+// defer m1.Shutdown()
+
+// _, err = m1.Join([]string{c2.BindAddr})
+// if err != nil {
+// t.Fatalf("err: %s", err)
+// }
+
+// yield()
+// yield()
+
+// // Check the updates have propagated
+// var roles map[string]string
+
+// // Check the roles of members of m1
+// m1m := m1.Members()
+// if len(m1m) != 2 {
+// t.Fatalf("bad: %#v", m1m)
+// }
+
+// roles = make(map[string]string)
+// for _, m := range m1m {
+// roles[m.Name] = string(m.Meta)
+// }
+
+// if r := roles[c1.Name]; r != "api" {
+// t.Fatalf("bad role for %s: %s", c1.Name, r)
+// }
+
+// if r := roles[c2.Name]; r != "lb" {
+// t.Fatalf("bad role for %s: %s", c2.Name, r)
+// }
+
+// // Check the roles of members of m2
+// m2m := m2.Members()
+// if len(m2m) != 2 {
+// t.Fatalf("bad: %#v", m2m)
+// }
+
+// roles = make(map[string]string)
+// for _, m := range m2m {
+// roles[m.Name] = string(m.Meta)
+// }
+
+// if r := roles[c1.Name]; r != "api" {
+// t.Fatalf("bad role for %s: %s", c1.Name, r)
+// }
+
+// if r := roles[c2.Name]; r != "lb" {
+// t.Fatalf("bad role for %s: %s", c2.Name, r)
+// }
+//}