diff options
Diffstat (limited to 'vendor/github.com/hashicorp/memberlist/net.go')
-rw-r--r-- | vendor/github.com/hashicorp/memberlist/net.go | 92 |
1 files changed, 71 insertions, 21 deletions
diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index a4330c4d2..f6a0d45fe 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -8,9 +8,10 @@ import ( "hash/crc32" "io" "net" + "sync/atomic" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" ) @@ -71,7 +72,8 @@ const ( compoundOverhead = 2 // Assumed overhead per entry in compoundHeader userMsgOverhead = 1 blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process - maxPushStateBytes = 10 * 1024 * 1024 + maxPushStateBytes = 20 * 1024 * 1024 + maxPushPullRequests = 128 // Maximum number of concurrent push/pull requests ) // ping request sent directly to node @@ -238,6 +240,16 @@ func (m *Memberlist) handleConn(conn net.Conn) { m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn)) } case pushPullMsg: + // Increment counter of pending push/pulls + numConcurrent := atomic.AddUint32(&m.pushPullReq, 1) + defer atomic.AddUint32(&m.pushPullReq, ^uint32(0)) + + // Check if we have too many open push/pull requests + if numConcurrent >= maxPushPullRequests { + m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests") + return + } + join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec) if err != nil { m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn)) @@ -357,10 +369,25 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim case deadMsg: fallthrough case userMsg: + // Determine the message queue, prioritize alive + queue := m.lowPriorityMsgQueue + if msgType == aliveMsg { + queue = m.highPriorityMsgQueue + } + + // Check for overflow and append if not full + m.msgQueueLock.Lock() + if queue.Len() >= m.config.HandoffQueueDepth { + m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) + } else { + queue.PushBack(msgHandoff{msgType, buf, from}) + } + m.msgQueueLock.Unlock() + + // Notify of pending message select { - case m.handoff <- msgHandoff{msgType, buf, from}: + case m.handoffCh <- struct{}{}: default: - m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) } default: @@ -368,28 +395,51 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim } } +// getNextMessage returns the next message to process in priority order, using LIFO +func (m *Memberlist) getNextMessage() (msgHandoff, bool) { + m.msgQueueLock.Lock() + defer m.msgQueueLock.Unlock() + + if el := m.highPriorityMsgQueue.Back(); el != nil { + m.highPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } else if el := m.lowPriorityMsgQueue.Back(); el != nil { + m.lowPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } + return msgHandoff{}, false +} + // packetHandler is a long running goroutine that processes messages received // over the packet interface, but is decoupled from the listener to avoid // blocking the listener which may cause ping/ack messages to be delayed. func (m *Memberlist) packetHandler() { for { select { - case msg := <-m.handoff: - msgType := msg.msgType - buf := msg.buf - from := msg.from - - switch msgType { - case suspectMsg: - m.handleSuspect(buf, from) - case aliveMsg: - m.handleAlive(buf, from) - case deadMsg: - m.handleDead(buf, from) - case userMsg: - m.handleUser(buf, from) - default: - m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) + case <-m.handoffCh: + for { + msg, ok := m.getNextMessage() + if !ok { + break + } + msgType := msg.msgType + buf := msg.buf + from := msg.from + + switch msgType { + case suspectMsg: + m.handleSuspect(buf, from) + case aliveMsg: + m.handleAlive(buf, from) + case deadMsg: + m.handleDead(buf, from) + case userMsg: + m.handleUser(buf, from) + default: + m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) + } } case <-m.shutdownCh: @@ -1094,7 +1144,7 @@ func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time } if ack.SeqNo != ping.SeqNo { - return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn)) + return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo) } return true, nil |