summaryrefslogtreecommitdiffstats
path: root/plugin/rpcplugin/muxer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/rpcplugin/muxer.go')
-rw-r--r--plugin/rpcplugin/muxer.go264
1 files changed, 0 insertions, 264 deletions
diff --git a/plugin/rpcplugin/muxer.go b/plugin/rpcplugin/muxer.go
deleted file mode 100644
index a7260c399..000000000
--- a/plugin/rpcplugin/muxer.go
+++ /dev/null
@@ -1,264 +0,0 @@
-// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
-// See License.txt for license information.
-
-package rpcplugin
-
-import (
- "bufio"
- "bytes"
- "encoding/binary"
- "fmt"
- "io"
- "sync"
- "sync/atomic"
-)
-
-// Muxer allows multiple bidirectional streams to be transmitted over a single connection.
-//
-// Muxer is safe for use by multiple goroutines.
-//
-// Streams opened on the muxer must be periodically drained in order to reclaim read buffer memory.
-// In other words, readers must consume incoming data as it comes in.
-type Muxer struct {
- // writeMutex guards conn writes
- writeMutex sync.Mutex
- conn io.ReadWriteCloser
-
- // didCloseConn is a boolean (0 or 1) used from multiple goroutines via atomic operations
- didCloseConn int32
-
- // streamsMutex guards streams and nextId
- streamsMutex sync.Mutex
- nextId int64
- streams map[int64]*muxerStream
-
- stream0Reader *io.PipeReader
- stream0Writer *io.PipeWriter
- result chan error
-}
-
-// Creates a new Muxer.
-//
-// conn must be safe for simultaneous reads by one goroutine and writes by another.
-//
-// For two muxers communicating with each other via a connection, parity must be true for exactly
-// one of them.
-func NewMuxer(conn io.ReadWriteCloser, parity bool) *Muxer {
- s0r, s0w := io.Pipe()
- muxer := &Muxer{
- conn: conn,
- streams: make(map[int64]*muxerStream),
- result: make(chan error, 1),
- nextId: 1,
- stream0Reader: s0r,
- stream0Writer: s0w,
- }
- if parity {
- muxer.nextId = 2
- }
- go muxer.run()
- return muxer
-}
-
-// Opens a new stream with a unique id.
-//
-// Writes made to the stream before the other end calls Connect will be discarded.
-func (m *Muxer) Serve() (int64, io.ReadWriteCloser) {
- m.streamsMutex.Lock()
- id := m.nextId
- m.nextId += 2
- m.streamsMutex.Unlock()
- return id, m.Connect(id)
-}
-
-// Opens a remotely opened stream.
-func (m *Muxer) Connect(id int64) io.ReadWriteCloser {
- m.streamsMutex.Lock()
- defer m.streamsMutex.Unlock()
- mutex := &sync.Mutex{}
- stream := &muxerStream{
- id: id,
- muxer: m,
- mutex: mutex,
- readWake: sync.NewCond(mutex),
- }
- m.streams[id] = stream
- return stream
-}
-
-// Calling Read on the muxer directly performs a read on a dedicated, always-open channel.
-func (m *Muxer) Read(p []byte) (int, error) {
- return m.stream0Reader.Read(p)
-}
-
-// Calling Write on the muxer directly performs a write on a dedicated, always-open channel.
-func (m *Muxer) Write(p []byte) (int, error) {
- return m.write(p, 0)
-}
-
-// Closes the muxer.
-func (m *Muxer) Close() error {
- if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) {
- m.conn.Close()
- }
- m.stream0Reader.Close()
- m.stream0Writer.Close()
- <-m.result
- return nil
-}
-
-func (m *Muxer) IsClosed() bool {
- return atomic.LoadInt32(&m.didCloseConn) > 0
-}
-
-func (m *Muxer) write(p []byte, sid int64) (int, error) {
- m.writeMutex.Lock()
- defer m.writeMutex.Unlock()
- if m.IsClosed() {
- return 0, fmt.Errorf("muxer closed")
- }
- var buf [10]byte
- n := binary.PutVarint(buf[:], sid)
- if _, err := m.conn.Write(buf[:n]); err != nil {
- m.shutdown(err)
- return 0, err
- }
- n = binary.PutVarint(buf[:], int64(len(p)))
- if _, err := m.conn.Write(buf[:n]); err != nil {
- m.shutdown(err)
- return 0, err
- }
- if len(p) > 0 {
- if _, err := m.conn.Write(p); err != nil {
- m.shutdown(err)
- return 0, err
- }
- }
- return len(p), nil
-}
-
-func (m *Muxer) rm(sid int64) {
- m.streamsMutex.Lock()
- defer m.streamsMutex.Unlock()
- delete(m.streams, sid)
-}
-
-func (m *Muxer) run() {
- m.shutdown(m.loop())
-}
-
-func (m *Muxer) loop() error {
- reader := bufio.NewReader(m.conn)
-
- for {
- sid, err := binary.ReadVarint(reader)
- if err != nil {
- return err
- }
- len, err := binary.ReadVarint(reader)
- if err != nil {
- return err
- }
-
- if sid == 0 {
- if _, err := io.CopyN(m.stream0Writer, reader, len); err != nil {
- return err
- }
- continue
- }
-
- m.streamsMutex.Lock()
- stream, ok := m.streams[sid]
- m.streamsMutex.Unlock()
- if !ok {
- if _, err := reader.Discard(int(len)); err != nil {
- return err
- }
- continue
- }
-
- stream.mutex.Lock()
- if stream.isClosed {
- stream.mutex.Unlock()
- if _, err := reader.Discard(int(len)); err != nil {
- return err
- }
- continue
- }
- if len == 0 {
- stream.remoteClosed = true
- } else {
- _, err = io.CopyN(&stream.readBuf, reader, len)
- }
- stream.mutex.Unlock()
- if err != nil {
- return err
- }
- stream.readWake.Signal()
- }
-}
-
-func (m *Muxer) shutdown(err error) {
- if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) {
- m.conn.Close()
- }
- go func() {
- m.streamsMutex.Lock()
- for _, stream := range m.streams {
- stream.mutex.Lock()
- stream.readWake.Signal()
- stream.mutex.Unlock()
- }
- m.streams = make(map[int64]*muxerStream)
- m.streamsMutex.Unlock()
- }()
- m.result <- err
-}
-
-type muxerStream struct {
- id int64
- muxer *Muxer
- readBuf bytes.Buffer
- mutex *sync.Mutex
- readWake *sync.Cond
- isClosed bool
- remoteClosed bool
-}
-
-func (s *muxerStream) Read(p []byte) (int, error) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- for {
- if s.muxer.IsClosed() {
- return 0, fmt.Errorf("muxer closed")
- } else if s.isClosed {
- return 0, io.EOF
- } else if s.readBuf.Len() > 0 {
- return s.readBuf.Read(p)
- } else if s.remoteClosed {
- return 0, io.EOF
- }
- s.readWake.Wait()
- }
-}
-
-func (s *muxerStream) Write(p []byte) (int, error) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- if s.isClosed {
- return 0, fmt.Errorf("stream closed")
- }
- return s.muxer.write(p, s.id)
-}
-
-func (s *muxerStream) Close() error {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- if !s.isClosed {
- s.muxer.write(nil, s.id)
- s.isClosed = true
- s.muxer.rm(s.id)
- }
- s.readWake.Signal()
- return nil
-}