From f80d50adbddf55a043dfcab5b47d7c1e22749b7d Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 16 Aug 2017 17:23:38 -0500 Subject: PLT-7407: Back-end plugin mechanism (#7177) * begin backend plugin wip * flesh out rpcplugin. everything done except for minor supervisor stubs * done with basic plugin infrastructure * simplify tests * remove unused test lines --- plugin/api.go | 7 + plugin/bundle_info.go | 20 +++ plugin/bundle_info_test.go | 30 ++++ plugin/hooks.go | 10 ++ plugin/manifest.go | 70 +++++++++ plugin/manifest_test.go | 97 ++++++++++++ plugin/pluginenv/environment.go | 123 +++++++++++++++ plugin/pluginenv/environment_test.go | 291 +++++++++++++++++++++++++++++++++++ plugin/pluginenv/options.go | 42 +++++ plugin/pluginenv/options_test.go | 32 ++++ plugin/pluginenv/search_path.go | 32 ++++ plugin/pluginenv/search_path_test.go | 62 ++++++++ plugin/plugintest/api.go | 17 ++ plugin/plugintest/hooks.go | 21 +++ plugin/rpcplugin/api.go | 62 ++++++++ plugin/rpcplugin/api_test.go | 57 +++++++ plugin/rpcplugin/hooks.go | 77 +++++++++ plugin/rpcplugin/hooks_test.go | 58 +++++++ plugin/rpcplugin/io.go | 23 +++ plugin/rpcplugin/ipc.go | 28 ++++ plugin/rpcplugin/ipc_test.go | 61 ++++++++ plugin/rpcplugin/main.go | 46 ++++++ plugin/rpcplugin/main_test.go | 58 +++++++ plugin/rpcplugin/muxer.go | 253 ++++++++++++++++++++++++++++++ plugin/rpcplugin/muxer_test.go | 169 ++++++++++++++++++++ plugin/rpcplugin/process.go | 23 +++ plugin/rpcplugin/process_test.go | 64 ++++++++ plugin/rpcplugin/process_unix.go | 45 ++++++ plugin/rpcplugin/process_windows.go | 17 ++ plugin/rpcplugin/supervisor.go | 128 +++++++++++++++ plugin/rpcplugin/supervisor_test.go | 130 ++++++++++++++++ plugin/supervisor.go | 8 + 32 files changed, 2161 insertions(+) create mode 100644 plugin/api.go create mode 100644 plugin/bundle_info.go create mode 100644 plugin/bundle_info_test.go create mode 100644 plugin/hooks.go create mode 100644 plugin/manifest.go create mode 100644 plugin/manifest_test.go create mode 100644 plugin/pluginenv/environment.go create mode 100644 plugin/pluginenv/environment_test.go create mode 100644 plugin/pluginenv/options.go create mode 100644 plugin/pluginenv/options_test.go create mode 100644 plugin/pluginenv/search_path.go create mode 100644 plugin/pluginenv/search_path_test.go create mode 100644 plugin/plugintest/api.go create mode 100644 plugin/plugintest/hooks.go create mode 100644 plugin/rpcplugin/api.go create mode 100644 plugin/rpcplugin/api_test.go create mode 100644 plugin/rpcplugin/hooks.go create mode 100644 plugin/rpcplugin/hooks_test.go create mode 100644 plugin/rpcplugin/io.go create mode 100644 plugin/rpcplugin/ipc.go create mode 100644 plugin/rpcplugin/ipc_test.go create mode 100644 plugin/rpcplugin/main.go create mode 100644 plugin/rpcplugin/main_test.go create mode 100644 plugin/rpcplugin/muxer.go create mode 100644 plugin/rpcplugin/muxer_test.go create mode 100644 plugin/rpcplugin/process.go create mode 100644 plugin/rpcplugin/process_test.go create mode 100644 plugin/rpcplugin/process_unix.go create mode 100644 plugin/rpcplugin/process_windows.go create mode 100644 plugin/rpcplugin/supervisor.go create mode 100644 plugin/rpcplugin/supervisor_test.go create mode 100644 plugin/supervisor.go (limited to 'plugin') diff --git a/plugin/api.go b/plugin/api.go new file mode 100644 index 000000000..c62ae0f55 --- /dev/null +++ b/plugin/api.go @@ -0,0 +1,7 @@ +package plugin + +type API interface { + // LoadPluginConfiguration loads the plugin's configuration. dest should be a pointer to a + // struct that the configuration JSON can be unmarshalled to. + LoadPluginConfiguration(dest interface{}) error +} diff --git a/plugin/bundle_info.go b/plugin/bundle_info.go new file mode 100644 index 000000000..9dc47ceea --- /dev/null +++ b/plugin/bundle_info.go @@ -0,0 +1,20 @@ +package plugin + +type BundleInfo struct { + Path string + + Manifest *Manifest + ManifestPath string + ManifestError error +} + +// Returns bundle info for the given path. The return value is never nil. +func BundleInfoForPath(path string) *BundleInfo { + m, mpath, err := FindManifest(path) + return &BundleInfo{ + Path: path, + Manifest: m, + ManifestPath: mpath, + ManifestError: err, + } +} diff --git a/plugin/bundle_info_test.go b/plugin/bundle_info_test.go new file mode 100644 index 000000000..94a0c624f --- /dev/null +++ b/plugin/bundle_info_test.go @@ -0,0 +1,30 @@ +package plugin + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBundleInfoForPath(t *testing.T) { + dir, err := ioutil.TempDir("", "mm-plugin-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + path := filepath.Join(dir, "plugin.json") + f, err := os.Create(path) + require.NoError(t, err) + _, err = f.WriteString(`{"id": "foo"}`) + f.Close() + require.NoError(t, err) + + info := BundleInfoForPath(dir) + assert.Equal(t, info.Path, dir) + assert.NotNil(t, info.Manifest) + assert.Equal(t, info.ManifestPath, path) + assert.Nil(t, info.ManifestError) +} diff --git a/plugin/hooks.go b/plugin/hooks.go new file mode 100644 index 000000000..28a762a1a --- /dev/null +++ b/plugin/hooks.go @@ -0,0 +1,10 @@ +package plugin + +type Hooks interface { + // OnActivate is invoked when the plugin is activated. + OnActivate(API) error + + // OnDeactivate is invoked when the plugin is deactivated. This is the plugin's last chance to + // use the API, and the plugin will be terminated shortly after this invocation. + OnDeactivate() error +} diff --git a/plugin/manifest.go b/plugin/manifest.go new file mode 100644 index 000000000..15b7f0555 --- /dev/null +++ b/plugin/manifest.go @@ -0,0 +1,70 @@ +package plugin + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + "gopkg.in/yaml.v2" +) + +type Manifest struct { + Id string `json:"id" yaml:"id"` + Backend *ManifestBackend `json:"backend,omitempty" yaml:"backend,omitempty"` +} + +type ManifestBackend struct { + Executable string `json:"executable" yaml:"executable"` +} + +// FindManifest will find and parse the manifest in a given directory. +// +// In all cases other than a does-not-exist error, path is set to the path of the manifest file that was +// found. +// +// Manifests are JSON or YAML files named plugin.json, plugin.yaml, or plugin.yml. +func FindManifest(dir string) (manifest *Manifest, path string, err error) { + for _, name := range []string{"plugin.yml", "plugin.yaml"} { + path = filepath.Join(dir, name) + f, ferr := os.Open(path) + if ferr != nil { + if !os.IsNotExist(ferr) { + err = ferr + return + } + continue + } + b, ioerr := ioutil.ReadAll(f) + f.Close() + if ioerr != nil { + err = ioerr + return + } + var parsed Manifest + err = yaml.Unmarshal(b, &parsed) + if err != nil { + return + } + manifest = &parsed + return + } + + path = filepath.Join(dir, "plugin.json") + f, ferr := os.Open(path) + if ferr != nil { + if os.IsNotExist(ferr) { + path = "" + } + err = ferr + return + } + defer f.Close() + var parsed Manifest + err = json.NewDecoder(f).Decode(&parsed) + if err != nil { + return + } + manifest = &parsed + return +} diff --git a/plugin/manifest_test.go b/plugin/manifest_test.go new file mode 100644 index 000000000..5dae4fbaa --- /dev/null +++ b/plugin/manifest_test.go @@ -0,0 +1,97 @@ +package plugin + +import ( + "encoding/json" + "gopkg.in/yaml.v2" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFindManifest(t *testing.T) { + for _, tc := range []struct { + Filename string + Contents string + ExpectError bool + ExpectNotExist bool + }{ + {"foo", "bar", true, true}, + {"plugin.json", "bar", true, false}, + {"plugin.json", `{"id": "foo"}`, false, false}, + {"plugin.yaml", `id: foo`, false, false}, + {"plugin.yaml", "bar", true, false}, + {"plugin.yml", `id: foo`, false, false}, + {"plugin.yml", "bar", true, false}, + } { + dir, err := ioutil.TempDir("", "mm-plugin-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + path := filepath.Join(dir, tc.Filename) + f, err := os.Create(path) + require.NoError(t, err) + _, err = f.WriteString(tc.Contents) + f.Close() + require.NoError(t, err) + + m, mpath, err := FindManifest(dir) + assert.True(t, (err != nil) == tc.ExpectError, tc.Filename) + assert.True(t, (err != nil && os.IsNotExist(err)) == tc.ExpectNotExist, tc.Filename) + if !tc.ExpectNotExist { + assert.Equal(t, path, mpath, tc.Filename) + } else { + assert.Empty(t, mpath, tc.Filename) + } + if !tc.ExpectError { + require.NotNil(t, m, tc.Filename) + assert.NotEmpty(t, m.Id, tc.Filename) + } + } +} + +func TestManifestUnmarshal(t *testing.T) { + expected := Manifest{ + Id: "theid", + Backend: &ManifestBackend{ + Executable: "theexecutable", + }, + } + + var yamlResult Manifest + require.NoError(t, yaml.Unmarshal([]byte(` +id: theid +backend: + executable: theexecutable +`), &yamlResult)) + assert.Equal(t, expected, yamlResult) + + var jsonResult Manifest + require.NoError(t, json.Unmarshal([]byte(`{ + "id": "theid", + "backend": { + "executable": "theexecutable" + } + }`), &jsonResult)) + assert.Equal(t, expected, jsonResult) +} + +func TestFindManifest_FileErrors(t *testing.T) { + for _, tc := range []string{"plugin.yaml", "plugin.json"} { + dir, err := ioutil.TempDir("", "mm-plugin-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + path := filepath.Join(dir, tc) + require.NoError(t, os.Mkdir(path, 0700)) + + m, mpath, err := FindManifest(dir) + assert.Nil(t, m) + assert.Equal(t, path, mpath) + assert.Error(t, err, tc) + assert.False(t, os.IsNotExist(err), tc) + } +} diff --git a/plugin/pluginenv/environment.go b/plugin/pluginenv/environment.go new file mode 100644 index 000000000..36a8c6e76 --- /dev/null +++ b/plugin/pluginenv/environment.go @@ -0,0 +1,123 @@ +// Package pluginenv provides high level functionality for discovering and launching plugins. +package pluginenv + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/mattermost/platform/plugin" +) + +type APIProviderFunc func(*plugin.Manifest) (plugin.API, error) +type SupervisorProviderFunc func(*plugin.BundleInfo) (plugin.Supervisor, error) + +// Environment represents an environment that plugins are discovered and launched in. +type Environment struct { + searchPath string + apiProvider APIProviderFunc + supervisorProvider SupervisorProviderFunc + activePlugins map[string]plugin.Supervisor +} + +type Option func(*Environment) + +// Creates a new environment. At a minimum, the APIProvider and SearchPath options are required. +func New(options ...Option) (*Environment, error) { + env := &Environment{ + activePlugins: make(map[string]plugin.Supervisor), + } + for _, opt := range options { + opt(env) + } + if env.supervisorProvider == nil { + env.supervisorProvider = DefaultSupervisorProvider + } + if env.searchPath == "" { + return nil, fmt.Errorf("a search path must be provided") + } else if env.apiProvider == nil { + return nil, fmt.Errorf("an api provider must be provided") + } + return env, nil +} + +// Returns a list of all plugins found within the environment. +func (env *Environment) Plugins() ([]*plugin.BundleInfo, error) { + return ScanSearchPath(env.searchPath) +} + +// Returns the ids of the currently active plugins. +func (env *Environment) ActivePluginIds() (ids []string) { + for id := range env.activePlugins { + ids = append(ids, id) + } + return +} + +// Activates the plugin with the given id. +func (env *Environment) ActivatePlugin(id string) error { + if _, ok := env.activePlugins[id]; ok { + return fmt.Errorf("plugin already active: %v", id) + } + plugins, err := ScanSearchPath(env.searchPath) + if err != nil { + return err + } + var plugin *plugin.BundleInfo + for _, p := range plugins { + if p.Manifest != nil && p.Manifest.Id == id { + if plugin != nil { + return fmt.Errorf("multiple plugins found: %v", id) + } + plugin = p + } + } + if plugin == nil { + return fmt.Errorf("plugin not found: %v", id) + } + supervisor, err := env.supervisorProvider(plugin) + if err != nil { + return errors.Wrapf(err, "unable to create supervisor for plugin: %v", id) + } + api, err := env.apiProvider(plugin.Manifest) + if err != nil { + return errors.Wrapf(err, "unable to get api for plugin: %v", id) + } + if err := supervisor.Start(); err != nil { + return errors.Wrapf(err, "unable to start plugin: %v", id) + } + if err := supervisor.Hooks().OnActivate(api); err != nil { + supervisor.Stop() + return errors.Wrapf(err, "unable to activate plugin: %v", id) + } + env.activePlugins[id] = supervisor + return nil +} + +// Deactivates the plugin with the given id. +func (env *Environment) DeactivatePlugin(id string) error { + if supervisor, ok := env.activePlugins[id]; !ok { + return fmt.Errorf("plugin not active: %v", id) + } else { + delete(env.activePlugins, id) + err := supervisor.Hooks().OnDeactivate() + if serr := supervisor.Stop(); err == nil { + err = serr + } + return err + } +} + +// Deactivates all plugins and gracefully shuts down the environment. +func (env *Environment) Shutdown() (errs []error) { + for _, supervisor := range env.activePlugins { + if err := supervisor.Hooks().OnDeactivate(); err != nil { + errs = append(errs, err) + } + if err := supervisor.Stop(); err != nil { + errs = append(errs, err) + } + } + env.activePlugins = make(map[string]plugin.Supervisor) + return +} diff --git a/plugin/pluginenv/environment_test.go b/plugin/pluginenv/environment_test.go new file mode 100644 index 000000000..d933c8696 --- /dev/null +++ b/plugin/pluginenv/environment_test.go @@ -0,0 +1,291 @@ +package pluginenv + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/plugintest" +) + +type MockProvider struct { + mock.Mock +} + +func (m *MockProvider) API(manifest *plugin.Manifest) (plugin.API, error) { + ret := m.Called() + if ret.Get(0) == nil { + return nil, ret.Error(1) + } + return ret.Get(0).(plugin.API), ret.Error(1) +} + +func (m *MockProvider) Supervisor(bundle *plugin.BundleInfo) (plugin.Supervisor, error) { + ret := m.Called() + if ret.Get(0) == nil { + return nil, ret.Error(1) + } + return ret.Get(0).(plugin.Supervisor), ret.Error(1) +} + +type MockSupervisor struct { + mock.Mock +} + +func (m *MockSupervisor) Start() error { + return m.Called().Error(0) +} + +func (m *MockSupervisor) Stop() error { + return m.Called().Error(0) +} + +func (m *MockSupervisor) Hooks() plugin.Hooks { + return m.Called().Get(0).(plugin.Hooks) +} + +func initTmpDir(t *testing.T, files map[string]string) string { + success := false + dir, err := ioutil.TempDir("", "mm-plugin-test") + require.NoError(t, err) + defer func() { + if !success { + os.RemoveAll(dir) + } + }() + + for name, contents := range files { + path := filepath.Join(dir, name) + parent := filepath.Dir(path) + require.NoError(t, os.MkdirAll(parent, 0700)) + f, err := os.Create(path) + require.NoError(t, err) + _, err = f.WriteString(contents) + f.Close() + require.NoError(t, err) + } + + success = true + return dir +} + +func TestNew_MissingOptions(t *testing.T) { + dir := initTmpDir(t, map[string]string{ + "foo/plugin.json": `{"id": "foo"}`, + }) + defer os.RemoveAll(dir) + + var provider MockProvider + defer provider.AssertExpectations(t) + + env, err := New( + APIProvider(provider.API), + ) + assert.Nil(t, env) + assert.Error(t, err) + + env, err = New( + SearchPath(dir), + ) + assert.Nil(t, env) + assert.Error(t, err) +} + +func TestEnvironment(t *testing.T) { + dir := initTmpDir(t, map[string]string{ + ".foo/plugin.json": `{"id": "foo"}`, + "foo/bar": "asdf", + "foo/plugin.json": `{"id": "foo"}`, + "bar/zxc": "qwer", + "baz/plugin.yaml": "id: baz", + "bad/plugin.json": "asd", + "qwe": "asd", + }) + defer os.RemoveAll(dir) + + var provider MockProvider + defer provider.AssertExpectations(t) + + env, err := New( + SearchPath(dir), + APIProvider(provider.API), + SupervisorProvider(provider.Supervisor), + ) + require.NoError(t, err) + defer env.Shutdown() + + plugins, err := env.Plugins() + assert.NoError(t, err) + assert.Len(t, plugins, 3) + + assert.Error(t, env.ActivatePlugin("x")) + + var api struct{ plugin.API } + var supervisor MockSupervisor + defer supervisor.AssertExpectations(t) + var hooks plugintest.Hooks + defer hooks.AssertExpectations(t) + + provider.On("API").Return(&api, nil) + provider.On("Supervisor").Return(&supervisor, nil) + + supervisor.On("Start").Return(nil) + supervisor.On("Stop").Return(nil) + supervisor.On("Hooks").Return(&hooks) + + hooks.On("OnActivate", &api).Return(nil) + + assert.NoError(t, env.ActivatePlugin("foo")) + assert.Equal(t, env.ActivePluginIds(), []string{"foo"}) + assert.Error(t, env.ActivatePlugin("foo")) + + hooks.On("OnDeactivate").Return(nil) + assert.NoError(t, env.DeactivatePlugin("foo")) + assert.Error(t, env.DeactivatePlugin("foo")) + + assert.NoError(t, env.ActivatePlugin("foo")) + assert.Equal(t, env.ActivePluginIds(), []string{"foo"}) + assert.Empty(t, env.Shutdown()) +} + +func TestEnvironment_DuplicatePluginError(t *testing.T) { + dir := initTmpDir(t, map[string]string{ + "foo/plugin.json": `{"id": "foo"}`, + "foo2/plugin.json": `{"id": "foo"}`, + }) + defer os.RemoveAll(dir) + + var provider MockProvider + defer provider.AssertExpectations(t) + + env, err := New( + SearchPath(dir), + APIProvider(provider.API), + SupervisorProvider(provider.Supervisor), + ) + require.NoError(t, err) + defer env.Shutdown() + + assert.Error(t, env.ActivatePlugin("foo")) + assert.Empty(t, env.ActivePluginIds()) +} + +func TestEnvironment_BadSearchPathError(t *testing.T) { + var provider MockProvider + defer provider.AssertExpectations(t) + + env, err := New( + SearchPath("thissearchpathshouldnotexist!"), + APIProvider(provider.API), + SupervisorProvider(provider.Supervisor), + ) + require.NoError(t, err) + defer env.Shutdown() + + assert.Error(t, env.ActivatePlugin("foo")) + assert.Empty(t, env.ActivePluginIds()) +} + +func TestEnvironment_ActivatePluginErrors(t *testing.T) { + dir := initTmpDir(t, map[string]string{ + "foo/plugin.json": `{"id": "foo"}`, + }) + defer os.RemoveAll(dir) + + var provider MockProvider + + env, err := New( + SearchPath(dir), + APIProvider(provider.API), + SupervisorProvider(provider.Supervisor), + ) + require.NoError(t, err) + defer env.Shutdown() + + var api struct{ plugin.API } + var supervisor MockSupervisor + var hooks plugintest.Hooks + + for name, setup := range map[string]func(){ + "SupervisorProviderError": func() { + provider.On("Supervisor").Return(nil, fmt.Errorf("test error")) + }, + "APIProviderError": func() { + provider.On("API").Return(plugin.API(nil), fmt.Errorf("test error")) + provider.On("Supervisor").Return(&supervisor, nil) + }, + "SupervisorError": func() { + provider.On("API").Return(&api, nil) + provider.On("Supervisor").Return(&supervisor, nil) + + supervisor.On("Start").Return(fmt.Errorf("test error")) + }, + "HooksError": func() { + provider.On("API").Return(&api, nil) + provider.On("Supervisor").Return(&supervisor, nil) + + supervisor.On("Start").Return(nil) + supervisor.On("Stop").Return(nil) + supervisor.On("Hooks").Return(&hooks) + + hooks.On("OnActivate", &api).Return(fmt.Errorf("test error")) + }, + } { + t.Run(name, func(t *testing.T) { + supervisor.Mock = mock.Mock{} + hooks.Mock = mock.Mock{} + provider.Mock = mock.Mock{} + setup() + assert.Error(t, env.ActivatePlugin("foo")) + assert.Empty(t, env.ActivePluginIds()) + supervisor.AssertExpectations(t) + hooks.AssertExpectations(t) + provider.AssertExpectations(t) + }) + } +} + +func TestEnvironment_ShutdownError(t *testing.T) { + dir := initTmpDir(t, map[string]string{ + "foo/plugin.json": `{"id": "foo"}`, + }) + defer os.RemoveAll(dir) + + var provider MockProvider + defer provider.AssertExpectations(t) + + env, err := New( + SearchPath(dir), + APIProvider(provider.API), + SupervisorProvider(provider.Supervisor), + ) + require.NoError(t, err) + defer env.Shutdown() + + var api struct{ plugin.API } + var supervisor MockSupervisor + defer supervisor.AssertExpectations(t) + var hooks plugintest.Hooks + defer hooks.AssertExpectations(t) + + provider.On("API").Return(&api, nil) + provider.On("Supervisor").Return(&supervisor, nil) + + supervisor.On("Start").Return(nil) + supervisor.On("Stop").Return(fmt.Errorf("test error")) + supervisor.On("Hooks").Return(&hooks) + + hooks.On("OnActivate", &api).Return(nil) + hooks.On("OnDeactivate").Return(fmt.Errorf("test error")) + + assert.NoError(t, env.ActivatePlugin("foo")) + assert.Equal(t, env.ActivePluginIds(), []string{"foo"}) + assert.Len(t, env.Shutdown(), 2) +} diff --git a/plugin/pluginenv/options.go b/plugin/pluginenv/options.go new file mode 100644 index 000000000..3f83228fb --- /dev/null +++ b/plugin/pluginenv/options.go @@ -0,0 +1,42 @@ +package pluginenv + +import ( + "fmt" + + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/rpcplugin" +) + +// APIProvider specifies a function that provides an API implementation to each plugin. +func APIProvider(provider APIProviderFunc) Option { + return func(env *Environment) { + env.apiProvider = provider + } +} + +// SupervisorProvider specifies a function that provides a Supervisor implementation to each plugin. +// If unspecified, DefaultSupervisorProvider is used. +func SupervisorProvider(provider SupervisorProviderFunc) Option { + return func(env *Environment) { + env.supervisorProvider = provider + } +} + +// SearchPath specifies a directory that contains the plugins to launch. +func SearchPath(path string) Option { + return func(env *Environment) { + env.searchPath = path + } +} + +// DefaultSupervisorProvider chooses a supervisor based on the plugin's manifest contents. E.g. if +// the manifest specifies a backend executable, it will be given an rpcplugin.Supervisor. +func DefaultSupervisorProvider(bundle *plugin.BundleInfo) (plugin.Supervisor, error) { + if bundle.Manifest == nil { + return nil, fmt.Errorf("a manifest is required") + } + if bundle.Manifest.Backend == nil { + return nil, fmt.Errorf("invalid manifest: at this time, only backend plugins are supported") + } + return rpcplugin.SupervisorProvider(bundle) +} diff --git a/plugin/pluginenv/options_test.go b/plugin/pluginenv/options_test.go new file mode 100644 index 000000000..4f8d411bd --- /dev/null +++ b/plugin/pluginenv/options_test.go @@ -0,0 +1,32 @@ +package pluginenv + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/rpcplugin" +) + +func TestDefaultSupervisorProvider(t *testing.T) { + _, err := DefaultSupervisorProvider(&plugin.BundleInfo{}) + assert.Error(t, err) + + _, err = DefaultSupervisorProvider(&plugin.BundleInfo{ + Manifest: &plugin.Manifest{}, + }) + assert.Error(t, err) + + supervisor, err := DefaultSupervisorProvider(&plugin.BundleInfo{ + Manifest: &plugin.Manifest{ + Backend: &plugin.ManifestBackend{ + Executable: "foo", + }, + }, + }) + require.NoError(t, err) + _, ok := supervisor.(*rpcplugin.Supervisor) + assert.True(t, ok) +} diff --git a/plugin/pluginenv/search_path.go b/plugin/pluginenv/search_path.go new file mode 100644 index 000000000..daebdb0d3 --- /dev/null +++ b/plugin/pluginenv/search_path.go @@ -0,0 +1,32 @@ +package pluginenv + +import ( + "io/ioutil" + "path/filepath" + + "github.com/mattermost/platform/plugin" +) + +// Performs a full scan of the given path. +// +// This function will return info for all subdirectories that appear to be plugins (i.e. all +// subdirectories containing plugin manifest files, regardless of whether they could actually be +// parsed). +// +// Plugins are found non-recursively and paths beginning with a dot are always ignored. +func ScanSearchPath(path string) ([]*plugin.BundleInfo, error) { + files, err := ioutil.ReadDir(path) + if err != nil { + return nil, err + } + var ret []*plugin.BundleInfo + for _, file := range files { + if !file.IsDir() || file.Name()[0] == '.' { + continue + } + if info := plugin.BundleInfoForPath(filepath.Join(path, file.Name())); info.ManifestPath != "" { + ret = append(ret, info) + } + } + return ret, nil +} diff --git a/plugin/pluginenv/search_path_test.go b/plugin/pluginenv/search_path_test.go new file mode 100644 index 000000000..d9a18cf56 --- /dev/null +++ b/plugin/pluginenv/search_path_test.go @@ -0,0 +1,62 @@ +package pluginenv + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/platform/plugin" +) + +func TestScanSearchPath(t *testing.T) { + dir := initTmpDir(t, map[string]string{ + ".foo/plugin.json": `{"id": "foo"}`, + "foo/bar": "asdf", + "foo/plugin.json": `{"id": "foo"}`, + "bar/zxc": "qwer", + "baz/plugin.yaml": "id: baz", + "bad/plugin.json": "asd", + "qwe": "asd", + }) + defer os.RemoveAll(dir) + + plugins, err := ScanSearchPath(dir) + require.NoError(t, err) + assert.Len(t, plugins, 3) + assert.Contains(t, plugins, &plugin.BundleInfo{ + Path: filepath.Join(dir, "foo"), + ManifestPath: filepath.Join(dir, "foo", "plugin.json"), + Manifest: &plugin.Manifest{ + Id: "foo", + }, + }) + assert.Contains(t, plugins, &plugin.BundleInfo{ + Path: filepath.Join(dir, "baz"), + ManifestPath: filepath.Join(dir, "baz", "plugin.yaml"), + Manifest: &plugin.Manifest{ + Id: "baz", + }, + }) + foundError := false + for _, x := range plugins { + if x.ManifestError != nil { + assert.Equal(t, x.Path, filepath.Join(dir, "bad")) + assert.Equal(t, x.ManifestPath, filepath.Join(dir, "bad", "plugin.json")) + syntexError, ok := x.ManifestError.(*json.SyntaxError) + assert.True(t, ok) + assert.EqualValues(t, 1, syntexError.Offset) + foundError = true + } + } + assert.True(t, foundError) +} + +func TestScanSearchPath_Error(t *testing.T) { + plugins, err := ScanSearchPath("not a valid path!") + assert.Nil(t, plugins) + assert.Error(t, err) +} diff --git a/plugin/plugintest/api.go b/plugin/plugintest/api.go new file mode 100644 index 000000000..2f1db88cf --- /dev/null +++ b/plugin/plugintest/api.go @@ -0,0 +1,17 @@ +package plugintest + +import ( + "github.com/stretchr/testify/mock" + + "github.com/mattermost/platform/plugin" +) + +type API struct { + mock.Mock +} + +var _ plugin.API = (*API)(nil) + +func (m *API) LoadPluginConfiguration(dest interface{}) error { + return m.Called(dest).Error(0) +} diff --git a/plugin/plugintest/hooks.go b/plugin/plugintest/hooks.go new file mode 100644 index 000000000..057c705c9 --- /dev/null +++ b/plugin/plugintest/hooks.go @@ -0,0 +1,21 @@ +package plugintest + +import ( + "github.com/stretchr/testify/mock" + + "github.com/mattermost/platform/plugin" +) + +type Hooks struct { + mock.Mock +} + +var _ plugin.Hooks = (*Hooks)(nil) + +func (m *Hooks) OnActivate(api plugin.API) error { + return m.Called(api).Error(0) +} + +func (m *Hooks) OnDeactivate() error { + return m.Called().Error(0) +} diff --git a/plugin/rpcplugin/api.go b/plugin/rpcplugin/api.go new file mode 100644 index 000000000..a807d0837 --- /dev/null +++ b/plugin/rpcplugin/api.go @@ -0,0 +1,62 @@ +package rpcplugin + +import ( + "encoding/json" + "io" + "net/rpc" + + "github.com/mattermost/platform/plugin" +) + +type LocalAPI struct { + api plugin.API + muxer *Muxer +} + +func (h *LocalAPI) LoadPluginConfiguration(args struct{}, reply *[]byte) error { + var config interface{} + if err := h.api.LoadPluginConfiguration(&config); err != nil { + return err + } + b, err := json.Marshal(config) + if err != nil { + return err + } + *reply = b + return nil +} + +type RemoteAPI struct { + client *rpc.Client + muxer *Muxer +} + +func ServeAPI(api plugin.API, conn io.ReadWriteCloser, muxer *Muxer) { + server := rpc.NewServer() + server.Register(&LocalAPI{ + api: api, + muxer: muxer, + }) + server.ServeConn(conn) +} + +var _ plugin.API = (*RemoteAPI)(nil) + +func (h *RemoteAPI) LoadPluginConfiguration(dest interface{}) error { + var config []byte + if err := h.client.Call("LocalAPI.LoadPluginConfiguration", struct{}{}, &config); err != nil { + return err + } + return json.Unmarshal(config, dest) +} + +func (h *RemoteAPI) Close() error { + return h.client.Close() +} + +func ConnectAPI(conn io.ReadWriteCloser, muxer *Muxer) *RemoteAPI { + return &RemoteAPI{ + client: rpc.NewClient(conn), + muxer: muxer, + } +} diff --git a/plugin/rpcplugin/api_test.go b/plugin/rpcplugin/api_test.go new file mode 100644 index 000000000..e55433556 --- /dev/null +++ b/plugin/rpcplugin/api_test.go @@ -0,0 +1,57 @@ +package rpcplugin + +import ( + "encoding/json" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/plugintest" +) + +func testAPIRPC(api plugin.API, f func(plugin.API)) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + c1 := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer c1.Close() + + c2 := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer c2.Close() + + id, server := c1.Serve() + go ServeAPI(api, server, c1) + + remote := ConnectAPI(c2.Connect(id), c2) + defer remote.Close() + + f(remote) +} + +func TestAPI(t *testing.T) { + var api plugintest.API + defer api.AssertExpectations(t) + + type Config struct { + Foo string + Bar struct { + Baz string + } + } + + api.On("LoadPluginConfiguration", mock.MatchedBy(func(x interface{}) bool { return true })).Run(func(args mock.Arguments) { + dest := args.Get(0).(interface{}) + json.Unmarshal([]byte(`{"Foo": "foo", "Bar": {"Baz": "baz"}}`), dest) + }).Return(nil) + + testAPIRPC(&api, func(remote plugin.API) { + var config Config + assert.NoError(t, remote.LoadPluginConfiguration(&config)) + + assert.Equal(t, "foo", config.Foo) + assert.Equal(t, "baz", config.Bar.Baz) + }) +} diff --git a/plugin/rpcplugin/hooks.go b/plugin/rpcplugin/hooks.go new file mode 100644 index 000000000..008730402 --- /dev/null +++ b/plugin/rpcplugin/hooks.go @@ -0,0 +1,77 @@ +package rpcplugin + +import ( + "io" + "net/rpc" + + "github.com/mattermost/platform/plugin" +) + +type LocalHooks struct { + hooks plugin.Hooks + muxer *Muxer + remoteAPI *RemoteAPI +} + +func (h *LocalHooks) OnActivate(args int64, reply *struct{}) error { + stream := h.muxer.Connect(args) + if h.remoteAPI != nil { + h.remoteAPI.Close() + } + h.remoteAPI = ConnectAPI(stream, h.muxer) + return h.hooks.OnActivate(h.remoteAPI) +} + +func (h *LocalHooks) OnDeactivate(args, reply *struct{}) error { + err := h.hooks.OnDeactivate() + if h.remoteAPI != nil { + h.remoteAPI.Close() + h.remoteAPI = nil + } + return err +} + +type RemoteHooks struct { + client *rpc.Client + muxer *Muxer + apiCloser io.Closer +} + +func ServeHooks(hooks plugin.Hooks, conn io.ReadWriteCloser, muxer *Muxer) { + server := rpc.NewServer() + server.Register(&LocalHooks{ + hooks: hooks, + muxer: muxer, + }) + server.ServeConn(conn) +} + +var _ plugin.Hooks = (*RemoteHooks)(nil) + +func (h *RemoteHooks) OnActivate(api plugin.API) error { + id, stream := h.muxer.Serve() + if h.apiCloser != nil { + h.apiCloser.Close() + } + h.apiCloser = stream + go ServeAPI(api, stream, h.muxer) + return h.client.Call("LocalHooks.OnActivate", id, nil) +} + +func (h *RemoteHooks) OnDeactivate() error { + return h.client.Call("LocalHooks.OnDeactivate", struct{}{}, nil) +} + +func (h *RemoteHooks) Close() error { + if h.apiCloser != nil { + h.apiCloser.Close() + } + return h.client.Close() +} + +func ConnectHooks(conn io.ReadWriteCloser, muxer *Muxer) *RemoteHooks { + return &RemoteHooks{ + client: rpc.NewClient(conn), + muxer: muxer, + } +} diff --git a/plugin/rpcplugin/hooks_test.go b/plugin/rpcplugin/hooks_test.go new file mode 100644 index 000000000..fbbbbedeb --- /dev/null +++ b/plugin/rpcplugin/hooks_test.go @@ -0,0 +1,58 @@ +package rpcplugin + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/plugintest" +) + +func testHooksRPC(hooks plugin.Hooks, f func(plugin.Hooks)) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + c1 := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer c1.Close() + + c2 := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer c2.Close() + + id, server := c1.Serve() + go ServeHooks(hooks, server, c1) + + remote := ConnectHooks(c2.Connect(id), c2) + defer remote.Close() + + f(remote) +} + +func TestHooks(t *testing.T) { + var api plugintest.API + var hooks plugintest.Hooks + defer hooks.AssertExpectations(t) + + testHooksRPC(&hooks, func(remote plugin.Hooks) { + hooks.On("OnActivate", mock.AnythingOfType("*rpcplugin.RemoteAPI")).Return(nil) + assert.NoError(t, remote.OnActivate(&api)) + + hooks.On("OnDeactivate").Return(nil) + assert.NoError(t, remote.OnDeactivate()) + }) +} + +func BenchmarkOnDeactivate(b *testing.B) { + var hooks plugintest.Hooks + hooks.On("OnDeactivate").Return(nil) + + testHooksRPC(&hooks, func(remote plugin.Hooks) { + b.ResetTimer() + for n := 0; n < b.N; n++ { + remote.OnDeactivate() + } + b.StopTimer() + }) +} diff --git a/plugin/rpcplugin/io.go b/plugin/rpcplugin/io.go new file mode 100644 index 000000000..f1b2f3c35 --- /dev/null +++ b/plugin/rpcplugin/io.go @@ -0,0 +1,23 @@ +package rpcplugin + +import ( + "io" +) + +type rwc struct { + io.ReadCloser + io.WriteCloser +} + +func (rwc *rwc) Close() error { + rerr := rwc.ReadCloser.Close() + werr := rwc.WriteCloser.Close() + if rerr != nil { + return rerr + } + return werr +} + +func NewReadWriteCloser(r io.ReadCloser, w io.WriteCloser) io.ReadWriteCloser { + return &rwc{r, w} +} diff --git a/plugin/rpcplugin/ipc.go b/plugin/rpcplugin/ipc.go new file mode 100644 index 000000000..3e6c89c4f --- /dev/null +++ b/plugin/rpcplugin/ipc.go @@ -0,0 +1,28 @@ +package rpcplugin + +import ( + "io" + "os" +) + +// Returns a new IPC for the parent process and a set of files to pass on to the child. +// +// The returned files must be closed after the child process is started. +func NewIPC() (io.ReadWriteCloser, []*os.File, error) { + parentReader, childWriter, err := os.Pipe() + if err != nil { + return nil, nil, err + } + childReader, parentWriter, err := os.Pipe() + if err != nil { + parentReader.Close() + childWriter.Close() + return nil, nil, err + } + return NewReadWriteCloser(parentReader, parentWriter), []*os.File{childReader, childWriter}, nil +} + +// Returns the IPC instance inherited by the process from its parent. +func InheritedIPC(fd0, fd1 uintptr) (io.ReadWriteCloser, error) { + return NewReadWriteCloser(os.NewFile(fd0, ""), os.NewFile(fd1, "")), nil +} diff --git a/plugin/rpcplugin/ipc_test.go b/plugin/rpcplugin/ipc_test.go new file mode 100644 index 000000000..bf4df3017 --- /dev/null +++ b/plugin/rpcplugin/ipc_test.go @@ -0,0 +1,61 @@ +package rpcplugin + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIPC(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + pingpong := filepath.Join(dir, "pingpong") + compileGo(t, ` + package main + + import ( + "log" + + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + func main() { + ipc, err := rpcplugin.InheritedProcessIPC() + if err != nil { + log.Fatal("unable to get inherited ipc") + } + defer ipc.Close() + _, err = ipc.Write([]byte("ping")) + if err != nil { + log.Fatal("unable to write to ipc") + } + b := make([]byte, 10) + n, err := ipc.Read(b) + if err != nil { + log.Fatal("unable to read from ipc") + } + if n != 4 || string(b[:4]) != "pong" { + log.Fatal("unexpected response") + } + } + `, pingpong) + + p, ipc, err := NewProcess(context.Background(), pingpong) + require.NoError(t, err) + defer ipc.Close() + b := make([]byte, 10) + n, err := ipc.Read(b) + require.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "ping", string(b[:4])) + _, err = ipc.Write([]byte("pong")) + require.NoError(t, err) + require.NoError(t, p.Wait()) +} diff --git a/plugin/rpcplugin/main.go b/plugin/rpcplugin/main.go new file mode 100644 index 000000000..36177954b --- /dev/null +++ b/plugin/rpcplugin/main.go @@ -0,0 +1,46 @@ +package rpcplugin + +import ( + "bufio" + "encoding/binary" + "fmt" + "log" + "os" + + "github.com/mattermost/platform/plugin" +) + +// Makes a set of hooks available via RPC. This function never returns. +func Main(hooks plugin.Hooks) { + ipc, err := InheritedProcessIPC() + if err != nil { + log.Fatal(err.Error()) + } + muxer := NewMuxer(ipc, true) + id, conn := muxer.Serve() + buf := make([]byte, 11) + buf[0] = 0 + n := binary.PutVarint(buf[1:], id) + if _, err := muxer.Write(buf[:1+n]); err != nil { + log.Fatal(err.Error()) + } + ServeHooks(hooks, conn, muxer) + os.Exit(0) +} + +// Returns the hooks being served by a call to Main. +func ConnectMain(muxer *Muxer) (*RemoteHooks, error) { + buf := make([]byte, 1) + if _, err := muxer.Read(buf); err != nil { + return nil, err + } else if buf[0] != 0 { + return nil, fmt.Errorf("unexpected control byte") + } + reader := bufio.NewReader(muxer) + id, err := binary.ReadVarint(reader) + if err != nil { + return nil, err + } + + return ConnectHooks(muxer.Connect(id), muxer), nil +} diff --git a/plugin/rpcplugin/main_test.go b/plugin/rpcplugin/main_test.go new file mode 100644 index 000000000..b54364bad --- /dev/null +++ b/plugin/rpcplugin/main_test.go @@ -0,0 +1,58 @@ +package rpcplugin + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/platform/plugin/plugintest" +) + +func TestMain(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + plugin := filepath.Join(dir, "plugin") + compileGo(t, ` + package main + + import ( + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + type MyPlugin struct {} + + func (p *MyPlugin) OnActivate(api plugin.API) error { + return nil + } + + func (p *MyPlugin) OnDeactivate() error { + return nil + } + + func main() { + rpcplugin.Main(&MyPlugin{}) + } + `, plugin) + + p, ipc, err := NewProcess(context.Background(), plugin) + require.NoError(t, err) + defer p.Wait() + + muxer := NewMuxer(ipc, false) + defer muxer.Close() + + var api plugintest.API + + hooks, err := ConnectMain(muxer) + require.NoError(t, err) + assert.NoError(t, hooks.OnActivate(&api)) + assert.NoError(t, hooks.OnDeactivate()) +} diff --git a/plugin/rpcplugin/muxer.go b/plugin/rpcplugin/muxer.go new file mode 100644 index 000000000..a2bfbf8b6 --- /dev/null +++ b/plugin/rpcplugin/muxer.go @@ -0,0 +1,253 @@ +package rpcplugin + +import ( + "bufio" + "bytes" + "encoding/binary" + "fmt" + "io" + "sync" + "sync/atomic" +) + +// Muxer allows multiple bidirectional streams to be transmitted over a single connection. +// +// Muxer is safe for use by multiple goroutines. +// +// Streams opened on the muxer must be periodically drained in order to reclaim read buffer memory. +// In other words, readers must consume incoming data as it comes in. +type Muxer struct { + // writeMutex guards conn writes + writeMutex sync.Mutex + conn io.ReadWriteCloser + + // didCloseConn is a boolean (0 or 1) used from multiple goroutines via atomic operations + didCloseConn int32 + + // streamsMutex guards streams and nextId + streamsMutex sync.Mutex + nextId int64 + streams map[int64]*muxerStream + + stream0Reader *io.PipeReader + stream0Writer *io.PipeWriter + result chan error +} + +// Creates a new Muxer. +// +// conn must be safe for simultaneous reads by one goroutine and writes by another. +// +// For two muxers communicating with each other via a connection, parity must be true for exactly +// one of them. +func NewMuxer(conn io.ReadWriteCloser, parity bool) *Muxer { + s0r, s0w := io.Pipe() + muxer := &Muxer{ + conn: conn, + streams: make(map[int64]*muxerStream), + result: make(chan error, 1), + nextId: 1, + stream0Reader: s0r, + stream0Writer: s0w, + } + if parity { + muxer.nextId = 2 + } + go muxer.run() + return muxer +} + +// Opens a new stream with a unique id. +// +// Writes made to the stream before the other end calls Connect will be discarded. +func (m *Muxer) Serve() (int64, io.ReadWriteCloser) { + m.streamsMutex.Lock() + id := m.nextId + m.nextId += 2 + m.streamsMutex.Unlock() + return id, m.Connect(id) +} + +// Opens a remotely opened stream. +func (m *Muxer) Connect(id int64) io.ReadWriteCloser { + m.streamsMutex.Lock() + defer m.streamsMutex.Unlock() + mutex := &sync.Mutex{} + stream := &muxerStream{ + id: id, + muxer: m, + mutex: mutex, + readWake: sync.NewCond(mutex), + } + m.streams[id] = stream + return stream +} + +// Calling Read on the muxer directly performs a read on a dedicated, always-open channel. +func (m *Muxer) Read(p []byte) (int, error) { + return m.stream0Reader.Read(p) +} + +// Calling Write on the muxer directly performs a write on a dedicated, always-open channel. +func (m *Muxer) Write(p []byte) (int, error) { + return m.write(p, 0) +} + +// Closes the muxer. +func (m *Muxer) Close() error { + if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) { + m.conn.Close() + } + m.stream0Reader.Close() + m.stream0Writer.Close() + <-m.result + return nil +} + +func (m *Muxer) IsClosed() bool { + return atomic.LoadInt32(&m.didCloseConn) > 0 +} + +func (m *Muxer) write(p []byte, sid int64) (int, error) { + m.writeMutex.Lock() + defer m.writeMutex.Unlock() + if m.IsClosed() { + return 0, fmt.Errorf("muxer closed") + } + buf := make([]byte, 10) + n := binary.PutVarint(buf, sid) + if _, err := m.conn.Write(buf[:n]); err != nil { + m.shutdown(err) + return 0, err + } + n = binary.PutVarint(buf, int64(len(p))) + if _, err := m.conn.Write(buf[:n]); err != nil { + m.shutdown(err) + return 0, err + } + if _, err := m.conn.Write(p); err != nil { + m.shutdown(err) + return 0, err + } + return len(p), nil +} + +func (m *Muxer) rm(sid int64) { + m.streamsMutex.Lock() + defer m.streamsMutex.Unlock() + delete(m.streams, sid) +} + +func (m *Muxer) run() { + m.shutdown(m.loop()) +} + +func (m *Muxer) loop() error { + reader := bufio.NewReader(m.conn) + + for { + sid, err := binary.ReadVarint(reader) + if err != nil { + return err + } + len, err := binary.ReadVarint(reader) + if err != nil { + return err + } + + if sid == 0 { + if _, err := io.CopyN(m.stream0Writer, reader, len); err != nil { + return err + } + continue + } + + m.streamsMutex.Lock() + stream, ok := m.streams[sid] + m.streamsMutex.Unlock() + if !ok { + if _, err := reader.Discard(int(len)); err != nil { + return err + } + continue + } + + stream.mutex.Lock() + if stream.isClosed { + stream.mutex.Unlock() + if _, err := reader.Discard(int(len)); err != nil { + return err + } + continue + } + _, err = io.CopyN(&stream.readBuf, reader, len) + stream.mutex.Unlock() + if err != nil { + return err + } + stream.readWake.Signal() + } +} + +func (m *Muxer) shutdown(err error) { + if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) { + m.conn.Close() + } + go func() { + m.streamsMutex.Lock() + for _, stream := range m.streams { + stream.mutex.Lock() + stream.readWake.Signal() + stream.mutex.Unlock() + } + m.streams = make(map[int64]*muxerStream) + m.streamsMutex.Unlock() + }() + m.result <- err +} + +type muxerStream struct { + id int64 + muxer *Muxer + readBuf bytes.Buffer + mutex *sync.Mutex + readWake *sync.Cond + isClosed bool + closeErr error +} + +func (s *muxerStream) Read(p []byte) (int, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + for { + if s.muxer.IsClosed() { + return 0, fmt.Errorf("muxer closed") + } else if s.isClosed { + return 0, io.EOF + } else if s.readBuf.Len() > 0 { + n, err := s.readBuf.Read(p) + return n, err + } + s.readWake.Wait() + } +} + +func (s *muxerStream) Write(p []byte) (int, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.isClosed { + return 0, fmt.Errorf("stream closed") + } + return s.muxer.write(p, s.id) +} + +func (s *muxerStream) Close() error { + s.mutex.Lock() + defer s.mutex.Unlock() + if !s.isClosed { + s.isClosed = true + s.muxer.rm(s.id) + } + s.readWake.Signal() + return nil +} diff --git a/plugin/rpcplugin/muxer_test.go b/plugin/rpcplugin/muxer_test.go new file mode 100644 index 000000000..7bb63d4f8 --- /dev/null +++ b/plugin/rpcplugin/muxer_test.go @@ -0,0 +1,169 @@ +package rpcplugin + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMuxer(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + id1, alice1 := alice.Serve() + defer func() { assert.NoError(t, alice1.Close()) }() + + id2, bob2 := bob.Serve() + defer func() { assert.NoError(t, bob2.Close()) }() + + done1 := make(chan bool) + done2 := make(chan bool) + + go func() { + bob1 := bob.Connect(id1) + defer func() { assert.NoError(t, bob1.Close()) }() + + n, err := bob1.Write([]byte("ping1.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + + n, err = bob1.Write([]byte("ping1.1")) + require.NoError(t, err) + assert.Equal(t, n, 7) + }() + + go func() { + alice2 := alice.Connect(id2) + defer func() { assert.NoError(t, alice2.Close()) }() + + n, err := alice2.Write([]byte("ping2.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + + buf := make([]byte, 20) + n, err = alice2.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("pong2.0"), buf[:n]) + + done2 <- true + }() + + go func() { + buf := make([]byte, 7) + n, err := io.ReadFull(alice1, buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping1.0"), buf[:n]) + + n, err = alice1.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping1.1"), buf[:n]) + + done1 <- true + }() + + go func() { + buf := make([]byte, 20) + n, err := bob2.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping2.0"), buf[:n]) + + n, err = bob2.Write([]byte("pong2.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + }() + + <-done1 + <-done2 +} + +// Closing a muxer during a read should unblock, but return an error. +func TestMuxer_CloseDuringRead(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + _, s := alice.Serve() + + go alice.Close() + buf := make([]byte, 20) + n, err := s.Read(buf) + assert.Equal(t, 0, n) + assert.NotNil(t, err) + assert.NotEqual(t, io.EOF, err) +} + +// Closing a stream during a read should unblock and return io.EOF since this is the way to +// gracefully close a connection. +func TestMuxer_StreamCloseDuringRead(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + _, s := alice.Serve() + + go s.Close() + buf := make([]byte, 20) + n, err := s.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, io.EOF, err) +} + +// Closing a muxer during a write should unblock, but return an error. +func TestMuxer_CloseDuringWrite(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + + // Don't connect bob to let writes will block forever. + defer r2.Close() + defer w1.Close() + + _, s := alice.Serve() + + go alice.Close() + buf := make([]byte, 20) + n, err := s.Write(buf) + assert.Equal(t, 0, n) + assert.NotNil(t, err) + assert.NotEqual(t, io.EOF, err) +} + +func TestMuxer_ReadWrite(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + go alice.Write([]byte("hello")) + buf := make([]byte, 20) + n, err := bob.Read(buf) + assert.Equal(t, 5, n) + assert.Nil(t, err) + assert.Equal(t, []byte("hello"), buf[:n]) +} diff --git a/plugin/rpcplugin/process.go b/plugin/rpcplugin/process.go new file mode 100644 index 000000000..4b3362d68 --- /dev/null +++ b/plugin/rpcplugin/process.go @@ -0,0 +1,23 @@ +package rpcplugin + +import ( + "context" + "io" +) + +type Process interface { + // Waits for the process to exit and returns an error if a problem occurred or the process exited + // with a non-zero status. + Wait() error +} + +// NewProcess launches an RPC executable in a new process and returns an IPC that can be used to +// communicate with it. +func NewProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) { + return newProcess(ctx, path) +} + +// When called on a process launched with NewProcess, returns the inherited IPC. +func InheritedProcessIPC() (io.ReadWriteCloser, error) { + return inheritedProcessIPC() +} diff --git a/plugin/rpcplugin/process_test.go b/plugin/rpcplugin/process_test.go new file mode 100644 index 000000000..b7984ad0a --- /dev/null +++ b/plugin/rpcplugin/process_test.go @@ -0,0 +1,64 @@ +package rpcplugin + +import ( + "context" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func compileGo(t *testing.T, sourceCode, outputPath string) { + dir, err := ioutil.TempDir(".", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "main.go"), []byte(sourceCode), 0600)) + cmd := exec.Command("go", "build", "-o", outputPath, "main.go") + cmd.Dir = dir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + require.NoError(t, cmd.Run()) +} + +func TestProcess(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + ping := filepath.Join(dir, "ping") + compileGo(t, ` + package main + + import ( + "log" + + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + func main() { + ipc, err := rpcplugin.InheritedProcessIPC() + if err != nil { + log.Fatal("unable to get inherited ipc") + } + defer ipc.Close() + _, err = ipc.Write([]byte("ping")) + if err != nil { + log.Fatal("unable to write to ipc") + } + } + `, ping) + + p, ipc, err := NewProcess(context.Background(), ping) + require.NoError(t, err) + defer ipc.Close() + b := make([]byte, 10) + n, err := ipc.Read(b) + require.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "ping", string(b[:4])) + require.NoError(t, p.Wait()) +} diff --git a/plugin/rpcplugin/process_unix.go b/plugin/rpcplugin/process_unix.go new file mode 100644 index 000000000..f196e34f8 --- /dev/null +++ b/plugin/rpcplugin/process_unix.go @@ -0,0 +1,45 @@ +// +build !windows + +package rpcplugin + +import ( + "context" + "io" + "os" + "os/exec" +) + +type process struct { + command *exec.Cmd +} + +func newProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) { + ipc, childFiles, err := NewIPC() + if err != nil { + return nil, nil, err + } + defer childFiles[0].Close() + defer childFiles[1].Close() + + cmd := exec.CommandContext(ctx, path) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.ExtraFiles = childFiles + err = cmd.Start() + if err != nil { + ipc.Close() + return nil, nil, err + } + + return &process{ + command: cmd, + }, ipc, nil +} + +func (p *process) Wait() error { + return p.command.Wait() +} + +func inheritedProcessIPC() (io.ReadWriteCloser, error) { + return InheritedIPC(3, 4) +} diff --git a/plugin/rpcplugin/process_windows.go b/plugin/rpcplugin/process_windows.go new file mode 100644 index 000000000..7be03cacd --- /dev/null +++ b/plugin/rpcplugin/process_windows.go @@ -0,0 +1,17 @@ +package rpcplugin + +import ( + "context" + "fmt" + "io" +) + +func newProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) { + // TODO + return nil, nil, fmt.Errorf("not yet supported") +} + +func inheritedProcessIPC() (*IPC, error) { + // TODO + return nil, fmt.Errorf("not yet supported") +} diff --git a/plugin/rpcplugin/supervisor.go b/plugin/rpcplugin/supervisor.go new file mode 100644 index 000000000..9316d7186 --- /dev/null +++ b/plugin/rpcplugin/supervisor.go @@ -0,0 +1,128 @@ +package rpcplugin + +import ( + "context" + "fmt" + "path/filepath" + "sync/atomic" + "time" + + "github.com/mattermost/platform/plugin" +) + +// Supervisor implements a plugin.Supervisor that launches the plugin in a separate process and +// communicates via RPC. +// +// If the plugin unexpectedly exists, the supervisor will relaunch it after a short delay. +type Supervisor struct { + executable string + hooks atomic.Value + done chan bool + cancel context.CancelFunc +} + +var _ plugin.Supervisor = (*Supervisor)(nil) + +// Starts the plugin. This method will block until the plugin is successfully launched for the first +// time and will return an error if the plugin cannot be launched at all. +func (s *Supervisor) Start() error { + ctx, cancel := context.WithCancel(context.Background()) + s.done = make(chan bool, 1) + start := make(chan error, 1) + go s.run(ctx, start) + + select { + case <-time.After(time.Second * 3): + cancel() + <-s.done + return fmt.Errorf("timed out waiting for plugin") + case err := <-start: + s.cancel = cancel + return err + } +} + +// Stops the plugin. +func (s *Supervisor) Stop() error { + s.cancel() + <-s.done + return nil +} + +// Returns the hooks used to communicate with the plugin. The hooks may change if the plugin is +// restarted, so the return value should not be cached. +func (s *Supervisor) Hooks() plugin.Hooks { + return s.hooks.Load().(plugin.Hooks) +} + +func (s *Supervisor) run(ctx context.Context, start chan<- error) { + defer func() { + s.done <- true + }() + done := ctx.Done() + for { + s.runPlugin(ctx, start) + select { + case <-done: + return + default: + start = nil + time.Sleep(time.Second) + } + } +} + +func (s *Supervisor) runPlugin(ctx context.Context, start chan<- error) error { + p, ipc, err := NewProcess(ctx, s.executable) + if err != nil { + if start != nil { + start <- err + } + return err + } + + muxer := NewMuxer(ipc, false) + closeMuxer := make(chan bool, 1) + muxerClosed := make(chan error, 1) + go func() { + select { + case <-ctx.Done(): + break + case <-closeMuxer: + break + } + muxerClosed <- muxer.Close() + }() + + hooks, err := ConnectMain(muxer) + if err != nil { + if start != nil { + start <- err + } + closeMuxer <- true + <-muxerClosed + p.Wait() + return err + } + + s.hooks.Store(hooks) + if start != nil { + start <- nil + } + p.Wait() + closeMuxer <- true + <-muxerClosed + + return nil +} + +func SupervisorProvider(bundle *plugin.BundleInfo) (plugin.Supervisor, error) { + if bundle.Manifest == nil { + return nil, fmt.Errorf("no manifest available") + } else if bundle.Manifest.Backend == nil || bundle.Manifest.Backend.Executable == "" { + return nil, fmt.Errorf("no backend executable specified") + } + return &Supervisor{ + executable: filepath.Join(bundle.Path, bundle.Manifest.Backend.Executable), + }, nil +} diff --git a/plugin/rpcplugin/supervisor_test.go b/plugin/rpcplugin/supervisor_test.go new file mode 100644 index 000000000..1d046bf82 --- /dev/null +++ b/plugin/rpcplugin/supervisor_test.go @@ -0,0 +1,130 @@ +package rpcplugin + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/platform/plugin" +) + +func TestSupervisor(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + backend := filepath.Join(dir, "backend") + compileGo(t, ` + package main + + import ( + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + type MyPlugin struct {} + + func (p *MyPlugin) OnActivate(api plugin.API) error { + return nil + } + + func (p *MyPlugin) OnDeactivate() error { + return nil + } + + func main() { + rpcplugin.Main(&MyPlugin{}) + } + `, backend) + + ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600) + + bundle := plugin.BundleInfoForPath(dir) + supervisor, err := SupervisorProvider(bundle) + require.NoError(t, err) + require.NoError(t, supervisor.Start()) + require.NoError(t, supervisor.Hooks().OnActivate(nil)) + require.NoError(t, supervisor.Stop()) +} + +// If plugin development goes really wrong, let's make sure plugin activation won't block forever. +func TestSupervisor_StartTimeout(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + backend := filepath.Join(dir, "backend") + compileGo(t, ` + package main + + func main() { + for { + } + } + `, backend) + + ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600) + + bundle := plugin.BundleInfoForPath(dir) + supervisor, err := SupervisorProvider(bundle) + require.NoError(t, err) + require.Error(t, supervisor.Start()) +} + +// Crashed plugins should be relaunched. +func TestSupervisor_PluginCrash(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + backend := filepath.Join(dir, "backend") + compileGo(t, ` + package main + + import ( + "os" + + "github.com/mattermost/platform/plugin" + "github.com/mattermost/platform/plugin/rpcplugin" + ) + + type MyPlugin struct {} + + func (p *MyPlugin) OnActivate(api plugin.API) error { + os.Exit(1) + return nil + } + + func (p *MyPlugin) OnDeactivate() error { + return nil + } + + func main() { + rpcplugin.Main(&MyPlugin{}) + } + `, backend) + + ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600) + + bundle := plugin.BundleInfoForPath(dir) + supervisor, err := SupervisorProvider(bundle) + require.NoError(t, err) + require.NoError(t, supervisor.Start()) + require.Error(t, supervisor.Hooks().OnActivate(nil)) + + recovered := false + for i := 0; i < 30; i++ { + if supervisor.Hooks().OnDeactivate() == nil { + recovered = true + break + } + time.Sleep(time.Millisecond * 100) + } + assert.True(t, recovered) + require.NoError(t, supervisor.Stop()) +} diff --git a/plugin/supervisor.go b/plugin/supervisor.go new file mode 100644 index 000000000..5ddf5f169 --- /dev/null +++ b/plugin/supervisor.go @@ -0,0 +1,8 @@ +package plugin + +// Supervisor provides the interface for an object that controls the execution of a plugin. +type Supervisor interface { + Start() error + Stop() error + Hooks() Hooks +} -- cgit v1.2.3-1-g7c22