summaryrefslogtreecommitdiffstats
path: root/plugin/rpcplugin/io.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/rpcplugin/io.go')
-rw-r--r--plugin/rpcplugin/io.go150
1 files changed, 0 insertions, 150 deletions
diff --git a/plugin/rpcplugin/io.go b/plugin/rpcplugin/io.go
index 21d79ab0b..f4fdd3a1b 100644
--- a/plugin/rpcplugin/io.go
+++ b/plugin/rpcplugin/io.go
@@ -2,160 +2,10 @@ package rpcplugin
import (
"bufio"
- "bytes"
"encoding/binary"
"io"
- "sync"
)
-type asyncRead struct {
- b []byte
- err error
-}
-
-type asyncReadCloser struct {
- io.ReadCloser
- buffer bytes.Buffer
- read chan struct{}
- reads chan asyncRead
- close chan struct{}
- closeOnce sync.Once
-}
-
-// NewAsyncReadCloser returns a ReadCloser that supports Close during Read.
-func NewAsyncReadCloser(r io.ReadCloser) io.ReadCloser {
- ret := &asyncReadCloser{
- ReadCloser: r,
- read: make(chan struct{}),
- reads: make(chan asyncRead),
- close: make(chan struct{}),
- }
- go ret.loop()
- return ret
-}
-
-func (r *asyncReadCloser) loop() {
- buf := make([]byte, 1024*8)
- var n int
- var err error
- for {
- select {
- case <-r.read:
- n = 0
- if err == nil {
- n, err = r.ReadCloser.Read(buf)
- }
- select {
- case r.reads <- asyncRead{buf[:n], err}:
- case <-r.close:
- }
- case <-r.close:
- r.ReadCloser.Close()
- return
- }
- }
-}
-
-func (r *asyncReadCloser) Read(b []byte) (int, error) {
- if r.buffer.Len() > 0 {
- return r.buffer.Read(b)
- }
- select {
- case r.read <- struct{}{}:
- case <-r.close:
- }
- select {
- case read := <-r.reads:
- if read.err != nil {
- return 0, read.err
- }
- n := copy(b, read.b)
- if n < len(read.b) {
- r.buffer.Write(read.b[n:])
- }
- return n, nil
- case <-r.close:
- return 0, io.EOF
- }
-}
-
-func (r *asyncReadCloser) Close() error {
- r.closeOnce.Do(func() {
- close(r.close)
- })
- return nil
-}
-
-type asyncWrite struct {
- n int
- err error
-}
-
-type asyncWriteCloser struct {
- io.WriteCloser
- writeBuffer bytes.Buffer
- write chan struct{}
- writes chan asyncWrite
- close chan struct{}
- closeOnce sync.Once
-}
-
-// NewAsyncWriteCloser returns a WriteCloser that supports Close during Write.
-func NewAsyncWriteCloser(w io.WriteCloser) io.WriteCloser {
- ret := &asyncWriteCloser{
- WriteCloser: w,
- write: make(chan struct{}),
- writes: make(chan asyncWrite),
- close: make(chan struct{}),
- }
- go ret.loop()
- return ret
-}
-
-func (w *asyncWriteCloser) loop() {
- var n int64
- var err error
- for {
- select {
- case <-w.write:
- n = 0
- if err == nil {
- n, err = w.writeBuffer.WriteTo(w.WriteCloser)
- }
- select {
- case w.writes <- asyncWrite{int(n), err}:
- case <-w.close:
- }
- case <-w.close:
- w.WriteCloser.Close()
- return
- }
- }
-}
-
-func (w *asyncWriteCloser) Write(b []byte) (int, error) {
- if n, err := w.writeBuffer.Write(b); err != nil {
- return n, err
- }
- select {
- case w.write <- struct{}{}:
- case <-w.close:
- }
- select {
- case write := <-w.writes:
- return write.n, write.err
- case <-w.close:
- return 0, io.EOF
- }
-}
-
-func (w *asyncWriteCloser) Close() error {
- w.closeOnce.Do(func() {
- close(w.close)
- })
- return nil
-}
-
type rwc struct {
io.ReadCloser
io.WriteCloser