From 847c181ec9b73e51daf39efc5c597eff2e7cdb31 Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Wed, 23 May 2018 14:26:35 -0400 Subject: MM-8622: Improved plugin error reporting (#8737) * allow `Wait()`ing on the supervisor In the event the plugin supervisor shuts down a plugin for crashing too many times, the new `Wait()` interface allows the `ActivatePlugin` to accept a callback function to trigger when `supervisor.Wait()` returns. If the supervisor shuts down normally, this callback is invoked with a nil error, otherwise any error reported by the supervisor is passed along. * improve plugin activation/deactivation logic Avoid triggering activation of previously failed-to-start plugins just becase something in the configuration changed. Now, intelligently compare the global enable bit as well as the each individual plugin's enabled bit. * expose store to manipulate PluginStatuses * expose API to fetch plugin statuses * keep track of whether or not plugin sandboxing is supported * transition plugin statuses * restore error on plugin activation if already active * don't initialize test plugins until successfully loaded * emit websocket events when plugin statuses change * skip pruning if already initialized * MM-8622: maintain plugin statuses in memory Switch away from persisting plugin statuses to the database, and maintain in memory instead. This will be followed by a cluster interface to query the in-memory status of plugin statuses from all cluster nodes. At the same time, rename `cluster_discovery_id` on the `PluginStatus` model object to `cluster_id`. * MM-8622: aggregate plugin statuses across cluster * fetch cluster plugin statuses when emitting websocket notification * address unit test fixes after rebasing * relax (poor) racey unit test re: supervisor.Wait() * make store-mocks --- plugin/pluginenv/environment.go | 12 +++++++-- plugin/pluginenv/environment_test.go | 24 ++++++++++------- plugin/rpcplugin/rpcplugintest/supervisor.go | 39 ++++++++++++++++++++++++++++ plugin/rpcplugin/supervisor.go | 15 ++++++++--- plugin/supervisor.go | 1 + 5 files changed, 76 insertions(+), 15 deletions(-) (limited to 'plugin') diff --git a/plugin/pluginenv/environment.go b/plugin/pluginenv/environment.go index 947eda86d..f704aa5bb 100644 --- a/plugin/pluginenv/environment.go +++ b/plugin/pluginenv/environment.go @@ -108,7 +108,7 @@ func (env *Environment) IsPluginActive(pluginId string) bool { } // Activates the plugin with the given id. -func (env *Environment) ActivatePlugin(id string) error { +func (env *Environment) ActivatePlugin(id string, onError func(error)) error { env.mutex.Lock() defer env.mutex.Unlock() @@ -117,7 +117,7 @@ func (env *Environment) ActivatePlugin(id string) error { } if _, ok := env.activePlugins[id]; ok { - return nil + return fmt.Errorf("plugin already active: %v", id) } plugins, err := ScanSearchPath(env.searchPath) if err != nil { @@ -156,6 +156,14 @@ func (env *Environment) ActivatePlugin(id string) error { if err := supervisor.Start(api); err != nil { return errors.Wrapf(err, "unable to start plugin: %v", id) } + if onError != nil { + go func() { + err := supervisor.Wait() + if err != nil { + onError(err) + } + }() + } activePlugin.Supervisor = supervisor } diff --git a/plugin/pluginenv/environment_test.go b/plugin/pluginenv/environment_test.go index 91d639f69..8c1397799 100644 --- a/plugin/pluginenv/environment_test.go +++ b/plugin/pluginenv/environment_test.go @@ -56,6 +56,10 @@ func (m *MockSupervisor) Hooks() plugin.Hooks { return m.Called().Get(0).(plugin.Hooks) } +func (m *MockSupervisor) Wait() error { + return m.Called().Get(0).(error) +} + func initTmpDir(t *testing.T, files map[string]string) string { success := false dir, err := ioutil.TempDir("", "mm-plugin-test") @@ -130,7 +134,7 @@ func TestEnvironment(t *testing.T) { activePlugins := env.ActivePlugins() assert.Len(t, activePlugins, 0) - assert.Error(t, env.ActivatePlugin("x")) + assert.Error(t, env.ActivatePlugin("x", nil)) var api struct{ plugin.API } var supervisor MockSupervisor @@ -145,11 +149,11 @@ func TestEnvironment(t *testing.T) { supervisor.On("Stop").Return(nil) supervisor.On("Hooks").Return(&hooks) - assert.NoError(t, env.ActivatePlugin("foo")) + assert.NoError(t, env.ActivatePlugin("foo", nil)) assert.Equal(t, env.ActivePluginIds(), []string{"foo"}) activePlugins = env.ActivePlugins() assert.Len(t, activePlugins, 1) - assert.NoError(t, env.ActivatePlugin("foo")) + assert.Error(t, env.ActivatePlugin("foo", nil)) assert.True(t, env.IsPluginActive("foo")) hooks.On("OnDeactivate").Return(nil) @@ -157,7 +161,7 @@ func TestEnvironment(t *testing.T) { assert.Error(t, env.DeactivatePlugin("foo")) assert.False(t, env.IsPluginActive("foo")) - assert.NoError(t, env.ActivatePlugin("foo")) + assert.NoError(t, env.ActivatePlugin("foo", nil)) assert.Equal(t, env.ActivePluginIds(), []string{"foo"}) assert.Equal(t, env.SearchPath(), dir) @@ -184,7 +188,7 @@ func TestEnvironment_DuplicatePluginError(t *testing.T) { require.NoError(t, err) defer env.Shutdown() - assert.Error(t, env.ActivatePlugin("foo")) + assert.Error(t, env.ActivatePlugin("foo", nil)) assert.Empty(t, env.ActivePluginIds()) } @@ -200,7 +204,7 @@ func TestEnvironment_BadSearchPathError(t *testing.T) { require.NoError(t, err) defer env.Shutdown() - assert.Error(t, env.ActivatePlugin("foo")) + assert.Error(t, env.ActivatePlugin("foo", nil)) assert.Empty(t, env.ActivePluginIds()) } @@ -244,7 +248,7 @@ func TestEnvironment_ActivatePluginErrors(t *testing.T) { hooks.Mock = mock.Mock{} provider.Mock = mock.Mock{} setup() - assert.Error(t, env.ActivatePlugin("foo")) + assert.Error(t, env.ActivatePlugin("foo", nil)) assert.Empty(t, env.ActivePluginIds()) supervisor.AssertExpectations(t) hooks.AssertExpectations(t) @@ -285,7 +289,7 @@ func TestEnvironment_ShutdownError(t *testing.T) { hooks.On("OnDeactivate").Return(fmt.Errorf("test error")) - assert.NoError(t, env.ActivatePlugin("foo")) + assert.NoError(t, env.ActivatePlugin("foo", nil)) assert.Equal(t, env.ActivePluginIds(), []string{"foo"}) assert.Len(t, env.Shutdown(), 2) } @@ -332,7 +336,7 @@ func TestEnvironment_ConcurrentHookInvocations(t *testing.T) { } }) - assert.NoError(t, env.ActivatePlugin("foo")) + assert.NoError(t, env.ActivatePlugin("foo", nil)) rec := httptest.NewRecorder() @@ -391,7 +395,7 @@ func TestEnvironment_HooksForPlugins(t *testing.T) { Text: "bar", }, nil) - assert.NoError(t, env.ActivatePlugin("foo")) + assert.NoError(t, env.ActivatePlugin("foo", nil)) assert.Equal(t, env.ActivePluginIds(), []string{"foo"}) resp, appErr, err := env.HooksForPlugin("foo").ExecuteCommand(&model.CommandArgs{ diff --git a/plugin/rpcplugin/rpcplugintest/supervisor.go b/plugin/rpcplugin/rpcplugintest/supervisor.go index 2ae065621..d225f96fc 100644 --- a/plugin/rpcplugin/rpcplugintest/supervisor.go +++ b/plugin/rpcplugin/rpcplugintest/supervisor.go @@ -174,6 +174,14 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) { bundle := model.BundleInfoForPath(dir) supervisor, err := sp(bundle) require.NoError(t, err) + + var supervisorWaitErr error + supervisorWaitDone := make(chan bool, 1) + go func() { + supervisorWaitErr = supervisor.Wait() + close(supervisorWaitDone) + }() + require.NoError(t, supervisor.Start(&api)) failed := false @@ -189,7 +197,21 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) { time.Sleep(time.Millisecond * 100) } assert.True(t, recovered) + + select { + case <-supervisorWaitDone: + require.Fail(t, "supervisor.Wait() unexpectedly returned") + case <-time.After(500 * time.Millisecond): + } + require.NoError(t, supervisor.Stop()) + + select { + case <-supervisorWaitDone: + require.Nil(t, supervisorWaitErr) + case <-time.After(5000 * time.Millisecond): + require.Fail(t, "supervisor.Wait() failed to return") + } } // Crashed plugins should be relaunched at most three times. @@ -239,6 +261,14 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun bundle := model.BundleInfoForPath(dir) supervisor, err := sp(bundle) require.NoError(t, err) + + var supervisorWaitErr error + supervisorWaitDone := make(chan bool, 1) + go func() { + supervisorWaitErr = supervisor.Wait() + close(supervisorWaitDone) + }() + require.NoError(t, supervisor.Start(&api)) for attempt := 1; attempt <= 4; attempt++ { @@ -264,10 +294,19 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun } if attempt < 4 { + require.Nil(t, supervisorWaitErr) require.True(t, recovered, "failed to recover after attempt %d", attempt) } else { require.False(t, recovered, "unexpectedly recovered after attempt %d", attempt) } } + + select { + case <-supervisorWaitDone: + require.NotNil(t, supervisorWaitErr) + case <-time.After(500 * time.Millisecond): + require.Fail(t, "supervisor.Wait() failed to return after plugin crashed") + } + require.NoError(t, supervisor.Stop()) } diff --git a/plugin/rpcplugin/supervisor.go b/plugin/rpcplugin/supervisor.go index 6e26d5682..246747c89 100644 --- a/plugin/rpcplugin/supervisor.go +++ b/plugin/rpcplugin/supervisor.go @@ -32,6 +32,7 @@ type Supervisor struct { cancel context.CancelFunc newProcess func(context.Context) (Process, io.ReadWriteCloser, error) pluginId string + pluginErr error } var _ plugin.Supervisor = (*Supervisor)(nil) @@ -55,6 +56,13 @@ func (s *Supervisor) Start(api plugin.API) error { } } +// Waits for the supervisor to stop (on demand or of its own accord), returning any error that +// triggered the supervisor to stop. +func (s *Supervisor) Wait() error { + <-s.done + return s.pluginErr +} + // Stops the plugin. func (s *Supervisor) Stop() error { s.cancel() @@ -70,7 +78,7 @@ func (s *Supervisor) Hooks() plugin.Hooks { func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API) { defer func() { - s.done <- true + close(s.done) }() done := ctx.Done() for i := 0; i <= MaxProcessRestarts; i++ { @@ -81,10 +89,11 @@ func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API default: start = nil if i < MaxProcessRestarts { - mlog.Debug("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId)) + mlog.Error("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId)) time.Sleep(time.Duration((1 + i*i)) * time.Second) } else { - mlog.Debug("Plugin terminated unexpectedly too many times", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts)) + s.pluginErr = fmt.Errorf("plugin terminated unexpectedly too many times") + mlog.Error("Plugin shutdown", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts), mlog.Err(s.pluginErr)) } } } diff --git a/plugin/supervisor.go b/plugin/supervisor.go index 6cb7445f7..f20df7040 100644 --- a/plugin/supervisor.go +++ b/plugin/supervisor.go @@ -7,6 +7,7 @@ package plugin // type is only relevant to the server, and isn't used by the plugins themselves. type Supervisor interface { Start(API) error + Wait() error Stop() error Hooks() Hooks } -- cgit v1.2.3-1-g7c22