summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/hashicorp/memberlist/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/memberlist/queue.go')
-rw-r--r--vendor/github.com/hashicorp/memberlist/queue.go167
1 files changed, 167 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/memberlist/queue.go b/vendor/github.com/hashicorp/memberlist/queue.go
new file mode 100644
index 000000000..994b90ff1
--- /dev/null
+++ b/vendor/github.com/hashicorp/memberlist/queue.go
@@ -0,0 +1,167 @@
+package memberlist
+
+import (
+ "sort"
+ "sync"
+)
+
+// TransmitLimitedQueue is used to queue messages to broadcast to
+// the cluster (via gossip) but limits the number of transmits per
+// message. It also prioritizes messages with lower transmit counts
+// (hence newer messages).
+type TransmitLimitedQueue struct {
+ // NumNodes returns the number of nodes in the cluster. This is
+ // used to determine the retransmit count, which is calculated
+ // based on the log of this.
+ NumNodes func() int
+
+ // RetransmitMult is the multiplier used to determine the maximum
+ // number of retransmissions attempted.
+ RetransmitMult int
+
+ sync.Mutex
+ bcQueue limitedBroadcasts
+}
+
+type limitedBroadcast struct {
+ transmits int // Number of transmissions attempted.
+ b Broadcast
+}
+type limitedBroadcasts []*limitedBroadcast
+
+// Broadcast is something that can be broadcasted via gossip to
+// the memberlist cluster.
+type Broadcast interface {
+ // Invalidates checks if enqueuing the current broadcast
+ // invalidates a previous broadcast
+ Invalidates(b Broadcast) bool
+
+ // Returns a byte form of the message
+ Message() []byte
+
+ // Finished is invoked when the message will no longer
+ // be broadcast, either due to invalidation or to the
+ // transmit limit being reached
+ Finished()
+}
+
+// QueueBroadcast is used to enqueue a broadcast
+func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
+ q.Lock()
+ defer q.Unlock()
+
+ // Check if this message invalidates another
+ n := len(q.bcQueue)
+ for i := 0; i < n; i++ {
+ if b.Invalidates(q.bcQueue[i].b) {
+ q.bcQueue[i].b.Finished()
+ copy(q.bcQueue[i:], q.bcQueue[i+1:])
+ q.bcQueue[n-1] = nil
+ q.bcQueue = q.bcQueue[:n-1]
+ n--
+ }
+ }
+
+ // Append to the queue
+ q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b})
+}
+
+// GetBroadcasts is used to get a number of broadcasts, up to a byte limit
+// and applying a per-message overhead as provided.
+func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte {
+ q.Lock()
+ defer q.Unlock()
+
+ // Fast path the default case
+ if len(q.bcQueue) == 0 {
+ return nil
+ }
+
+ transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes())
+ bytesUsed := 0
+ var toSend [][]byte
+
+ for i := len(q.bcQueue) - 1; i >= 0; i-- {
+ // Check if this is within our limits
+ b := q.bcQueue[i]
+ msg := b.b.Message()
+ if bytesUsed+overhead+len(msg) > limit {
+ continue
+ }
+
+ // Add to slice to send
+ bytesUsed += overhead + len(msg)
+ toSend = append(toSend, msg)
+
+ // Check if we should stop transmission
+ b.transmits++
+ if b.transmits >= transmitLimit {
+ b.b.Finished()
+ n := len(q.bcQueue)
+ q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil
+ q.bcQueue = q.bcQueue[:n-1]
+ }
+ }
+
+ // If we are sending anything, we need to re-sort to deal
+ // with adjusted transmit counts
+ if len(toSend) > 0 {
+ q.bcQueue.Sort()
+ }
+ return toSend
+}
+
+// NumQueued returns the number of queued messages
+func (q *TransmitLimitedQueue) NumQueued() int {
+ q.Lock()
+ defer q.Unlock()
+ return len(q.bcQueue)
+}
+
+// Reset clears all the queued messages
+func (q *TransmitLimitedQueue) Reset() {
+ q.Lock()
+ defer q.Unlock()
+ for _, b := range q.bcQueue {
+ b.b.Finished()
+ }
+ q.bcQueue = nil
+}
+
+// Prune will retain the maxRetain latest messages, and the rest
+// will be discarded. This can be used to prevent unbounded queue sizes
+func (q *TransmitLimitedQueue) Prune(maxRetain int) {
+ q.Lock()
+ defer q.Unlock()
+
+ // Do nothing if queue size is less than the limit
+ n := len(q.bcQueue)
+ if n < maxRetain {
+ return
+ }
+
+ // Invalidate the messages we will be removing
+ for i := 0; i < n-maxRetain; i++ {
+ q.bcQueue[i].b.Finished()
+ }
+
+ // Move the messages, and retain only the last maxRetain
+ copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:])
+ q.bcQueue = q.bcQueue[:maxRetain]
+}
+
+func (b limitedBroadcasts) Len() int {
+ return len(b)
+}
+
+func (b limitedBroadcasts) Less(i, j int) bool {
+ return b[i].transmits < b[j].transmits
+}
+
+func (b limitedBroadcasts) Swap(i, j int) {
+ b[i], b[j] = b[j], b[i]
+}
+
+func (b limitedBroadcasts) Sort() {
+ sort.Sort(sort.Reverse(b))
+}