summaryrefslogtreecommitdiffstats
path: root/plugin
diff options
context:
space:
mode:
authorJesse Hallam <jesse.hallam@gmail.com>2018-05-23 14:26:35 -0400
committerGitHub <noreply@github.com>2018-05-23 14:26:35 -0400
commit847c181ec9b73e51daf39efc5c597eff2e7cdb31 (patch)
tree54bcb4a3ac43f9acb22ff037582541c7428bd990 /plugin
parent5c21bdc1783e5cd17169436e7ccfacdd1b637907 (diff)
downloadchat-847c181ec9b73e51daf39efc5c597eff2e7cdb31.tar.gz
chat-847c181ec9b73e51daf39efc5c597eff2e7cdb31.tar.bz2
chat-847c181ec9b73e51daf39efc5c597eff2e7cdb31.zip
MM-8622: Improved plugin error reporting (#8737)
* allow `Wait()`ing on the supervisor In the event the plugin supervisor shuts down a plugin for crashing too many times, the new `Wait()` interface allows the `ActivatePlugin` to accept a callback function to trigger when `supervisor.Wait()` returns. If the supervisor shuts down normally, this callback is invoked with a nil error, otherwise any error reported by the supervisor is passed along. * improve plugin activation/deactivation logic Avoid triggering activation of previously failed-to-start plugins just becase something in the configuration changed. Now, intelligently compare the global enable bit as well as the each individual plugin's enabled bit. * expose store to manipulate PluginStatuses * expose API to fetch plugin statuses * keep track of whether or not plugin sandboxing is supported * transition plugin statuses * restore error on plugin activation if already active * don't initialize test plugins until successfully loaded * emit websocket events when plugin statuses change * skip pruning if already initialized * MM-8622: maintain plugin statuses in memory Switch away from persisting plugin statuses to the database, and maintain in memory instead. This will be followed by a cluster interface to query the in-memory status of plugin statuses from all cluster nodes. At the same time, rename `cluster_discovery_id` on the `PluginStatus` model object to `cluster_id`. * MM-8622: aggregate plugin statuses across cluster * fetch cluster plugin statuses when emitting websocket notification * address unit test fixes after rebasing * relax (poor) racey unit test re: supervisor.Wait() * make store-mocks
Diffstat (limited to 'plugin')
-rw-r--r--plugin/pluginenv/environment.go12
-rw-r--r--plugin/pluginenv/environment_test.go24
-rw-r--r--plugin/rpcplugin/rpcplugintest/supervisor.go39
-rw-r--r--plugin/rpcplugin/supervisor.go15
-rw-r--r--plugin/supervisor.go1
5 files changed, 76 insertions, 15 deletions
diff --git a/plugin/pluginenv/environment.go b/plugin/pluginenv/environment.go
index 947eda86d..f704aa5bb 100644
--- a/plugin/pluginenv/environment.go
+++ b/plugin/pluginenv/environment.go
@@ -108,7 +108,7 @@ func (env *Environment) IsPluginActive(pluginId string) bool {
}
// Activates the plugin with the given id.
-func (env *Environment) ActivatePlugin(id string) error {
+func (env *Environment) ActivatePlugin(id string, onError func(error)) error {
env.mutex.Lock()
defer env.mutex.Unlock()
@@ -117,7 +117,7 @@ func (env *Environment) ActivatePlugin(id string) error {
}
if _, ok := env.activePlugins[id]; ok {
- return nil
+ return fmt.Errorf("plugin already active: %v", id)
}
plugins, err := ScanSearchPath(env.searchPath)
if err != nil {
@@ -156,6 +156,14 @@ func (env *Environment) ActivatePlugin(id string) error {
if err := supervisor.Start(api); err != nil {
return errors.Wrapf(err, "unable to start plugin: %v", id)
}
+ if onError != nil {
+ go func() {
+ err := supervisor.Wait()
+ if err != nil {
+ onError(err)
+ }
+ }()
+ }
activePlugin.Supervisor = supervisor
}
diff --git a/plugin/pluginenv/environment_test.go b/plugin/pluginenv/environment_test.go
index 91d639f69..8c1397799 100644
--- a/plugin/pluginenv/environment_test.go
+++ b/plugin/pluginenv/environment_test.go
@@ -56,6 +56,10 @@ func (m *MockSupervisor) Hooks() plugin.Hooks {
return m.Called().Get(0).(plugin.Hooks)
}
+func (m *MockSupervisor) Wait() error {
+ return m.Called().Get(0).(error)
+}
+
func initTmpDir(t *testing.T, files map[string]string) string {
success := false
dir, err := ioutil.TempDir("", "mm-plugin-test")
@@ -130,7 +134,7 @@ func TestEnvironment(t *testing.T) {
activePlugins := env.ActivePlugins()
assert.Len(t, activePlugins, 0)
- assert.Error(t, env.ActivatePlugin("x"))
+ assert.Error(t, env.ActivatePlugin("x", nil))
var api struct{ plugin.API }
var supervisor MockSupervisor
@@ -145,11 +149,11 @@ func TestEnvironment(t *testing.T) {
supervisor.On("Stop").Return(nil)
supervisor.On("Hooks").Return(&hooks)
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
activePlugins = env.ActivePlugins()
assert.Len(t, activePlugins, 1)
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.Error(t, env.ActivatePlugin("foo", nil))
assert.True(t, env.IsPluginActive("foo"))
hooks.On("OnDeactivate").Return(nil)
@@ -157,7 +161,7 @@ func TestEnvironment(t *testing.T) {
assert.Error(t, env.DeactivatePlugin("foo"))
assert.False(t, env.IsPluginActive("foo"))
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
assert.Equal(t, env.SearchPath(), dir)
@@ -184,7 +188,7 @@ func TestEnvironment_DuplicatePluginError(t *testing.T) {
require.NoError(t, err)
defer env.Shutdown()
- assert.Error(t, env.ActivatePlugin("foo"))
+ assert.Error(t, env.ActivatePlugin("foo", nil))
assert.Empty(t, env.ActivePluginIds())
}
@@ -200,7 +204,7 @@ func TestEnvironment_BadSearchPathError(t *testing.T) {
require.NoError(t, err)
defer env.Shutdown()
- assert.Error(t, env.ActivatePlugin("foo"))
+ assert.Error(t, env.ActivatePlugin("foo", nil))
assert.Empty(t, env.ActivePluginIds())
}
@@ -244,7 +248,7 @@ func TestEnvironment_ActivatePluginErrors(t *testing.T) {
hooks.Mock = mock.Mock{}
provider.Mock = mock.Mock{}
setup()
- assert.Error(t, env.ActivatePlugin("foo"))
+ assert.Error(t, env.ActivatePlugin("foo", nil))
assert.Empty(t, env.ActivePluginIds())
supervisor.AssertExpectations(t)
hooks.AssertExpectations(t)
@@ -285,7 +289,7 @@ func TestEnvironment_ShutdownError(t *testing.T) {
hooks.On("OnDeactivate").Return(fmt.Errorf("test error"))
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
assert.Len(t, env.Shutdown(), 2)
}
@@ -332,7 +336,7 @@ func TestEnvironment_ConcurrentHookInvocations(t *testing.T) {
}
})
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
rec := httptest.NewRecorder()
@@ -391,7 +395,7 @@ func TestEnvironment_HooksForPlugins(t *testing.T) {
Text: "bar",
}, nil)
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
resp, appErr, err := env.HooksForPlugin("foo").ExecuteCommand(&model.CommandArgs{
diff --git a/plugin/rpcplugin/rpcplugintest/supervisor.go b/plugin/rpcplugin/rpcplugintest/supervisor.go
index 2ae065621..d225f96fc 100644
--- a/plugin/rpcplugin/rpcplugintest/supervisor.go
+++ b/plugin/rpcplugin/rpcplugintest/supervisor.go
@@ -174,6 +174,14 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) {
bundle := model.BundleInfoForPath(dir)
supervisor, err := sp(bundle)
require.NoError(t, err)
+
+ var supervisorWaitErr error
+ supervisorWaitDone := make(chan bool, 1)
+ go func() {
+ supervisorWaitErr = supervisor.Wait()
+ close(supervisorWaitDone)
+ }()
+
require.NoError(t, supervisor.Start(&api))
failed := false
@@ -189,7 +197,21 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) {
time.Sleep(time.Millisecond * 100)
}
assert.True(t, recovered)
+
+ select {
+ case <-supervisorWaitDone:
+ require.Fail(t, "supervisor.Wait() unexpectedly returned")
+ case <-time.After(500 * time.Millisecond):
+ }
+
require.NoError(t, supervisor.Stop())
+
+ select {
+ case <-supervisorWaitDone:
+ require.Nil(t, supervisorWaitErr)
+ case <-time.After(5000 * time.Millisecond):
+ require.Fail(t, "supervisor.Wait() failed to return")
+ }
}
// Crashed plugins should be relaunched at most three times.
@@ -239,6 +261,14 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun
bundle := model.BundleInfoForPath(dir)
supervisor, err := sp(bundle)
require.NoError(t, err)
+
+ var supervisorWaitErr error
+ supervisorWaitDone := make(chan bool, 1)
+ go func() {
+ supervisorWaitErr = supervisor.Wait()
+ close(supervisorWaitDone)
+ }()
+
require.NoError(t, supervisor.Start(&api))
for attempt := 1; attempt <= 4; attempt++ {
@@ -264,10 +294,19 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun
}
if attempt < 4 {
+ require.Nil(t, supervisorWaitErr)
require.True(t, recovered, "failed to recover after attempt %d", attempt)
} else {
require.False(t, recovered, "unexpectedly recovered after attempt %d", attempt)
}
}
+
+ select {
+ case <-supervisorWaitDone:
+ require.NotNil(t, supervisorWaitErr)
+ case <-time.After(500 * time.Millisecond):
+ require.Fail(t, "supervisor.Wait() failed to return after plugin crashed")
+ }
+
require.NoError(t, supervisor.Stop())
}
diff --git a/plugin/rpcplugin/supervisor.go b/plugin/rpcplugin/supervisor.go
index 6e26d5682..246747c89 100644
--- a/plugin/rpcplugin/supervisor.go
+++ b/plugin/rpcplugin/supervisor.go
@@ -32,6 +32,7 @@ type Supervisor struct {
cancel context.CancelFunc
newProcess func(context.Context) (Process, io.ReadWriteCloser, error)
pluginId string
+ pluginErr error
}
var _ plugin.Supervisor = (*Supervisor)(nil)
@@ -55,6 +56,13 @@ func (s *Supervisor) Start(api plugin.API) error {
}
}
+// Waits for the supervisor to stop (on demand or of its own accord), returning any error that
+// triggered the supervisor to stop.
+func (s *Supervisor) Wait() error {
+ <-s.done
+ return s.pluginErr
+}
+
// Stops the plugin.
func (s *Supervisor) Stop() error {
s.cancel()
@@ -70,7 +78,7 @@ func (s *Supervisor) Hooks() plugin.Hooks {
func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API) {
defer func() {
- s.done <- true
+ close(s.done)
}()
done := ctx.Done()
for i := 0; i <= MaxProcessRestarts; i++ {
@@ -81,10 +89,11 @@ func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API
default:
start = nil
if i < MaxProcessRestarts {
- mlog.Debug("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId))
+ mlog.Error("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId))
time.Sleep(time.Duration((1 + i*i)) * time.Second)
} else {
- mlog.Debug("Plugin terminated unexpectedly too many times", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts))
+ s.pluginErr = fmt.Errorf("plugin terminated unexpectedly too many times")
+ mlog.Error("Plugin shutdown", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts), mlog.Err(s.pluginErr))
}
}
}
diff --git a/plugin/supervisor.go b/plugin/supervisor.go
index 6cb7445f7..f20df7040 100644
--- a/plugin/supervisor.go
+++ b/plugin/supervisor.go
@@ -7,6 +7,7 @@ package plugin
// type is only relevant to the server, and isn't used by the plugins themselves.
type Supervisor interface {
Start(API) error
+ Wait() error
Stop() error
Hooks() Hooks
}