From f80d50adbddf55a043dfcab5b47d7c1e22749b7d Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 16 Aug 2017 17:23:38 -0500 Subject: PLT-7407: Back-end plugin mechanism (#7177) * begin backend plugin wip * flesh out rpcplugin. everything done except for minor supervisor stubs * done with basic plugin infrastructure * simplify tests * remove unused test lines --- plugin/rpcplugin/muxer_test.go | 169 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 plugin/rpcplugin/muxer_test.go (limited to 'plugin/rpcplugin/muxer_test.go') diff --git a/plugin/rpcplugin/muxer_test.go b/plugin/rpcplugin/muxer_test.go new file mode 100644 index 000000000..7bb63d4f8 --- /dev/null +++ b/plugin/rpcplugin/muxer_test.go @@ -0,0 +1,169 @@ +package rpcplugin + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMuxer(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + id1, alice1 := alice.Serve() + defer func() { assert.NoError(t, alice1.Close()) }() + + id2, bob2 := bob.Serve() + defer func() { assert.NoError(t, bob2.Close()) }() + + done1 := make(chan bool) + done2 := make(chan bool) + + go func() { + bob1 := bob.Connect(id1) + defer func() { assert.NoError(t, bob1.Close()) }() + + n, err := bob1.Write([]byte("ping1.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + + n, err = bob1.Write([]byte("ping1.1")) + require.NoError(t, err) + assert.Equal(t, n, 7) + }() + + go func() { + alice2 := alice.Connect(id2) + defer func() { assert.NoError(t, alice2.Close()) }() + + n, err := alice2.Write([]byte("ping2.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + + buf := make([]byte, 20) + n, err = alice2.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("pong2.0"), buf[:n]) + + done2 <- true + }() + + go func() { + buf := make([]byte, 7) + n, err := io.ReadFull(alice1, buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping1.0"), buf[:n]) + + n, err = alice1.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping1.1"), buf[:n]) + + done1 <- true + }() + + go func() { + buf := make([]byte, 20) + n, err := bob2.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping2.0"), buf[:n]) + + n, err = bob2.Write([]byte("pong2.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + }() + + <-done1 + <-done2 +} + +// Closing a muxer during a read should unblock, but return an error. +func TestMuxer_CloseDuringRead(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + _, s := alice.Serve() + + go alice.Close() + buf := make([]byte, 20) + n, err := s.Read(buf) + assert.Equal(t, 0, n) + assert.NotNil(t, err) + assert.NotEqual(t, io.EOF, err) +} + +// Closing a stream during a read should unblock and return io.EOF since this is the way to +// gracefully close a connection. +func TestMuxer_StreamCloseDuringRead(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + _, s := alice.Serve() + + go s.Close() + buf := make([]byte, 20) + n, err := s.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, io.EOF, err) +} + +// Closing a muxer during a write should unblock, but return an error. +func TestMuxer_CloseDuringWrite(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + + // Don't connect bob to let writes will block forever. + defer r2.Close() + defer w1.Close() + + _, s := alice.Serve() + + go alice.Close() + buf := make([]byte, 20) + n, err := s.Write(buf) + assert.Equal(t, 0, n) + assert.NotNil(t, err) + assert.NotEqual(t, io.EOF, err) +} + +func TestMuxer_ReadWrite(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + go alice.Write([]byte("hello")) + buf := make([]byte, 20) + n, err := bob.Read(buf) + assert.Equal(t, 5, n) + assert.Nil(t, err) + assert.Equal(t, []byte("hello"), buf[:n]) +} -- cgit v1.2.3-1-g7c22