diff options
Diffstat (limited to 'plugin/rpcplugin/muxer.go')
-rw-r--r-- | plugin/rpcplugin/muxer.go | 264 |
1 files changed, 0 insertions, 264 deletions
diff --git a/plugin/rpcplugin/muxer.go b/plugin/rpcplugin/muxer.go deleted file mode 100644 index a7260c399..000000000 --- a/plugin/rpcplugin/muxer.go +++ /dev/null @@ -1,264 +0,0 @@ -// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. -// See License.txt for license information. - -package rpcplugin - -import ( - "bufio" - "bytes" - "encoding/binary" - "fmt" - "io" - "sync" - "sync/atomic" -) - -// Muxer allows multiple bidirectional streams to be transmitted over a single connection. -// -// Muxer is safe for use by multiple goroutines. -// -// Streams opened on the muxer must be periodically drained in order to reclaim read buffer memory. -// In other words, readers must consume incoming data as it comes in. -type Muxer struct { - // writeMutex guards conn writes - writeMutex sync.Mutex - conn io.ReadWriteCloser - - // didCloseConn is a boolean (0 or 1) used from multiple goroutines via atomic operations - didCloseConn int32 - - // streamsMutex guards streams and nextId - streamsMutex sync.Mutex - nextId int64 - streams map[int64]*muxerStream - - stream0Reader *io.PipeReader - stream0Writer *io.PipeWriter - result chan error -} - -// Creates a new Muxer. -// -// conn must be safe for simultaneous reads by one goroutine and writes by another. -// -// For two muxers communicating with each other via a connection, parity must be true for exactly -// one of them. -func NewMuxer(conn io.ReadWriteCloser, parity bool) *Muxer { - s0r, s0w := io.Pipe() - muxer := &Muxer{ - conn: conn, - streams: make(map[int64]*muxerStream), - result: make(chan error, 1), - nextId: 1, - stream0Reader: s0r, - stream0Writer: s0w, - } - if parity { - muxer.nextId = 2 - } - go muxer.run() - return muxer -} - -// Opens a new stream with a unique id. -// -// Writes made to the stream before the other end calls Connect will be discarded. -func (m *Muxer) Serve() (int64, io.ReadWriteCloser) { - m.streamsMutex.Lock() - id := m.nextId - m.nextId += 2 - m.streamsMutex.Unlock() - return id, m.Connect(id) -} - -// Opens a remotely opened stream. -func (m *Muxer) Connect(id int64) io.ReadWriteCloser { - m.streamsMutex.Lock() - defer m.streamsMutex.Unlock() - mutex := &sync.Mutex{} - stream := &muxerStream{ - id: id, - muxer: m, - mutex: mutex, - readWake: sync.NewCond(mutex), - } - m.streams[id] = stream - return stream -} - -// Calling Read on the muxer directly performs a read on a dedicated, always-open channel. -func (m *Muxer) Read(p []byte) (int, error) { - return m.stream0Reader.Read(p) -} - -// Calling Write on the muxer directly performs a write on a dedicated, always-open channel. -func (m *Muxer) Write(p []byte) (int, error) { - return m.write(p, 0) -} - -// Closes the muxer. -func (m *Muxer) Close() error { - if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) { - m.conn.Close() - } - m.stream0Reader.Close() - m.stream0Writer.Close() - <-m.result - return nil -} - -func (m *Muxer) IsClosed() bool { - return atomic.LoadInt32(&m.didCloseConn) > 0 -} - -func (m *Muxer) write(p []byte, sid int64) (int, error) { - m.writeMutex.Lock() - defer m.writeMutex.Unlock() - if m.IsClosed() { - return 0, fmt.Errorf("muxer closed") - } - var buf [10]byte - n := binary.PutVarint(buf[:], sid) - if _, err := m.conn.Write(buf[:n]); err != nil { - m.shutdown(err) - return 0, err - } - n = binary.PutVarint(buf[:], int64(len(p))) - if _, err := m.conn.Write(buf[:n]); err != nil { - m.shutdown(err) - return 0, err - } - if len(p) > 0 { - if _, err := m.conn.Write(p); err != nil { - m.shutdown(err) - return 0, err - } - } - return len(p), nil -} - -func (m *Muxer) rm(sid int64) { - m.streamsMutex.Lock() - defer m.streamsMutex.Unlock() - delete(m.streams, sid) -} - -func (m *Muxer) run() { - m.shutdown(m.loop()) -} - -func (m *Muxer) loop() error { - reader := bufio.NewReader(m.conn) - - for { - sid, err := binary.ReadVarint(reader) - if err != nil { - return err - } - len, err := binary.ReadVarint(reader) - if err != nil { - return err - } - - if sid == 0 { - if _, err := io.CopyN(m.stream0Writer, reader, len); err != nil { - return err - } - continue - } - - m.streamsMutex.Lock() - stream, ok := m.streams[sid] - m.streamsMutex.Unlock() - if !ok { - if _, err := reader.Discard(int(len)); err != nil { - return err - } - continue - } - - stream.mutex.Lock() - if stream.isClosed { - stream.mutex.Unlock() - if _, err := reader.Discard(int(len)); err != nil { - return err - } - continue - } - if len == 0 { - stream.remoteClosed = true - } else { - _, err = io.CopyN(&stream.readBuf, reader, len) - } - stream.mutex.Unlock() - if err != nil { - return err - } - stream.readWake.Signal() - } -} - -func (m *Muxer) shutdown(err error) { - if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) { - m.conn.Close() - } - go func() { - m.streamsMutex.Lock() - for _, stream := range m.streams { - stream.mutex.Lock() - stream.readWake.Signal() - stream.mutex.Unlock() - } - m.streams = make(map[int64]*muxerStream) - m.streamsMutex.Unlock() - }() - m.result <- err -} - -type muxerStream struct { - id int64 - muxer *Muxer - readBuf bytes.Buffer - mutex *sync.Mutex - readWake *sync.Cond - isClosed bool - remoteClosed bool -} - -func (s *muxerStream) Read(p []byte) (int, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - for { - if s.muxer.IsClosed() { - return 0, fmt.Errorf("muxer closed") - } else if s.isClosed { - return 0, io.EOF - } else if s.readBuf.Len() > 0 { - return s.readBuf.Read(p) - } else if s.remoteClosed { - return 0, io.EOF - } - s.readWake.Wait() - } -} - -func (s *muxerStream) Write(p []byte) (int, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.isClosed { - return 0, fmt.Errorf("stream closed") - } - return s.muxer.write(p, s.id) -} - -func (s *muxerStream) Close() error { - s.mutex.Lock() - defer s.mutex.Unlock() - if !s.isClosed { - s.muxer.write(nil, s.id) - s.isClosed = true - s.muxer.rm(s.id) - } - s.readWake.Signal() - return nil -} |