From f80d50adbddf55a043dfcab5b47d7c1e22749b7d Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 16 Aug 2017 17:23:38 -0500 Subject: PLT-7407: Back-end plugin mechanism (#7177) * begin backend plugin wip * flesh out rpcplugin. everything done except for minor supervisor stubs * done with basic plugin infrastructure * simplify tests * remove unused test lines --- plugin/rpcplugin/api.go | 62 +++++++++ plugin/rpcplugin/api_test.go | 57 ++++++++ plugin/rpcplugin/hooks.go | 77 +++++++++++ plugin/rpcplugin/hooks_test.go | 58 +++++++++ plugin/rpcplugin/io.go | 23 ++++ plugin/rpcplugin/ipc.go | 28 ++++ plugin/rpcplugin/ipc_test.go | 61 +++++++++ plugin/rpcplugin/main.go | 46 +++++++ plugin/rpcplugin/main_test.go | 58 +++++++++ plugin/rpcplugin/muxer.go | 253 ++++++++++++++++++++++++++++++++++++ plugin/rpcplugin/muxer_test.go | 169 ++++++++++++++++++++++++ plugin/rpcplugin/process.go | 23 ++++ plugin/rpcplugin/process_test.go | 64 +++++++++ plugin/rpcplugin/process_unix.go | 45 +++++++ plugin/rpcplugin/process_windows.go | 17 +++ plugin/rpcplugin/supervisor.go | 128 ++++++++++++++++++ plugin/rpcplugin/supervisor_test.go | 130 ++++++++++++++++++ 17 files changed, 1299 insertions(+) create mode 100644 plugin/rpcplugin/api.go create mode 100644 plugin/rpcplugin/api_test.go create mode 100644 plugin/rpcplugin/hooks.go create mode 100644 plugin/rpcplugin/hooks_test.go create mode 100644 plugin/rpcplugin/io.go create mode 100644 plugin/rpcplugin/ipc.go create mode 100644 plugin/rpcplugin/ipc_test.go create mode 100644 plugin/rpcplugin/main.go create mode 100644 plugin/rpcplugin/main_test.go create mode 100644 plugin/rpcplugin/muxer.go create mode 100644 plugin/rpcplugin/muxer_test.go create mode 100644 plugin/rpcplugin/process.go create mode 100644 plugin/rpcplugin/process_test.go create mode 100644 plugin/rpcplugin/process_unix.go create mode 100644 plugin/rpcplugin/process_windows.go create mode 100644 plugin/rpcplugin/supervisor.go create mode 100644 plugin/rpcplugin/supervisor_test.go (limited to 'plugin/rpcplugin') diff --git a/plugin/rpcplugin/api.go b/plugin/rpcplugin/api.go new file mode 100644 index 000000000..a807d0837 --- /dev/null +++ b/plugin/rpcplugin/api.go @@ -0,0 +1,62 @@ +package rpcplugin + +import ( + "encoding/json" + "io" + "net/rpc" + + "github.com/mattermost/platform/plugin" +) + +type LocalAPI struct { + api plugin.API + muxer *Muxer +} + +func (h *LocalAPI) LoadPluginConfiguration(args struct{}, reply *[]byte) error { + var config interface{} + if err := h.api.LoadPluginConfiguration(&config); err != nil { + return err + } + b, err := json.Marshal(config) + if err != nil { + return err + } + *reply = b + return nil +} + +type RemoteAPI struct { + client *rpc.Client + muxer *Muxer +} + +func ServeAPI(api plugin.API, conn io.ReadWriteCloser, muxer *Muxer) { + server := rpc.NewServer() + server.Register(&LocalAPI{ + api: api, + muxer: muxer, + }) + server.ServeConn(conn) +} + +var _ plugin.API = (*RemoteAPI)(nil) + +func (h *RemoteAPI) LoadPluginConfiguration(dest interface{}) error { + var config []byte + if err := h.client.Call("LocalAPI.LoadPluginConfiguration", struct{}{}, &config); err != nil { + return err + } + return json.Unmarshal(config, dest) +} + +func (h *RemoteAPI) Close() error { + return h.client.Close() +} + +func ConnectAPI(conn io.ReadWriteCloser, muxer *Muxer) *RemoteAPI { + return &RemoteAPI{ + client: rpc.NewClient(conn), + muxer: muxer, + } +} diff --git a/plugin/rpcplugin/api_test.go b/plugin/rpcplugin/api_test.go new file mode 100644 index 000000000..e55433556 --- /dev/null +++ b/plugin/rpcplugin/api_test.go @@ -0,0 +1,57 @@ +package rpcplugin + +import ( + "encoding/json" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/plugintest" +) + +func testAPIRPC(api plugin.API, f func(plugin.API)) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + c1 := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer c1.Close() + + c2 := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer c2.Close() + + id, server := c1.Serve() + go ServeAPI(api, server, c1) + + remote := ConnectAPI(c2.Connect(id), c2) + defer remote.Close() + + f(remote) +} + +func TestAPI(t *testing.T) { + var api plugintest.API + defer api.AssertExpectations(t) + + type Config struct { + Foo string + Bar struct { + Baz string + } + } + + api.On("LoadPluginConfiguration", mock.MatchedBy(func(x interface{}) bool { return true })).Run(func(args mock.Arguments) { + dest := args.Get(0).(interface{}) + json.Unmarshal([]byte(`{"Foo": "foo", "Bar": {"Baz": "baz"}}`), dest) + }).Return(nil) + + testAPIRPC(&api, func(remote plugin.API) { + var config Config + assert.NoError(t, remote.LoadPluginConfiguration(&config)) + + assert.Equal(t, "foo", config.Foo) + assert.Equal(t, "baz", config.Bar.Baz) + }) +} diff --git a/plugin/rpcplugin/hooks.go b/plugin/rpcplugin/hooks.go new file mode 100644 index 000000000..008730402 --- /dev/null +++ b/plugin/rpcplugin/hooks.go @@ -0,0 +1,77 @@ +package rpcplugin + +import ( + "io" + "net/rpc" + + "github.com/mattermost/platform/plugin" +) + +type LocalHooks struct { + hooks plugin.Hooks + muxer *Muxer + remoteAPI *RemoteAPI +} + +func (h *LocalHooks) OnActivate(args int64, reply *struct{}) error { + stream := h.muxer.Connect(args) + if h.remoteAPI != nil { + h.remoteAPI.Close() + } + h.remoteAPI = ConnectAPI(stream, h.muxer) + return h.hooks.OnActivate(h.remoteAPI) +} + +func (h *LocalHooks) OnDeactivate(args, reply *struct{}) error { + err := h.hooks.OnDeactivate() + if h.remoteAPI != nil { + h.remoteAPI.Close() + h.remoteAPI = nil + } + return err +} + +type RemoteHooks struct { + client *rpc.Client + muxer *Muxer + apiCloser io.Closer +} + +func ServeHooks(hooks plugin.Hooks, conn io.ReadWriteCloser, muxer *Muxer) { + server := rpc.NewServer() + server.Register(&LocalHooks{ + hooks: hooks, + muxer: muxer, + }) + server.ServeConn(conn) +} + +var _ plugin.Hooks = (*RemoteHooks)(nil) + +func (h *RemoteHooks) OnActivate(api plugin.API) error { + id, stream := h.muxer.Serve() + if h.apiCloser != nil { + h.apiCloser.Close() + } + h.apiCloser = stream + go ServeAPI(api, stream, h.muxer) + return h.client.Call("LocalHooks.OnActivate", id, nil) +} + +func (h *RemoteHooks) OnDeactivate() error { + return h.client.Call("LocalHooks.OnDeactivate", struct{}{}, nil) +} + +func (h *RemoteHooks) Close() error { + if h.apiCloser != nil { + h.apiCloser.Close() + } + return h.client.Close() +} + +func ConnectHooks(conn io.ReadWriteCloser, muxer *Muxer) *RemoteHooks { + return &RemoteHooks{ + client: rpc.NewClient(conn), + muxer: muxer, + } +} diff --git a/plugin/rpcplugin/hooks_test.go b/plugin/rpcplugin/hooks_test.go new file mode 100644 index 000000000..fbbbbedeb --- /dev/null +++ b/plugin/rpcplugin/hooks_test.go @@ -0,0 +1,58 @@ +package rpcplugin + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/plugintest" +) + +func testHooksRPC(hooks plugin.Hooks, f func(plugin.Hooks)) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + c1 := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer c1.Close() + + c2 := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer c2.Close() + + id, server := c1.Serve() + go ServeHooks(hooks, server, c1) + + remote := ConnectHooks(c2.Connect(id), c2) + defer remote.Close() + + f(remote) +} + +func TestHooks(t *testing.T) { + var api plugintest.API + var hooks plugintest.Hooks + defer hooks.AssertExpectations(t) + + testHooksRPC(&hooks, func(remote plugin.Hooks) { + hooks.On("OnActivate", mock.AnythingOfType("*rpcplugin.RemoteAPI")).Return(nil) + assert.NoError(t, remote.OnActivate(&api)) + + hooks.On("OnDeactivate").Return(nil) + assert.NoError(t, remote.OnDeactivate()) + }) +} + +func BenchmarkOnDeactivate(b *testing.B) { + var hooks plugintest.Hooks + hooks.On("OnDeactivate").Return(nil) + + testHooksRPC(&hooks, func(remote plugin.Hooks) { + b.ResetTimer() + for n := 0; n < b.N; n++ { + remote.OnDeactivate() + } + b.StopTimer() + }) +} diff --git a/plugin/rpcplugin/io.go b/plugin/rpcplugin/io.go new file mode 100644 index 000000000..f1b2f3c35 --- /dev/null +++ b/plugin/rpcplugin/io.go @@ -0,0 +1,23 @@ +package rpcplugin + +import ( + "io" +) + +type rwc struct { + io.ReadCloser + io.WriteCloser +} + +func (rwc *rwc) Close() error { + rerr := rwc.ReadCloser.Close() + werr := rwc.WriteCloser.Close() + if rerr != nil { + return rerr + } + return werr +} + +func NewReadWriteCloser(r io.ReadCloser, w io.WriteCloser) io.ReadWriteCloser { + return &rwc{r, w} +} diff --git a/plugin/rpcplugin/ipc.go b/plugin/rpcplugin/ipc.go new file mode 100644 index 000000000..3e6c89c4f --- /dev/null +++ b/plugin/rpcplugin/ipc.go @@ -0,0 +1,28 @@ +package rpcplugin + +import ( + "io" + "os" +) + +// Returns a new IPC for the parent process and a set of files to pass on to the child. +// +// The returned files must be closed after the child process is started. +func NewIPC() (io.ReadWriteCloser, []*os.File, error) { + parentReader, childWriter, err := os.Pipe() + if err != nil { + return nil, nil, err + } + childReader, parentWriter, err := os.Pipe() + if err != nil { + parentReader.Close() + childWriter.Close() + return nil, nil, err + } + return NewReadWriteCloser(parentReader, parentWriter), []*os.File{childReader, childWriter}, nil +} + +// Returns the IPC instance inherited by the process from its parent. +func InheritedIPC(fd0, fd1 uintptr) (io.ReadWriteCloser, error) { + return NewReadWriteCloser(os.NewFile(fd0, ""), os.NewFile(fd1, "")), nil +} diff --git a/plugin/rpcplugin/ipc_test.go b/plugin/rpcplugin/ipc_test.go new file mode 100644 index 000000000..bf4df3017 --- /dev/null +++ b/plugin/rpcplugin/ipc_test.go @@ -0,0 +1,61 @@ +package rpcplugin + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIPC(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + pingpong := filepath.Join(dir, "pingpong") + compileGo(t, ` + package main + + import ( + "log" + + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + func main() { + ipc, err := rpcplugin.InheritedProcessIPC() + if err != nil { + log.Fatal("unable to get inherited ipc") + } + defer ipc.Close() + _, err = ipc.Write([]byte("ping")) + if err != nil { + log.Fatal("unable to write to ipc") + } + b := make([]byte, 10) + n, err := ipc.Read(b) + if err != nil { + log.Fatal("unable to read from ipc") + } + if n != 4 || string(b[:4]) != "pong" { + log.Fatal("unexpected response") + } + } + `, pingpong) + + p, ipc, err := NewProcess(context.Background(), pingpong) + require.NoError(t, err) + defer ipc.Close() + b := make([]byte, 10) + n, err := ipc.Read(b) + require.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "ping", string(b[:4])) + _, err = ipc.Write([]byte("pong")) + require.NoError(t, err) + require.NoError(t, p.Wait()) +} diff --git a/plugin/rpcplugin/main.go b/plugin/rpcplugin/main.go new file mode 100644 index 000000000..36177954b --- /dev/null +++ b/plugin/rpcplugin/main.go @@ -0,0 +1,46 @@ +package rpcplugin + +import ( + "bufio" + "encoding/binary" + "fmt" + "log" + "os" + + "github.com/mattermost/platform/plugin" +) + +// Makes a set of hooks available via RPC. This function never returns. +func Main(hooks plugin.Hooks) { + ipc, err := InheritedProcessIPC() + if err != nil { + log.Fatal(err.Error()) + } + muxer := NewMuxer(ipc, true) + id, conn := muxer.Serve() + buf := make([]byte, 11) + buf[0] = 0 + n := binary.PutVarint(buf[1:], id) + if _, err := muxer.Write(buf[:1+n]); err != nil { + log.Fatal(err.Error()) + } + ServeHooks(hooks, conn, muxer) + os.Exit(0) +} + +// Returns the hooks being served by a call to Main. +func ConnectMain(muxer *Muxer) (*RemoteHooks, error) { + buf := make([]byte, 1) + if _, err := muxer.Read(buf); err != nil { + return nil, err + } else if buf[0] != 0 { + return nil, fmt.Errorf("unexpected control byte") + } + reader := bufio.NewReader(muxer) + id, err := binary.ReadVarint(reader) + if err != nil { + return nil, err + } + + return ConnectHooks(muxer.Connect(id), muxer), nil +} diff --git a/plugin/rpcplugin/main_test.go b/plugin/rpcplugin/main_test.go new file mode 100644 index 000000000..b54364bad --- /dev/null +++ b/plugin/rpcplugin/main_test.go @@ -0,0 +1,58 @@ +package rpcplugin + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/platform/plugin/plugintest" +) + +func TestMain(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + plugin := filepath.Join(dir, "plugin") + compileGo(t, ` + package main + + import ( + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + type MyPlugin struct {} + + func (p *MyPlugin) OnActivate(api plugin.API) error { + return nil + } + + func (p *MyPlugin) OnDeactivate() error { + return nil + } + + func main() { + rpcplugin.Main(&MyPlugin{}) + } + `, plugin) + + p, ipc, err := NewProcess(context.Background(), plugin) + require.NoError(t, err) + defer p.Wait() + + muxer := NewMuxer(ipc, false) + defer muxer.Close() + + var api plugintest.API + + hooks, err := ConnectMain(muxer) + require.NoError(t, err) + assert.NoError(t, hooks.OnActivate(&api)) + assert.NoError(t, hooks.OnDeactivate()) +} diff --git a/plugin/rpcplugin/muxer.go b/plugin/rpcplugin/muxer.go new file mode 100644 index 000000000..a2bfbf8b6 --- /dev/null +++ b/plugin/rpcplugin/muxer.go @@ -0,0 +1,253 @@ +package rpcplugin + +import ( + "bufio" + "bytes" + "encoding/binary" + "fmt" + "io" + "sync" + "sync/atomic" +) + +// Muxer allows multiple bidirectional streams to be transmitted over a single connection. +// +// Muxer is safe for use by multiple goroutines. +// +// Streams opened on the muxer must be periodically drained in order to reclaim read buffer memory. +// In other words, readers must consume incoming data as it comes in. +type Muxer struct { + // writeMutex guards conn writes + writeMutex sync.Mutex + conn io.ReadWriteCloser + + // didCloseConn is a boolean (0 or 1) used from multiple goroutines via atomic operations + didCloseConn int32 + + // streamsMutex guards streams and nextId + streamsMutex sync.Mutex + nextId int64 + streams map[int64]*muxerStream + + stream0Reader *io.PipeReader + stream0Writer *io.PipeWriter + result chan error +} + +// Creates a new Muxer. +// +// conn must be safe for simultaneous reads by one goroutine and writes by another. +// +// For two muxers communicating with each other via a connection, parity must be true for exactly +// one of them. +func NewMuxer(conn io.ReadWriteCloser, parity bool) *Muxer { + s0r, s0w := io.Pipe() + muxer := &Muxer{ + conn: conn, + streams: make(map[int64]*muxerStream), + result: make(chan error, 1), + nextId: 1, + stream0Reader: s0r, + stream0Writer: s0w, + } + if parity { + muxer.nextId = 2 + } + go muxer.run() + return muxer +} + +// Opens a new stream with a unique id. +// +// Writes made to the stream before the other end calls Connect will be discarded. +func (m *Muxer) Serve() (int64, io.ReadWriteCloser) { + m.streamsMutex.Lock() + id := m.nextId + m.nextId += 2 + m.streamsMutex.Unlock() + return id, m.Connect(id) +} + +// Opens a remotely opened stream. +func (m *Muxer) Connect(id int64) io.ReadWriteCloser { + m.streamsMutex.Lock() + defer m.streamsMutex.Unlock() + mutex := &sync.Mutex{} + stream := &muxerStream{ + id: id, + muxer: m, + mutex: mutex, + readWake: sync.NewCond(mutex), + } + m.streams[id] = stream + return stream +} + +// Calling Read on the muxer directly performs a read on a dedicated, always-open channel. +func (m *Muxer) Read(p []byte) (int, error) { + return m.stream0Reader.Read(p) +} + +// Calling Write on the muxer directly performs a write on a dedicated, always-open channel. +func (m *Muxer) Write(p []byte) (int, error) { + return m.write(p, 0) +} + +// Closes the muxer. +func (m *Muxer) Close() error { + if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) { + m.conn.Close() + } + m.stream0Reader.Close() + m.stream0Writer.Close() + <-m.result + return nil +} + +func (m *Muxer) IsClosed() bool { + return atomic.LoadInt32(&m.didCloseConn) > 0 +} + +func (m *Muxer) write(p []byte, sid int64) (int, error) { + m.writeMutex.Lock() + defer m.writeMutex.Unlock() + if m.IsClosed() { + return 0, fmt.Errorf("muxer closed") + } + buf := make([]byte, 10) + n := binary.PutVarint(buf, sid) + if _, err := m.conn.Write(buf[:n]); err != nil { + m.shutdown(err) + return 0, err + } + n = binary.PutVarint(buf, int64(len(p))) + if _, err := m.conn.Write(buf[:n]); err != nil { + m.shutdown(err) + return 0, err + } + if _, err := m.conn.Write(p); err != nil { + m.shutdown(err) + return 0, err + } + return len(p), nil +} + +func (m *Muxer) rm(sid int64) { + m.streamsMutex.Lock() + defer m.streamsMutex.Unlock() + delete(m.streams, sid) +} + +func (m *Muxer) run() { + m.shutdown(m.loop()) +} + +func (m *Muxer) loop() error { + reader := bufio.NewReader(m.conn) + + for { + sid, err := binary.ReadVarint(reader) + if err != nil { + return err + } + len, err := binary.ReadVarint(reader) + if err != nil { + return err + } + + if sid == 0 { + if _, err := io.CopyN(m.stream0Writer, reader, len); err != nil { + return err + } + continue + } + + m.streamsMutex.Lock() + stream, ok := m.streams[sid] + m.streamsMutex.Unlock() + if !ok { + if _, err := reader.Discard(int(len)); err != nil { + return err + } + continue + } + + stream.mutex.Lock() + if stream.isClosed { + stream.mutex.Unlock() + if _, err := reader.Discard(int(len)); err != nil { + return err + } + continue + } + _, err = io.CopyN(&stream.readBuf, reader, len) + stream.mutex.Unlock() + if err != nil { + return err + } + stream.readWake.Signal() + } +} + +func (m *Muxer) shutdown(err error) { + if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) { + m.conn.Close() + } + go func() { + m.streamsMutex.Lock() + for _, stream := range m.streams { + stream.mutex.Lock() + stream.readWake.Signal() + stream.mutex.Unlock() + } + m.streams = make(map[int64]*muxerStream) + m.streamsMutex.Unlock() + }() + m.result <- err +} + +type muxerStream struct { + id int64 + muxer *Muxer + readBuf bytes.Buffer + mutex *sync.Mutex + readWake *sync.Cond + isClosed bool + closeErr error +} + +func (s *muxerStream) Read(p []byte) (int, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + for { + if s.muxer.IsClosed() { + return 0, fmt.Errorf("muxer closed") + } else if s.isClosed { + return 0, io.EOF + } else if s.readBuf.Len() > 0 { + n, err := s.readBuf.Read(p) + return n, err + } + s.readWake.Wait() + } +} + +func (s *muxerStream) Write(p []byte) (int, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.isClosed { + return 0, fmt.Errorf("stream closed") + } + return s.muxer.write(p, s.id) +} + +func (s *muxerStream) Close() error { + s.mutex.Lock() + defer s.mutex.Unlock() + if !s.isClosed { + s.isClosed = true + s.muxer.rm(s.id) + } + s.readWake.Signal() + return nil +} diff --git a/plugin/rpcplugin/muxer_test.go b/plugin/rpcplugin/muxer_test.go new file mode 100644 index 000000000..7bb63d4f8 --- /dev/null +++ b/plugin/rpcplugin/muxer_test.go @@ -0,0 +1,169 @@ +package rpcplugin + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMuxer(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + id1, alice1 := alice.Serve() + defer func() { assert.NoError(t, alice1.Close()) }() + + id2, bob2 := bob.Serve() + defer func() { assert.NoError(t, bob2.Close()) }() + + done1 := make(chan bool) + done2 := make(chan bool) + + go func() { + bob1 := bob.Connect(id1) + defer func() { assert.NoError(t, bob1.Close()) }() + + n, err := bob1.Write([]byte("ping1.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + + n, err = bob1.Write([]byte("ping1.1")) + require.NoError(t, err) + assert.Equal(t, n, 7) + }() + + go func() { + alice2 := alice.Connect(id2) + defer func() { assert.NoError(t, alice2.Close()) }() + + n, err := alice2.Write([]byte("ping2.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + + buf := make([]byte, 20) + n, err = alice2.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("pong2.0"), buf[:n]) + + done2 <- true + }() + + go func() { + buf := make([]byte, 7) + n, err := io.ReadFull(alice1, buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping1.0"), buf[:n]) + + n, err = alice1.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping1.1"), buf[:n]) + + done1 <- true + }() + + go func() { + buf := make([]byte, 20) + n, err := bob2.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping2.0"), buf[:n]) + + n, err = bob2.Write([]byte("pong2.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + }() + + <-done1 + <-done2 +} + +// Closing a muxer during a read should unblock, but return an error. +func TestMuxer_CloseDuringRead(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + _, s := alice.Serve() + + go alice.Close() + buf := make([]byte, 20) + n, err := s.Read(buf) + assert.Equal(t, 0, n) + assert.NotNil(t, err) + assert.NotEqual(t, io.EOF, err) +} + +// Closing a stream during a read should unblock and return io.EOF since this is the way to +// gracefully close a connection. +func TestMuxer_StreamCloseDuringRead(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + _, s := alice.Serve() + + go s.Close() + buf := make([]byte, 20) + n, err := s.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, io.EOF, err) +} + +// Closing a muxer during a write should unblock, but return an error. +func TestMuxer_CloseDuringWrite(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + + // Don't connect bob to let writes will block forever. + defer r2.Close() + defer w1.Close() + + _, s := alice.Serve() + + go alice.Close() + buf := make([]byte, 20) + n, err := s.Write(buf) + assert.Equal(t, 0, n) + assert.NotNil(t, err) + assert.NotEqual(t, io.EOF, err) +} + +func TestMuxer_ReadWrite(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + go alice.Write([]byte("hello")) + buf := make([]byte, 20) + n, err := bob.Read(buf) + assert.Equal(t, 5, n) + assert.Nil(t, err) + assert.Equal(t, []byte("hello"), buf[:n]) +} diff --git a/plugin/rpcplugin/process.go b/plugin/rpcplugin/process.go new file mode 100644 index 000000000..4b3362d68 --- /dev/null +++ b/plugin/rpcplugin/process.go @@ -0,0 +1,23 @@ +package rpcplugin + +import ( + "context" + "io" +) + +type Process interface { + // Waits for the process to exit and returns an error if a problem occurred or the process exited + // with a non-zero status. + Wait() error +} + +// NewProcess launches an RPC executable in a new process and returns an IPC that can be used to +// communicate with it. +func NewProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) { + return newProcess(ctx, path) +} + +// When called on a process launched with NewProcess, returns the inherited IPC. +func InheritedProcessIPC() (io.ReadWriteCloser, error) { + return inheritedProcessIPC() +} diff --git a/plugin/rpcplugin/process_test.go b/plugin/rpcplugin/process_test.go new file mode 100644 index 000000000..b7984ad0a --- /dev/null +++ b/plugin/rpcplugin/process_test.go @@ -0,0 +1,64 @@ +package rpcplugin + +import ( + "context" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func compileGo(t *testing.T, sourceCode, outputPath string) { + dir, err := ioutil.TempDir(".", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "main.go"), []byte(sourceCode), 0600)) + cmd := exec.Command("go", "build", "-o", outputPath, "main.go") + cmd.Dir = dir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + require.NoError(t, cmd.Run()) +} + +func TestProcess(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + ping := filepath.Join(dir, "ping") + compileGo(t, ` + package main + + import ( + "log" + + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + func main() { + ipc, err := rpcplugin.InheritedProcessIPC() + if err != nil { + log.Fatal("unable to get inherited ipc") + } + defer ipc.Close() + _, err = ipc.Write([]byte("ping")) + if err != nil { + log.Fatal("unable to write to ipc") + } + } + `, ping) + + p, ipc, err := NewProcess(context.Background(), ping) + require.NoError(t, err) + defer ipc.Close() + b := make([]byte, 10) + n, err := ipc.Read(b) + require.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "ping", string(b[:4])) + require.NoError(t, p.Wait()) +} diff --git a/plugin/rpcplugin/process_unix.go b/plugin/rpcplugin/process_unix.go new file mode 100644 index 000000000..f196e34f8 --- /dev/null +++ b/plugin/rpcplugin/process_unix.go @@ -0,0 +1,45 @@ +// +build !windows + +package rpcplugin + +import ( + "context" + "io" + "os" + "os/exec" +) + +type process struct { + command *exec.Cmd +} + +func newProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) { + ipc, childFiles, err := NewIPC() + if err != nil { + return nil, nil, err + } + defer childFiles[0].Close() + defer childFiles[1].Close() + + cmd := exec.CommandContext(ctx, path) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.ExtraFiles = childFiles + err = cmd.Start() + if err != nil { + ipc.Close() + return nil, nil, err + } + + return &process{ + command: cmd, + }, ipc, nil +} + +func (p *process) Wait() error { + return p.command.Wait() +} + +func inheritedProcessIPC() (io.ReadWriteCloser, error) { + return InheritedIPC(3, 4) +} diff --git a/plugin/rpcplugin/process_windows.go b/plugin/rpcplugin/process_windows.go new file mode 100644 index 000000000..7be03cacd --- /dev/null +++ b/plugin/rpcplugin/process_windows.go @@ -0,0 +1,17 @@ +package rpcplugin + +import ( + "context" + "fmt" + "io" +) + +func newProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) { + // TODO + return nil, nil, fmt.Errorf("not yet supported") +} + +func inheritedProcessIPC() (*IPC, error) { + // TODO + return nil, fmt.Errorf("not yet supported") +} diff --git a/plugin/rpcplugin/supervisor.go b/plugin/rpcplugin/supervisor.go new file mode 100644 index 000000000..9316d7186 --- /dev/null +++ b/plugin/rpcplugin/supervisor.go @@ -0,0 +1,128 @@ +package rpcplugin + +import ( + "context" + "fmt" + "path/filepath" + "sync/atomic" + "time" + + "github.com/mattermost/platform/plugin" +) + +// Supervisor implements a plugin.Supervisor that launches the plugin in a separate process and +// communicates via RPC. +// +// If the plugin unexpectedly exists, the supervisor will relaunch it after a short delay. +type Supervisor struct { + executable string + hooks atomic.Value + done chan bool + cancel context.CancelFunc +} + +var _ plugin.Supervisor = (*Supervisor)(nil) + +// Starts the plugin. This method will block until the plugin is successfully launched for the first +// time and will return an error if the plugin cannot be launched at all. +func (s *Supervisor) Start() error { + ctx, cancel := context.WithCancel(context.Background()) + s.done = make(chan bool, 1) + start := make(chan error, 1) + go s.run(ctx, start) + + select { + case <-time.After(time.Second * 3): + cancel() + <-s.done + return fmt.Errorf("timed out waiting for plugin") + case err := <-start: + s.cancel = cancel + return err + } +} + +// Stops the plugin. +func (s *Supervisor) Stop() error { + s.cancel() + <-s.done + return nil +} + +// Returns the hooks used to communicate with the plugin. The hooks may change if the plugin is +// restarted, so the return value should not be cached. +func (s *Supervisor) Hooks() plugin.Hooks { + return s.hooks.Load().(plugin.Hooks) +} + +func (s *Supervisor) run(ctx context.Context, start chan<- error) { + defer func() { + s.done <- true + }() + done := ctx.Done() + for { + s.runPlugin(ctx, start) + select { + case <-done: + return + default: + start = nil + time.Sleep(time.Second) + } + } +} + +func (s *Supervisor) runPlugin(ctx context.Context, start chan<- error) error { + p, ipc, err := NewProcess(ctx, s.executable) + if err != nil { + if start != nil { + start <- err + } + return err + } + + muxer := NewMuxer(ipc, false) + closeMuxer := make(chan bool, 1) + muxerClosed := make(chan error, 1) + go func() { + select { + case <-ctx.Done(): + break + case <-closeMuxer: + break + } + muxerClosed <- muxer.Close() + }() + + hooks, err := ConnectMain(muxer) + if err != nil { + if start != nil { + start <- err + } + closeMuxer <- true + <-muxerClosed + p.Wait() + return err + } + + s.hooks.Store(hooks) + if start != nil { + start <- nil + } + p.Wait() + closeMuxer <- true + <-muxerClosed + + return nil +} + +func SupervisorProvider(bundle *plugin.BundleInfo) (plugin.Supervisor, error) { + if bundle.Manifest == nil { + return nil, fmt.Errorf("no manifest available") + } else if bundle.Manifest.Backend == nil || bundle.Manifest.Backend.Executable == "" { + return nil, fmt.Errorf("no backend executable specified") + } + return &Supervisor{ + executable: filepath.Join(bundle.Path, bundle.Manifest.Backend.Executable), + }, nil +} diff --git a/plugin/rpcplugin/supervisor_test.go b/plugin/rpcplugin/supervisor_test.go new file mode 100644 index 000000000..1d046bf82 --- /dev/null +++ b/plugin/rpcplugin/supervisor_test.go @@ -0,0 +1,130 @@ +package rpcplugin + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/platform/plugin" +) + +func TestSupervisor(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + backend := filepath.Join(dir, "backend") + compileGo(t, ` + package main + + import ( + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + type MyPlugin struct {} + + func (p *MyPlugin) OnActivate(api plugin.API) error { + return nil + } + + func (p *MyPlugin) OnDeactivate() error { + return nil + } + + func main() { + rpcplugin.Main(&MyPlugin{}) + } + `, backend) + + ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600) + + bundle := plugin.BundleInfoForPath(dir) + supervisor, err := SupervisorProvider(bundle) + require.NoError(t, err) + require.NoError(t, supervisor.Start()) + require.NoError(t, supervisor.Hooks().OnActivate(nil)) + require.NoError(t, supervisor.Stop()) +} + +// If plugin development goes really wrong, let's make sure plugin activation won't block forever. +func TestSupervisor_StartTimeout(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + backend := filepath.Join(dir, "backend") + compileGo(t, ` + package main + + func main() { + for { + } + } + `, backend) + + ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600) + + bundle := plugin.BundleInfoForPath(dir) + supervisor, err := SupervisorProvider(bundle) + require.NoError(t, err) + require.Error(t, supervisor.Start()) +} + +// Crashed plugins should be relaunched. +func TestSupervisor_PluginCrash(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + backend := filepath.Join(dir, "backend") + compileGo(t, ` + package main + + import ( + "os" + + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + type MyPlugin struct {} + + func (p *MyPlugin) OnActivate(api plugin.API) error { + os.Exit(1) + return nil + } + + func (p *MyPlugin) OnDeactivate() error { + return nil + } + + func main() { + rpcplugin.Main(&MyPlugin{}) + } + `, backend) + + ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600) + + bundle := plugin.BundleInfoForPath(dir) + supervisor, err := SupervisorProvider(bundle) + require.NoError(t, err) + require.NoError(t, supervisor.Start()) + require.Error(t, supervisor.Hooks().OnActivate(nil)) + + recovered := false + for i := 0; i < 30; i++ { + if supervisor.Hooks().OnDeactivate() == nil { + recovered = true + break + } + time.Sleep(time.Millisecond * 100) + } + assert.True(t, recovered) + require.NoError(t, supervisor.Stop()) +} -- cgit v1.2.3-1-g7c22