summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/hashicorp/memberlist/broadcast.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/memberlist/broadcast.go')
-rw-r--r--vendor/github.com/hashicorp/memberlist/broadcast.go100
1 files changed, 100 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/memberlist/broadcast.go b/vendor/github.com/hashicorp/memberlist/broadcast.go
new file mode 100644
index 000000000..f7e85a119
--- /dev/null
+++ b/vendor/github.com/hashicorp/memberlist/broadcast.go
@@ -0,0 +1,100 @@
+package memberlist
+
+/*
+The broadcast mechanism works by maintaining a sorted list of messages to be
+sent out. When a message is to be broadcast, the retransmit count
+is set to zero and appended to the queue. The retransmit count serves
+as the "priority", ensuring that newer messages get sent first. Once
+a message hits the retransmit limit, it is removed from the queue.
+
+Additionally, older entries can be invalidated by new messages that
+are contradictory. For example, if we send "{suspect M1 inc: 1},
+then a following {alive M1 inc: 2} will invalidate that message
+*/
+
+type memberlistBroadcast struct {
+ node string
+ msg []byte
+ notify chan struct{}
+}
+
+func (b *memberlistBroadcast) Invalidates(other Broadcast) bool {
+ // Check if that broadcast is a memberlist type
+ mb, ok := other.(*memberlistBroadcast)
+ if !ok {
+ return false
+ }
+
+ // Invalidates any message about the same node
+ return b.node == mb.node
+}
+
+func (b *memberlistBroadcast) Message() []byte {
+ return b.msg
+}
+
+func (b *memberlistBroadcast) Finished() {
+ select {
+ case b.notify <- struct{}{}:
+ default:
+ }
+}
+
+// encodeAndBroadcast encodes a message and enqueues it for broadcast. Fails
+// silently if there is an encoding error.
+func (m *Memberlist) encodeAndBroadcast(node string, msgType messageType, msg interface{}) {
+ m.encodeBroadcastNotify(node, msgType, msg, nil)
+}
+
+// encodeBroadcastNotify encodes a message and enqueues it for broadcast
+// and notifies the given channel when transmission is finished. Fails
+// silently if there is an encoding error.
+func (m *Memberlist) encodeBroadcastNotify(node string, msgType messageType, msg interface{}, notify chan struct{}) {
+ buf, err := encode(msgType, msg)
+ if err != nil {
+ m.logger.Printf("[ERR] memberlist: Failed to encode message for broadcast: %s", err)
+ } else {
+ m.queueBroadcast(node, buf.Bytes(), notify)
+ }
+}
+
+// queueBroadcast is used to start dissemination of a message. It will be
+// sent up to a configured number of times. The message could potentially
+// be invalidated by a future message about the same node
+func (m *Memberlist) queueBroadcast(node string, msg []byte, notify chan struct{}) {
+ b := &memberlistBroadcast{node, msg, notify}
+ m.broadcasts.QueueBroadcast(b)
+}
+
+// getBroadcasts is used to return a slice of broadcasts to send up to
+// a maximum byte size, while imposing a per-broadcast overhead. This is used
+// to fill a UDP packet with piggybacked data
+func (m *Memberlist) getBroadcasts(overhead, limit int) [][]byte {
+ // Get memberlist messages first
+ toSend := m.broadcasts.GetBroadcasts(overhead, limit)
+
+ // Check if the user has anything to broadcast
+ d := m.config.Delegate
+ if d != nil {
+ // Determine the bytes used already
+ bytesUsed := 0
+ for _, msg := range toSend {
+ bytesUsed += len(msg) + overhead
+ }
+
+ // Check space remaining for user messages
+ avail := limit - bytesUsed
+ if avail > overhead+userMsgOverhead {
+ userMsgs := d.GetBroadcasts(overhead+userMsgOverhead, avail)
+
+ // Frame each user message
+ for _, msg := range userMsgs {
+ buf := make([]byte, 1, len(msg)+1)
+ buf[0] = byte(userMsg)
+ buf = append(buf, msg...)
+ toSend = append(toSend, buf)
+ }
+ }
+ }
+ return toSend
+}