summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/hashicorp/memberlist/net.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/memberlist/net.go')
-rw-r--r--vendor/github.com/hashicorp/memberlist/net.go92
1 files changed, 71 insertions, 21 deletions
diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go
index a4330c4d2..f6a0d45fe 100644
--- a/vendor/github.com/hashicorp/memberlist/net.go
+++ b/vendor/github.com/hashicorp/memberlist/net.go
@@ -8,9 +8,10 @@ import (
"hash/crc32"
"io"
"net"
+ "sync/atomic"
"time"
- "github.com/armon/go-metrics"
+ metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
)
@@ -71,7 +72,8 @@ const (
compoundOverhead = 2 // Assumed overhead per entry in compoundHeader
userMsgOverhead = 1
blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process
- maxPushStateBytes = 10 * 1024 * 1024
+ maxPushStateBytes = 20 * 1024 * 1024
+ maxPushPullRequests = 128 // Maximum number of concurrent push/pull requests
)
// ping request sent directly to node
@@ -238,6 +240,16 @@ func (m *Memberlist) handleConn(conn net.Conn) {
m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn))
}
case pushPullMsg:
+ // Increment counter of pending push/pulls
+ numConcurrent := atomic.AddUint32(&m.pushPullReq, 1)
+ defer atomic.AddUint32(&m.pushPullReq, ^uint32(0))
+
+ // Check if we have too many open push/pull requests
+ if numConcurrent >= maxPushPullRequests {
+ m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests")
+ return
+ }
+
join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn))
@@ -357,10 +369,25 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
case deadMsg:
fallthrough
case userMsg:
+ // Determine the message queue, prioritize alive
+ queue := m.lowPriorityMsgQueue
+ if msgType == aliveMsg {
+ queue = m.highPriorityMsgQueue
+ }
+
+ // Check for overflow and append if not full
+ m.msgQueueLock.Lock()
+ if queue.Len() >= m.config.HandoffQueueDepth {
+ m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
+ } else {
+ queue.PushBack(msgHandoff{msgType, buf, from})
+ }
+ m.msgQueueLock.Unlock()
+
+ // Notify of pending message
select {
- case m.handoff <- msgHandoff{msgType, buf, from}:
+ case m.handoffCh <- struct{}{}:
default:
- m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
}
default:
@@ -368,28 +395,51 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
}
}
+// getNextMessage returns the next message to process in priority order, using LIFO
+func (m *Memberlist) getNextMessage() (msgHandoff, bool) {
+ m.msgQueueLock.Lock()
+ defer m.msgQueueLock.Unlock()
+
+ if el := m.highPriorityMsgQueue.Back(); el != nil {
+ m.highPriorityMsgQueue.Remove(el)
+ msg := el.Value.(msgHandoff)
+ return msg, true
+ } else if el := m.lowPriorityMsgQueue.Back(); el != nil {
+ m.lowPriorityMsgQueue.Remove(el)
+ msg := el.Value.(msgHandoff)
+ return msg, true
+ }
+ return msgHandoff{}, false
+}
+
// packetHandler is a long running goroutine that processes messages received
// over the packet interface, but is decoupled from the listener to avoid
// blocking the listener which may cause ping/ack messages to be delayed.
func (m *Memberlist) packetHandler() {
for {
select {
- case msg := <-m.handoff:
- msgType := msg.msgType
- buf := msg.buf
- from := msg.from
-
- switch msgType {
- case suspectMsg:
- m.handleSuspect(buf, from)
- case aliveMsg:
- m.handleAlive(buf, from)
- case deadMsg:
- m.handleDead(buf, from)
- case userMsg:
- m.handleUser(buf, from)
- default:
- m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
+ case <-m.handoffCh:
+ for {
+ msg, ok := m.getNextMessage()
+ if !ok {
+ break
+ }
+ msgType := msg.msgType
+ buf := msg.buf
+ from := msg.from
+
+ switch msgType {
+ case suspectMsg:
+ m.handleSuspect(buf, from)
+ case aliveMsg:
+ m.handleAlive(buf, from)
+ case deadMsg:
+ m.handleDead(buf, from)
+ case userMsg:
+ m.handleUser(buf, from)
+ default:
+ m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
+ }
}
case <-m.shutdownCh:
@@ -1094,7 +1144,7 @@ func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time
}
if ack.SeqNo != ping.SeqNo {
- return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn))
+ return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo)
}
return true, nil