summaryrefslogtreecommitdiffstats
path: root/plugin/rpcplugin/io.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/rpcplugin/io.go')
-rw-r--r--plugin/rpcplugin/io.go163
1 files changed, 153 insertions, 10 deletions
diff --git a/plugin/rpcplugin/io.go b/plugin/rpcplugin/io.go
index 38229d868..21d79ab0b 100644
--- a/plugin/rpcplugin/io.go
+++ b/plugin/rpcplugin/io.go
@@ -2,26 +2,169 @@ package rpcplugin
import (
"bufio"
+ "bytes"
"encoding/binary"
"io"
- "os"
+ "sync"
)
+type asyncRead struct {
+ b []byte
+ err error
+}
+
+type asyncReadCloser struct {
+ io.ReadCloser
+ buffer bytes.Buffer
+ read chan struct{}
+ reads chan asyncRead
+ close chan struct{}
+ closeOnce sync.Once
+}
+
+// NewAsyncReadCloser returns a ReadCloser that supports Close during Read.
+func NewAsyncReadCloser(r io.ReadCloser) io.ReadCloser {
+ ret := &asyncReadCloser{
+ ReadCloser: r,
+ read: make(chan struct{}),
+ reads: make(chan asyncRead),
+ close: make(chan struct{}),
+ }
+ go ret.loop()
+ return ret
+}
+
+func (r *asyncReadCloser) loop() {
+ buf := make([]byte, 1024*8)
+ var n int
+ var err error
+ for {
+ select {
+ case <-r.read:
+ n = 0
+ if err == nil {
+ n, err = r.ReadCloser.Read(buf)
+ }
+ select {
+ case r.reads <- asyncRead{buf[:n], err}:
+ case <-r.close:
+ }
+ case <-r.close:
+ r.ReadCloser.Close()
+ return
+ }
+ }
+}
+
+func (r *asyncReadCloser) Read(b []byte) (int, error) {
+ if r.buffer.Len() > 0 {
+ return r.buffer.Read(b)
+ }
+ select {
+ case r.read <- struct{}{}:
+ case <-r.close:
+ }
+ select {
+ case read := <-r.reads:
+ if read.err != nil {
+ return 0, read.err
+ }
+ n := copy(b, read.b)
+ if n < len(read.b) {
+ r.buffer.Write(read.b[n:])
+ }
+ return n, nil
+ case <-r.close:
+ return 0, io.EOF
+ }
+}
+
+func (r *asyncReadCloser) Close() error {
+ r.closeOnce.Do(func() {
+ close(r.close)
+ })
+ return nil
+}
+
+type asyncWrite struct {
+ n int
+ err error
+}
+
+type asyncWriteCloser struct {
+ io.WriteCloser
+ writeBuffer bytes.Buffer
+ write chan struct{}
+ writes chan asyncWrite
+ close chan struct{}
+ closeOnce sync.Once
+}
+
+// NewAsyncWriteCloser returns a WriteCloser that supports Close during Write.
+func NewAsyncWriteCloser(w io.WriteCloser) io.WriteCloser {
+ ret := &asyncWriteCloser{
+ WriteCloser: w,
+ write: make(chan struct{}),
+ writes: make(chan asyncWrite),
+ close: make(chan struct{}),
+ }
+ go ret.loop()
+ return ret
+}
+
+func (w *asyncWriteCloser) loop() {
+ var n int64
+ var err error
+ for {
+ select {
+ case <-w.write:
+ n = 0
+ if err == nil {
+ n, err = w.writeBuffer.WriteTo(w.WriteCloser)
+ }
+ select {
+ case w.writes <- asyncWrite{int(n), err}:
+ case <-w.close:
+ }
+ case <-w.close:
+ w.WriteCloser.Close()
+ return
+ }
+ }
+}
+
+func (w *asyncWriteCloser) Write(b []byte) (int, error) {
+ if n, err := w.writeBuffer.Write(b); err != nil {
+ return n, err
+ }
+ select {
+ case w.write <- struct{}{}:
+ case <-w.close:
+ }
+ select {
+ case write := <-w.writes:
+ return write.n, write.err
+ case <-w.close:
+ return 0, io.EOF
+ }
+}
+
+func (w *asyncWriteCloser) Close() error {
+ w.closeOnce.Do(func() {
+ close(w.close)
+ })
+ return nil
+}
+
type rwc struct {
io.ReadCloser
io.WriteCloser
}
func (rwc *rwc) Close() (err error) {
- if f, ok := rwc.ReadCloser.(*os.File); ok {
- // https://groups.google.com/d/topic/golang-nuts/i4w58KJ5-J8/discussion
- err = os.NewFile(f.Fd(), "").Close()
- } else {
- err = rwc.ReadCloser.Close()
- }
- werr := rwc.WriteCloser.Close()
- if err == nil {
- err = werr
+ err = rwc.WriteCloser.Close()
+ if rerr := rwc.ReadCloser.Close(); err == nil {
+ err = rerr
}
return
}