From 402491b7e52c4d836c1274976cdb387852cfd17b Mon Sep 17 00:00:00 2001 From: Chris Date: Mon, 11 Sep 2017 10:02:02 -0500 Subject: PLT-7407: Back-end plugins (#7409) * tie back-end plugins together * fix comment typo * add tests and a bit of polish * tests and polish * add test, don't let backend executable paths escape the plugin directory --- plugin/rpcplugin/hooks.go | 29 ++++++- plugin/rpcplugin/hooks_test.go | 44 ++++++++++ plugin/rpcplugin/io.go | 163 +++++++++++++++++++++++++++++++++--- plugin/rpcplugin/io_test.go | 73 ++++++++++++++++ plugin/rpcplugin/ipc.go | 2 +- plugin/rpcplugin/supervisor.go | 7 +- plugin/rpcplugin/supervisor_test.go | 13 +++ 7 files changed, 315 insertions(+), 16 deletions(-) create mode 100644 plugin/rpcplugin/io_test.go (limited to 'plugin/rpcplugin') diff --git a/plugin/rpcplugin/hooks.go b/plugin/rpcplugin/hooks.go index 68bce41eb..18e4a6672 100644 --- a/plugin/rpcplugin/hooks.go +++ b/plugin/rpcplugin/hooks.go @@ -86,6 +86,15 @@ func (h *LocalHooks) OnDeactivate(args, reply *struct{}) (err error) { return } +func (h *LocalHooks) OnConfigurationChange(args, reply *struct{}) error { + if hook, ok := h.hooks.(interface { + OnConfigurationChange() error + }); ok { + return hook.OnConfigurationChange() + } + return nil +} + type ServeHTTPArgs struct { ResponseWriterStream int64 Request *http.Request @@ -122,11 +131,14 @@ func ServeHooks(hooks interface{}, conn io.ReadWriteCloser, muxer *Muxer) { server.ServeConn(conn) } +// These assignments are part of the wire protocol. You can add more, but should not change existing +// assignments. const ( - remoteOnActivate = iota - remoteOnDeactivate - remoteServeHTTP - maxRemoteHookCount + remoteOnActivate = 0 + remoteOnDeactivate = 1 + remoteServeHTTP = 2 + remoteOnConfigurationChange = 3 + maxRemoteHookCount = iota ) type RemoteHooks struct { @@ -164,6 +176,13 @@ func (h *RemoteHooks) OnDeactivate() error { return h.client.Call("LocalHooks.OnDeactivate", struct{}{}, nil) } +func (h *RemoteHooks) OnConfigurationChange() error { + if !h.implemented[remoteOnConfigurationChange] { + return nil + } + return h.client.Call("LocalHooks.OnConfigurationChange", struct{}{}, nil) +} + func (h *RemoteHooks) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !h.implemented[remoteServeHTTP] { http.NotFound(w, r) @@ -227,6 +246,8 @@ func ConnectHooks(conn io.ReadWriteCloser, muxer *Muxer) (*RemoteHooks, error) { remote.implemented[remoteOnActivate] = true case "OnDeactivate": remote.implemented[remoteOnDeactivate] = true + case "OnConfigurationChange": + remote.implemented[remoteOnConfigurationChange] = true case "ServeHTTP": remote.implemented[remoteServeHTTP] = true } diff --git a/plugin/rpcplugin/hooks_test.go b/plugin/rpcplugin/hooks_test.go index c3c6c8448..37c529510 100644 --- a/plugin/rpcplugin/hooks_test.go +++ b/plugin/rpcplugin/hooks_test.go @@ -6,10 +6,12 @@ import ( "net/http" "net/http/httptest" "strings" + "sync" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/mattermost/mattermost-server/plugin" "github.com/mattermost/mattermost-server/plugin/plugintest" @@ -50,6 +52,9 @@ func TestHooks(t *testing.T) { hooks.On("OnDeactivate").Return(nil) assert.NoError(t, remote.OnDeactivate()) + hooks.On("OnConfigurationChange").Return(nil) + assert.NoError(t, remote.OnConfigurationChange()) + hooks.On("ServeHTTP", mock.AnythingOfType("*rpcplugin.RemoteHTTPResponseWriter"), mock.AnythingOfType("*http.Request")).Run(func(args mock.Arguments) { w := args.Get(0).(http.ResponseWriter) r := args.Get(1).(*http.Request) @@ -77,6 +82,45 @@ func TestHooks(t *testing.T) { })) } +func TestHooks_Concurrency(t *testing.T) { + var hooks plugintest.Hooks + defer hooks.AssertExpectations(t) + + assert.NoError(t, testHooksRPC(&hooks, func(remote *RemoteHooks) { + ch := make(chan bool) + + hooks.On("ServeHTTP", mock.AnythingOfType("*rpcplugin.RemoteHTTPResponseWriter"), mock.AnythingOfType("*http.Request")).Run(func(args mock.Arguments) { + r := args.Get(1).(*http.Request) + if r.URL.Path == "/1" { + <-ch + } else { + ch <- true + } + }) + + rec := httptest.NewRecorder() + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + req, err := http.NewRequest("GET", "/1", nil) + require.NoError(t, err) + remote.ServeHTTP(rec, req) + wg.Done() + }() + + go func() { + req, err := http.NewRequest("GET", "/2", nil) + require.NoError(t, err) + remote.ServeHTTP(rec, req) + wg.Done() + }() + + wg.Wait() + })) +} + type testHooks struct { mock.Mock } diff --git a/plugin/rpcplugin/io.go b/plugin/rpcplugin/io.go index 38229d868..21d79ab0b 100644 --- a/plugin/rpcplugin/io.go +++ b/plugin/rpcplugin/io.go @@ -2,26 +2,169 @@ package rpcplugin import ( "bufio" + "bytes" "encoding/binary" "io" - "os" + "sync" ) +type asyncRead struct { + b []byte + err error +} + +type asyncReadCloser struct { + io.ReadCloser + buffer bytes.Buffer + read chan struct{} + reads chan asyncRead + close chan struct{} + closeOnce sync.Once +} + +// NewAsyncReadCloser returns a ReadCloser that supports Close during Read. +func NewAsyncReadCloser(r io.ReadCloser) io.ReadCloser { + ret := &asyncReadCloser{ + ReadCloser: r, + read: make(chan struct{}), + reads: make(chan asyncRead), + close: make(chan struct{}), + } + go ret.loop() + return ret +} + +func (r *asyncReadCloser) loop() { + buf := make([]byte, 1024*8) + var n int + var err error + for { + select { + case <-r.read: + n = 0 + if err == nil { + n, err = r.ReadCloser.Read(buf) + } + select { + case r.reads <- asyncRead{buf[:n], err}: + case <-r.close: + } + case <-r.close: + r.ReadCloser.Close() + return + } + } +} + +func (r *asyncReadCloser) Read(b []byte) (int, error) { + if r.buffer.Len() > 0 { + return r.buffer.Read(b) + } + select { + case r.read <- struct{}{}: + case <-r.close: + } + select { + case read := <-r.reads: + if read.err != nil { + return 0, read.err + } + n := copy(b, read.b) + if n < len(read.b) { + r.buffer.Write(read.b[n:]) + } + return n, nil + case <-r.close: + return 0, io.EOF + } +} + +func (r *asyncReadCloser) Close() error { + r.closeOnce.Do(func() { + close(r.close) + }) + return nil +} + +type asyncWrite struct { + n int + err error +} + +type asyncWriteCloser struct { + io.WriteCloser + writeBuffer bytes.Buffer + write chan struct{} + writes chan asyncWrite + close chan struct{} + closeOnce sync.Once +} + +// NewAsyncWriteCloser returns a WriteCloser that supports Close during Write. +func NewAsyncWriteCloser(w io.WriteCloser) io.WriteCloser { + ret := &asyncWriteCloser{ + WriteCloser: w, + write: make(chan struct{}), + writes: make(chan asyncWrite), + close: make(chan struct{}), + } + go ret.loop() + return ret +} + +func (w *asyncWriteCloser) loop() { + var n int64 + var err error + for { + select { + case <-w.write: + n = 0 + if err == nil { + n, err = w.writeBuffer.WriteTo(w.WriteCloser) + } + select { + case w.writes <- asyncWrite{int(n), err}: + case <-w.close: + } + case <-w.close: + w.WriteCloser.Close() + return + } + } +} + +func (w *asyncWriteCloser) Write(b []byte) (int, error) { + if n, err := w.writeBuffer.Write(b); err != nil { + return n, err + } + select { + case w.write <- struct{}{}: + case <-w.close: + } + select { + case write := <-w.writes: + return write.n, write.err + case <-w.close: + return 0, io.EOF + } +} + +func (w *asyncWriteCloser) Close() error { + w.closeOnce.Do(func() { + close(w.close) + }) + return nil +} + type rwc struct { io.ReadCloser io.WriteCloser } func (rwc *rwc) Close() (err error) { - if f, ok := rwc.ReadCloser.(*os.File); ok { - // https://groups.google.com/d/topic/golang-nuts/i4w58KJ5-J8/discussion - err = os.NewFile(f.Fd(), "").Close() - } else { - err = rwc.ReadCloser.Close() - } - werr := rwc.WriteCloser.Close() - if err == nil { - err = werr + err = rwc.WriteCloser.Close() + if rerr := rwc.ReadCloser.Close(); err == nil { + err = rerr } return } diff --git a/plugin/rpcplugin/io_test.go b/plugin/rpcplugin/io_test.go new file mode 100644 index 000000000..cb31b23b3 --- /dev/null +++ b/plugin/rpcplugin/io_test.go @@ -0,0 +1,73 @@ +package rpcplugin + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewAsyncReadCloser(t *testing.T) { + rf, w, err := os.Pipe() + require.NoError(t, err) + r := NewAsyncReadCloser(rf) + defer r.Close() + + go func() { + w.Write([]byte("foo")) + w.Close() + }() + + foo, err := ioutil.ReadAll(r) + require.NoError(t, err) + assert.Equal(t, "foo", string(foo)) +} + +func TestNewAsyncReadCloser_CloseDuringRead(t *testing.T) { + rf, w, err := os.Pipe() + require.NoError(t, err) + defer w.Close() + + r := NewAsyncReadCloser(rf) + + go func() { + time.Sleep(time.Millisecond * 200) + r.Close() + }() + r.Read(make([]byte, 10)) +} + +func TestNewAsyncWriteCloser(t *testing.T) { + r, wf, err := os.Pipe() + require.NoError(t, err) + w := NewAsyncWriteCloser(wf) + defer w.Close() + + go func() { + foo, err := ioutil.ReadAll(r) + require.NoError(t, err) + assert.Equal(t, "foo", string(foo)) + r.Close() + }() + + n, err := w.Write([]byte("foo")) + require.NoError(t, err) + assert.Equal(t, 3, n) +} + +func TestNewAsyncWriteCloser_CloseDuringWrite(t *testing.T) { + r, wf, err := os.Pipe() + require.NoError(t, err) + defer r.Close() + + w := NewAsyncWriteCloser(wf) + + go func() { + time.Sleep(time.Millisecond * 200) + w.Close() + }() + w.Write(make([]byte, 10)) +} diff --git a/plugin/rpcplugin/ipc.go b/plugin/rpcplugin/ipc.go index 3e6c89c4f..bbb3db06e 100644 --- a/plugin/rpcplugin/ipc.go +++ b/plugin/rpcplugin/ipc.go @@ -19,7 +19,7 @@ func NewIPC() (io.ReadWriteCloser, []*os.File, error) { childWriter.Close() return nil, nil, err } - return NewReadWriteCloser(parentReader, parentWriter), []*os.File{childReader, childWriter}, nil + return NewReadWriteCloser(NewAsyncReadCloser(parentReader), NewAsyncWriteCloser(parentWriter)), []*os.File{childReader, childWriter}, nil } // Returns the IPC instance inherited by the process from its parent. diff --git a/plugin/rpcplugin/supervisor.go b/plugin/rpcplugin/supervisor.go index 6a00d0468..7e37e2851 100644 --- a/plugin/rpcplugin/supervisor.go +++ b/plugin/rpcplugin/supervisor.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path/filepath" + "strings" "sync/atomic" "time" @@ -123,7 +124,11 @@ func SupervisorProvider(bundle *model.BundleInfo) (plugin.Supervisor, error) { } else if bundle.Manifest.Backend == nil || bundle.Manifest.Backend.Executable == "" { return nil, fmt.Errorf("no backend executable specified") } + executable := filepath.Clean(filepath.Join(".", bundle.Manifest.Backend.Executable)) + if strings.HasPrefix(executable, "..") { + return nil, fmt.Errorf("invalid backend executable") + } return &Supervisor{ - executable: filepath.Join(bundle.Path, bundle.Manifest.Backend.Executable), + executable: filepath.Join(bundle.Path, executable), }, nil } diff --git a/plugin/rpcplugin/supervisor_test.go b/plugin/rpcplugin/supervisor_test.go index 6940adcad..bad38b2d7 100644 --- a/plugin/rpcplugin/supervisor_test.go +++ b/plugin/rpcplugin/supervisor_test.go @@ -43,6 +43,19 @@ func TestSupervisor(t *testing.T) { require.NoError(t, supervisor.Stop()) } +func TestSupervisor_InvalidExecutablePath(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "/foo/../../backend.exe"}}`), 0600) + + bundle := model.BundleInfoForPath(dir) + supervisor, err := SupervisorProvider(bundle) + assert.Nil(t, supervisor) + assert.Error(t, err) +} + // If plugin development goes really wrong, let's make sure plugin activation won't block forever. func TestSupervisor_StartTimeout(t *testing.T) { dir, err := ioutil.TempDir("", "") -- cgit v1.2.3-1-g7c22