diff options
Diffstat (limited to 'vendor/github.com/hashicorp/memberlist/queue.go')
-rw-r--r-- | vendor/github.com/hashicorp/memberlist/queue.go | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/memberlist/queue.go b/vendor/github.com/hashicorp/memberlist/queue.go new file mode 100644 index 000000000..994b90ff1 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/queue.go @@ -0,0 +1,167 @@ +package memberlist + +import ( + "sort" + "sync" +) + +// TransmitLimitedQueue is used to queue messages to broadcast to +// the cluster (via gossip) but limits the number of transmits per +// message. It also prioritizes messages with lower transmit counts +// (hence newer messages). +type TransmitLimitedQueue struct { + // NumNodes returns the number of nodes in the cluster. This is + // used to determine the retransmit count, which is calculated + // based on the log of this. + NumNodes func() int + + // RetransmitMult is the multiplier used to determine the maximum + // number of retransmissions attempted. + RetransmitMult int + + sync.Mutex + bcQueue limitedBroadcasts +} + +type limitedBroadcast struct { + transmits int // Number of transmissions attempted. + b Broadcast +} +type limitedBroadcasts []*limitedBroadcast + +// Broadcast is something that can be broadcasted via gossip to +// the memberlist cluster. +type Broadcast interface { + // Invalidates checks if enqueuing the current broadcast + // invalidates a previous broadcast + Invalidates(b Broadcast) bool + + // Returns a byte form of the message + Message() []byte + + // Finished is invoked when the message will no longer + // be broadcast, either due to invalidation or to the + // transmit limit being reached + Finished() +} + +// QueueBroadcast is used to enqueue a broadcast +func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) { + q.Lock() + defer q.Unlock() + + // Check if this message invalidates another + n := len(q.bcQueue) + for i := 0; i < n; i++ { + if b.Invalidates(q.bcQueue[i].b) { + q.bcQueue[i].b.Finished() + copy(q.bcQueue[i:], q.bcQueue[i+1:]) + q.bcQueue[n-1] = nil + q.bcQueue = q.bcQueue[:n-1] + n-- + } + } + + // Append to the queue + q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b}) +} + +// GetBroadcasts is used to get a number of broadcasts, up to a byte limit +// and applying a per-message overhead as provided. +func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte { + q.Lock() + defer q.Unlock() + + // Fast path the default case + if len(q.bcQueue) == 0 { + return nil + } + + transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes()) + bytesUsed := 0 + var toSend [][]byte + + for i := len(q.bcQueue) - 1; i >= 0; i-- { + // Check if this is within our limits + b := q.bcQueue[i] + msg := b.b.Message() + if bytesUsed+overhead+len(msg) > limit { + continue + } + + // Add to slice to send + bytesUsed += overhead + len(msg) + toSend = append(toSend, msg) + + // Check if we should stop transmission + b.transmits++ + if b.transmits >= transmitLimit { + b.b.Finished() + n := len(q.bcQueue) + q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil + q.bcQueue = q.bcQueue[:n-1] + } + } + + // If we are sending anything, we need to re-sort to deal + // with adjusted transmit counts + if len(toSend) > 0 { + q.bcQueue.Sort() + } + return toSend +} + +// NumQueued returns the number of queued messages +func (q *TransmitLimitedQueue) NumQueued() int { + q.Lock() + defer q.Unlock() + return len(q.bcQueue) +} + +// Reset clears all the queued messages +func (q *TransmitLimitedQueue) Reset() { + q.Lock() + defer q.Unlock() + for _, b := range q.bcQueue { + b.b.Finished() + } + q.bcQueue = nil +} + +// Prune will retain the maxRetain latest messages, and the rest +// will be discarded. This can be used to prevent unbounded queue sizes +func (q *TransmitLimitedQueue) Prune(maxRetain int) { + q.Lock() + defer q.Unlock() + + // Do nothing if queue size is less than the limit + n := len(q.bcQueue) + if n < maxRetain { + return + } + + // Invalidate the messages we will be removing + for i := 0; i < n-maxRetain; i++ { + q.bcQueue[i].b.Finished() + } + + // Move the messages, and retain only the last maxRetain + copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:]) + q.bcQueue = q.bcQueue[:maxRetain] +} + +func (b limitedBroadcasts) Len() int { + return len(b) +} + +func (b limitedBroadcasts) Less(i, j int) bool { + return b[i].transmits < b[j].transmits +} + +func (b limitedBroadcasts) Swap(i, j int) { + b[i], b[j] = b[j], b[i] +} + +func (b limitedBroadcasts) Sort() { + sort.Sort(sort.Reverse(b)) +} |