From 66b07a72ce45ce12f1840e0f74f26ea2a9b8105f Mon Sep 17 00:00:00 2001 From: Chris Date: Tue, 10 Oct 2017 14:50:45 -0700 Subject: remove go 1.8 ipc workaround (#7604) --- plugin/rpcplugin/io.go | 150 -------------------------------------------- plugin/rpcplugin/io_test.go | 73 --------------------- plugin/rpcplugin/ipc.go | 2 +- 3 files changed, 1 insertion(+), 224 deletions(-) delete mode 100644 plugin/rpcplugin/io_test.go (limited to 'plugin') diff --git a/plugin/rpcplugin/io.go b/plugin/rpcplugin/io.go index 21d79ab0b..f4fdd3a1b 100644 --- a/plugin/rpcplugin/io.go +++ b/plugin/rpcplugin/io.go @@ -2,160 +2,10 @@ package rpcplugin import ( "bufio" - "bytes" "encoding/binary" "io" - "sync" ) -type asyncRead struct { - b []byte - err error -} - -type asyncReadCloser struct { - io.ReadCloser - buffer bytes.Buffer - read chan struct{} - reads chan asyncRead - close chan struct{} - closeOnce sync.Once -} - -// NewAsyncReadCloser returns a ReadCloser that supports Close during Read. -func NewAsyncReadCloser(r io.ReadCloser) io.ReadCloser { - ret := &asyncReadCloser{ - ReadCloser: r, - read: make(chan struct{}), - reads: make(chan asyncRead), - close: make(chan struct{}), - } - go ret.loop() - return ret -} - -func (r *asyncReadCloser) loop() { - buf := make([]byte, 1024*8) - var n int - var err error - for { - select { - case <-r.read: - n = 0 - if err == nil { - n, err = r.ReadCloser.Read(buf) - } - select { - case r.reads <- asyncRead{buf[:n], err}: - case <-r.close: - } - case <-r.close: - r.ReadCloser.Close() - return - } - } -} - -func (r *asyncReadCloser) Read(b []byte) (int, error) { - if r.buffer.Len() > 0 { - return r.buffer.Read(b) - } - select { - case r.read <- struct{}{}: - case <-r.close: - } - select { - case read := <-r.reads: - if read.err != nil { - return 0, read.err - } - n := copy(b, read.b) - if n < len(read.b) { - r.buffer.Write(read.b[n:]) - } - return n, nil - case <-r.close: - return 0, io.EOF - } -} - -func (r *asyncReadCloser) Close() error { - r.closeOnce.Do(func() { - close(r.close) - }) - return nil -} - -type asyncWrite struct { - n int - err error -} - -type asyncWriteCloser struct { - io.WriteCloser - writeBuffer bytes.Buffer - write chan struct{} - writes chan asyncWrite - close chan struct{} - closeOnce sync.Once -} - -// NewAsyncWriteCloser returns a WriteCloser that supports Close during Write. -func NewAsyncWriteCloser(w io.WriteCloser) io.WriteCloser { - ret := &asyncWriteCloser{ - WriteCloser: w, - write: make(chan struct{}), - writes: make(chan asyncWrite), - close: make(chan struct{}), - } - go ret.loop() - return ret -} - -func (w *asyncWriteCloser) loop() { - var n int64 - var err error - for { - select { - case <-w.write: - n = 0 - if err == nil { - n, err = w.writeBuffer.WriteTo(w.WriteCloser) - } - select { - case w.writes <- asyncWrite{int(n), err}: - case <-w.close: - } - case <-w.close: - w.WriteCloser.Close() - return - } - } -} - -func (w *asyncWriteCloser) Write(b []byte) (int, error) { - if n, err := w.writeBuffer.Write(b); err != nil { - return n, err - } - select { - case w.write <- struct{}{}: - case <-w.close: - } - select { - case write := <-w.writes: - return write.n, write.err - case <-w.close: - return 0, io.EOF - } -} - -func (w *asyncWriteCloser) Close() error { - w.closeOnce.Do(func() { - close(w.close) - }) - return nil -} - type rwc struct { io.ReadCloser io.WriteCloser diff --git a/plugin/rpcplugin/io_test.go b/plugin/rpcplugin/io_test.go deleted file mode 100644 index cb31b23b3..000000000 --- a/plugin/rpcplugin/io_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package rpcplugin - -import ( - "io/ioutil" - "os" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestNewAsyncReadCloser(t *testing.T) { - rf, w, err := os.Pipe() - require.NoError(t, err) - r := NewAsyncReadCloser(rf) - defer r.Close() - - go func() { - w.Write([]byte("foo")) - w.Close() - }() - - foo, err := ioutil.ReadAll(r) - require.NoError(t, err) - assert.Equal(t, "foo", string(foo)) -} - -func TestNewAsyncReadCloser_CloseDuringRead(t *testing.T) { - rf, w, err := os.Pipe() - require.NoError(t, err) - defer w.Close() - - r := NewAsyncReadCloser(rf) - - go func() { - time.Sleep(time.Millisecond * 200) - r.Close() - }() - r.Read(make([]byte, 10)) -} - -func TestNewAsyncWriteCloser(t *testing.T) { - r, wf, err := os.Pipe() - require.NoError(t, err) - w := NewAsyncWriteCloser(wf) - defer w.Close() - - go func() { - foo, err := ioutil.ReadAll(r) - require.NoError(t, err) - assert.Equal(t, "foo", string(foo)) - r.Close() - }() - - n, err := w.Write([]byte("foo")) - require.NoError(t, err) - assert.Equal(t, 3, n) -} - -func TestNewAsyncWriteCloser_CloseDuringWrite(t *testing.T) { - r, wf, err := os.Pipe() - require.NoError(t, err) - defer r.Close() - - w := NewAsyncWriteCloser(wf) - - go func() { - time.Sleep(time.Millisecond * 200) - w.Close() - }() - w.Write(make([]byte, 10)) -} diff --git a/plugin/rpcplugin/ipc.go b/plugin/rpcplugin/ipc.go index bbb3db06e..3e6c89c4f 100644 --- a/plugin/rpcplugin/ipc.go +++ b/plugin/rpcplugin/ipc.go @@ -19,7 +19,7 @@ func NewIPC() (io.ReadWriteCloser, []*os.File, error) { childWriter.Close() return nil, nil, err } - return NewReadWriteCloser(NewAsyncReadCloser(parentReader), NewAsyncWriteCloser(parentWriter)), []*os.File{childReader, childWriter}, nil + return NewReadWriteCloser(parentReader, parentWriter), []*os.File{childReader, childWriter}, nil } // Returns the IPC instance inherited by the process from its parent. -- cgit v1.2.3-1-g7c22