summaryrefslogtreecommitdiffstats
path: root/plugin/rpcplugin/muxer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/rpcplugin/muxer.go')
-rw-r--r--plugin/rpcplugin/muxer.go41
1 files changed, 25 insertions, 16 deletions
diff --git a/plugin/rpcplugin/muxer.go b/plugin/rpcplugin/muxer.go
index a2bfbf8b6..393a122c4 100644
--- a/plugin/rpcplugin/muxer.go
+++ b/plugin/rpcplugin/muxer.go
@@ -114,20 +114,22 @@ func (m *Muxer) write(p []byte, sid int64) (int, error) {
if m.IsClosed() {
return 0, fmt.Errorf("muxer closed")
}
- buf := make([]byte, 10)
- n := binary.PutVarint(buf, sid)
+ var buf [10]byte
+ n := binary.PutVarint(buf[:], sid)
if _, err := m.conn.Write(buf[:n]); err != nil {
m.shutdown(err)
return 0, err
}
- n = binary.PutVarint(buf, int64(len(p)))
+ n = binary.PutVarint(buf[:], int64(len(p)))
if _, err := m.conn.Write(buf[:n]); err != nil {
m.shutdown(err)
return 0, err
}
- if _, err := m.conn.Write(p); err != nil {
- m.shutdown(err)
- return 0, err
+ if len(p) > 0 {
+ if _, err := m.conn.Write(p); err != nil {
+ m.shutdown(err)
+ return 0, err
+ }
}
return len(p), nil
}
@@ -180,7 +182,11 @@ func (m *Muxer) loop() error {
}
continue
}
- _, err = io.CopyN(&stream.readBuf, reader, len)
+ if len == 0 {
+ stream.remoteClosed = true
+ } else {
+ _, err = io.CopyN(&stream.readBuf, reader, len)
+ }
stream.mutex.Unlock()
if err != nil {
return err
@@ -207,13 +213,14 @@ func (m *Muxer) shutdown(err error) {
}
type muxerStream struct {
- id int64
- muxer *Muxer
- readBuf bytes.Buffer
- mutex *sync.Mutex
- readWake *sync.Cond
- isClosed bool
- closeErr error
+ id int64
+ muxer *Muxer
+ readBuf bytes.Buffer
+ mutex *sync.Mutex
+ readWake *sync.Cond
+ isClosed bool
+ remoteClosed bool
+ closeErr error
}
func (s *muxerStream) Read(p []byte) (int, error) {
@@ -225,8 +232,9 @@ func (s *muxerStream) Read(p []byte) (int, error) {
} else if s.isClosed {
return 0, io.EOF
} else if s.readBuf.Len() > 0 {
- n, err := s.readBuf.Read(p)
- return n, err
+ return s.readBuf.Read(p)
+ } else if s.remoteClosed {
+ return 0, io.EOF
}
s.readWake.Wait()
}
@@ -245,6 +253,7 @@ func (s *muxerStream) Close() error {
s.mutex.Lock()
defer s.mutex.Unlock()
if !s.isClosed {
+ s.muxer.write(nil, s.id)
s.isClosed = true
s.muxer.rm(s.id)
}