summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris <ccbrown112@gmail.com>2017-08-16 17:23:38 -0500
committerGitHub <noreply@github.com>2017-08-16 17:23:38 -0500
commitf80d50adbddf55a043dfcab5b47d7c1e22749b7d (patch)
tree5deb606debb6322716c9cdcc6c58be4f68b74223
parent4f85ed985d478ddf6692fa4f7d8d98d2a412d18c (diff)
downloadchat-f80d50adbddf55a043dfcab5b47d7c1e22749b7d.tar.gz
chat-f80d50adbddf55a043dfcab5b47d7c1e22749b7d.tar.bz2
chat-f80d50adbddf55a043dfcab5b47d7c1e22749b7d.zip
PLT-7407: Back-end plugin mechanism (#7177)
* begin backend plugin wip * flesh out rpcplugin. everything done except for minor supervisor stubs * done with basic plugin infrastructure * simplify tests * remove unused test lines
-rw-r--r--.gitignore3
-rw-r--r--plugin/api.go7
-rw-r--r--plugin/bundle_info.go20
-rw-r--r--plugin/bundle_info_test.go30
-rw-r--r--plugin/hooks.go10
-rw-r--r--plugin/manifest.go70
-rw-r--r--plugin/manifest_test.go97
-rw-r--r--plugin/pluginenv/environment.go123
-rw-r--r--plugin/pluginenv/environment_test.go291
-rw-r--r--plugin/pluginenv/options.go42
-rw-r--r--plugin/pluginenv/options_test.go32
-rw-r--r--plugin/pluginenv/search_path.go32
-rw-r--r--plugin/pluginenv/search_path_test.go62
-rw-r--r--plugin/plugintest/api.go17
-rw-r--r--plugin/plugintest/hooks.go21
-rw-r--r--plugin/rpcplugin/api.go62
-rw-r--r--plugin/rpcplugin/api_test.go57
-rw-r--r--plugin/rpcplugin/hooks.go77
-rw-r--r--plugin/rpcplugin/hooks_test.go58
-rw-r--r--plugin/rpcplugin/io.go23
-rw-r--r--plugin/rpcplugin/ipc.go28
-rw-r--r--plugin/rpcplugin/ipc_test.go61
-rw-r--r--plugin/rpcplugin/main.go46
-rw-r--r--plugin/rpcplugin/main_test.go58
-rw-r--r--plugin/rpcplugin/muxer.go253
-rw-r--r--plugin/rpcplugin/muxer_test.go169
-rw-r--r--plugin/rpcplugin/process.go23
-rw-r--r--plugin/rpcplugin/process_test.go64
-rw-r--r--plugin/rpcplugin/process_unix.go45
-rw-r--r--plugin/rpcplugin/process_windows.go17
-rw-r--r--plugin/rpcplugin/supervisor.go128
-rw-r--r--plugin/rpcplugin/supervisor_test.go130
-rw-r--r--plugin/supervisor.go8
33 files changed, 2163 insertions, 1 deletions
diff --git a/.gitignore b/.gitignore
index b9a6a4bcd..e3646b174 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,6 +16,7 @@ web/static/js/libs*.js
config/active.dat
config/config.json
+/plugins
# Enteprise imports file
imports/imports.go
@@ -90,4 +91,4 @@ webapp/coverage
tags
.idea
-debug \ No newline at end of file
+debug
diff --git a/plugin/api.go b/plugin/api.go
new file mode 100644
index 000000000..c62ae0f55
--- /dev/null
+++ b/plugin/api.go
@@ -0,0 +1,7 @@
+package plugin
+
+type API interface {
+ // LoadPluginConfiguration loads the plugin's configuration. dest should be a pointer to a
+ // struct that the configuration JSON can be unmarshalled to.
+ LoadPluginConfiguration(dest interface{}) error
+}
diff --git a/plugin/bundle_info.go b/plugin/bundle_info.go
new file mode 100644
index 000000000..9dc47ceea
--- /dev/null
+++ b/plugin/bundle_info.go
@@ -0,0 +1,20 @@
+package plugin
+
+type BundleInfo struct {
+ Path string
+
+ Manifest *Manifest
+ ManifestPath string
+ ManifestError error
+}
+
+// Returns bundle info for the given path. The return value is never nil.
+func BundleInfoForPath(path string) *BundleInfo {
+ m, mpath, err := FindManifest(path)
+ return &BundleInfo{
+ Path: path,
+ Manifest: m,
+ ManifestPath: mpath,
+ ManifestError: err,
+ }
+}
diff --git a/plugin/bundle_info_test.go b/plugin/bundle_info_test.go
new file mode 100644
index 000000000..94a0c624f
--- /dev/null
+++ b/plugin/bundle_info_test.go
@@ -0,0 +1,30 @@
+package plugin
+
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestBundleInfoForPath(t *testing.T) {
+ dir, err := ioutil.TempDir("", "mm-plugin-test")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ path := filepath.Join(dir, "plugin.json")
+ f, err := os.Create(path)
+ require.NoError(t, err)
+ _, err = f.WriteString(`{"id": "foo"}`)
+ f.Close()
+ require.NoError(t, err)
+
+ info := BundleInfoForPath(dir)
+ assert.Equal(t, info.Path, dir)
+ assert.NotNil(t, info.Manifest)
+ assert.Equal(t, info.ManifestPath, path)
+ assert.Nil(t, info.ManifestError)
+}
diff --git a/plugin/hooks.go b/plugin/hooks.go
new file mode 100644
index 000000000..28a762a1a
--- /dev/null
+++ b/plugin/hooks.go
@@ -0,0 +1,10 @@
+package plugin
+
+type Hooks interface {
+ // OnActivate is invoked when the plugin is activated.
+ OnActivate(API) error
+
+ // OnDeactivate is invoked when the plugin is deactivated. This is the plugin's last chance to
+ // use the API, and the plugin will be terminated shortly after this invocation.
+ OnDeactivate() error
+}
diff --git a/plugin/manifest.go b/plugin/manifest.go
new file mode 100644
index 000000000..15b7f0555
--- /dev/null
+++ b/plugin/manifest.go
@@ -0,0 +1,70 @@
+package plugin
+
+import (
+ "encoding/json"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+
+ "gopkg.in/yaml.v2"
+)
+
+type Manifest struct {
+ Id string `json:"id" yaml:"id"`
+ Backend *ManifestBackend `json:"backend,omitempty" yaml:"backend,omitempty"`
+}
+
+type ManifestBackend struct {
+ Executable string `json:"executable" yaml:"executable"`
+}
+
+// FindManifest will find and parse the manifest in a given directory.
+//
+// In all cases other than a does-not-exist error, path is set to the path of the manifest file that was
+// found.
+//
+// Manifests are JSON or YAML files named plugin.json, plugin.yaml, or plugin.yml.
+func FindManifest(dir string) (manifest *Manifest, path string, err error) {
+ for _, name := range []string{"plugin.yml", "plugin.yaml"} {
+ path = filepath.Join(dir, name)
+ f, ferr := os.Open(path)
+ if ferr != nil {
+ if !os.IsNotExist(ferr) {
+ err = ferr
+ return
+ }
+ continue
+ }
+ b, ioerr := ioutil.ReadAll(f)
+ f.Close()
+ if ioerr != nil {
+ err = ioerr
+ return
+ }
+ var parsed Manifest
+ err = yaml.Unmarshal(b, &parsed)
+ if err != nil {
+ return
+ }
+ manifest = &parsed
+ return
+ }
+
+ path = filepath.Join(dir, "plugin.json")
+ f, ferr := os.Open(path)
+ if ferr != nil {
+ if os.IsNotExist(ferr) {
+ path = ""
+ }
+ err = ferr
+ return
+ }
+ defer f.Close()
+ var parsed Manifest
+ err = json.NewDecoder(f).Decode(&parsed)
+ if err != nil {
+ return
+ }
+ manifest = &parsed
+ return
+}
diff --git a/plugin/manifest_test.go b/plugin/manifest_test.go
new file mode 100644
index 000000000..5dae4fbaa
--- /dev/null
+++ b/plugin/manifest_test.go
@@ -0,0 +1,97 @@
+package plugin
+
+import (
+ "encoding/json"
+ "gopkg.in/yaml.v2"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestFindManifest(t *testing.T) {
+ for _, tc := range []struct {
+ Filename string
+ Contents string
+ ExpectError bool
+ ExpectNotExist bool
+ }{
+ {"foo", "bar", true, true},
+ {"plugin.json", "bar", true, false},
+ {"plugin.json", `{"id": "foo"}`, false, false},
+ {"plugin.yaml", `id: foo`, false, false},
+ {"plugin.yaml", "bar", true, false},
+ {"plugin.yml", `id: foo`, false, false},
+ {"plugin.yml", "bar", true, false},
+ } {
+ dir, err := ioutil.TempDir("", "mm-plugin-test")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ path := filepath.Join(dir, tc.Filename)
+ f, err := os.Create(path)
+ require.NoError(t, err)
+ _, err = f.WriteString(tc.Contents)
+ f.Close()
+ require.NoError(t, err)
+
+ m, mpath, err := FindManifest(dir)
+ assert.True(t, (err != nil) == tc.ExpectError, tc.Filename)
+ assert.True(t, (err != nil && os.IsNotExist(err)) == tc.ExpectNotExist, tc.Filename)
+ if !tc.ExpectNotExist {
+ assert.Equal(t, path, mpath, tc.Filename)
+ } else {
+ assert.Empty(t, mpath, tc.Filename)
+ }
+ if !tc.ExpectError {
+ require.NotNil(t, m, tc.Filename)
+ assert.NotEmpty(t, m.Id, tc.Filename)
+ }
+ }
+}
+
+func TestManifestUnmarshal(t *testing.T) {
+ expected := Manifest{
+ Id: "theid",
+ Backend: &ManifestBackend{
+ Executable: "theexecutable",
+ },
+ }
+
+ var yamlResult Manifest
+ require.NoError(t, yaml.Unmarshal([]byte(`
+id: theid
+backend:
+ executable: theexecutable
+`), &yamlResult))
+ assert.Equal(t, expected, yamlResult)
+
+ var jsonResult Manifest
+ require.NoError(t, json.Unmarshal([]byte(`{
+ "id": "theid",
+ "backend": {
+ "executable": "theexecutable"
+ }
+ }`), &jsonResult))
+ assert.Equal(t, expected, jsonResult)
+}
+
+func TestFindManifest_FileErrors(t *testing.T) {
+ for _, tc := range []string{"plugin.yaml", "plugin.json"} {
+ dir, err := ioutil.TempDir("", "mm-plugin-test")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ path := filepath.Join(dir, tc)
+ require.NoError(t, os.Mkdir(path, 0700))
+
+ m, mpath, err := FindManifest(dir)
+ assert.Nil(t, m)
+ assert.Equal(t, path, mpath)
+ assert.Error(t, err, tc)
+ assert.False(t, os.IsNotExist(err), tc)
+ }
+}
diff --git a/plugin/pluginenv/environment.go b/plugin/pluginenv/environment.go
new file mode 100644
index 000000000..36a8c6e76
--- /dev/null
+++ b/plugin/pluginenv/environment.go
@@ -0,0 +1,123 @@
+// Package pluginenv provides high level functionality for discovering and launching plugins.
+package pluginenv
+
+import (
+ "fmt"
+
+ "github.com/pkg/errors"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+type APIProviderFunc func(*plugin.Manifest) (plugin.API, error)
+type SupervisorProviderFunc func(*plugin.BundleInfo) (plugin.Supervisor, error)
+
+// Environment represents an environment that plugins are discovered and launched in.
+type Environment struct {
+ searchPath string
+ apiProvider APIProviderFunc
+ supervisorProvider SupervisorProviderFunc
+ activePlugins map[string]plugin.Supervisor
+}
+
+type Option func(*Environment)
+
+// Creates a new environment. At a minimum, the APIProvider and SearchPath options are required.
+func New(options ...Option) (*Environment, error) {
+ env := &Environment{
+ activePlugins: make(map[string]plugin.Supervisor),
+ }
+ for _, opt := range options {
+ opt(env)
+ }
+ if env.supervisorProvider == nil {
+ env.supervisorProvider = DefaultSupervisorProvider
+ }
+ if env.searchPath == "" {
+ return nil, fmt.Errorf("a search path must be provided")
+ } else if env.apiProvider == nil {
+ return nil, fmt.Errorf("an api provider must be provided")
+ }
+ return env, nil
+}
+
+// Returns a list of all plugins found within the environment.
+func (env *Environment) Plugins() ([]*plugin.BundleInfo, error) {
+ return ScanSearchPath(env.searchPath)
+}
+
+// Returns the ids of the currently active plugins.
+func (env *Environment) ActivePluginIds() (ids []string) {
+ for id := range env.activePlugins {
+ ids = append(ids, id)
+ }
+ return
+}
+
+// Activates the plugin with the given id.
+func (env *Environment) ActivatePlugin(id string) error {
+ if _, ok := env.activePlugins[id]; ok {
+ return fmt.Errorf("plugin already active: %v", id)
+ }
+ plugins, err := ScanSearchPath(env.searchPath)
+ if err != nil {
+ return err
+ }
+ var plugin *plugin.BundleInfo
+ for _, p := range plugins {
+ if p.Manifest != nil && p.Manifest.Id == id {
+ if plugin != nil {
+ return fmt.Errorf("multiple plugins found: %v", id)
+ }
+ plugin = p
+ }
+ }
+ if plugin == nil {
+ return fmt.Errorf("plugin not found: %v", id)
+ }
+ supervisor, err := env.supervisorProvider(plugin)
+ if err != nil {
+ return errors.Wrapf(err, "unable to create supervisor for plugin: %v", id)
+ }
+ api, err := env.apiProvider(plugin.Manifest)
+ if err != nil {
+ return errors.Wrapf(err, "unable to get api for plugin: %v", id)
+ }
+ if err := supervisor.Start(); err != nil {
+ return errors.Wrapf(err, "unable to start plugin: %v", id)
+ }
+ if err := supervisor.Hooks().OnActivate(api); err != nil {
+ supervisor.Stop()
+ return errors.Wrapf(err, "unable to activate plugin: %v", id)
+ }
+ env.activePlugins[id] = supervisor
+ return nil
+}
+
+// Deactivates the plugin with the given id.
+func (env *Environment) DeactivatePlugin(id string) error {
+ if supervisor, ok := env.activePlugins[id]; !ok {
+ return fmt.Errorf("plugin not active: %v", id)
+ } else {
+ delete(env.activePlugins, id)
+ err := supervisor.Hooks().OnDeactivate()
+ if serr := supervisor.Stop(); err == nil {
+ err = serr
+ }
+ return err
+ }
+}
+
+// Deactivates all plugins and gracefully shuts down the environment.
+func (env *Environment) Shutdown() (errs []error) {
+ for _, supervisor := range env.activePlugins {
+ if err := supervisor.Hooks().OnDeactivate(); err != nil {
+ errs = append(errs, err)
+ }
+ if err := supervisor.Stop(); err != nil {
+ errs = append(errs, err)
+ }
+ }
+ env.activePlugins = make(map[string]plugin.Supervisor)
+ return
+}
diff --git a/plugin/pluginenv/environment_test.go b/plugin/pluginenv/environment_test.go
new file mode 100644
index 000000000..d933c8696
--- /dev/null
+++ b/plugin/pluginenv/environment_test.go
@@ -0,0 +1,291 @@
+package pluginenv
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+
+ "github.com/mattermost/platform/plugin"
+ "github.com/mattermost/platform/plugin/plugintest"
+)
+
+type MockProvider struct {
+ mock.Mock
+}
+
+func (m *MockProvider) API(manifest *plugin.Manifest) (plugin.API, error) {
+ ret := m.Called()
+ if ret.Get(0) == nil {
+ return nil, ret.Error(1)
+ }
+ return ret.Get(0).(plugin.API), ret.Error(1)
+}
+
+func (m *MockProvider) Supervisor(bundle *plugin.BundleInfo) (plugin.Supervisor, error) {
+ ret := m.Called()
+ if ret.Get(0) == nil {
+ return nil, ret.Error(1)
+ }
+ return ret.Get(0).(plugin.Supervisor), ret.Error(1)
+}
+
+type MockSupervisor struct {
+ mock.Mock
+}
+
+func (m *MockSupervisor) Start() error {
+ return m.Called().Error(0)
+}
+
+func (m *MockSupervisor) Stop() error {
+ return m.Called().Error(0)
+}
+
+func (m *MockSupervisor) Hooks() plugin.Hooks {
+ return m.Called().Get(0).(plugin.Hooks)
+}
+
+func initTmpDir(t *testing.T, files map[string]string) string {
+ success := false
+ dir, err := ioutil.TempDir("", "mm-plugin-test")
+ require.NoError(t, err)
+ defer func() {
+ if !success {
+ os.RemoveAll(dir)
+ }
+ }()
+
+ for name, contents := range files {
+ path := filepath.Join(dir, name)
+ parent := filepath.Dir(path)
+ require.NoError(t, os.MkdirAll(parent, 0700))
+ f, err := os.Create(path)
+ require.NoError(t, err)
+ _, err = f.WriteString(contents)
+ f.Close()
+ require.NoError(t, err)
+ }
+
+ success = true
+ return dir
+}
+
+func TestNew_MissingOptions(t *testing.T) {
+ dir := initTmpDir(t, map[string]string{
+ "foo/plugin.json": `{"id": "foo"}`,
+ })
+ defer os.RemoveAll(dir)
+
+ var provider MockProvider
+ defer provider.AssertExpectations(t)
+
+ env, err := New(
+ APIProvider(provider.API),
+ )
+ assert.Nil(t, env)
+ assert.Error(t, err)
+
+ env, err = New(
+ SearchPath(dir),
+ )
+ assert.Nil(t, env)
+ assert.Error(t, err)
+}
+
+func TestEnvironment(t *testing.T) {
+ dir := initTmpDir(t, map[string]string{
+ ".foo/plugin.json": `{"id": "foo"}`,
+ "foo/bar": "asdf",
+ "foo/plugin.json": `{"id": "foo"}`,
+ "bar/zxc": "qwer",
+ "baz/plugin.yaml": "id: baz",
+ "bad/plugin.json": "asd",
+ "qwe": "asd",
+ })
+ defer os.RemoveAll(dir)
+
+ var provider MockProvider
+ defer provider.AssertExpectations(t)
+
+ env, err := New(
+ SearchPath(dir),
+ APIProvider(provider.API),
+ SupervisorProvider(provider.Supervisor),
+ )
+ require.NoError(t, err)
+ defer env.Shutdown()
+
+ plugins, err := env.Plugins()
+ assert.NoError(t, err)
+ assert.Len(t, plugins, 3)
+
+ assert.Error(t, env.ActivatePlugin("x"))
+
+ var api struct{ plugin.API }
+ var supervisor MockSupervisor
+ defer supervisor.AssertExpectations(t)
+ var hooks plugintest.Hooks
+ defer hooks.AssertExpectations(t)
+
+ provider.On("API").Return(&api, nil)
+ provider.On("Supervisor").Return(&supervisor, nil)
+
+ supervisor.On("Start").Return(nil)
+ supervisor.On("Stop").Return(nil)
+ supervisor.On("Hooks").Return(&hooks)
+
+ hooks.On("OnActivate", &api).Return(nil)
+
+ assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
+ assert.Error(t, env.ActivatePlugin("foo"))
+
+ hooks.On("OnDeactivate").Return(nil)
+ assert.NoError(t, env.DeactivatePlugin("foo"))
+ assert.Error(t, env.DeactivatePlugin("foo"))
+
+ assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
+ assert.Empty(t, env.Shutdown())
+}
+
+func TestEnvironment_DuplicatePluginError(t *testing.T) {
+ dir := initTmpDir(t, map[string]string{
+ "foo/plugin.json": `{"id": "foo"}`,
+ "foo2/plugin.json": `{"id": "foo"}`,
+ })
+ defer os.RemoveAll(dir)
+
+ var provider MockProvider
+ defer provider.AssertExpectations(t)
+
+ env, err := New(
+ SearchPath(dir),
+ APIProvider(provider.API),
+ SupervisorProvider(provider.Supervisor),
+ )
+ require.NoError(t, err)
+ defer env.Shutdown()
+
+ assert.Error(t, env.ActivatePlugin("foo"))
+ assert.Empty(t, env.ActivePluginIds())
+}
+
+func TestEnvironment_BadSearchPathError(t *testing.T) {
+ var provider MockProvider
+ defer provider.AssertExpectations(t)
+
+ env, err := New(
+ SearchPath("thissearchpathshouldnotexist!"),
+ APIProvider(provider.API),
+ SupervisorProvider(provider.Supervisor),
+ )
+ require.NoError(t, err)
+ defer env.Shutdown()
+
+ assert.Error(t, env.ActivatePlugin("foo"))
+ assert.Empty(t, env.ActivePluginIds())
+}
+
+func TestEnvironment_ActivatePluginErrors(t *testing.T) {
+ dir := initTmpDir(t, map[string]string{
+ "foo/plugin.json": `{"id": "foo"}`,
+ })
+ defer os.RemoveAll(dir)
+
+ var provider MockProvider
+
+ env, err := New(
+ SearchPath(dir),
+ APIProvider(provider.API),
+ SupervisorProvider(provider.Supervisor),
+ )
+ require.NoError(t, err)
+ defer env.Shutdown()
+
+ var api struct{ plugin.API }
+ var supervisor MockSupervisor
+ var hooks plugintest.Hooks
+
+ for name, setup := range map[string]func(){
+ "SupervisorProviderError": func() {
+ provider.On("Supervisor").Return(nil, fmt.Errorf("test error"))
+ },
+ "APIProviderError": func() {
+ provider.On("API").Return(plugin.API(nil), fmt.Errorf("test error"))
+ provider.On("Supervisor").Return(&supervisor, nil)
+ },
+ "SupervisorError": func() {
+ provider.On("API").Return(&api, nil)
+ provider.On("Supervisor").Return(&supervisor, nil)
+
+ supervisor.On("Start").Return(fmt.Errorf("test error"))
+ },
+ "HooksError": func() {
+ provider.On("API").Return(&api, nil)
+ provider.On("Supervisor").Return(&supervisor, nil)
+
+ supervisor.On("Start").Return(nil)
+ supervisor.On("Stop").Return(nil)
+ supervisor.On("Hooks").Return(&hooks)
+
+ hooks.On("OnActivate", &api).Return(fmt.Errorf("test error"))
+ },
+ } {
+ t.Run(name, func(t *testing.T) {
+ supervisor.Mock = mock.Mock{}
+ hooks.Mock = mock.Mock{}
+ provider.Mock = mock.Mock{}
+ setup()
+ assert.Error(t, env.ActivatePlugin("foo"))
+ assert.Empty(t, env.ActivePluginIds())
+ supervisor.AssertExpectations(t)
+ hooks.AssertExpectations(t)
+ provider.AssertExpectations(t)
+ })
+ }
+}
+
+func TestEnvironment_ShutdownError(t *testing.T) {
+ dir := initTmpDir(t, map[string]string{
+ "foo/plugin.json": `{"id": "foo"}`,
+ })
+ defer os.RemoveAll(dir)
+
+ var provider MockProvider
+ defer provider.AssertExpectations(t)
+
+ env, err := New(
+ SearchPath(dir),
+ APIProvider(provider.API),
+ SupervisorProvider(provider.Supervisor),
+ )
+ require.NoError(t, err)
+ defer env.Shutdown()
+
+ var api struct{ plugin.API }
+ var supervisor MockSupervisor
+ defer supervisor.AssertExpectations(t)
+ var hooks plugintest.Hooks
+ defer hooks.AssertExpectations(t)
+
+ provider.On("API").Return(&api, nil)
+ provider.On("Supervisor").Return(&supervisor, nil)
+
+ supervisor.On("Start").Return(nil)
+ supervisor.On("Stop").Return(fmt.Errorf("test error"))
+ supervisor.On("Hooks").Return(&hooks)
+
+ hooks.On("OnActivate", &api).Return(nil)
+ hooks.On("OnDeactivate").Return(fmt.Errorf("test error"))
+
+ assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
+ assert.Len(t, env.Shutdown(), 2)
+}
diff --git a/plugin/pluginenv/options.go b/plugin/pluginenv/options.go
new file mode 100644
index 000000000..3f83228fb
--- /dev/null
+++ b/plugin/pluginenv/options.go
@@ -0,0 +1,42 @@
+package pluginenv
+
+import (
+ "fmt"
+
+ "github.com/mattermost/platform/plugin"
+ "github.com/mattermost/platform/plugin/rpcplugin"
+)
+
+// APIProvider specifies a function that provides an API implementation to each plugin.
+func APIProvider(provider APIProviderFunc) Option {
+ return func(env *Environment) {
+ env.apiProvider = provider
+ }
+}
+
+// SupervisorProvider specifies a function that provides a Supervisor implementation to each plugin.
+// If unspecified, DefaultSupervisorProvider is used.
+func SupervisorProvider(provider SupervisorProviderFunc) Option {
+ return func(env *Environment) {
+ env.supervisorProvider = provider
+ }
+}
+
+// SearchPath specifies a directory that contains the plugins to launch.
+func SearchPath(path string) Option {
+ return func(env *Environment) {
+ env.searchPath = path
+ }
+}
+
+// DefaultSupervisorProvider chooses a supervisor based on the plugin's manifest contents. E.g. if
+// the manifest specifies a backend executable, it will be given an rpcplugin.Supervisor.
+func DefaultSupervisorProvider(bundle *plugin.BundleInfo) (plugin.Supervisor, error) {
+ if bundle.Manifest == nil {
+ return nil, fmt.Errorf("a manifest is required")
+ }
+ if bundle.Manifest.Backend == nil {
+ return nil, fmt.Errorf("invalid manifest: at this time, only backend plugins are supported")
+ }
+ return rpcplugin.SupervisorProvider(bundle)
+}
diff --git a/plugin/pluginenv/options_test.go b/plugin/pluginenv/options_test.go
new file mode 100644
index 000000000..4f8d411bd
--- /dev/null
+++ b/plugin/pluginenv/options_test.go
@@ -0,0 +1,32 @@
+package pluginenv
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/mattermost/platform/plugin"
+ "github.com/mattermost/platform/plugin/rpcplugin"
+)
+
+func TestDefaultSupervisorProvider(t *testing.T) {
+ _, err := DefaultSupervisorProvider(&plugin.BundleInfo{})
+ assert.Error(t, err)
+
+ _, err = DefaultSupervisorProvider(&plugin.BundleInfo{
+ Manifest: &plugin.Manifest{},
+ })
+ assert.Error(t, err)
+
+ supervisor, err := DefaultSupervisorProvider(&plugin.BundleInfo{
+ Manifest: &plugin.Manifest{
+ Backend: &plugin.ManifestBackend{
+ Executable: "foo",
+ },
+ },
+ })
+ require.NoError(t, err)
+ _, ok := supervisor.(*rpcplugin.Supervisor)
+ assert.True(t, ok)
+}
diff --git a/plugin/pluginenv/search_path.go b/plugin/pluginenv/search_path.go
new file mode 100644
index 000000000..daebdb0d3
--- /dev/null
+++ b/plugin/pluginenv/search_path.go
@@ -0,0 +1,32 @@
+package pluginenv
+
+import (
+ "io/ioutil"
+ "path/filepath"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+// Performs a full scan of the given path.
+//
+// This function will return info for all subdirectories that appear to be plugins (i.e. all
+// subdirectories containing plugin manifest files, regardless of whether they could actually be
+// parsed).
+//
+// Plugins are found non-recursively and paths beginning with a dot are always ignored.
+func ScanSearchPath(path string) ([]*plugin.BundleInfo, error) {
+ files, err := ioutil.ReadDir(path)
+ if err != nil {
+ return nil, err
+ }
+ var ret []*plugin.BundleInfo
+ for _, file := range files {
+ if !file.IsDir() || file.Name()[0] == '.' {
+ continue
+ }
+ if info := plugin.BundleInfoForPath(filepath.Join(path, file.Name())); info.ManifestPath != "" {
+ ret = append(ret, info)
+ }
+ }
+ return ret, nil
+}
diff --git a/plugin/pluginenv/search_path_test.go b/plugin/pluginenv/search_path_test.go
new file mode 100644
index 000000000..d9a18cf56
--- /dev/null
+++ b/plugin/pluginenv/search_path_test.go
@@ -0,0 +1,62 @@
+package pluginenv
+
+import (
+ "encoding/json"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+func TestScanSearchPath(t *testing.T) {
+ dir := initTmpDir(t, map[string]string{
+ ".foo/plugin.json": `{"id": "foo"}`,
+ "foo/bar": "asdf",
+ "foo/plugin.json": `{"id": "foo"}`,
+ "bar/zxc": "qwer",
+ "baz/plugin.yaml": "id: baz",
+ "bad/plugin.json": "asd",
+ "qwe": "asd",
+ })
+ defer os.RemoveAll(dir)
+
+ plugins, err := ScanSearchPath(dir)
+ require.NoError(t, err)
+ assert.Len(t, plugins, 3)
+ assert.Contains(t, plugins, &plugin.BundleInfo{
+ Path: filepath.Join(dir, "foo"),
+ ManifestPath: filepath.Join(dir, "foo", "plugin.json"),
+ Manifest: &plugin.Manifest{
+ Id: "foo",
+ },
+ })
+ assert.Contains(t, plugins, &plugin.BundleInfo{
+ Path: filepath.Join(dir, "baz"),
+ ManifestPath: filepath.Join(dir, "baz", "plugin.yaml"),
+ Manifest: &plugin.Manifest{
+ Id: "baz",
+ },
+ })
+ foundError := false
+ for _, x := range plugins {
+ if x.ManifestError != nil {
+ assert.Equal(t, x.Path, filepath.Join(dir, "bad"))
+ assert.Equal(t, x.ManifestPath, filepath.Join(dir, "bad", "plugin.json"))
+ syntexError, ok := x.ManifestError.(*json.SyntaxError)
+ assert.True(t, ok)
+ assert.EqualValues(t, 1, syntexError.Offset)
+ foundError = true
+ }
+ }
+ assert.True(t, foundError)
+}
+
+func TestScanSearchPath_Error(t *testing.T) {
+ plugins, err := ScanSearchPath("not a valid path!")
+ assert.Nil(t, plugins)
+ assert.Error(t, err)
+}
diff --git a/plugin/plugintest/api.go b/plugin/plugintest/api.go
new file mode 100644
index 000000000..2f1db88cf
--- /dev/null
+++ b/plugin/plugintest/api.go
@@ -0,0 +1,17 @@
+package plugintest
+
+import (
+ "github.com/stretchr/testify/mock"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+type API struct {
+ mock.Mock
+}
+
+var _ plugin.API = (*API)(nil)
+
+func (m *API) LoadPluginConfiguration(dest interface{}) error {
+ return m.Called(dest).Error(0)
+}
diff --git a/plugin/plugintest/hooks.go b/plugin/plugintest/hooks.go
new file mode 100644
index 000000000..057c705c9
--- /dev/null
+++ b/plugin/plugintest/hooks.go
@@ -0,0 +1,21 @@
+package plugintest
+
+import (
+ "github.com/stretchr/testify/mock"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+type Hooks struct {
+ mock.Mock
+}
+
+var _ plugin.Hooks = (*Hooks)(nil)
+
+func (m *Hooks) OnActivate(api plugin.API) error {
+ return m.Called(api).Error(0)
+}
+
+func (m *Hooks) OnDeactivate() error {
+ return m.Called().Error(0)
+}
diff --git a/plugin/rpcplugin/api.go b/plugin/rpcplugin/api.go
new file mode 100644
index 000000000..a807d0837
--- /dev/null
+++ b/plugin/rpcplugin/api.go
@@ -0,0 +1,62 @@
+package rpcplugin
+
+import (
+ "encoding/json"
+ "io"
+ "net/rpc"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+type LocalAPI struct {
+ api plugin.API
+ muxer *Muxer
+}
+
+func (h *LocalAPI) LoadPluginConfiguration(args struct{}, reply *[]byte) error {
+ var config interface{}
+ if err := h.api.LoadPluginConfiguration(&config); err != nil {
+ return err
+ }
+ b, err := json.Marshal(config)
+ if err != nil {
+ return err
+ }
+ *reply = b
+ return nil
+}
+
+type RemoteAPI struct {
+ client *rpc.Client
+ muxer *Muxer
+}
+
+func ServeAPI(api plugin.API, conn io.ReadWriteCloser, muxer *Muxer) {
+ server := rpc.NewServer()
+ server.Register(&LocalAPI{
+ api: api,
+ muxer: muxer,
+ })
+ server.ServeConn(conn)
+}
+
+var _ plugin.API = (*RemoteAPI)(nil)
+
+func (h *RemoteAPI) LoadPluginConfiguration(dest interface{}) error {
+ var config []byte
+ if err := h.client.Call("LocalAPI.LoadPluginConfiguration", struct{}{}, &config); err != nil {
+ return err
+ }
+ return json.Unmarshal(config, dest)
+}
+
+func (h *RemoteAPI) Close() error {
+ return h.client.Close()
+}
+
+func ConnectAPI(conn io.ReadWriteCloser, muxer *Muxer) *RemoteAPI {
+ return &RemoteAPI{
+ client: rpc.NewClient(conn),
+ muxer: muxer,
+ }
+}
diff --git a/plugin/rpcplugin/api_test.go b/plugin/rpcplugin/api_test.go
new file mode 100644
index 000000000..e55433556
--- /dev/null
+++ b/plugin/rpcplugin/api_test.go
@@ -0,0 +1,57 @@
+package rpcplugin
+
+import (
+ "encoding/json"
+ "io"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+
+ "github.com/mattermost/platform/plugin"
+ "github.com/mattermost/platform/plugin/plugintest"
+)
+
+func testAPIRPC(api plugin.API, f func(plugin.API)) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+
+ c1 := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer c1.Close()
+
+ c2 := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer c2.Close()
+
+ id, server := c1.Serve()
+ go ServeAPI(api, server, c1)
+
+ remote := ConnectAPI(c2.Connect(id), c2)
+ defer remote.Close()
+
+ f(remote)
+}
+
+func TestAPI(t *testing.T) {
+ var api plugintest.API
+ defer api.AssertExpectations(t)
+
+ type Config struct {
+ Foo string
+ Bar struct {
+ Baz string
+ }
+ }
+
+ api.On("LoadPluginConfiguration", mock.MatchedBy(func(x interface{}) bool { return true })).Run(func(args mock.Arguments) {
+ dest := args.Get(0).(interface{})
+ json.Unmarshal([]byte(`{"Foo": "foo", "Bar": {"Baz": "baz"}}`), dest)
+ }).Return(nil)
+
+ testAPIRPC(&api, func(remote plugin.API) {
+ var config Config
+ assert.NoError(t, remote.LoadPluginConfiguration(&config))
+
+ assert.Equal(t, "foo", config.Foo)
+ assert.Equal(t, "baz", config.Bar.Baz)
+ })
+}
diff --git a/plugin/rpcplugin/hooks.go b/plugin/rpcplugin/hooks.go
new file mode 100644
index 000000000..008730402
--- /dev/null
+++ b/plugin/rpcplugin/hooks.go
@@ -0,0 +1,77 @@
+package rpcplugin
+
+import (
+ "io"
+ "net/rpc"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+type LocalHooks struct {
+ hooks plugin.Hooks
+ muxer *Muxer
+ remoteAPI *RemoteAPI
+}
+
+func (h *LocalHooks) OnActivate(args int64, reply *struct{}) error {
+ stream := h.muxer.Connect(args)
+ if h.remoteAPI != nil {
+ h.remoteAPI.Close()
+ }
+ h.remoteAPI = ConnectAPI(stream, h.muxer)
+ return h.hooks.OnActivate(h.remoteAPI)
+}
+
+func (h *LocalHooks) OnDeactivate(args, reply *struct{}) error {
+ err := h.hooks.OnDeactivate()
+ if h.remoteAPI != nil {
+ h.remoteAPI.Close()
+ h.remoteAPI = nil
+ }
+ return err
+}
+
+type RemoteHooks struct {
+ client *rpc.Client
+ muxer *Muxer
+ apiCloser io.Closer
+}
+
+func ServeHooks(hooks plugin.Hooks, conn io.ReadWriteCloser, muxer *Muxer) {
+ server := rpc.NewServer()
+ server.Register(&LocalHooks{
+ hooks: hooks,
+ muxer: muxer,
+ })
+ server.ServeConn(conn)
+}
+
+var _ plugin.Hooks = (*RemoteHooks)(nil)
+
+func (h *RemoteHooks) OnActivate(api plugin.API) error {
+ id, stream := h.muxer.Serve()
+ if h.apiCloser != nil {
+ h.apiCloser.Close()
+ }
+ h.apiCloser = stream
+ go ServeAPI(api, stream, h.muxer)
+ return h.client.Call("LocalHooks.OnActivate", id, nil)
+}
+
+func (h *RemoteHooks) OnDeactivate() error {
+ return h.client.Call("LocalHooks.OnDeactivate", struct{}{}, nil)
+}
+
+func (h *RemoteHooks) Close() error {
+ if h.apiCloser != nil {
+ h.apiCloser.Close()
+ }
+ return h.client.Close()
+}
+
+func ConnectHooks(conn io.ReadWriteCloser, muxer *Muxer) *RemoteHooks {
+ return &RemoteHooks{
+ client: rpc.NewClient(conn),
+ muxer: muxer,
+ }
+}
diff --git a/plugin/rpcplugin/hooks_test.go b/plugin/rpcplugin/hooks_test.go
new file mode 100644
index 000000000..fbbbbedeb
--- /dev/null
+++ b/plugin/rpcplugin/hooks_test.go
@@ -0,0 +1,58 @@
+package rpcplugin
+
+import (
+ "io"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+
+ "github.com/mattermost/platform/plugin"
+ "github.com/mattermost/platform/plugin/plugintest"
+)
+
+func testHooksRPC(hooks plugin.Hooks, f func(plugin.Hooks)) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+
+ c1 := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer c1.Close()
+
+ c2 := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer c2.Close()
+
+ id, server := c1.Serve()
+ go ServeHooks(hooks, server, c1)
+
+ remote := ConnectHooks(c2.Connect(id), c2)
+ defer remote.Close()
+
+ f(remote)
+}
+
+func TestHooks(t *testing.T) {
+ var api plugintest.API
+ var hooks plugintest.Hooks
+ defer hooks.AssertExpectations(t)
+
+ testHooksRPC(&hooks, func(remote plugin.Hooks) {
+ hooks.On("OnActivate", mock.AnythingOfType("*rpcplugin.RemoteAPI")).Return(nil)
+ assert.NoError(t, remote.OnActivate(&api))
+
+ hooks.On("OnDeactivate").Return(nil)
+ assert.NoError(t, remote.OnDeactivate())
+ })
+}
+
+func BenchmarkOnDeactivate(b *testing.B) {
+ var hooks plugintest.Hooks
+ hooks.On("OnDeactivate").Return(nil)
+
+ testHooksRPC(&hooks, func(remote plugin.Hooks) {
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ remote.OnDeactivate()
+ }
+ b.StopTimer()
+ })
+}
diff --git a/plugin/rpcplugin/io.go b/plugin/rpcplugin/io.go
new file mode 100644
index 000000000..f1b2f3c35
--- /dev/null
+++ b/plugin/rpcplugin/io.go
@@ -0,0 +1,23 @@
+package rpcplugin
+
+import (
+ "io"
+)
+
+type rwc struct {
+ io.ReadCloser
+ io.WriteCloser
+}
+
+func (rwc *rwc) Close() error {
+ rerr := rwc.ReadCloser.Close()
+ werr := rwc.WriteCloser.Close()
+ if rerr != nil {
+ return rerr
+ }
+ return werr
+}
+
+func NewReadWriteCloser(r io.ReadCloser, w io.WriteCloser) io.ReadWriteCloser {
+ return &rwc{r, w}
+}
diff --git a/plugin/rpcplugin/ipc.go b/plugin/rpcplugin/ipc.go
new file mode 100644
index 000000000..3e6c89c4f
--- /dev/null
+++ b/plugin/rpcplugin/ipc.go
@@ -0,0 +1,28 @@
+package rpcplugin
+
+import (
+ "io"
+ "os"
+)
+
+// Returns a new IPC for the parent process and a set of files to pass on to the child.
+//
+// The returned files must be closed after the child process is started.
+func NewIPC() (io.ReadWriteCloser, []*os.File, error) {
+ parentReader, childWriter, err := os.Pipe()
+ if err != nil {
+ return nil, nil, err
+ }
+ childReader, parentWriter, err := os.Pipe()
+ if err != nil {
+ parentReader.Close()
+ childWriter.Close()
+ return nil, nil, err
+ }
+ return NewReadWriteCloser(parentReader, parentWriter), []*os.File{childReader, childWriter}, nil
+}
+
+// Returns the IPC instance inherited by the process from its parent.
+func InheritedIPC(fd0, fd1 uintptr) (io.ReadWriteCloser, error) {
+ return NewReadWriteCloser(os.NewFile(fd0, ""), os.NewFile(fd1, "")), nil
+}
diff --git a/plugin/rpcplugin/ipc_test.go b/plugin/rpcplugin/ipc_test.go
new file mode 100644
index 000000000..bf4df3017
--- /dev/null
+++ b/plugin/rpcplugin/ipc_test.go
@@ -0,0 +1,61 @@
+package rpcplugin
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestIPC(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ pingpong := filepath.Join(dir, "pingpong")
+ compileGo(t, `
+ package main
+
+ import (
+ "log"
+
+ "github.com/mattermost/platform/plugin/rpcplugin"
+ )
+
+ func main() {
+ ipc, err := rpcplugin.InheritedProcessIPC()
+ if err != nil {
+ log.Fatal("unable to get inherited ipc")
+ }
+ defer ipc.Close()
+ _, err = ipc.Write([]byte("ping"))
+ if err != nil {
+ log.Fatal("unable to write to ipc")
+ }
+ b := make([]byte, 10)
+ n, err := ipc.Read(b)
+ if err != nil {
+ log.Fatal("unable to read from ipc")
+ }
+ if n != 4 || string(b[:4]) != "pong" {
+ log.Fatal("unexpected response")
+ }
+ }
+ `, pingpong)
+
+ p, ipc, err := NewProcess(context.Background(), pingpong)
+ require.NoError(t, err)
+ defer ipc.Close()
+ b := make([]byte, 10)
+ n, err := ipc.Read(b)
+ require.NoError(t, err)
+ assert.Equal(t, 4, n)
+ assert.Equal(t, "ping", string(b[:4]))
+ _, err = ipc.Write([]byte("pong"))
+ require.NoError(t, err)
+ require.NoError(t, p.Wait())
+}
diff --git a/plugin/rpcplugin/main.go b/plugin/rpcplugin/main.go
new file mode 100644
index 000000000..36177954b
--- /dev/null
+++ b/plugin/rpcplugin/main.go
@@ -0,0 +1,46 @@
+package rpcplugin
+
+import (
+ "bufio"
+ "encoding/binary"
+ "fmt"
+ "log"
+ "os"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+// Makes a set of hooks available via RPC. This function never returns.
+func Main(hooks plugin.Hooks) {
+ ipc, err := InheritedProcessIPC()
+ if err != nil {
+ log.Fatal(err.Error())
+ }
+ muxer := NewMuxer(ipc, true)
+ id, conn := muxer.Serve()
+ buf := make([]byte, 11)
+ buf[0] = 0
+ n := binary.PutVarint(buf[1:], id)
+ if _, err := muxer.Write(buf[:1+n]); err != nil {
+ log.Fatal(err.Error())
+ }
+ ServeHooks(hooks, conn, muxer)
+ os.Exit(0)
+}
+
+// Returns the hooks being served by a call to Main.
+func ConnectMain(muxer *Muxer) (*RemoteHooks, error) {
+ buf := make([]byte, 1)
+ if _, err := muxer.Read(buf); err != nil {
+ return nil, err
+ } else if buf[0] != 0 {
+ return nil, fmt.Errorf("unexpected control byte")
+ }
+ reader := bufio.NewReader(muxer)
+ id, err := binary.ReadVarint(reader)
+ if err != nil {
+ return nil, err
+ }
+
+ return ConnectHooks(muxer.Connect(id), muxer), nil
+}
diff --git a/plugin/rpcplugin/main_test.go b/plugin/rpcplugin/main_test.go
new file mode 100644
index 000000000..b54364bad
--- /dev/null
+++ b/plugin/rpcplugin/main_test.go
@@ -0,0 +1,58 @@
+package rpcplugin
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/mattermost/platform/plugin/plugintest"
+)
+
+func TestMain(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ plugin := filepath.Join(dir, "plugin")
+ compileGo(t, `
+ package main
+
+ import (
+ "github.com/mattermost/platform/plugin"
+ "github.com/mattermost/platform/plugin/rpcplugin"
+ )
+
+ type MyPlugin struct {}
+
+ func (p *MyPlugin) OnActivate(api plugin.API) error {
+ return nil
+ }
+
+ func (p *MyPlugin) OnDeactivate() error {
+ return nil
+ }
+
+ func main() {
+ rpcplugin.Main(&MyPlugin{})
+ }
+ `, plugin)
+
+ p, ipc, err := NewProcess(context.Background(), plugin)
+ require.NoError(t, err)
+ defer p.Wait()
+
+ muxer := NewMuxer(ipc, false)
+ defer muxer.Close()
+
+ var api plugintest.API
+
+ hooks, err := ConnectMain(muxer)
+ require.NoError(t, err)
+ assert.NoError(t, hooks.OnActivate(&api))
+ assert.NoError(t, hooks.OnDeactivate())
+}
diff --git a/plugin/rpcplugin/muxer.go b/plugin/rpcplugin/muxer.go
new file mode 100644
index 000000000..a2bfbf8b6
--- /dev/null
+++ b/plugin/rpcplugin/muxer.go
@@ -0,0 +1,253 @@
+package rpcplugin
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "sync"
+ "sync/atomic"
+)
+
+// Muxer allows multiple bidirectional streams to be transmitted over a single connection.
+//
+// Muxer is safe for use by multiple goroutines.
+//
+// Streams opened on the muxer must be periodically drained in order to reclaim read buffer memory.
+// In other words, readers must consume incoming data as it comes in.
+type Muxer struct {
+ // writeMutex guards conn writes
+ writeMutex sync.Mutex
+ conn io.ReadWriteCloser
+
+ // didCloseConn is a boolean (0 or 1) used from multiple goroutines via atomic operations
+ didCloseConn int32
+
+ // streamsMutex guards streams and nextId
+ streamsMutex sync.Mutex
+ nextId int64
+ streams map[int64]*muxerStream
+
+ stream0Reader *io.PipeReader
+ stream0Writer *io.PipeWriter
+ result chan error
+}
+
+// Creates a new Muxer.
+//
+// conn must be safe for simultaneous reads by one goroutine and writes by another.
+//
+// For two muxers communicating with each other via a connection, parity must be true for exactly
+// one of them.
+func NewMuxer(conn io.ReadWriteCloser, parity bool) *Muxer {
+ s0r, s0w := io.Pipe()
+ muxer := &Muxer{
+ conn: conn,
+ streams: make(map[int64]*muxerStream),
+ result: make(chan error, 1),
+ nextId: 1,
+ stream0Reader: s0r,
+ stream0Writer: s0w,
+ }
+ if parity {
+ muxer.nextId = 2
+ }
+ go muxer.run()
+ return muxer
+}
+
+// Opens a new stream with a unique id.
+//
+// Writes made to the stream before the other end calls Connect will be discarded.
+func (m *Muxer) Serve() (int64, io.ReadWriteCloser) {
+ m.streamsMutex.Lock()
+ id := m.nextId
+ m.nextId += 2
+ m.streamsMutex.Unlock()
+ return id, m.Connect(id)
+}
+
+// Opens a remotely opened stream.
+func (m *Muxer) Connect(id int64) io.ReadWriteCloser {
+ m.streamsMutex.Lock()
+ defer m.streamsMutex.Unlock()
+ mutex := &sync.Mutex{}
+ stream := &muxerStream{
+ id: id,
+ muxer: m,
+ mutex: mutex,
+ readWake: sync.NewCond(mutex),
+ }
+ m.streams[id] = stream
+ return stream
+}
+
+// Calling Read on the muxer directly performs a read on a dedicated, always-open channel.
+func (m *Muxer) Read(p []byte) (int, error) {
+ return m.stream0Reader.Read(p)
+}
+
+// Calling Write on the muxer directly performs a write on a dedicated, always-open channel.
+func (m *Muxer) Write(p []byte) (int, error) {
+ return m.write(p, 0)
+}
+
+// Closes the muxer.
+func (m *Muxer) Close() error {
+ if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) {
+ m.conn.Close()
+ }
+ m.stream0Reader.Close()
+ m.stream0Writer.Close()
+ <-m.result
+ return nil
+}
+
+func (m *Muxer) IsClosed() bool {
+ return atomic.LoadInt32(&m.didCloseConn) > 0
+}
+
+func (m *Muxer) write(p []byte, sid int64) (int, error) {
+ m.writeMutex.Lock()
+ defer m.writeMutex.Unlock()
+ if m.IsClosed() {
+ return 0, fmt.Errorf("muxer closed")
+ }
+ buf := make([]byte, 10)
+ n := binary.PutVarint(buf, sid)
+ if _, err := m.conn.Write(buf[:n]); err != nil {
+ m.shutdown(err)
+ return 0, err
+ }
+ n = binary.PutVarint(buf, int64(len(p)))
+ if _, err := m.conn.Write(buf[:n]); err != nil {
+ m.shutdown(err)
+ return 0, err
+ }
+ if _, err := m.conn.Write(p); err != nil {
+ m.shutdown(err)
+ return 0, err
+ }
+ return len(p), nil
+}
+
+func (m *Muxer) rm(sid int64) {
+ m.streamsMutex.Lock()
+ defer m.streamsMutex.Unlock()
+ delete(m.streams, sid)
+}
+
+func (m *Muxer) run() {
+ m.shutdown(m.loop())
+}
+
+func (m *Muxer) loop() error {
+ reader := bufio.NewReader(m.conn)
+
+ for {
+ sid, err := binary.ReadVarint(reader)
+ if err != nil {
+ return err
+ }
+ len, err := binary.ReadVarint(reader)
+ if err != nil {
+ return err
+ }
+
+ if sid == 0 {
+ if _, err := io.CopyN(m.stream0Writer, reader, len); err != nil {
+ return err
+ }
+ continue
+ }
+
+ m.streamsMutex.Lock()
+ stream, ok := m.streams[sid]
+ m.streamsMutex.Unlock()
+ if !ok {
+ if _, err := reader.Discard(int(len)); err != nil {
+ return err
+ }
+ continue
+ }
+
+ stream.mutex.Lock()
+ if stream.isClosed {
+ stream.mutex.Unlock()
+ if _, err := reader.Discard(int(len)); err != nil {
+ return err
+ }
+ continue
+ }
+ _, err = io.CopyN(&stream.readBuf, reader, len)
+ stream.mutex.Unlock()
+ if err != nil {
+ return err
+ }
+ stream.readWake.Signal()
+ }
+}
+
+func (m *Muxer) shutdown(err error) {
+ if atomic.CompareAndSwapInt32(&m.didCloseConn, 0, 1) {
+ m.conn.Close()
+ }
+ go func() {
+ m.streamsMutex.Lock()
+ for _, stream := range m.streams {
+ stream.mutex.Lock()
+ stream.readWake.Signal()
+ stream.mutex.Unlock()
+ }
+ m.streams = make(map[int64]*muxerStream)
+ m.streamsMutex.Unlock()
+ }()
+ m.result <- err
+}
+
+type muxerStream struct {
+ id int64
+ muxer *Muxer
+ readBuf bytes.Buffer
+ mutex *sync.Mutex
+ readWake *sync.Cond
+ isClosed bool
+ closeErr error
+}
+
+func (s *muxerStream) Read(p []byte) (int, error) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ for {
+ if s.muxer.IsClosed() {
+ return 0, fmt.Errorf("muxer closed")
+ } else if s.isClosed {
+ return 0, io.EOF
+ } else if s.readBuf.Len() > 0 {
+ n, err := s.readBuf.Read(p)
+ return n, err
+ }
+ s.readWake.Wait()
+ }
+}
+
+func (s *muxerStream) Write(p []byte) (int, error) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ if s.isClosed {
+ return 0, fmt.Errorf("stream closed")
+ }
+ return s.muxer.write(p, s.id)
+}
+
+func (s *muxerStream) Close() error {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ if !s.isClosed {
+ s.isClosed = true
+ s.muxer.rm(s.id)
+ }
+ s.readWake.Signal()
+ return nil
+}
diff --git a/plugin/rpcplugin/muxer_test.go b/plugin/rpcplugin/muxer_test.go
new file mode 100644
index 000000000..7bb63d4f8
--- /dev/null
+++ b/plugin/rpcplugin/muxer_test.go
@@ -0,0 +1,169 @@
+package rpcplugin
+
+import (
+ "io"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestMuxer(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer func() { assert.NoError(t, alice.Close()) }()
+
+ bob := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer func() { assert.NoError(t, bob.Close()) }()
+
+ id1, alice1 := alice.Serve()
+ defer func() { assert.NoError(t, alice1.Close()) }()
+
+ id2, bob2 := bob.Serve()
+ defer func() { assert.NoError(t, bob2.Close()) }()
+
+ done1 := make(chan bool)
+ done2 := make(chan bool)
+
+ go func() {
+ bob1 := bob.Connect(id1)
+ defer func() { assert.NoError(t, bob1.Close()) }()
+
+ n, err := bob1.Write([]byte("ping1.0"))
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+
+ n, err = bob1.Write([]byte("ping1.1"))
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ }()
+
+ go func() {
+ alice2 := alice.Connect(id2)
+ defer func() { assert.NoError(t, alice2.Close()) }()
+
+ n, err := alice2.Write([]byte("ping2.0"))
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+
+ buf := make([]byte, 20)
+ n, err = alice2.Read(buf)
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ assert.Equal(t, []byte("pong2.0"), buf[:n])
+
+ done2 <- true
+ }()
+
+ go func() {
+ buf := make([]byte, 7)
+ n, err := io.ReadFull(alice1, buf)
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ assert.Equal(t, []byte("ping1.0"), buf[:n])
+
+ n, err = alice1.Read(buf)
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ assert.Equal(t, []byte("ping1.1"), buf[:n])
+
+ done1 <- true
+ }()
+
+ go func() {
+ buf := make([]byte, 20)
+ n, err := bob2.Read(buf)
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ assert.Equal(t, []byte("ping2.0"), buf[:n])
+
+ n, err = bob2.Write([]byte("pong2.0"))
+ require.NoError(t, err)
+ assert.Equal(t, n, 7)
+ }()
+
+ <-done1
+ <-done2
+}
+
+// Closing a muxer during a read should unblock, but return an error.
+func TestMuxer_CloseDuringRead(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+
+ bob := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer func() { assert.NoError(t, bob.Close()) }()
+
+ _, s := alice.Serve()
+
+ go alice.Close()
+ buf := make([]byte, 20)
+ n, err := s.Read(buf)
+ assert.Equal(t, 0, n)
+ assert.NotNil(t, err)
+ assert.NotEqual(t, io.EOF, err)
+}
+
+// Closing a stream during a read should unblock and return io.EOF since this is the way to
+// gracefully close a connection.
+func TestMuxer_StreamCloseDuringRead(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer func() { assert.NoError(t, alice.Close()) }()
+
+ bob := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer func() { assert.NoError(t, bob.Close()) }()
+
+ _, s := alice.Serve()
+
+ go s.Close()
+ buf := make([]byte, 20)
+ n, err := s.Read(buf)
+ assert.Equal(t, 0, n)
+ assert.Equal(t, io.EOF, err)
+}
+
+// Closing a muxer during a write should unblock, but return an error.
+func TestMuxer_CloseDuringWrite(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+
+ // Don't connect bob to let writes will block forever.
+ defer r2.Close()
+ defer w1.Close()
+
+ _, s := alice.Serve()
+
+ go alice.Close()
+ buf := make([]byte, 20)
+ n, err := s.Write(buf)
+ assert.Equal(t, 0, n)
+ assert.NotNil(t, err)
+ assert.NotEqual(t, io.EOF, err)
+}
+
+func TestMuxer_ReadWrite(t *testing.T) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+
+ alice := NewMuxer(NewReadWriteCloser(r1, w2), false)
+ defer func() { assert.NoError(t, alice.Close()) }()
+
+ bob := NewMuxer(NewReadWriteCloser(r2, w1), true)
+ defer func() { assert.NoError(t, bob.Close()) }()
+
+ go alice.Write([]byte("hello"))
+ buf := make([]byte, 20)
+ n, err := bob.Read(buf)
+ assert.Equal(t, 5, n)
+ assert.Nil(t, err)
+ assert.Equal(t, []byte("hello"), buf[:n])
+}
diff --git a/plugin/rpcplugin/process.go b/plugin/rpcplugin/process.go
new file mode 100644
index 000000000..4b3362d68
--- /dev/null
+++ b/plugin/rpcplugin/process.go
@@ -0,0 +1,23 @@
+package rpcplugin
+
+import (
+ "context"
+ "io"
+)
+
+type Process interface {
+ // Waits for the process to exit and returns an error if a problem occurred or the process exited
+ // with a non-zero status.
+ Wait() error
+}
+
+// NewProcess launches an RPC executable in a new process and returns an IPC that can be used to
+// communicate with it.
+func NewProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) {
+ return newProcess(ctx, path)
+}
+
+// When called on a process launched with NewProcess, returns the inherited IPC.
+func InheritedProcessIPC() (io.ReadWriteCloser, error) {
+ return inheritedProcessIPC()
+}
diff --git a/plugin/rpcplugin/process_test.go b/plugin/rpcplugin/process_test.go
new file mode 100644
index 000000000..b7984ad0a
--- /dev/null
+++ b/plugin/rpcplugin/process_test.go
@@ -0,0 +1,64 @@
+package rpcplugin
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func compileGo(t *testing.T, sourceCode, outputPath string) {
+ dir, err := ioutil.TempDir(".", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "main.go"), []byte(sourceCode), 0600))
+ cmd := exec.Command("go", "build", "-o", outputPath, "main.go")
+ cmd.Dir = dir
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ require.NoError(t, cmd.Run())
+}
+
+func TestProcess(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ ping := filepath.Join(dir, "ping")
+ compileGo(t, `
+ package main
+
+ import (
+ "log"
+
+ "github.com/mattermost/platform/plugin/rpcplugin"
+ )
+
+ func main() {
+ ipc, err := rpcplugin.InheritedProcessIPC()
+ if err != nil {
+ log.Fatal("unable to get inherited ipc")
+ }
+ defer ipc.Close()
+ _, err = ipc.Write([]byte("ping"))
+ if err != nil {
+ log.Fatal("unable to write to ipc")
+ }
+ }
+ `, ping)
+
+ p, ipc, err := NewProcess(context.Background(), ping)
+ require.NoError(t, err)
+ defer ipc.Close()
+ b := make([]byte, 10)
+ n, err := ipc.Read(b)
+ require.NoError(t, err)
+ assert.Equal(t, 4, n)
+ assert.Equal(t, "ping", string(b[:4]))
+ require.NoError(t, p.Wait())
+}
diff --git a/plugin/rpcplugin/process_unix.go b/plugin/rpcplugin/process_unix.go
new file mode 100644
index 000000000..f196e34f8
--- /dev/null
+++ b/plugin/rpcplugin/process_unix.go
@@ -0,0 +1,45 @@
+// +build !windows
+
+package rpcplugin
+
+import (
+ "context"
+ "io"
+ "os"
+ "os/exec"
+)
+
+type process struct {
+ command *exec.Cmd
+}
+
+func newProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) {
+ ipc, childFiles, err := NewIPC()
+ if err != nil {
+ return nil, nil, err
+ }
+ defer childFiles[0].Close()
+ defer childFiles[1].Close()
+
+ cmd := exec.CommandContext(ctx, path)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ cmd.ExtraFiles = childFiles
+ err = cmd.Start()
+ if err != nil {
+ ipc.Close()
+ return nil, nil, err
+ }
+
+ return &process{
+ command: cmd,
+ }, ipc, nil
+}
+
+func (p *process) Wait() error {
+ return p.command.Wait()
+}
+
+func inheritedProcessIPC() (io.ReadWriteCloser, error) {
+ return InheritedIPC(3, 4)
+}
diff --git a/plugin/rpcplugin/process_windows.go b/plugin/rpcplugin/process_windows.go
new file mode 100644
index 000000000..7be03cacd
--- /dev/null
+++ b/plugin/rpcplugin/process_windows.go
@@ -0,0 +1,17 @@
+package rpcplugin
+
+import (
+ "context"
+ "fmt"
+ "io"
+)
+
+func newProcess(ctx context.Context, path string) (Process, io.ReadWriteCloser, error) {
+ // TODO
+ return nil, nil, fmt.Errorf("not yet supported")
+}
+
+func inheritedProcessIPC() (*IPC, error) {
+ // TODO
+ return nil, fmt.Errorf("not yet supported")
+}
diff --git a/plugin/rpcplugin/supervisor.go b/plugin/rpcplugin/supervisor.go
new file mode 100644
index 000000000..9316d7186
--- /dev/null
+++ b/plugin/rpcplugin/supervisor.go
@@ -0,0 +1,128 @@
+package rpcplugin
+
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+ "sync/atomic"
+ "time"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+// Supervisor implements a plugin.Supervisor that launches the plugin in a separate process and
+// communicates via RPC.
+//
+// If the plugin unexpectedly exists, the supervisor will relaunch it after a short delay.
+type Supervisor struct {
+ executable string
+ hooks atomic.Value
+ done chan bool
+ cancel context.CancelFunc
+}
+
+var _ plugin.Supervisor = (*Supervisor)(nil)
+
+// Starts the plugin. This method will block until the plugin is successfully launched for the first
+// time and will return an error if the plugin cannot be launched at all.
+func (s *Supervisor) Start() error {
+ ctx, cancel := context.WithCancel(context.Background())
+ s.done = make(chan bool, 1)
+ start := make(chan error, 1)
+ go s.run(ctx, start)
+
+ select {
+ case <-time.After(time.Second * 3):
+ cancel()
+ <-s.done
+ return fmt.Errorf("timed out waiting for plugin")
+ case err := <-start:
+ s.cancel = cancel
+ return err
+ }
+}
+
+// Stops the plugin.
+func (s *Supervisor) Stop() error {
+ s.cancel()
+ <-s.done
+ return nil
+}
+
+// Returns the hooks used to communicate with the plugin. The hooks may change if the plugin is
+// restarted, so the return value should not be cached.
+func (s *Supervisor) Hooks() plugin.Hooks {
+ return s.hooks.Load().(plugin.Hooks)
+}
+
+func (s *Supervisor) run(ctx context.Context, start chan<- error) {
+ defer func() {
+ s.done <- true
+ }()
+ done := ctx.Done()
+ for {
+ s.runPlugin(ctx, start)
+ select {
+ case <-done:
+ return
+ default:
+ start = nil
+ time.Sleep(time.Second)
+ }
+ }
+}
+
+func (s *Supervisor) runPlugin(ctx context.Context, start chan<- error) error {
+ p, ipc, err := NewProcess(ctx, s.executable)
+ if err != nil {
+ if start != nil {
+ start <- err
+ }
+ return err
+ }
+
+ muxer := NewMuxer(ipc, false)
+ closeMuxer := make(chan bool, 1)
+ muxerClosed := make(chan error, 1)
+ go func() {
+ select {
+ case <-ctx.Done():
+ break
+ case <-closeMuxer:
+ break
+ }
+ muxerClosed <- muxer.Close()
+ }()
+
+ hooks, err := ConnectMain(muxer)
+ if err != nil {
+ if start != nil {
+ start <- err
+ }
+ closeMuxer <- true
+ <-muxerClosed
+ p.Wait()
+ return err
+ }
+
+ s.hooks.Store(hooks)
+ if start != nil {
+ start <- nil
+ }
+ p.Wait()
+ closeMuxer <- true
+ <-muxerClosed
+
+ return nil
+}
+
+func SupervisorProvider(bundle *plugin.BundleInfo) (plugin.Supervisor, error) {
+ if bundle.Manifest == nil {
+ return nil, fmt.Errorf("no manifest available")
+ } else if bundle.Manifest.Backend == nil || bundle.Manifest.Backend.Executable == "" {
+ return nil, fmt.Errorf("no backend executable specified")
+ }
+ return &Supervisor{
+ executable: filepath.Join(bundle.Path, bundle.Manifest.Backend.Executable),
+ }, nil
+}
diff --git a/plugin/rpcplugin/supervisor_test.go b/plugin/rpcplugin/supervisor_test.go
new file mode 100644
index 000000000..1d046bf82
--- /dev/null
+++ b/plugin/rpcplugin/supervisor_test.go
@@ -0,0 +1,130 @@
+package rpcplugin
+
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/mattermost/platform/plugin"
+)
+
+func TestSupervisor(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ backend := filepath.Join(dir, "backend")
+ compileGo(t, `
+ package main
+
+ import (
+ "github.com/mattermost/platform/plugin"
+ "github.com/mattermost/platform/plugin/rpcplugin"
+ )
+
+ type MyPlugin struct {}
+
+ func (p *MyPlugin) OnActivate(api plugin.API) error {
+ return nil
+ }
+
+ func (p *MyPlugin) OnDeactivate() error {
+ return nil
+ }
+
+ func main() {
+ rpcplugin.Main(&MyPlugin{})
+ }
+ `, backend)
+
+ ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600)
+
+ bundle := plugin.BundleInfoForPath(dir)
+ supervisor, err := SupervisorProvider(bundle)
+ require.NoError(t, err)
+ require.NoError(t, supervisor.Start())
+ require.NoError(t, supervisor.Hooks().OnActivate(nil))
+ require.NoError(t, supervisor.Stop())
+}
+
+// If plugin development goes really wrong, let's make sure plugin activation won't block forever.
+func TestSupervisor_StartTimeout(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ backend := filepath.Join(dir, "backend")
+ compileGo(t, `
+ package main
+
+ func main() {
+ for {
+ }
+ }
+ `, backend)
+
+ ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600)
+
+ bundle := plugin.BundleInfoForPath(dir)
+ supervisor, err := SupervisorProvider(bundle)
+ require.NoError(t, err)
+ require.Error(t, supervisor.Start())
+}
+
+// Crashed plugins should be relaunched.
+func TestSupervisor_PluginCrash(t *testing.T) {
+ dir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ backend := filepath.Join(dir, "backend")
+ compileGo(t, `
+ package main
+
+ import (
+ "os"
+
+ "github.com/mattermost/platform/plugin"
+ "github.com/mattermost/platform/plugin/rpcplugin"
+ )
+
+ type MyPlugin struct {}
+
+ func (p *MyPlugin) OnActivate(api plugin.API) error {
+ os.Exit(1)
+ return nil
+ }
+
+ func (p *MyPlugin) OnDeactivate() error {
+ return nil
+ }
+
+ func main() {
+ rpcplugin.Main(&MyPlugin{})
+ }
+ `, backend)
+
+ ioutil.WriteFile(filepath.Join(dir, "plugin.json"), []byte(`{"id": "foo", "backend": {"executable": "backend"}}`), 0600)
+
+ bundle := plugin.BundleInfoForPath(dir)
+ supervisor, err := SupervisorProvider(bundle)
+ require.NoError(t, err)
+ require.NoError(t, supervisor.Start())
+ require.Error(t, supervisor.Hooks().OnActivate(nil))
+
+ recovered := false
+ for i := 0; i < 30; i++ {
+ if supervisor.Hooks().OnDeactivate() == nil {
+ recovered = true
+ break
+ }
+ time.Sleep(time.Millisecond * 100)
+ }
+ assert.True(t, recovered)
+ require.NoError(t, supervisor.Stop())
+}
diff --git a/plugin/supervisor.go b/plugin/supervisor.go
new file mode 100644
index 000000000..5ddf5f169
--- /dev/null
+++ b/plugin/supervisor.go
@@ -0,0 +1,8 @@
+package plugin
+
+// Supervisor provides the interface for an object that controls the execution of a plugin.
+type Supervisor interface {
+ Start() error
+ Stop() error
+ Hooks() Hooks
+}