From 61e27beabc9804fdcf59ed9df2180802175a4f70 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Tue, 28 Aug 2018 10:05:26 -0700 Subject: Updating dependancies. (#9303) --- vendor/github.com/hashicorp/errwrap/go.mod | 1 + vendor/github.com/hashicorp/go-multierror/go.mod | 3 + vendor/github.com/hashicorp/go-multierror/go.sum | 4 + vendor/github.com/hashicorp/memberlist/Makefile | 2 +- .../github.com/hashicorp/memberlist/memberlist.go | 32 +++++--- vendor/github.com/hashicorp/memberlist/net.go | 92 +++++++++++++++++----- 6 files changed, 100 insertions(+), 34 deletions(-) create mode 100644 vendor/github.com/hashicorp/errwrap/go.mod create mode 100644 vendor/github.com/hashicorp/go-multierror/go.mod create mode 100644 vendor/github.com/hashicorp/go-multierror/go.sum (limited to 'vendor/github.com/hashicorp') diff --git a/vendor/github.com/hashicorp/errwrap/go.mod b/vendor/github.com/hashicorp/errwrap/go.mod new file mode 100644 index 000000000..c9b84022c --- /dev/null +++ b/vendor/github.com/hashicorp/errwrap/go.mod @@ -0,0 +1 @@ +module github.com/hashicorp/errwrap diff --git a/vendor/github.com/hashicorp/go-multierror/go.mod b/vendor/github.com/hashicorp/go-multierror/go.mod new file mode 100644 index 000000000..2534331d5 --- /dev/null +++ b/vendor/github.com/hashicorp/go-multierror/go.mod @@ -0,0 +1,3 @@ +module github.com/hashicorp/go-multierror + +require github.com/hashicorp/errwrap v1.0.0 diff --git a/vendor/github.com/hashicorp/go-multierror/go.sum b/vendor/github.com/hashicorp/go-multierror/go.sum new file mode 100644 index 000000000..85b1f8ff3 --- /dev/null +++ b/vendor/github.com/hashicorp/go-multierror/go.sum @@ -0,0 +1,4 @@ +github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce h1:prjrVgOk2Yg6w+PflHoszQNLTUh4kaByUcEWM/9uin4= +github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= diff --git a/vendor/github.com/hashicorp/memberlist/Makefile b/vendor/github.com/hashicorp/memberlist/Makefile index 891e8364a..e34a0818d 100644 --- a/vendor/github.com/hashicorp/memberlist/Makefile +++ b/vendor/github.com/hashicorp/memberlist/Makefile @@ -16,4 +16,4 @@ deps: go get -d -v ./... echo $(DEPS) | xargs -n1 go get -d -.PNONY: test cov integ +.PHONY: test cov integ diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go index e9084f9fd..bd8abd23f 100644 --- a/vendor/github.com/hashicorp/memberlist/memberlist.go +++ b/vendor/github.com/hashicorp/memberlist/memberlist.go @@ -15,6 +15,7 @@ multiple routes. package memberlist import ( + "container/list" "fmt" "log" "net" @@ -34,6 +35,7 @@ type Memberlist struct { sequenceNum uint32 // Local sequence number incarnation uint32 // Local incarnation number numNodes uint32 // Number of known nodes (estimate) + pushPullReq uint32 // Number of push/pull requests config *Config shutdown int32 // Used as an atomic boolean value @@ -45,7 +47,11 @@ type Memberlist struct { leaveLock sync.Mutex // Serializes calls to Leave transport Transport - handoff chan msgHandoff + + handoffCh chan struct{} + highPriorityMsgQueue *list.List + lowPriorityMsgQueue *list.List + msgQueueLock sync.Mutex nodeLock sync.RWMutex nodes []*nodeState // Known nodes @@ -160,17 +166,19 @@ func newMemberlist(conf *Config) (*Memberlist, error) { } m := &Memberlist{ - config: conf, - shutdownCh: make(chan struct{}), - leaveBroadcast: make(chan struct{}, 1), - transport: transport, - handoff: make(chan msgHandoff, conf.HandoffQueueDepth), - nodeMap: make(map[string]*nodeState), - nodeTimers: make(map[string]*suspicion), - awareness: newAwareness(conf.AwarenessMaxMultiplier), - ackHandlers: make(map[uint32]*ackHandler), - broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, - logger: logger, + config: conf, + shutdownCh: make(chan struct{}), + leaveBroadcast: make(chan struct{}, 1), + transport: transport, + handoffCh: make(chan struct{}, 1), + highPriorityMsgQueue: list.New(), + lowPriorityMsgQueue: list.New(), + nodeMap: make(map[string]*nodeState), + nodeTimers: make(map[string]*suspicion), + awareness: newAwareness(conf.AwarenessMaxMultiplier), + ackHandlers: make(map[uint32]*ackHandler), + broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, + logger: logger, } m.broadcasts.NumNodes = func() int { return m.estNumNodes() diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index a4330c4d2..f6a0d45fe 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -8,9 +8,10 @@ import ( "hash/crc32" "io" "net" + "sync/atomic" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" ) @@ -71,7 +72,8 @@ const ( compoundOverhead = 2 // Assumed overhead per entry in compoundHeader userMsgOverhead = 1 blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process - maxPushStateBytes = 10 * 1024 * 1024 + maxPushStateBytes = 20 * 1024 * 1024 + maxPushPullRequests = 128 // Maximum number of concurrent push/pull requests ) // ping request sent directly to node @@ -238,6 +240,16 @@ func (m *Memberlist) handleConn(conn net.Conn) { m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn)) } case pushPullMsg: + // Increment counter of pending push/pulls + numConcurrent := atomic.AddUint32(&m.pushPullReq, 1) + defer atomic.AddUint32(&m.pushPullReq, ^uint32(0)) + + // Check if we have too many open push/pull requests + if numConcurrent >= maxPushPullRequests { + m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests") + return + } + join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec) if err != nil { m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn)) @@ -357,10 +369,25 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim case deadMsg: fallthrough case userMsg: + // Determine the message queue, prioritize alive + queue := m.lowPriorityMsgQueue + if msgType == aliveMsg { + queue = m.highPriorityMsgQueue + } + + // Check for overflow and append if not full + m.msgQueueLock.Lock() + if queue.Len() >= m.config.HandoffQueueDepth { + m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) + } else { + queue.PushBack(msgHandoff{msgType, buf, from}) + } + m.msgQueueLock.Unlock() + + // Notify of pending message select { - case m.handoff <- msgHandoff{msgType, buf, from}: + case m.handoffCh <- struct{}{}: default: - m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) } default: @@ -368,28 +395,51 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim } } +// getNextMessage returns the next message to process in priority order, using LIFO +func (m *Memberlist) getNextMessage() (msgHandoff, bool) { + m.msgQueueLock.Lock() + defer m.msgQueueLock.Unlock() + + if el := m.highPriorityMsgQueue.Back(); el != nil { + m.highPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } else if el := m.lowPriorityMsgQueue.Back(); el != nil { + m.lowPriorityMsgQueue.Remove(el) + msg := el.Value.(msgHandoff) + return msg, true + } + return msgHandoff{}, false +} + // packetHandler is a long running goroutine that processes messages received // over the packet interface, but is decoupled from the listener to avoid // blocking the listener which may cause ping/ack messages to be delayed. func (m *Memberlist) packetHandler() { for { select { - case msg := <-m.handoff: - msgType := msg.msgType - buf := msg.buf - from := msg.from - - switch msgType { - case suspectMsg: - m.handleSuspect(buf, from) - case aliveMsg: - m.handleAlive(buf, from) - case deadMsg: - m.handleDead(buf, from) - case userMsg: - m.handleUser(buf, from) - default: - m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) + case <-m.handoffCh: + for { + msg, ok := m.getNextMessage() + if !ok { + break + } + msgType := msg.msgType + buf := msg.buf + from := msg.from + + switch msgType { + case suspectMsg: + m.handleSuspect(buf, from) + case aliveMsg: + m.handleAlive(buf, from) + case deadMsg: + m.handleDead(buf, from) + case userMsg: + m.handleUser(buf, from) + default: + m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) + } } case <-m.shutdownCh: @@ -1094,7 +1144,7 @@ func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time } if ack.SeqNo != ping.SeqNo { - return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn)) + return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo) } return true, nil -- cgit v1.2.3-1-g7c22