package memberlist import ( "fmt" "log" "net" "sync" "sync/atomic" "time" "github.com/armon/go-metrics" sockaddr "github.com/hashicorp/go-sockaddr" ) const ( // udpPacketBufSize is used to buffer incoming packets during read // operations. udpPacketBufSize = 65536 // udpRecvBufSize is a large buffer size that we attempt to set UDP // sockets to in order to handle a large volume of messages. udpRecvBufSize = 2 * 1024 * 1024 ) // NetTransportConfig is used to configure a net transport. type NetTransportConfig struct { // BindAddrs is a list of addresses to bind to for both TCP and UDP // communications. BindAddrs []string // BindPort is the port to listen on, for each address above. BindPort int // Logger is a logger for operator messages. Logger *log.Logger } // NetTransport is a Transport implementation that uses connectionless UDP for // packet operations, and ad-hoc TCP connections for stream operations. type NetTransport struct { config *NetTransportConfig packetCh chan *Packet streamCh chan net.Conn logger *log.Logger wg sync.WaitGroup tcpListeners []*net.TCPListener udpListeners []*net.UDPConn shutdown int32 } // NewNetTransport returns a net transport with the given configuration. On // success all the network listeners will be created and listening. func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) { // If we reject the empty list outright we can assume that there's at // least one listener of each type later during operation. if len(config.BindAddrs) == 0 { return nil, fmt.Errorf("At least one bind address is required") } // Build out the new transport. var ok bool t := NetTransport{ config: config, packetCh: make(chan *Packet), streamCh: make(chan net.Conn), logger: config.Logger, } // Clean up listeners if there's an error. defer func() { if !ok { t.Shutdown() } }() // Build all the TCP and UDP listeners. port := config.BindPort for _, addr := range config.BindAddrs { ip := net.ParseIP(addr) tcpAddr := &net.TCPAddr{IP: ip, Port: port} tcpLn, err := net.ListenTCP("tcp", tcpAddr) if err != nil { return nil, fmt.Errorf("Failed to start TCP listener on %q port %d: %v", addr, port, err) } t.tcpListeners = append(t.tcpListeners, tcpLn) // If the config port given was zero, use the first TCP listener // to pick an available port and then apply that to everything // else. if port == 0 { port = tcpLn.Addr().(*net.TCPAddr).Port } udpAddr := &net.UDPAddr{IP: ip, Port: port} udpLn, err := net.ListenUDP("udp", udpAddr) if err != nil { return nil, fmt.Errorf("Failed to start UDP listener on %q port %d: %v", addr, port, err) } if err := setUDPRecvBuf(udpLn); err != nil { return nil, fmt.Errorf("Failed to resize UDP buffer: %v", err) } t.udpListeners = append(t.udpListeners, udpLn) } // Fire them up now that we've been able to create them all. for i := 0; i < len(config.BindAddrs); i++ { t.wg.Add(2) go t.tcpListen(t.tcpListeners[i]) go t.udpListen(t.udpListeners[i]) } ok = true return &t, nil } // GetAutoBindPort returns the bind port that was automatically given by the // kernel, if a bind port of 0 was given. func (t *NetTransport) GetAutoBindPort() int { // We made sure there's at least one TCP listener, and that one's // port was applied to all the others for the dynamic bind case. return t.tcpListeners[0].Addr().(*net.TCPAddr).Port } // See Transport. func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) { var advertiseAddr net.IP var advertisePort int if ip != "" { // If they've supplied an address, use that. advertiseAddr = net.ParseIP(ip) if advertiseAddr == nil { return nil, 0, fmt.Errorf("Failed to parse advertise address %q", ip) } // Ensure IPv4 conversion if necessary. if ip4 := advertiseAddr.To4(); ip4 != nil { advertiseAddr = ip4 } advertisePort = port } else { if t.config.BindAddrs[0] == "0.0.0.0" { // Otherwise, if we're not bound to a specific IP, let's // use a suitable private IP address. var err error ip, err = sockaddr.GetPrivateIP() if err != nil { return nil, 0, fmt.Errorf("Failed to get interface addresses: %v", err) } if ip == "" { return nil, 0, fmt.Errorf("No private IP address found, and explicit IP not provided") } advertiseAddr = net.ParseIP(ip) if advertiseAddr == nil { return nil, 0, fmt.Errorf("Failed to parse advertise address: %q", ip) } } else { // Use the IP that we're bound to, based on the first // TCP listener, which we already ensure is there. advertiseAddr = t.tcpListeners[0].Addr().(*net.TCPAddr).IP } // Use the port we are bound to. advertisePort = t.GetAutoBindPort() } return advertiseAddr, advertisePort, nil } // See Transport. func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) { udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return time.Time{}, err } // We made sure there's at least one UDP listener, so just use the // packet sending interface on the first one. Take the time after the // write call comes back, which will underestimate the time a little, // but help account for any delays before the write occurs. _, err = t.udpListeners[0].WriteTo(b, udpAddr) return time.Now(), err } // See Transport. func (t *NetTransport) PacketCh() <-chan *Packet { return t.packetCh } // See Transport. func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { dialer := net.Dialer{Timeout: timeout} return dialer.Dial("tcp", addr) } // See Transport. func (t *NetTransport) StreamCh() <-chan net.Conn { return t.streamCh } // See Transport. func (t *NetTransport) Shutdown() error { // This will avoid log spam about errors when we shut down. atomic.StoreInt32(&t.shutdown, 1) // Rip through all the connections and shut them down. for _, conn := range t.tcpListeners { conn.Close() } for _, conn := range t.udpListeners { conn.Close() } // Block until all the listener threads have died. t.wg.Wait() return nil } // tcpListen is a long running goroutine that accepts incoming TCP connections // and hands them off to the stream channel. func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) { defer t.wg.Done() for { conn, err := tcpLn.AcceptTCP() if err != nil { if s := atomic.LoadInt32(&t.shutdown); s == 1 { break } t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err) continue } t.streamCh <- conn } } // udpListen is a long running goroutine that accepts incoming UDP packets and // hands them off to the packet channel. func (t *NetTransport) udpListen(udpLn *net.UDPConn) { defer t.wg.Done() for { // Do a blocking read into a fresh buffer. Grab a time stamp as // close as possible to the I/O. buf := make([]byte, udpPacketBufSize) n, addr, err := udpLn.ReadFrom(buf) ts := time.Now() if err != nil { if s := atomic.LoadInt32(&t.shutdown); s == 1 { break } t.logger.Printf("[ERR] memberlist: Error reading UDP packet: %v", err) continue } // Check the length - it needs to have at least one byte to be a // proper message. if n < 1 { t.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s", len(buf), LogAddress(addr)) continue } // Ingest the packet. metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n)) t.packetCh <- &Packet{ Buf: buf[:n], From: addr, Timestamp: ts, } } } // setUDPRecvBuf is used to resize the UDP receive window. The function // attempts to set the read buffer to `udpRecvBuf` but backs off until // the read buffer can be set. func setUDPRecvBuf(c *net.UDPConn) error { size := udpRecvBufSize var err error for size > 0 { if err = c.SetReadBuffer(size); err == nil { return nil } size = size / 2 } return err }