From d103ed6ca97ca5a2669f6cf5fe4b3d2a9c945f26 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Wed, 17 May 2017 16:51:25 -0400 Subject: Upgrading server dependancies (#6431) --- .../hashicorp/memberlist/net_transport.go | 289 +++++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 vendor/github.com/hashicorp/memberlist/net_transport.go (limited to 'vendor/github.com/hashicorp/memberlist/net_transport.go') diff --git a/vendor/github.com/hashicorp/memberlist/net_transport.go b/vendor/github.com/hashicorp/memberlist/net_transport.go new file mode 100644 index 000000000..e7b88b01f --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/net_transport.go @@ -0,0 +1,289 @@ +package memberlist + +import ( + "fmt" + "log" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/armon/go-metrics" + sockaddr "github.com/hashicorp/go-sockaddr" +) + +const ( + // udpPacketBufSize is used to buffer incoming packets during read + // operations. + udpPacketBufSize = 65536 + + // udpRecvBufSize is a large buffer size that we attempt to set UDP + // sockets to in order to handle a large volume of messages. + udpRecvBufSize = 2 * 1024 * 1024 +) + +// NetTransportConfig is used to configure a net transport. +type NetTransportConfig struct { + // BindAddrs is a list of addresses to bind to for both TCP and UDP + // communications. + BindAddrs []string + + // BindPort is the port to listen on, for each address above. + BindPort int + + // Logger is a logger for operator messages. + Logger *log.Logger +} + +// NetTransport is a Transport implementation that uses connectionless UDP for +// packet operations, and ad-hoc TCP connections for stream operations. +type NetTransport struct { + config *NetTransportConfig + packetCh chan *Packet + streamCh chan net.Conn + logger *log.Logger + wg sync.WaitGroup + tcpListeners []*net.TCPListener + udpListeners []*net.UDPConn + shutdown int32 +} + +// NewNetTransport returns a net transport with the given configuration. On +// success all the network listeners will be created and listening. +func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) { + // If we reject the empty list outright we can assume that there's at + // least one listener of each type later during operation. + if len(config.BindAddrs) == 0 { + return nil, fmt.Errorf("At least one bind address is required") + } + + // Build out the new transport. + var ok bool + t := NetTransport{ + config: config, + packetCh: make(chan *Packet), + streamCh: make(chan net.Conn), + logger: config.Logger, + } + + // Clean up listeners if there's an error. + defer func() { + if !ok { + t.Shutdown() + } + }() + + // Build all the TCP and UDP listeners. + port := config.BindPort + for _, addr := range config.BindAddrs { + ip := net.ParseIP(addr) + + tcpAddr := &net.TCPAddr{IP: ip, Port: port} + tcpLn, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + return nil, fmt.Errorf("Failed to start TCP listener on %q port %d: %v", addr, port, err) + } + t.tcpListeners = append(t.tcpListeners, tcpLn) + + // If the config port given was zero, use the first TCP listener + // to pick an available port and then apply that to everything + // else. + if port == 0 { + port = tcpLn.Addr().(*net.TCPAddr).Port + } + + udpAddr := &net.UDPAddr{IP: ip, Port: port} + udpLn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return nil, fmt.Errorf("Failed to start UDP listener on %q port %d: %v", addr, port, err) + } + if err := setUDPRecvBuf(udpLn); err != nil { + return nil, fmt.Errorf("Failed to resize UDP buffer: %v", err) + } + t.udpListeners = append(t.udpListeners, udpLn) + } + + // Fire them up now that we've been able to create them all. + for i := 0; i < len(config.BindAddrs); i++ { + t.wg.Add(2) + go t.tcpListen(t.tcpListeners[i]) + go t.udpListen(t.udpListeners[i]) + } + + ok = true + return &t, nil +} + +// GetAutoBindPort returns the bind port that was automatically given by the +// kernel, if a bind port of 0 was given. +func (t *NetTransport) GetAutoBindPort() int { + // We made sure there's at least one TCP listener, and that one's + // port was applied to all the others for the dynamic bind case. + return t.tcpListeners[0].Addr().(*net.TCPAddr).Port +} + +// See Transport. +func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) { + var advertiseAddr net.IP + var advertisePort int + if ip != "" { + // If they've supplied an address, use that. + advertiseAddr = net.ParseIP(ip) + if advertiseAddr == nil { + return nil, 0, fmt.Errorf("Failed to parse advertise address %q", ip) + } + + // Ensure IPv4 conversion if necessary. + if ip4 := advertiseAddr.To4(); ip4 != nil { + advertiseAddr = ip4 + } + advertisePort = port + } else { + if t.config.BindAddrs[0] == "0.0.0.0" { + // Otherwise, if we're not bound to a specific IP, let's + // use a suitable private IP address. + var err error + ip, err = sockaddr.GetPrivateIP() + if err != nil { + return nil, 0, fmt.Errorf("Failed to get interface addresses: %v", err) + } + if ip == "" { + return nil, 0, fmt.Errorf("No private IP address found, and explicit IP not provided") + } + + advertiseAddr = net.ParseIP(ip) + if advertiseAddr == nil { + return nil, 0, fmt.Errorf("Failed to parse advertise address: %q", ip) + } + } else { + // Use the IP that we're bound to, based on the first + // TCP listener, which we already ensure is there. + advertiseAddr = t.tcpListeners[0].Addr().(*net.TCPAddr).IP + } + + // Use the port we are bound to. + advertisePort = t.GetAutoBindPort() + } + + return advertiseAddr, advertisePort, nil +} + +// See Transport. +func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return time.Time{}, err + } + + // We made sure there's at least one UDP listener, so just use the + // packet sending interface on the first one. Take the time after the + // write call comes back, which will underestimate the time a little, + // but help account for any delays before the write occurs. + _, err = t.udpListeners[0].WriteTo(b, udpAddr) + return time.Now(), err +} + +// See Transport. +func (t *NetTransport) PacketCh() <-chan *Packet { + return t.packetCh +} + +// See Transport. +func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { + dialer := net.Dialer{Timeout: timeout} + return dialer.Dial("tcp", addr) +} + +// See Transport. +func (t *NetTransport) StreamCh() <-chan net.Conn { + return t.streamCh +} + +// See Transport. +func (t *NetTransport) Shutdown() error { + // This will avoid log spam about errors when we shut down. + atomic.StoreInt32(&t.shutdown, 1) + + // Rip through all the connections and shut them down. + for _, conn := range t.tcpListeners { + conn.Close() + } + for _, conn := range t.udpListeners { + conn.Close() + } + + // Block until all the listener threads have died. + t.wg.Wait() + return nil +} + +// tcpListen is a long running goroutine that accepts incoming TCP connections +// and hands them off to the stream channel. +func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) { + defer t.wg.Done() + for { + conn, err := tcpLn.AcceptTCP() + if err != nil { + if s := atomic.LoadInt32(&t.shutdown); s == 1 { + break + } + + t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err) + continue + } + + t.streamCh <- conn + } +} + +// udpListen is a long running goroutine that accepts incoming UDP packets and +// hands them off to the packet channel. +func (t *NetTransport) udpListen(udpLn *net.UDPConn) { + defer t.wg.Done() + for { + // Do a blocking read into a fresh buffer. Grab a time stamp as + // close as possible to the I/O. + buf := make([]byte, udpPacketBufSize) + n, addr, err := udpLn.ReadFrom(buf) + ts := time.Now() + if err != nil { + if s := atomic.LoadInt32(&t.shutdown); s == 1 { + break + } + + t.logger.Printf("[ERR] memberlist: Error reading UDP packet: %v", err) + continue + } + + // Check the length - it needs to have at least one byte to be a + // proper message. + if n < 1 { + t.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s", + len(buf), LogAddress(addr)) + continue + } + + // Ingest the packet. + metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n)) + t.packetCh <- &Packet{ + Buf: buf[:n], + From: addr, + Timestamp: ts, + } + } +} + +// setUDPRecvBuf is used to resize the UDP receive window. The function +// attempts to set the read buffer to `udpRecvBuf` but backs off until +// the read buffer can be set. +func setUDPRecvBuf(c *net.UDPConn) error { + size := udpRecvBufSize + var err error + for size > 0 { + if err = c.SetReadBuffer(size); err == nil { + return nil + } + size = size / 2 + } + return err +} -- cgit v1.2.3-1-g7c22