path: root/plugin/rpcplugin
diff options
authorChris <>2017-08-16 17:23:38 -0500
committerGitHub <>2017-08-16 17:23:38 -0500
commitf80d50adbddf55a043dfcab5b47d7c1e22749b7d (patch)
tree5deb606debb6322716c9cdcc6c58be4f68b74223 /plugin/rpcplugin
parent4f85ed985d478ddf6692fa4f7d8d98d2a412d18c (diff)
PLT-7407: Back-end plugin mechanism (#7177)
* begin backend plugin wip * flesh out rpcplugin. everything done except for minor supervisor stubs * done with basic plugin infrastructure * simplify tests * remove unused test lines
Diffstat (limited to 'plugin/rpcplugin')
17 files changed, 1299 insertions, 0 deletions
diff --git a/plugin/rpcplugin/api.go b/plugin/rpcplugin/api.go
new file mode 100644
index 000000000..a807d0837
--- /dev/null
+++ b/plugin/rpcplugin/api.go
@@ -0,0 +1,62 @@
+package rpcplugin
+import (
+ "encoding/json"
+ "io"
+ "net/rpc"
+ ""
+type LocalAPI struct {
+ api plugin.API
+ muxer *Muxer
+func (h *LocalAPI) LoadPluginConfiguration(args struct{}, reply *[]byte) error {
+ var config interface{}
+ if err := h.api.LoadPluginConfiguration(&config); err != nil {
+ return err
+ }
+ b, err := json.Marshal(config)
+ if err != nil {
+ return err
+ }
+ *reply = b
+ return nil
+type RemoteAPI struct {
+ client *rpc.Client
+ muxer *Muxer
+func ServeAPI(api plugin.API, conn io.ReadWriteCloser, muxer *Muxer) {
+ server := rpc.NewServer()
+ server.Register(&LocalAPI{
+ api: api,
+ muxer: muxer,
+ })
+ server.ServeConn(conn)
+var _ plugin.API = (*RemoteAPI)(nil)
+func (h *RemoteAPI) LoadPluginConfiguration(dest interface{}) error {
+ var config []byte
+ if err := h.client.Call("LocalAPI.LoadPluginConfiguration", struct{}{}, &config); err != nil {
+ return err
+ }
+ return json.Unmarshal(config, dest)
+func (h *RemoteAPI) Close() error {
+ return h.client.Close()
+func ConnectAPI(conn io.ReadWriteCloser, muxer *Muxer) *RemoteAPI {
+ return &RemoteAPI{
+ client: rpc.NewClient(conn),
+ muxer: muxer,
+ }
diff --git a/plugin/rpcplugin/api_test.go b/plugin/rpcplugin/api_test.go
new file mode 100644
index 000000000..e55433556
--- /dev/null
+++ b/plugin/rpcplugin/api_test.go
@@ -0,0 +1,57 @@
+package rpcplugin
+import (
+ "encoding/json"
+ "io"
+ "testing"
+ ""
+ ""
+ ""
+ ""
+func testAPIRPC(api plugin.API, f func(plugin.API)) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+ c1 := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer c1.Close()
+ c2 := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer c2.Close()
+ id, server := c1.Serve()
+ go ServeAPI(api, server, c1)
+ remote := ConnectAPI(c2.Connect(id), c2)
+ defer remote.Close()
+ f(remote)
+func TestAPI(t *testing.T) {
+ var api plugintest.API
+ defer api.AssertExpectations(t)
+ type Config struct {
+ Foo string
+ Bar struct {
+ Baz string
+ }
+ }
+ api.On("LoadPluginConfiguration", mock.MatchedBy(func(x interface{}) bool { return true })).Run(func(args mock.Arguments) {
+ dest := args.Get(0).(interface{})
+ json.Unmarshal([]byte(`{"Foo": "foo", "Bar": {"Baz": "baz"}}`), dest)
+ }).Return(nil)
+ testAPIRPC(&api, func(remote plugin.API) {
+ var config Config
+ assert.NoError(t, remote.LoadPluginConfiguration(&config))
+ assert.Equal(t, "foo", config.Foo)
+ assert.Equal(t, "baz", config.Bar.Baz)
+ })
diff --git a/plugin/rpcplugin/hooks.go b/plugin/rpcplugin/hooks.go
new file mode 100644
index 000000000..008730402
--- /dev/null
+++ b/plugin/rpcplugin/hooks.go
@@ -0,0 +1,77 @@
+package rpcplugin
+import (
+ "io"
+ "net/rpc"
+ ""
+type LocalHooks struct {
+ hooks plugin.Hooks
+ muxer *Muxer
+ remoteAPI *RemoteAPI
+func (h *LocalHooks) OnActivate(args int64, reply *struct{}) error {
+ stream := h.muxer.Connect(args)
+ if h.remoteAPI != nil {
+ h.remoteAPI.Close()
+ }
+ h.remoteAPI = ConnectAPI(stream, h.muxer)
+ return h.hooks.OnActivate(h.remoteAPI)
+func (h *LocalHooks) OnDeactivate(args, reply *struct{}) error {
+ err := h.hooks.OnDeactivate()
+ if h.remoteAPI != nil {
+ h.remoteAPI.Close()
+ h.remoteAPI = nil
+ }
+ return err
+type RemoteHooks struct {
+ client *rpc.Client
+ muxer *Muxer
+ apiCloser io.Closer
+func ServeHooks(hooks plugin.Hooks, conn io.ReadWriteCloser, muxer *Muxer) {
+ server := rpc.NewServer()
+ server.Register(&LocalHooks{
+ hooks: hooks,
+ muxer: muxer,
+ })
+ server.ServeConn(conn)
+var _ plugin.Hooks = (*RemoteHooks)(nil)
+func (h *RemoteHooks) OnActivate(api plugin.API) error {
+ id, stream := h.muxer.Serve()
+ if h.apiCloser != nil {
+ h.apiCloser.Close()
+ }
+ h.apiCloser = stream
+ go ServeAPI(api, stream, h.muxer)
+ return h.client.Call("LocalHooks.OnActivate", id, nil)
+func (h *RemoteHooks) OnDeactivate() error {
+ return h.client.Call("LocalHooks.OnDeactivate", struct{}{}, nil)
+func (h *RemoteHooks) Close() error {
+ if h.apiCloser != nil {
+ h.apiCloser.Close()
+ }
+ return h.client.Close()
+func ConnectHooks(conn io.ReadWriteCloser, muxer *Muxer) *RemoteHooks {
+ return &RemoteHooks{
+ client: rpc.NewClient(conn),
+ muxer: muxer,
+ }
diff --git a/plugin/rpcplugin/hooks_test.go b/plugin/rpcplugin/hooks_test.go
new file mode 100644
index 000000000..fbbbbedeb
--- /dev/null
+++ b/plugin/rpcplugin/hooks_test.go
@@ -0,0 +1,58 @@
+package rpcplugin
+import (
+ "io"
+ "testing"
+ ""
+ ""
+ ""
+ ""
+func testHooksRPC(hooks plugin.Hooks, f func(plugin.Hooks)) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+ c1 := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer c1.Close()
+ c2 := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer c2.Close()
+ id, server := c1.Serve()
+ go ServeHooks(hooks, server, c1)
+ remote := ConnectHooks(c2.Connect(id), c2)
+ defer remote.Close()
+ f(remote)
+func TestHooks(t *testing.T) {
+ var api plugintest.API
+ var hooks plugintest.Hooks
+ defer hooks.AssertExpectations(t)
+ testHooksRPC(&hooks, func(remote plugin.Hooks) {
+ hooks.On("OnActivate", mock.AnythingOfType("*rpcplugin.RemoteAPI")).Return(nil)
+ assert.NoError(t, remote.OnActivate(&api))
+ hooks.On("OnDeactivate").Return(nil)
+ assert.NoError(t, remote.OnDeactivate())
+ })
+func BenchmarkOnDeactivate(b *testing.B) {
+ var hooks plugintest.Hooks
+ hooks.On("OnDeactivate").Return(nil)
+ testHooksRPC(&hooks, func(remote plugin.Hooks) {
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ remote.OnDeactivate()
+ }
+ b.StopTimer()
+ })
diff --git a/plugin/rpcplugin/io.go b/plugin/rpcplugin/io.go
new file mode 100644
index 000000000..f1b2f3c35
--- /dev/null
+++ b/plugin/rpcplugin/io.go
@@ -0,0 +1,23 @@
+package rpcplugin
+import (
+ "io"
+type rwc struct {
+ io.ReadCloser
+ io.WriteCloser
+func (rwc *rwc) Close() error {
+ rerr := rwc.ReadCloser.Close()
+ werr := rwc.WriteCloser.Close()
+ if rerr != nil {
+ return rerr
+ }
+ return werr
+func NewReadWriteCloser(r io.ReadCloser, w io.WriteCloser) io.ReadWriteCloser {
+ return &rwc{r, w}
diff --git a/plugin/rpcplugin/ipc.go b/plugin/rpcplugin/ipc.go
new file mode 100644
index 000000000..3e6c89c4f
--- /dev/null
+++ b/plugin/rpcplugin/ipc.go
@@ -0,0 +1,28 @@
+package rpcplugin
+import (
+ "io"
+ "os"
+// Returns a new IPC for the parent process and a set of files to pass on to the child.
+// The returned files must be closed after the child process is started.
+func NewIPC() (io.ReadWriteCloser, []*os.File, error) {
+ parentReader, childWriter, err := os.Pipe()
+ if err != nil {
+ return nil, nil, err
+ }
+ childReader, parentWriter, err := os.Pipe()
+ if err != nil {
+ parentReader.Close()
+ childWriter.Close()
+ return nil, nil, err
+ }
+ return NewReadWriteCloser(parentReader, parentWriter), []*os.File{childReader, childWriter}, nil
+// Returns the IPC instance inherited by the process from its parent.
+func InheritedIPC(fd0, fd1 uintptr) (io.ReadWriteCloser, error) {
+ return NewReadWriteCloser(os.NewFile(fd0, ""), os.NewFile(fd1, "")), nil
diff --git a/plugin/rpcplugin/ipc_test.go b/plugin/rpcplugin/ipc_test.go
new file mode 100644
index 000000000..bf4df3017
--- /dev/null
+++ b/plugin/rpcplugin/ipc_test.go
@@ -0,0 +1,61 @@
+package rpcplugin
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+ ""
+ ""
+func TestIPC(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ pingpong := filepath.Join(dir, "pingpong")
+ compileGo(t, `
+ package main
+ import (
+ "log"
+ ""
+ )
+ func main() {
+ ipc, err := rpcplugin.InheritedProcessIPC()
+ if err != nil {
+ log.Fatal("unable to get inherited ipc")
+ }
+ defer ipc.Close()
+ _, err = ipc.Write([]byte("ping"))
+ if err != nil {
+ log.Fatal("unable to write to ipc")
+ }
+ b := make([]byte, 10)
+ n, err := ipc.Read(b)
+ if err != nil {
+ log.Fatal("unable to read from ipc")
+ }
+ if n != 4 || string(b[:4]) != "pong" {
+ log.Fatal("unexpected response")
+ }
+ }
+ `, pingpong)
+ p, ipc, err := NewProcess(context.Background(), pingpong)
+ require.NoError(t, err)
+ defer ipc.Close()
+ b := make([]byte, 10)
+ n, err := ipc.Read(b)
+ require.NoError(t, err)
+ assert.Equal(t, 4, n)
+ assert.Equal(t, "ping", string(b[:4]))
+ _, err = ipc.Write([]byte("pong"))
+ require.NoError(t, err)
+ require.NoError(t, p.Wait())
diff --git a/plugin/rpcplugin/main.go b/plugin/rpcplugin/main.go
new file mode 100644
index 000000000..36177954b
--- /dev/null
+++ b/plugin/rpcplugin/main.go
@@ -0,0 +1,46 @@
+package rpcplugin
+import (
+ "bufio"
+ "encoding/binary"
+ "fmt"
+ "log"
+ "os"
+ ""
+// Makes a set of hooks available via RPC. This function never returns.
+func Main(hooks plugin.Hooks) {
+ ipc, err := InheritedProcessIPC()
+ if err != nil {
+ log.Fatal(err.Error())
+ }
+ muxer := NewMuxer(ipc, true)
+ id, conn := muxer.Serve()
+ buf := make([]byte, 11)
+ buf[0] = 0
+ n := binary.PutVarint(buf[1:], id)
+ if _, err := muxer.Write(buf[:1+n]); err != nil {
+ log.Fatal(err.Error())
+ }
+ ServeHooks(hooks, conn, muxer)
+ os.Exit(0)
+// Returns the hooks being served by a call to Main.
+func ConnectMain(muxer *Muxer) (*RemoteHooks, error) {
+ buf := make([]byte, 1)
+ if _, err := muxer.Read(buf); err != nil {
+ return nil, err
+ } else if buf[0] != 0 {
+ return nil, fmt.Errorf("unexpected control byte")
+ }
+ reader := bufio.NewReader(muxer)
+ id, err := binary.ReadVarint(reader)
+ if err != nil {
+ return nil, err
+ }
+ return ConnectHooks(muxer.Connect(id), muxer), nil
diff --git a/plugin/rpcplugin/main_test.go b/plugin/rpcplugin/main_test.go
new file mode 100644
index 000000000..b54364bad
--- /dev/null
+++ b/plugin/rpcplugin/main_test.go
@@ -0,0 +1,58 @@
+package rpcplugin
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+ ""
+ ""
+ ""
+func TestMain(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ plugin := filepath.Join(dir, "plugin")
+ compileGo(t, `
+ package main
+ import (
+ ""
+ ""
+ )
+ type MyPlugin struct {}
+ func (p *MyPlugin) OnActivate(api plugin.API) error {
+ return nil
+ }
+ func (p *MyPlugin) OnDeactivate() error {
+ return nil
+ }
+ func main() {
+ rpcplugin.Main(&MyPlugin{})
+ }
+ `, plugin)
+ p, ipc, err := NewProcess(context.Background(), plugin)
+ require.NoError(t, err)
+ defer p.Wait()
+ muxer := NewMuxer(ipc, false)
+ defer muxer.Close()
+ var api plugintest.API
+ hooks, err := ConnectMain(muxer)
+ require.NoError(t, err)
+ assert.NoError(t, hooks.OnActivate(&api))
+ assert.NoError(t, hooks.OnDeactivate())
diff --git a/plugin/rpcplugin/muxer.go b/plugin/rpcplugin/muxer.go
new file mode 100644
index 000000000..a2bfbf8b6
--- /dev/null
+++ b/plugin/rpcplugin/muxer.go
@@ -0,0 +1,253 @@
+package rpcplugin
+import (
+ "bufio"
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "sync"
+ "sync/atomic"
+// Muxer allows multiple bidirectional streams to be transmitted over a single connection.
+// Muxer is safe for use by multiple goroutines.
+// Streams opened on the muxer must be periodically drained in order to reclaim read buffer memory.
+// In other words, readers must consume incoming data as it comes in.
+type Muxer struct {
+ // writeMutex guards conn writes
+ writeMutex sync.Mutex
+ conn io.ReadWriteCloser
+ // didCloseConn is a boolean (0 or 1) used from multiple goroutines via atomic operations
+ didCloseConn int32
+ // streamsMutex guards streams and nextId
+ streamsMutex sync.Mutex
+ nextId int64
+ streams map[int64]*muxerStream
+ stream0Reader *io.PipeReader
+ stream0Writer *io.PipeWriter
+ result chan error
+// Creates a new Muxer.
+// conn must be safe for simultaneous reads by one goroutine and writes by another.
+// For two muxers communicating with each other via a connection, parity must be true for exactly
+// one of them.
+func NewMuxer(conn io.ReadWriteCloser, parity bool) *Muxer {
+ s0r, s0w := io.Pipe()
+ muxer := &Muxer{
+ conn: conn,
+ streams: make(map[int64]*muxerStream),
+ result: make(chan error, 1),
+ nextId: 1,
+ stream0Reader: s0r,
+ stream0Writer: s0w,
+ }
+ if parity {
+ muxer.nextId = 2
+ }
+ go
+ return muxer
+// Opens a new stream with a unique id.
+// Writes made to the stream before the other end calls Connect will be discarded.
+func (m *Muxer) Serve() (int64, io.ReadWriteCloser) {
+ m.streamsMutex.Lock()
+ id := m.nextId
+ m.nextId += 2
+ m.streamsMutex.Unlock()
+ return id, m.Connect(id)
+// Opens a remotely opened stream.
+func (m *Muxer) Connect(id int64) io.ReadWriteCloser {
+ m.streamsMutex.Lock()
+ defer m.streamsMutex.Unlock()
+ mutex := &sync.Mutex{}
+ stream := &muxerStream{
+ id: id,
+ muxer: m,
+ mutex: mutex,
+ readWake: sync.NewCond(mutex),
+ }
+ m.streams[id] = stream
+ return stream
+// Calling Read on the muxer directly performs a read on a dedicated, always-open channel.
+func (m *Muxer) Read(p []byte) (int, error) {
+ return m.stream0Reader.Read(p)
+// Calling Write on the muxer directly performs a write on a dedicated, always-open channel.
+func (m *Muxer) Write(p []byte) (int, error) {
+ return m.write(p, 0)
+// Closes the muxer.
+func (m *Muxer) Close() error {
+ if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) {
+ m.conn.Close()
+ }
+ m.stream0Reader.Close()
+ m.stream0Writer.Close()
+ <-m.result
+ return nil
+func (m *Muxer) IsClosed() bool {
+ return atomic.LoadInt32(&m.didCloseConn) > 0
+func (m *Muxer) write(p []byte, sid int64) (int, error) {
+ m.writeMutex.Lock()
+ defer m.writeMutex.Unlock()
+ if m.IsClosed() {
+ return 0, fmt.Errorf("muxer closed")
+ }
+ buf := make([]byte, 10)
+ n := binary.PutVarint(buf, sid)
+ if _, err := m.conn.Write(buf[:n]); err != nil {
+ m.shutdown(err)
+ return 0, err
+ }
+ n = binary.PutVarint(buf, int64(len(p)))
+ if _, err := m.conn.Write(buf[:n]); err != nil {
+ m.shutdown(err)
+ return 0, err
+ }
+ if _, err := m.conn.Write(p); err != nil {
+ m.shutdown(err)
+ return 0, err
+ }
+ return len(p), nil
+func (m *Muxer) rm(sid int64) {
+ m.streamsMutex.Lock()
+ defer m.streamsMutex.Unlock()
+ delete(m.streams, sid)
+func (m *Muxer) run() {
+ m.shutdown(m.loop())
+func (m *Muxer) loop() error {
+ reader := bufio.NewReader(m.conn)
+ for {
+ sid, err := binary.ReadVarint(reader)
+ if err != nil {
+ return err
+ }
+ len, err := binary.ReadVarint(reader)
+ if err != nil {
+ return err
+ }
+ if sid == 0 {
+ if _, err := io.CopyN(m.stream0Writer, reader, len); err != nil {
+ return err
+ }
+ continue
+ }
+ m.streamsMutex.Lock()
+ stream, ok := m.streams[sid]
+ m.streamsMutex.Unlock()
+ if !ok {
+ if _, err := reader.Discard(int(len)); err != nil {
+ return err
+ }
+ continue
+ }
+ stream.mutex.Lock()
+ if stream.isClosed {
+ stream.mutex.Unlock()
+ if _, err := reader.Discard(int(len)); err != nil {
+ return err
+ }
+ continue
+ }
+ _, err = io.CopyN(&stream.readBuf, reader, len)
+ stream.mutex.Unlock()
+ if err != nil {
+ return err
+ }
+ stream.readWake.Signal()
+ }
+func (m *Muxer) shutdown(err error) {
+ if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) {
+ m.conn.Close()
+ }
+ go func() {
+ m.streamsMutex.Lock()
+ for _, stream := range m.streams {
+ stream.mutex.Lock()
+ stream.readWake.Signal()
+ stream.mutex.Unlock()
+ }
+ m.streams = make(map[int64]*muxerStream)
+ m.streamsMutex.Unlock()
+ }()
+ m.result <- err
+type muxerStream struct {
+ id int64
+ muxer *Muxer
+ readBuf bytes.Buffer
+ mutex *sync.Mutex
+ readWake *sync.Cond
+ isClosed bool
+ closeErr error
+func (s *muxerStream) Read(p []byte) (int, error) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ for {
+ if s.muxer.IsClosed() {
+ return 0, fmt.Errorf("muxer closed")
+ } else if s.isClosed {
+ return 0, io.EOF
+ } else if s.readBuf.Len() > 0 {
+ n, err := s.readBuf.Read(p)
+ return n, err
+ }
+ s.readWake.Wait()
+ }
+func (s *muxerStream) Write(p []byte) (int, error) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ if s.isClosed {
+ return 0, fmt.Errorf("stream closed")
+ }
+ return s.muxer.write(p,
+func (s *muxerStream) Close() error {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ if !s.isClosed {
+ s.isClosed = true
+ s.muxer.rm(
+ }
+ s.readWake.Signal()
+ return nil
diff --git a/plugin/rpcplugin/muxer_test.go b/plugin/rpcplugin/muxer_test.go
new file mode 100644
index 000000000..7bb63d4f8
--- /dev/null
+++ b/plugin/rpcplugin/muxer_test.go
@@ -0,0 +1,169 @@
+package rpcplugin
+import (
+ "io"
+ "testing"
+ ""
+ ""
+func TestMuxer(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer func() { assert.NoError(t, alice.Close()) }()
+ bob := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer func() { assert.NoError(t, bob.Close()) }()
+ id1, alice1 := alice.Serve()
+ defer func() { assert.NoError(t, alice1.Close()) }()
+ id2, bob2 := bob.Serve()
+ defer func() { assert.NoError(t, bob2.Close()) }()
+ done1 := make(chan bool)
+ done2 := make(chan bool)
+ go func() {
+ bob1 := bob.Connect(id1)
+ defer func() { assert.NoError(t, bob1.Close()) }()
+ n, err := bob1.Write([]byte("ping1.0"))
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ n, err = bob1.Write([]byte("ping1.1"))
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ }()
+ go func() {
+ alice2 := alice.Connect(id2)
+ defer func() { assert.NoError(t, alice2.Close()) }()
+ n, err := alice2.Write([]byte("ping2.0"))
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ buf := make([]byte, 20)
+ n, err = alice2.Read(buf)
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ assert.Equal(t, []byte("pong2.0"), buf[:n])
+ done2 <- true
+ }()
+ go func() {
+ buf := make([]byte, 7)
+ n, err := io.ReadFull(alice1, buf)
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ assert.Equal(t, []byte("ping1.0"), buf[:n])
+ n, err = alice1.Read(buf)
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ assert.Equal(t, []byte("ping1.1"), buf[:n])
+ done1 <- true
+ }()
+ go func() {
+ buf := make([]byte, 20)
+ n, err := bob2.Read(buf)
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ assert.Equal(t, []byte("ping2.0"), buf[:n])
+ n, err = bob2.Write([]byte("pong2.0"))
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ }()
+ <-done1
+ <-done2
+// Closing a muxer during a read should unblock, but return an error.
+func TestMuxer_CloseDuringRead(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ bob := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer func() { assert.NoError(t, bob.Close()) }()
+ _, s := alice.Serve()
+ go alice.Close()
+ buf := make([]byte, 20)
+ n, err := s.Read(buf)
+ assert.Equal(t, 0, n)
+ assert.NotNil(t, err)
+ assert.NotEqual(t, io.EOF, err)
+// Closing a stream during a read should unblock and return io.EOF since this is the way to
+// gracefully close a connection.
+func TestMuxer_StreamCloseDuringRead(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer func() { assert.NoError(t, alice.Close()) }()
+ bob := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer func() { assert.NoError(t, bob.Close()) }()
+ _, s := alice.Serve()
+ go s.Close()
+ buf := make([]byte, 20)
+ n, err := s.Read(buf)
+ assert.Equal(t, 0, n)
+ assert.Equal(t, io.EOF, err)
+// Closing a muxer during a write should unblock, but return an error.
+func TestMuxer_CloseDuringWrite(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ // Don't connect bob to let writes will block forever.
+ defer r2.Close()
+ defer w1.Close()
+ _, s := alice.Serve()
+ go alice.Close()
+ buf := make([]byte, 20)
+ n, err := s.Write(buf)
+ assert.Equal(t, 0, n)
+ assert.NotNil(t, err)
+ assert.NotEqual(t, io.EOF, err)
+func TestMuxer_ReadWrite(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer func() { assert.NoError(t, alice.Close()) }()
+ bob := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer func() { assert.NoError(t, bob.Close()) }()
+ go alice.Write([]byte("hello"))
+ buf := make([]byte, 20)
+ n, err := bob.Read(buf)
+ assert.Equal(t, 5, n)
+ assert.Nil(t, err)
+ assert.Equal(t, []byte("hello"), buf[:n])
diff --git a/plugin/rpcplugin/process.go b/plugin/rpcplugin/process.go
new file mode 100644
index 000000000..4b3362d68
--- /dev/null
+++ b/plugin/rpcplugin/process.go
@@ -0,0 +1,23 @@
+package rpcplugin
+import (
+ "context"
+ "io"
+type Process interface {
+ // Waits for the process to exit and returns an error if a problem occurred or the process exited
+ // with a non-zero status.
+ Wait() error
+// NewProcess launches an RPC executable in a new process and returns an IPC that can be used to
+// communicate with it.
+func NewProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) {
+ return newProcess(ctx, path)
+// When called on a process launched with NewProcess, returns the inherited IPC.
+func InheritedProcessIPC() (io.ReadWriteCloser, error) {
+ return inheritedProcessIPC()
diff --git a/plugin/rpcplugin/process_test.go b/plugin/rpcplugin/process_test.go
new file mode 100644
index 000000000..b7984ad0a
--- /dev/null
+++ b/plugin/rpcplugin/process_test.go
@@ -0,0 +1,64 @@
+package rpcplugin
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "testing"
+ ""
+ ""
+func compileGo(t *testing.T, sourceCode, outputPath string) {
+ dir, err := ioutil.TempDir(".", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "main.go"), []byte(sourceCode), 0600))
+ cmd := exec.Command("go", "build", "-o", outputPath, "main.go")
+ cmd.Dir = dir
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ require.NoError(t, cmd.Run())
+func TestProcess(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ ping := filepath.Join(dir, "ping")
+ compileGo(t, `
+ package main
+ import (
+ "log"
+ ""
+ )
+ func main() {
+ ipc, err := rpcplugin.InheritedProcessIPC()
+ if err != nil {
+ log.Fatal("unable to get inherited ipc")
+ }
+ defer ipc.Close()
+ _, err = ipc.Write([]byte("ping"))
+ if err != nil {
+ log.Fatal("unable to write to ipc")
+ }
+ }
+ `, ping)
+ p, ipc, err := NewProcess(context.Background(), ping)
+ require.NoError(t, err)
+ defer ipc.Close()
+ b := make([]byte, 10)
+ n, err := ipc.Read(b)
+ require.NoError(t, err)
+ assert.Equal(t, 4, n)
+ assert.Equal(t, "ping", string(b[:4]))
+ require.NoError(t, p.Wait())
diff --git a/plugin/rpcplugin/process_unix.go b/plugin/rpcplugin/process_unix.go
new file mode 100644
index 000000000..f196e34f8
--- /dev/null
+++ b/plugin/rpcplugin/process_unix.go
@@ -0,0 +1,45 @@
+// +build !windows
+package rpcplugin
+import (
+ "context"
+ "io"
+ "os"
+ "os/exec"
+type process struct {
+ command *exec.Cmd
+func newProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) {
+ ipc, childFiles, err := NewIPC()
+ if err != nil {
+ return nil, nil, err
+ }
+ defer childFiles[0].Close()
+ defer childFiles[1].Close()
+ cmd := exec.CommandContext(ctx, path)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ cmd.ExtraFiles = childFiles
+ err = cmd.Start()
+ if err != nil {
+ ipc.Close()
+ return nil, nil, err
+ }
+ return &process{
+ command: cmd,
+ }, ipc, nil
+func (p *process) Wait() error {
+ return p.command.Wait()
+func inheritedProcessIPC() (io.ReadWriteCloser, error) {
+ return InheritedIPC(3, 4)
diff --git a/plugin/rpcplugin/process_windows.go b/plugin/rpcplugin/process_windows.go
new file mode 100644
index 000000000..7be03cacd
--- /dev/null
+++ b/plugin/rpcplugin/process_windows.go
@@ -0,0 +1,17 @@
+package rpcplugin
+import (
+ "context"
+ "fmt"
+ "io"
+func newProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) {
+ // TODO
+ return nil, nil, fmt.Errorf("not yet supported")
+func inheritedProcessIPC() (*IPC, error) {
+ // TODO
+ return nil, fmt.Errorf("not yet supported")
diff --git a/plugin/rpcplugin/supervisor.go b/plugin/rpcplugin/supervisor.go
new file mode 100644
index 000000000..9316d7186
--- /dev/null
+++ b/plugin/rpcplugin/supervisor.go
@@ -0,0 +1,128 @@
+package rpcplugin
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+ "sync/atomic"
+ "time"
+ ""
+// Supervisor implements a plugin.Supervisor that launches the plugin in a separate process and
+// communicates via RPC.
+// If the plugin unexpectedly exists, the supervisor will relaunch it after a short delay.
+type Supervisor struct {
+ executable string
+ hooks atomic.Value
+ done chan bool
+ cancel context.CancelFunc
+var _ plugin.Supervisor = (*Supervisor)(nil)
+// Starts the plugin. This method will block until the plugin is successfully launched for the first
+// time and will return an error if the plugin cannot be launched at all.
+func (s *Supervisor) Start() error {
+ ctx, cancel := context.WithCancel(context.Background())
+ s.done = make(chan bool, 1)
+ start := make(chan error, 1)
+ go, start)
+ select {
+ case <-time.After(time.Second * 3):
+ cancel()
+ <-s.done
+ return fmt.Errorf("timed out waiting for plugin")
+ case err := <-start:
+ s.cancel = cancel
+ return err
+ }
+// Stops the plugin.
+func (s *Supervisor) Stop() error {
+ s.cancel()
+ <-s.done
+ return nil
+// Returns the hooks used to communicate with the plugin. The hooks may change if the plugin is
+// restarted, so the return value should not be cached.
+func (s *Supervisor) Hooks() plugin.Hooks {
+ return s.hooks.Load().(plugin.Hooks)
+func (s *Supervisor) run(ctx context.Context, start chan<- error) {
+ defer func() {
+ s.done <- true
+ }()
+ done := ctx.Done()
+ for {
+ s.runPlugin(ctx, start)
+ select {
+ case <-done:
+ return
+ default:
+ start = nil
+ time.Sleep(time.Second)
+ }
+ }
+func (s *Supervisor) runPlugin(ctx context.Context, start chan<- error) error {
+ p, ipc, err := NewProcess(ctx, s.executable)
+ if err != nil {
+ if start != nil {
+ start <- err
+ }
+ return err
+ }
+ muxer := NewMuxer(ipc, false)
+ closeMuxer := make(chan bool, 1)
+ muxerClosed := make(chan error, 1)
+ go func() {
+ select {
+ case <-ctx.Done():
+ break
+ case <-closeMuxer:
+ break
+ }
+ muxerClosed <- muxer.Close()
+ }()
+ hooks, err := ConnectMain(muxer)
+ if err != nil {
+ if start != nil {
+ start <- err
+ }
+ closeMuxer <- true
+ <-muxerClosed
+ p.Wait()
+ return err
+ }
+ s.hooks.Store(hooks)
+ if start != nil {
+ start <- nil
+ }
+ p.Wait()
+ closeMuxer <- true
+ <-muxerClosed
+ return nil
+func SupervisorProvider(bundle *plugin.BundleInfo) (plugin.Supervisor, error) {
+ if bundle.Manifest == nil {
+ return nil, fmt.Errorf("no manifest available")
+ } else if bundle.Manifest.Backend == nil || bundle.Manifest.Backend.Executable == "" {
+ return nil, fmt.Errorf("no backend executable specified")
+ }
+ return &Supervisor{
+ executable: filepath.Join(bundle.Path, bundle.Manifest.Backend.Executable),
+ }, nil
diff --git a/plugin/rpcplugin/supervisor_test.go b/plugin/rpcplugin/supervisor_test.go
new file mode 100644
index 000000000..1d046bf82
--- /dev/null
+++ b/plugin/rpcplugin/supervisor_test.go
@@ -0,0 +1,130 @@
+package rpcplugin
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+ ""
+ ""
+ ""
+func TestSupervisor(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ backend := filepath.Join(dir, "backend")
+ compileGo(t, `
+ package main
+ import (
+ ""
+ ""
+ )
+ type MyPlugin struct {}
+ func (p *MyPlugin) OnActivate(api plugin.API) error {
+ return nil
+ }
+ func (p *MyPlugin) OnDeactivate() error {
+ return nil
+ }
+ func main() {
+ rpcplugin.Main(&MyPlugin{})
+ }
+ `, backend)
+ ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600)
+ bundle := plugin.BundleInfoForPath(dir)
+ supervisor, err := SupervisorProvider(bundle)
+ require.NoError(t, err)
+ require.NoError(t, supervisor.Start())
+ require.NoError(t, supervisor.Hooks().OnActivate(nil))
+ require.NoError(t, supervisor.Stop())
+// If plugin development goes really wrong, let's make sure plugin activation won't block forever.
+func TestSupervisor_StartTimeout(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ backend := filepath.Join(dir, "backend")
+ compileGo(t, `
+ package main
+ func main() {
+ for {
+ }
+ }
+ `, backend)
+ ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600)
+ bundle := plugin.BundleInfoForPath(dir)
+ supervisor, err := SupervisorProvider(bundle)
+ require.NoError(t, err)
+ require.Error(t, supervisor.Start())
+// Crashed plugins should be relaunched.
+func TestSupervisor_PluginCrash(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ backend := filepath.Join(dir, "backend")
+ compileGo(t, `
+ package main
+ import (
+ "os"
+ ""
+ ""
+ )
+ type MyPlugin struct {}
+ func (p *MyPlugin) OnActivate(api plugin.API) error {
+ os.Exit(1)
+ return nil
+ }
+ func (p *MyPlugin) OnDeactivate() error {
+ return nil
+ }
+ func main() {
+ rpcplugin.Main(&MyPlugin{})
+ }
+ `, backend)
+ ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600)
+ bundle := plugin.BundleInfoForPath(dir)
+ supervisor, err := SupervisorProvider(bundle)
+ require.NoError(t, err)
+ require.NoError(t, supervisor.Start())
+ require.Error(t, supervisor.Hooks().OnActivate(nil))
+ recovered := false
+ for i := 0; i < 30; i++ {
+ if supervisor.Hooks().OnDeactivate() == nil {
+ recovered = true
+ break
+ }
+ time.Sleep(time.Millisecond * 100)
+ }
+ assert.True(t, recovered)
+ require.NoError(t, supervisor.Stop())