From 402491b7e52c4d836c1274976cdb387852cfd17b Mon Sep 17 00:00:00 2001 From: Chris Date: Mon, 11 Sep 2017 10:02:02 -0500 Subject: PLT-7407: Back-end plugins (#7409) * tie back-end plugins together * fix comment typo * add tests and a bit of polish * tests and polish * add test, don't let backend executable paths escape the plugin directory --- plugin/rpcplugin/io.go | 163 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 153 insertions(+), 10 deletions(-) (limited to 'plugin/rpcplugin/io.go') diff --git a/plugin/rpcplugin/io.go b/plugin/rpcplugin/io.go index 38229d868..21d79ab0b 100644 --- a/plugin/rpcplugin/io.go +++ b/plugin/rpcplugin/io.go @@ -2,26 +2,169 @@ package rpcplugin import ( "bufio" + "bytes" "encoding/binary" "io" - "os" + "sync" ) +type asyncRead struct { + b []byte + err error +} + +type asyncReadCloser struct { + io.ReadCloser + buffer bytes.Buffer + read chan struct{} + reads chan asyncRead + close chan struct{} + closeOnce sync.Once +} + +// NewAsyncReadCloser returns a ReadCloser that supports Close during Read. +func NewAsyncReadCloser(r io.ReadCloser) io.ReadCloser { + ret := &asyncReadCloser{ + ReadCloser: r, + read: make(chan struct{}), + reads: make(chan asyncRead), + close: make(chan struct{}), + } + go ret.loop() + return ret +} + +func (r *asyncReadCloser) loop() { + buf := make([]byte, 1024*8) + var n int + var err error + for { + select { + case <-r.read: + n = 0 + if err == nil { + n, err = r.ReadCloser.Read(buf) + } + select { + case r.reads <- asyncRead{buf[:n], err}: + case <-r.close: + } + case <-r.close: + r.ReadCloser.Close() + return + } + } +} + +func (r *asyncReadCloser) Read(b []byte) (int, error) { + if r.buffer.Len() > 0 { + return r.buffer.Read(b) + } + select { + case r.read <- struct{}{}: + case <-r.close: + } + select { + case read := <-r.reads: + if read.err != nil { + return 0, read.err + } + n := copy(b, read.b) + if n < len(read.b) { + r.buffer.Write(read.b[n:]) + } + return n, nil + case <-r.close: + return 0, io.EOF + } +} + +func (r *asyncReadCloser) Close() error { + r.closeOnce.Do(func() { + close(r.close) + }) + return nil +} + +type asyncWrite struct { + n int + err error +} + +type asyncWriteCloser struct { + io.WriteCloser + writeBuffer bytes.Buffer + write chan struct{} + writes chan asyncWrite + close chan struct{} + closeOnce sync.Once +} + +// NewAsyncWriteCloser returns a WriteCloser that supports Close during Write. +func NewAsyncWriteCloser(w io.WriteCloser) io.WriteCloser { + ret := &asyncWriteCloser{ + WriteCloser: w, + write: make(chan struct{}), + writes: make(chan asyncWrite), + close: make(chan struct{}), + } + go ret.loop() + return ret +} + +func (w *asyncWriteCloser) loop() { + var n int64 + var err error + for { + select { + case <-w.write: + n = 0 + if err == nil { + n, err = w.writeBuffer.WriteTo(w.WriteCloser) + } + select { + case w.writes <- asyncWrite{int(n), err}: + case <-w.close: + } + case <-w.close: + w.WriteCloser.Close() + return + } + } +} + +func (w *asyncWriteCloser) Write(b []byte) (int, error) { + if n, err := w.writeBuffer.Write(b); err != nil { + return n, err + } + select { + case w.write <- struct{}{}: + case <-w.close: + } + select { + case write := <-w.writes: + return write.n, write.err + case <-w.close: + return 0, io.EOF + } +} + +func (w *asyncWriteCloser) Close() error { + w.closeOnce.Do(func() { + close(w.close) + }) + return nil +} + type rwc struct { io.ReadCloser io.WriteCloser } func (rwc *rwc) Close() (err error) { - if f, ok := rwc.ReadCloser.(*os.File); ok { - // https://groups.google.com/d/topic/golang-nuts/i4w58KJ5-J8/discussion - err = os.NewFile(f.Fd(), "").Close() - } else { - err = rwc.ReadCloser.Close() - } - werr := rwc.WriteCloser.Close() - if err == nil { - err = werr + err = rwc.WriteCloser.Close() + if rerr := rwc.ReadCloser.Close(); err == nil { + err = rerr } return } -- cgit v1.2.3-1-g7c22