summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse Hallam <jesse.hallam@gmail.com>2018-05-23 14:26:35 -0400
committerGitHub <noreply@github.com>2018-05-23 14:26:35 -0400
commit847c181ec9b73e51daf39efc5c597eff2e7cdb31 (patch)
tree54bcb4a3ac43f9acb22ff037582541c7428bd990
parent5c21bdc1783e5cd17169436e7ccfacdd1b637907 (diff)
downloadchat-847c181ec9b73e51daf39efc5c597eff2e7cdb31.tar.gz
chat-847c181ec9b73e51daf39efc5c597eff2e7cdb31.tar.bz2
chat-847c181ec9b73e51daf39efc5c597eff2e7cdb31.zip
MM-8622: Improved plugin error reporting (#8737)
* allow `Wait()`ing on the supervisor In the event the plugin supervisor shuts down a plugin for crashing too many times, the new `Wait()` interface allows the `ActivatePlugin` to accept a callback function to trigger when `supervisor.Wait()` returns. If the supervisor shuts down normally, this callback is invoked with a nil error, otherwise any error reported by the supervisor is passed along. * improve plugin activation/deactivation logic Avoid triggering activation of previously failed-to-start plugins just becase something in the configuration changed. Now, intelligently compare the global enable bit as well as the each individual plugin's enabled bit. * expose store to manipulate PluginStatuses * expose API to fetch plugin statuses * keep track of whether or not plugin sandboxing is supported * transition plugin statuses * restore error on plugin activation if already active * don't initialize test plugins until successfully loaded * emit websocket events when plugin statuses change * skip pruning if already initialized * MM-8622: maintain plugin statuses in memory Switch away from persisting plugin statuses to the database, and maintain in memory instead. This will be followed by a cluster interface to query the in-memory status of plugin statuses from all cluster nodes. At the same time, rename `cluster_discovery_id` on the `PluginStatus` model object to `cluster_id`. * MM-8622: aggregate plugin statuses across cluster * fetch cluster plugin statuses when emitting websocket notification * address unit test fixes after rebasing * relax (poor) racey unit test re: supervisor.Wait() * make store-mocks
-rw-r--r--api4/plugin.go23
-rw-r--r--app/app.go6
-rw-r--r--app/apptestlib.go30
-rw-r--r--app/cluster_discovery.go8
-rw-r--r--app/plugin.go240
-rw-r--r--app/plugin_test.go59
-rw-r--r--einterfaces/cluster.go1
-rw-r--r--i18n/en.json16
-rw-r--r--model/client4.go12
-rw-r--r--model/cluster_discovery.go2
-rw-r--r--model/plugin_status.go44
-rw-r--r--model/websocket_message.go77
-rw-r--r--plugin/pluginenv/environment.go12
-rw-r--r--plugin/pluginenv/environment_test.go24
-rw-r--r--plugin/rpcplugin/rpcplugintest/supervisor.go39
-rw-r--r--plugin/rpcplugin/supervisor.go15
-rw-r--r--plugin/supervisor.go1
17 files changed, 516 insertions, 93 deletions
diff --git a/api4/plugin.go b/api4/plugin.go
index 37fbf12cd..ab026ab5f 100644
--- a/api4/plugin.go
+++ b/api4/plugin.go
@@ -23,6 +23,7 @@ func (api *API) InitPlugin() {
api.BaseRoutes.Plugins.Handle("", api.ApiSessionRequired(getPlugins)).Methods("GET")
api.BaseRoutes.Plugin.Handle("", api.ApiSessionRequired(removePlugin)).Methods("DELETE")
+ api.BaseRoutes.Plugins.Handle("/statuses", api.ApiSessionRequired(getPluginStatuses)).Methods("GET")
api.BaseRoutes.Plugin.Handle("/activate", api.ApiSessionRequired(activatePlugin)).Methods("POST")
api.BaseRoutes.Plugin.Handle("/deactivate", api.ApiSessionRequired(deactivatePlugin)).Methods("POST")
@@ -97,6 +98,26 @@ func getPlugins(c *Context, w http.ResponseWriter, r *http.Request) {
w.Write([]byte(response.ToJson()))
}
+func getPluginStatuses(c *Context, w http.ResponseWriter, r *http.Request) {
+ if !*c.App.Config().PluginSettings.Enable {
+ c.Err = model.NewAppError("getPluginStatuses", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
+ return
+ }
+
+ if !c.App.SessionHasPermissionTo(c.Session, model.PERMISSION_MANAGE_SYSTEM) {
+ c.SetPermissionError(model.PERMISSION_MANAGE_SYSTEM)
+ return
+ }
+
+ response, err := c.App.GetClusterPluginStatuses()
+ if err != nil {
+ c.Err = err
+ return
+ }
+
+ w.Write([]byte(response.ToJson()))
+}
+
func removePlugin(c *Context, w http.ResponseWriter, r *http.Request) {
c.RequirePluginId()
if c.Err != nil {
@@ -104,7 +125,7 @@ func removePlugin(c *Context, w http.ResponseWriter, r *http.Request) {
}
if !*c.App.Config().PluginSettings.Enable {
- c.Err = model.NewAppError("getPlugins", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
+ c.Err = model.NewAppError("removePlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
return
}
diff --git a/app/app.go b/app/app.go
index 2cdf333c1..6de75855c 100644
--- a/app/app.go
+++ b/app/app.go
@@ -38,8 +38,10 @@ type App struct {
Log *mlog.Logger
- PluginEnv *pluginenv.Environment
- PluginConfigListenerId string
+ PluginEnv *pluginenv.Environment
+ PluginConfigListenerId string
+ IsPluginSandboxSupported bool
+ pluginStatuses map[string]*model.PluginStatus
EmailBatching *EmailBatchingJob
diff --git a/app/apptestlib.go b/app/apptestlib.go
index b245ddabf..7fc78c9c9 100644
--- a/app/apptestlib.go
+++ b/app/apptestlib.go
@@ -336,6 +336,10 @@ func (s *mockPluginSupervisor) Start(api plugin.API) error {
return s.hooks.OnActivate(api)
}
+func (s *mockPluginSupervisor) Wait() error {
+ return nil
+}
+
func (s *mockPluginSupervisor) Stop() error {
return nil
}
@@ -353,17 +357,6 @@ func (me *TestHelper) InstallPlugin(manifest *model.Manifest, hooks plugin.Hooks
me.tempWorkspace = dir
}
- pluginDir := filepath.Join(me.tempWorkspace, "plugins")
- webappDir := filepath.Join(me.tempWorkspace, "webapp")
- me.App.InitPlugins(pluginDir, webappDir, func(bundle *model.BundleInfo) (plugin.Supervisor, error) {
- if hooks, ok := me.pluginHooks[bundle.Manifest.Id]; ok {
- return &mockPluginSupervisor{hooks}, nil
- }
- return pluginenv.DefaultSupervisorProvider(bundle)
- })
-
- me.pluginHooks[manifest.Id] = hooks
-
manifestCopy := *manifest
if manifestCopy.Backend == nil {
manifestCopy.Backend = &model.ManifestBackend{}
@@ -373,6 +366,9 @@ func (me *TestHelper) InstallPlugin(manifest *model.Manifest, hooks plugin.Hooks
panic(err)
}
+ pluginDir := filepath.Join(me.tempWorkspace, "plugins")
+ webappDir := filepath.Join(me.tempWorkspace, "webapp")
+
if err := os.MkdirAll(filepath.Join(pluginDir, manifest.Id), 0700); err != nil {
panic(err)
}
@@ -380,6 +376,15 @@ func (me *TestHelper) InstallPlugin(manifest *model.Manifest, hooks plugin.Hooks
if err := ioutil.WriteFile(filepath.Join(pluginDir, manifest.Id, "plugin.json"), manifestBytes, 0600); err != nil {
panic(err)
}
+
+ me.App.InitPlugins(pluginDir, webappDir, func(bundle *model.BundleInfo) (plugin.Supervisor, error) {
+ if hooks, ok := me.pluginHooks[bundle.Manifest.Id]; ok {
+ return &mockPluginSupervisor{hooks}, nil
+ }
+ return pluginenv.DefaultSupervisorProvider(bundle)
+ })
+
+ me.pluginHooks[manifest.Id] = hooks
}
func (me *TestHelper) ResetRoleMigration() {
@@ -415,6 +420,9 @@ func (me *FakeClusterInterface) GetClusterStats() ([]*model.ClusterStats, *model
func (me *FakeClusterInterface) GetLogs(page, perPage int) ([]string, *model.AppError) {
return []string{}, nil
}
+func (me *FakeClusterInterface) GetPluginStatuses() (model.PluginStatuses, *model.AppError) {
+ return nil, nil
+}
func (me *FakeClusterInterface) ConfigChanged(previousConfig *model.Config, newConfig *model.Config, sendToOtherServer bool) *model.AppError {
return nil
}
diff --git a/app/cluster_discovery.go b/app/cluster_discovery.go
index f7443680c..250744279 100644
--- a/app/cluster_discovery.go
+++ b/app/cluster_discovery.go
@@ -85,3 +85,11 @@ func (a *App) IsLeader() bool {
return true
}
}
+
+func (a *App) GetClusterId() string {
+ if a.Cluster == nil {
+ return ""
+ }
+
+ return a.Cluster.GetClusterId()
+}
diff --git a/app/plugin.go b/app/plugin.go
index 0d3415f4c..f6cb6bdda 100644
--- a/app/plugin.go
+++ b/app/plugin.go
@@ -37,6 +37,31 @@ var prepackagedPlugins map[string]func(string) ([]byte, error) = map[string]func
"zoom": zoom.Asset,
}
+func (a *App) notifyPluginStatusesChanged() error {
+ pluginStatuses, err := a.GetClusterPluginStatuses()
+ if err != nil {
+ return err
+ }
+
+ // Notify any system admins.
+ message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_STATUSES_CHANGED, "", "", "", nil)
+ message.Add("plugin_statuses", pluginStatuses)
+ message.Broadcast.ContainsSensitiveData = true
+ a.Publish(message)
+
+ return nil
+}
+
+func (a *App) setPluginStatusState(id string, state int) error {
+ if _, ok := a.pluginStatuses[id]; !ok {
+ return nil
+ }
+
+ a.pluginStatuses[id].State = state
+
+ return a.notifyPluginStatusesChanged()
+}
+
func (a *App) initBuiltInPlugins() {
plugins := map[string]builtinplugin.Plugin{
"ldapextras": &ldapextras.Plugin{},
@@ -77,30 +102,100 @@ func (a *App) setPluginsActive(activate bool) {
continue
}
- id := plugin.Manifest.Id
+ enabled := false
+ if state, ok := a.Config().PluginSettings.PluginStates[plugin.Manifest.Id]; ok {
+ enabled = state.Enable
+ }
+
+ a.pluginStatuses[plugin.Manifest.Id] = &model.PluginStatus{
+ ClusterId: a.GetClusterId(),
+ PluginId: plugin.Manifest.Id,
+ PluginPath: filepath.Dir(plugin.ManifestPath),
+ IsSandboxed: a.IsPluginSandboxSupported,
+ Name: plugin.Manifest.Name,
+ Description: plugin.Manifest.Description,
+ Version: plugin.Manifest.Version,
+ }
+
+ if activate && enabled {
+ a.setPluginActive(plugin, activate)
+ } else if !activate {
+ a.setPluginActive(plugin, activate)
+ }
+ }
+
+ if err := a.notifyPluginStatusesChanged(); err != nil {
+ mlog.Error("failed to notify plugin status changed", mlog.Err(err))
+ }
+}
+
+func (a *App) setPluginActiveById(id string, activate bool) {
+ plugins, err := a.PluginEnv.Plugins()
+ if err != nil {
+ mlog.Error(fmt.Sprintf("Cannot setPluginActiveById(%t)", activate), mlog.String("plugin_id", id), mlog.Err(err))
+ return
+ }
- pluginState := &model.PluginState{Enable: false}
- if state, ok := a.Config().PluginSettings.PluginStates[id]; ok {
- pluginState = state
+ for _, plugin := range plugins {
+ if plugin.Manifest != nil && plugin.Manifest.Id == id {
+ a.setPluginActive(plugin, activate)
}
+ }
+}
+
+func (a *App) setPluginActive(plugin *model.BundleInfo, activate bool) {
+ if plugin.Manifest == nil {
+ return
+ }
- active := a.PluginEnv.IsPluginActive(id)
+ id := plugin.Manifest.Id
- if activate && pluginState.Enable && !active {
+ active := a.PluginEnv.IsPluginActive(id)
+
+ if activate {
+ if !active {
if err := a.activatePlugin(plugin.Manifest); err != nil {
mlog.Error("Plugin failed to activate", mlog.String("plugin_id", plugin.Manifest.Id), mlog.String("err", err.DetailedError))
}
+ }
- } else if (!activate || !pluginState.Enable) && active {
+ } else if !activate {
+ if active {
if err := a.deactivatePlugin(plugin.Manifest); err != nil {
mlog.Error("Plugin failed to deactivate", mlog.String("plugin_id", plugin.Manifest.Id), mlog.String("err", err.DetailedError))
}
+ } else {
+ if err := a.setPluginStatusState(plugin.Manifest.Id, model.PluginStateNotRunning); err != nil {
+ mlog.Error("Plugin status state failed to update", mlog.String("plugin_id", plugin.Manifest.Id), mlog.String("err", err.Error()))
+ }
}
}
}
func (a *App) activatePlugin(manifest *model.Manifest) *model.AppError {
- if err := a.PluginEnv.ActivatePlugin(manifest.Id); err != nil {
+ mlog.Debug("Activating plugin", mlog.String("plugin_id", manifest.Id))
+
+ if err := a.setPluginStatusState(manifest.Id, model.PluginStateStarting); err != nil {
+ return model.NewAppError("activatePlugin", "app.plugin.set_plugin_status_state.app_error", nil, err.Error(), http.StatusInternalServerError)
+ }
+
+ onError := func(err error) {
+ mlog.Debug("Plugin failed to stay running", mlog.String("plugin_id", manifest.Id), mlog.Err(err))
+
+ if err := a.setPluginStatusState(manifest.Id, model.PluginStateFailedToStayRunning); err != nil {
+ mlog.Error("Failed to record plugin status", mlog.String("plugin_id", manifest.Id), mlog.Err(err))
+ }
+ }
+
+ if err := a.PluginEnv.ActivatePlugin(manifest.Id, onError); err != nil {
+ if err := a.setPluginStatusState(manifest.Id, model.PluginStateFailedToStart); err != nil {
+ return model.NewAppError("activatePlugin", "app.plugin.activate.app_error", nil, err.Error(), http.StatusInternalServerError)
+ }
+
+ return model.NewAppError("activatePlugin", "app.plugin.activate.app_error", nil, err.Error(), http.StatusBadRequest)
+ }
+
+ if err := a.setPluginStatusState(manifest.Id, model.PluginStateRunning); err != nil {
return model.NewAppError("activatePlugin", "app.plugin.activate.app_error", nil, err.Error(), http.StatusBadRequest)
}
@@ -115,6 +210,12 @@ func (a *App) activatePlugin(manifest *model.Manifest) *model.AppError {
}
func (a *App) deactivatePlugin(manifest *model.Manifest) *model.AppError {
+ mlog.Debug("Deactivating plugin", mlog.String("plugin_id", manifest.Id))
+
+ if err := a.setPluginStatusState(manifest.Id, model.PluginStateStopping); err != nil {
+ return model.NewAppError("EnablePlugin", "app.plugin.deactivate.app_error", nil, err.Error(), http.StatusInternalServerError)
+ }
+
if err := a.PluginEnv.DeactivatePlugin(manifest.Id); err != nil {
return model.NewAppError("deactivatePlugin", "app.plugin.deactivate.app_error", nil, err.Error(), http.StatusBadRequest)
}
@@ -127,6 +228,10 @@ func (a *App) deactivatePlugin(manifest *model.Manifest) *model.AppError {
a.Publish(message)
}
+ if err := a.setPluginStatusState(manifest.Id, model.PluginStateNotRunning); err != nil {
+ return model.NewAppError("deactivatePlugin", "app.plugin.deactivate.app_error", nil, err.Error(), http.StatusBadRequest)
+ }
+
mlog.Info("Deactivated plugin", mlog.String("plugin_id", manifest.Id))
return nil
}
@@ -166,7 +271,8 @@ func (a *App) installPlugin(pluginFile io.Reader, allowPrepackaged bool) (*model
return nil, model.NewAppError("installPlugin", "app.plugin.manifest.app_error", nil, err.Error(), http.StatusBadRequest)
}
- if _, ok := prepackagedPlugins[manifest.Id]; ok && !allowPrepackaged {
+ _, isPrepackaged := prepackagedPlugins[manifest.Id]
+ if isPrepackaged && !allowPrepackaged {
return nil, model.NewAppError("installPlugin", "app.plugin.prepackaged.app_error", nil, "", http.StatusBadRequest)
}
@@ -185,16 +291,33 @@ func (a *App) installPlugin(pluginFile io.Reader, allowPrepackaged bool) (*model
}
}
- err = utils.CopyDir(tmpPluginDir, filepath.Join(a.PluginEnv.SearchPath(), manifest.Id))
+ pluginPath := filepath.Join(a.PluginEnv.SearchPath(), manifest.Id)
+ err = utils.CopyDir(tmpPluginDir, pluginPath)
if err != nil {
return nil, model.NewAppError("installPlugin", "app.plugin.mvdir.app_error", nil, err.Error(), http.StatusInternalServerError)
}
- // Should add manifest validation and error handling here
+ a.pluginStatuses[manifest.Id] = &model.PluginStatus{
+ ClusterId: a.GetClusterId(),
+ PluginId: manifest.Id,
+ PluginPath: pluginPath,
+ State: model.PluginStateNotRunning,
+ IsSandboxed: a.IsPluginSandboxSupported,
+ IsPrepackaged: isPrepackaged,
+ Name: manifest.Name,
+ Description: manifest.Description,
+ Version: manifest.Version,
+ }
+
+ if err := a.notifyPluginStatusesChanged(); err != nil {
+ mlog.Error("failed to notify plugin status changed", mlog.Err(err))
+ }
return manifest, nil
}
+// GetPlugins returned the plugins installed on this server, including the manifests needed to
+// enable plugins with web functionality.
func (a *App) GetPlugins() (*model.PluginsResponse, *model.AppError) {
if a.PluginEnv == nil || !*a.Config().PluginSettings.Enable {
return nil, model.NewAppError("GetPlugins", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
@@ -240,6 +363,39 @@ func (a *App) GetActivePluginManifests() ([]*model.Manifest, *model.AppError) {
return manifests, nil
}
+// GetPluginStatuses returns the status for plugins installed on this server.
+func (a *App) GetPluginStatuses() (model.PluginStatuses, *model.AppError) {
+ if !*a.Config().PluginSettings.Enable {
+ return nil, model.NewAppError("GetPluginStatuses", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
+ }
+
+ pluginStatuses := make([]*model.PluginStatus, 0, len(a.pluginStatuses))
+ for _, pluginStatus := range a.pluginStatuses {
+ pluginStatuses = append(pluginStatuses, pluginStatus)
+ }
+
+ return pluginStatuses, nil
+}
+
+// GetClusterPluginStatuses returns the status for plugins installed anywhere in the cluster.
+func (a *App) GetClusterPluginStatuses() (model.PluginStatuses, *model.AppError) {
+ pluginStatuses, err := a.GetPluginStatuses()
+ if err != nil {
+ return nil, err
+ }
+
+ if a.Cluster != nil && *a.Config().ClusterSettings.Enable {
+ clusterPluginStatuses, err := a.Cluster.GetPluginStatuses()
+ if err != nil {
+ return nil, model.NewAppError("GetClusterPluginStatuses", "app.plugin.get_cluster_plugin_statuses.app_error", nil, err.Error(), http.StatusInternalServerError)
+ }
+
+ pluginStatuses = append(pluginStatuses, clusterPluginStatuses...)
+ }
+
+ return pluginStatuses, nil
+}
+
func (a *App) RemovePlugin(id string) *model.AppError {
return a.removePlugin(id, false)
}
@@ -284,10 +440,16 @@ func (a *App) removePlugin(id string, allowPrepackaged bool) *model.AppError {
return model.NewAppError("removePlugin", "app.plugin.remove.app_error", nil, err.Error(), http.StatusInternalServerError)
}
+ delete(a.pluginStatuses, manifest.Id)
+ if err := a.notifyPluginStatusesChanged(); err != nil {
+ mlog.Error("failed to notify plugin status changed", mlog.Err(err))
+ }
+
return nil
}
-// EnablePlugin will set the config for an installed plugin to enabled, triggering activation if inactive.
+// EnablePlugin will set the config for an installed plugin to enabled, triggering asynchronous
+// activation if inactive anywhere in the cluster.
func (a *App) EnablePlugin(id string) *model.AppError {
if a.PluginEnv == nil || !*a.Config().PluginSettings.Enable {
return model.NewAppError("EnablePlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
@@ -310,8 +472,8 @@ func (a *App) EnablePlugin(id string) *model.AppError {
return model.NewAppError("EnablePlugin", "app.plugin.not_installed.app_error", nil, "", http.StatusBadRequest)
}
- if err := a.activatePlugin(manifest); err != nil {
- return err
+ if err := a.setPluginStatusState(manifest.Id, model.PluginStateStarting); err != nil {
+ return model.NewAppError("EnablePlugin", "app.plugin.set_plugin_status_state.app_error", nil, err.Error(), http.StatusInternalServerError)
}
a.UpdateConfig(func(cfg *model.Config) {
@@ -351,6 +513,10 @@ func (a *App) DisablePlugin(id string) *model.AppError {
return model.NewAppError("DisablePlugin", "app.plugin.not_installed.app_error", nil, "", http.StatusBadRequest)
}
+ if err := a.setPluginStatusState(manifest.Id, model.PluginStateStopping); err != nil {
+ return model.NewAppError("EnablePlugin", "app.plugin.set_plugin_status_state.app_error", nil, err.Error(), http.StatusInternalServerError)
+ }
+
a.UpdateConfig(func(cfg *model.Config) {
cfg.PluginSettings.PluginStates[id] = &model.PluginState{Enable: false}
})
@@ -363,16 +529,18 @@ func (a *App) DisablePlugin(id string) *model.AppError {
}
func (a *App) InitPlugins(pluginPath, webappPath string, supervisorOverride pluginenv.SupervisorProviderFunc) {
- if !*a.Config().PluginSettings.Enable {
+ if a.PluginEnv != nil {
return
}
- if a.PluginEnv != nil {
+ if !*a.Config().PluginSettings.Enable {
return
}
mlog.Info("Starting up plugins")
+ a.pluginStatuses = make(map[string]*model.PluginStatus)
+
if err := os.Mkdir(pluginPath, 0744); err != nil && !os.IsExist(err) {
mlog.Error("Failed to start up plugins", mlog.Err(err))
return
@@ -398,13 +566,19 @@ func (a *App) InitPlugins(pluginPath, webappPath string, supervisorOverride plug
}),
}
- if supervisorOverride != nil {
- options = append(options, pluginenv.SupervisorProvider(supervisorOverride))
- } else if err := sandbox.CheckSupport(); err != nil {
+ if err := sandbox.CheckSupport(); err != nil {
+ a.IsPluginSandboxSupported = false
mlog.Warn("plugin sandboxing is not supported. plugins will run with the same access level as the server. See documentation to learn more: https://developers.mattermost.com/extend/plugins/security/", mlog.Err(err))
- options = append(options, pluginenv.SupervisorProvider(rpcplugin.SupervisorProvider))
} else {
+ a.IsPluginSandboxSupported = true
+ }
+
+ if supervisorOverride != nil {
+ options = append(options, pluginenv.SupervisorProvider(supervisorOverride))
+ } else if a.IsPluginSandboxSupported {
options = append(options, pluginenv.SupervisorProvider(sandbox.SupervisorProvider))
+ } else {
+ options = append(options, pluginenv.SupervisorProvider(rpcplugin.SupervisorProvider))
}
if env, err := pluginenv.New(options...); err != nil {
@@ -431,12 +605,34 @@ func (a *App) InitPlugins(pluginPath, webappPath string, supervisorOverride plug
}
a.RemoveConfigListener(a.PluginConfigListenerId)
- a.PluginConfigListenerId = a.AddConfigListener(func(_, cfg *model.Config) {
+ a.PluginConfigListenerId = a.AddConfigListener(func(oldCfg *model.Config, cfg *model.Config) {
if a.PluginEnv == nil {
return
}
- a.setPluginsActive(*cfg.PluginSettings.Enable)
+ if *oldCfg.PluginSettings.Enable != *cfg.PluginSettings.Enable {
+ a.setPluginsActive(*cfg.PluginSettings.Enable)
+ } else {
+ plugins := map[string]bool{}
+ for id := range oldCfg.PluginSettings.PluginStates {
+ plugins[id] = true
+ }
+ for id := range cfg.PluginSettings.PluginStates {
+ plugins[id] = true
+ }
+
+ for id := range plugins {
+ oldPluginState := oldCfg.PluginSettings.PluginStates[id]
+ pluginState := cfg.PluginSettings.PluginStates[id]
+
+ wasEnabled := oldPluginState != nil && oldPluginState.Enable
+ isEnabled := pluginState != nil && pluginState.Enable
+
+ if wasEnabled != isEnabled {
+ a.setPluginActiveById(id, isEnabled)
+ }
+ }
+ }
for _, err := range a.PluginEnv.Hooks().OnConfigurationChange() {
mlog.Error(err.Error())
diff --git a/app/plugin_test.go b/app/plugin_test.go
index 9ad5dc1fa..db5954d4d 100644
--- a/app/plugin_test.go
+++ b/app/plugin_test.go
@@ -7,8 +7,8 @@ import (
"errors"
"net/http"
"net/http/httptest"
- "strings"
"testing"
+ "time"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
@@ -158,6 +158,20 @@ func TestPluginCommands(t *testing.T) {
require.Nil(t, th.App.EnablePlugin("foo"))
+ // Ideally, we would wait for the websocket activation event instead of just sleeping.
+ time.Sleep(500 * time.Millisecond)
+
+ pluginStatuses, err := th.App.GetPluginStatuses()
+ require.Nil(t, err)
+ found := false
+ for _, pluginStatus := range pluginStatuses {
+ if pluginStatus.PluginId == "foo" {
+ require.Equal(t, model.PluginStateRunning, pluginStatus.State)
+ found = true
+ }
+ }
+ require.True(t, found, "failed to find plugin foo in plugin statuses")
+
resp, err := th.App.ExecuteCommand(&model.CommandArgs{
Command: "/foo2",
TeamId: th.BasicTeam.Id,
@@ -216,7 +230,46 @@ func TestPluginBadActivation(t *testing.T) {
t.Run("EnablePlugin bad activation", func(t *testing.T) {
err := th.App.EnablePlugin("foo")
- assert.NotNil(t, err)
- assert.True(t, strings.Contains(err.DetailedError, "won't activate for some reason"))
+ assert.Nil(t, err)
+
+ // Ideally, we would wait for the websocket activation event instead of just
+ // sleeping.
+ time.Sleep(500 * time.Millisecond)
+
+ pluginStatuses, err := th.App.GetPluginStatuses()
+ require.Nil(t, err)
+ found := false
+ for _, pluginStatus := range pluginStatuses {
+ if pluginStatus.PluginId == "foo" {
+ require.Equal(t, model.PluginStateFailedToStart, pluginStatus.State)
+ found = true
+ }
+ }
+ require.True(t, found, "failed to find plugin foo in plugin statuses")
+ })
+}
+
+func TestGetPluginStatusesDisabled(t *testing.T) {
+ th := Setup().InitBasic()
+ defer th.TearDown()
+
+ th.App.UpdateConfig(func(cfg *model.Config) {
+ *cfg.PluginSettings.Enable = false
})
+
+ _, err := th.App.GetPluginStatuses()
+ require.EqualError(t, err, "GetPluginStatuses: Plugins have been disabled. Please check your logs for details., ")
+}
+
+func TestGetPluginStatuses(t *testing.T) {
+ th := Setup().InitBasic()
+ defer th.TearDown()
+
+ th.App.UpdateConfig(func(cfg *model.Config) {
+ *cfg.PluginSettings.Enable = true
+ })
+
+ pluginStatuses, err := th.App.GetPluginStatuses()
+ require.Nil(t, err)
+ require.NotNil(t, pluginStatuses)
}
diff --git a/einterfaces/cluster.go b/einterfaces/cluster.go
index b5ef4772a..dd9c57f11 100644
--- a/einterfaces/cluster.go
+++ b/einterfaces/cluster.go
@@ -21,5 +21,6 @@ type ClusterInterface interface {
NotifyMsg(buf []byte)
GetClusterStats() ([]*model.ClusterStats, *model.AppError)
GetLogs(page, perPage int) ([]string, *model.AppError)
+ GetPluginStatuses() (model.PluginStatuses, *model.AppError)
ConfigChanged(previousConfig *model.Config, newConfig *model.Config, sendToOtherServer bool) *model.AppError
}
diff --git a/i18n/en.json b/i18n/en.json
index 24e49278c..9f008a64b 100644
--- a/i18n/en.json
+++ b/i18n/en.json
@@ -3855,6 +3855,10 @@
"translation": "Unable to deactivate plugin"
},
{
+ "id": "app.plugin.delete_plugin_status_state.app_error",
+ "translation": "Unable to delete plugin status state."
+ },
+ {
"id": "app.plugin.disabled.app_error",
"translation": "Plugins have been disabled. Please check your logs for details."
},
@@ -3899,10 +3903,18 @@
"translation": "Plugin is not installed"
},
{
+ "id": "app.plugin.prepackaged.app_error",
+ "translation": "Cannot install prepackaged plugin"
+ },
+ {
"id": "app.plugin.remove.app_error",
"translation": "Unable to delete plugin"
},
{
+ "id": "app.plugin.set_plugin_status_state.app_error",
+ "translation": "Unable to set plugin status state."
+ },
+ {
"id": "app.plugin.upload_disabled.app_error",
"translation": "Plugins and/or plugin uploads have been disabled."
},
@@ -4799,6 +4811,10 @@
"translation": "Unable to build multipart request"
},
{
+ "id": "model.cluster.is_valid.id.app_error",
+ "translation": "Invalid Id"
+ },
+ {
"id": "model.command.is_valid.create_at.app_error",
"translation": "Create at must be a valid time"
},
diff --git a/model/client4.go b/model/client4.go
index d245fe6c0..97dd30790 100644
--- a/model/client4.go
+++ b/model/client4.go
@@ -3534,6 +3534,18 @@ func (c *Client4) GetPlugins() (*PluginsResponse, *Response) {
}
}
+// GetPluginStatuses will return the plugins installed on any server in the cluster, for reporting
+// to the administrator via the system console.
+// WARNING: PLUGINS ARE STILL EXPERIMENTAL. THIS FUNCTION IS SUBJECT TO CHANGE.
+func (c *Client4) GetPluginStatuses() (PluginStatuses, *Response) {
+ if r, err := c.DoApiGet(c.GetPluginsRoute(), "/statuses"); err != nil {
+ return nil, BuildErrorResponse(r, err)
+ } else {
+ defer closeBody(r)
+ return PluginStatusesFromJson(r.Body), BuildResponse(r)
+ }
+}
+
// RemovePlugin will deactivate and delete a plugin.
// WARNING: PLUGINS ARE STILL EXPERIMENTAL. THIS FUNCTION IS SUBJECT TO CHANGE.
func (c *Client4) RemovePlugin(id string) (bool, *Response) {
diff --git a/model/cluster_discovery.go b/model/cluster_discovery.go
index 89e5fc95e..5d5b0465d 100644
--- a/model/cluster_discovery.go
+++ b/model/cluster_discovery.go
@@ -86,7 +86,7 @@ func FilterClusterDiscovery(vs []*ClusterDiscovery, f func(*ClusterDiscovery) bo
func (o *ClusterDiscovery) IsValid() *AppError {
if len(o.Id) != 26 {
- return NewAppError("Channel.IsValid", "model.channel.is_valid.id.app_error", nil, "", http.StatusBadRequest)
+ return NewAppError("ClusterDiscovery.IsValid", "model.cluster.is_valid.id.app_error", nil, "", http.StatusBadRequest)
}
if len(o.ClusterName) == 0 {
diff --git a/model/plugin_status.go b/model/plugin_status.go
new file mode 100644
index 000000000..1ae64ff89
--- /dev/null
+++ b/model/plugin_status.go
@@ -0,0 +1,44 @@
+// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
+// See LICENSE.txt for license information.
+
+package model
+
+import (
+ "encoding/json"
+ "io"
+)
+
+const (
+ PluginStateNotRunning = 0
+ PluginStateStarting = 1
+ PluginStateRunning = 2
+ PluginStateFailedToStart = 3
+ PluginStateFailedToStayRunning = 4
+ PluginStateStopping = 5
+)
+
+// PluginStatus provides a cluster-aware view of installed plugins.
+type PluginStatus struct {
+ PluginId string `json:"plugin_id"`
+ ClusterId string `json:"cluster_id"`
+ PluginPath string `json:"plugin_path"`
+ State int `json:"state"`
+ IsSandboxed bool `json:"is_sandboxed"`
+ IsPrepackaged bool `json:"is_prepackaged"`
+ Name string `json:"name"`
+ Description string `json:"description"`
+ Version string `json:"version"`
+}
+
+type PluginStatuses []*PluginStatus
+
+func (m *PluginStatuses) ToJson() string {
+ b, _ := json.Marshal(m)
+ return string(b)
+}
+
+func PluginStatusesFromJson(data io.Reader) PluginStatuses {
+ var m PluginStatuses
+ json.NewDecoder(data).Decode(&m)
+ return m
+}
diff --git a/model/websocket_message.go b/model/websocket_message.go
index 08c238480..071975d6c 100644
--- a/model/websocket_message.go
+++ b/model/websocket_message.go
@@ -10,44 +10,45 @@ import (
)
const (
- WEBSOCKET_EVENT_TYPING = "typing"
- WEBSOCKET_EVENT_POSTED = "posted"
- WEBSOCKET_EVENT_POST_EDITED = "post_edited"
- WEBSOCKET_EVENT_POST_DELETED = "post_deleted"
- WEBSOCKET_EVENT_CHANNEL_DELETED = "channel_deleted"
- WEBSOCKET_EVENT_CHANNEL_CREATED = "channel_created"
- WEBSOCKET_EVENT_CHANNEL_UPDATED = "channel_updated"
- WEBSOCKET_EVENT_CHANNEL_MEMBER_UPDATED = "channel_member_updated"
- WEBSOCKET_EVENT_DIRECT_ADDED = "direct_added"
- WEBSOCKET_EVENT_GROUP_ADDED = "group_added"
- WEBSOCKET_EVENT_NEW_USER = "new_user"
- WEBSOCKET_EVENT_ADDED_TO_TEAM = "added_to_team"
- WEBSOCKET_EVENT_LEAVE_TEAM = "leave_team"
- WEBSOCKET_EVENT_UPDATE_TEAM = "update_team"
- WEBSOCKET_EVENT_DELETE_TEAM = "delete_team"
- WEBSOCKET_EVENT_USER_ADDED = "user_added"
- WEBSOCKET_EVENT_USER_UPDATED = "user_updated"
- WEBSOCKET_EVENT_USER_ROLE_UPDATED = "user_role_updated"
- WEBSOCKET_EVENT_MEMBERROLE_UPDATED = "memberrole_updated"
- WEBSOCKET_EVENT_USER_REMOVED = "user_removed"
- WEBSOCKET_EVENT_PREFERENCE_CHANGED = "preference_changed"
- WEBSOCKET_EVENT_PREFERENCES_CHANGED = "preferences_changed"
- WEBSOCKET_EVENT_PREFERENCES_DELETED = "preferences_deleted"
- WEBSOCKET_EVENT_EPHEMERAL_MESSAGE = "ephemeral_message"
- WEBSOCKET_EVENT_STATUS_CHANGE = "status_change"
- WEBSOCKET_EVENT_HELLO = "hello"
- WEBSOCKET_EVENT_WEBRTC = "webrtc"
- WEBSOCKET_AUTHENTICATION_CHALLENGE = "authentication_challenge"
- WEBSOCKET_EVENT_REACTION_ADDED = "reaction_added"
- WEBSOCKET_EVENT_REACTION_REMOVED = "reaction_removed"
- WEBSOCKET_EVENT_RESPONSE = "response"
- WEBSOCKET_EVENT_EMOJI_ADDED = "emoji_added"
- WEBSOCKET_EVENT_CHANNEL_VIEWED = "channel_viewed"
- WEBSOCKET_EVENT_PLUGIN_ACTIVATED = "plugin_activated" // EXPERIMENTAL - SUBJECT TO CHANGE
- WEBSOCKET_EVENT_PLUGIN_DEACTIVATED = "plugin_deactivated" // EXPERIMENTAL - SUBJECT TO CHANGE
- WEBSOCKET_EVENT_ROLE_UPDATED = "role_updated"
- WEBSOCKET_EVENT_LICENSE_CHANGED = "license_changed"
- WEBSOCKET_EVENT_CONFIG_CHANGED = "config_changed"
+ WEBSOCKET_EVENT_TYPING = "typing"
+ WEBSOCKET_EVENT_POSTED = "posted"
+ WEBSOCKET_EVENT_POST_EDITED = "post_edited"
+ WEBSOCKET_EVENT_POST_DELETED = "post_deleted"
+ WEBSOCKET_EVENT_CHANNEL_DELETED = "channel_deleted"
+ WEBSOCKET_EVENT_CHANNEL_CREATED = "channel_created"
+ WEBSOCKET_EVENT_CHANNEL_UPDATED = "channel_updated"
+ WEBSOCKET_EVENT_CHANNEL_MEMBER_UPDATED = "channel_member_updated"
+ WEBSOCKET_EVENT_DIRECT_ADDED = "direct_added"
+ WEBSOCKET_EVENT_GROUP_ADDED = "group_added"
+ WEBSOCKET_EVENT_NEW_USER = "new_user"
+ WEBSOCKET_EVENT_ADDED_TO_TEAM = "added_to_team"
+ WEBSOCKET_EVENT_LEAVE_TEAM = "leave_team"
+ WEBSOCKET_EVENT_UPDATE_TEAM = "update_team"
+ WEBSOCKET_EVENT_DELETE_TEAM = "delete_team"
+ WEBSOCKET_EVENT_USER_ADDED = "user_added"
+ WEBSOCKET_EVENT_USER_UPDATED = "user_updated"
+ WEBSOCKET_EVENT_USER_ROLE_UPDATED = "user_role_updated"
+ WEBSOCKET_EVENT_MEMBERROLE_UPDATED = "memberrole_updated"
+ WEBSOCKET_EVENT_USER_REMOVED = "user_removed"
+ WEBSOCKET_EVENT_PREFERENCE_CHANGED = "preference_changed"
+ WEBSOCKET_EVENT_PREFERENCES_CHANGED = "preferences_changed"
+ WEBSOCKET_EVENT_PREFERENCES_DELETED = "preferences_deleted"
+ WEBSOCKET_EVENT_EPHEMERAL_MESSAGE = "ephemeral_message"
+ WEBSOCKET_EVENT_STATUS_CHANGE = "status_change"
+ WEBSOCKET_EVENT_HELLO = "hello"
+ WEBSOCKET_EVENT_WEBRTC = "webrtc"
+ WEBSOCKET_AUTHENTICATION_CHALLENGE = "authentication_challenge"
+ WEBSOCKET_EVENT_REACTION_ADDED = "reaction_added"
+ WEBSOCKET_EVENT_REACTION_REMOVED = "reaction_removed"
+ WEBSOCKET_EVENT_RESPONSE = "response"
+ WEBSOCKET_EVENT_EMOJI_ADDED = "emoji_added"
+ WEBSOCKET_EVENT_CHANNEL_VIEWED = "channel_viewed"
+ WEBSOCKET_EVENT_PLUGIN_ACTIVATED = "plugin_activated" // EXPERIMENTAL - SUBJECT TO CHANGE
+ WEBSOCKET_EVENT_PLUGIN_DEACTIVATED = "plugin_deactivated" // EXPERIMENTAL - SUBJECT TO CHANGE
+ WEBSOCKET_EVENT_PLUGIN_STATUSES_CHANGED = "plugin_statuses_changed" // EXPERIMENTAL - SUBJECT TO CHANGE
+ WEBSOCKET_EVENT_ROLE_UPDATED = "role_updated"
+ WEBSOCKET_EVENT_LICENSE_CHANGED = "license_changed"
+ WEBSOCKET_EVENT_CONFIG_CHANGED = "config_changed"
)
type WebSocketMessage interface {
diff --git a/plugin/pluginenv/environment.go b/plugin/pluginenv/environment.go
index 947eda86d..f704aa5bb 100644
--- a/plugin/pluginenv/environment.go
+++ b/plugin/pluginenv/environment.go
@@ -108,7 +108,7 @@ func (env *Environment) IsPluginActive(pluginId string) bool {
}
// Activates the plugin with the given id.
-func (env *Environment) ActivatePlugin(id string) error {
+func (env *Environment) ActivatePlugin(id string, onError func(error)) error {
env.mutex.Lock()
defer env.mutex.Unlock()
@@ -117,7 +117,7 @@ func (env *Environment) ActivatePlugin(id string) error {
}
if _, ok := env.activePlugins[id]; ok {
- return nil
+ return fmt.Errorf("plugin already active: %v", id)
}
plugins, err := ScanSearchPath(env.searchPath)
if err != nil {
@@ -156,6 +156,14 @@ func (env *Environment) ActivatePlugin(id string) error {
if err := supervisor.Start(api); err != nil {
return errors.Wrapf(err, "unable to start plugin: %v", id)
}
+ if onError != nil {
+ go func() {
+ err := supervisor.Wait()
+ if err != nil {
+ onError(err)
+ }
+ }()
+ }
activePlugin.Supervisor = supervisor
}
diff --git a/plugin/pluginenv/environment_test.go b/plugin/pluginenv/environment_test.go
index 91d639f69..8c1397799 100644
--- a/plugin/pluginenv/environment_test.go
+++ b/plugin/pluginenv/environment_test.go
@@ -56,6 +56,10 @@ func (m *MockSupervisor) Hooks() plugin.Hooks {
return m.Called().Get(0).(plugin.Hooks)
}
+func (m *MockSupervisor) Wait() error {
+ return m.Called().Get(0).(error)
+}
+
func initTmpDir(t *testing.T, files map[string]string) string {
success := false
dir, err := ioutil.TempDir("", "mm-plugin-test")
@@ -130,7 +134,7 @@ func TestEnvironment(t *testing.T) {
activePlugins := env.ActivePlugins()
assert.Len(t, activePlugins, 0)
- assert.Error(t, env.ActivatePlugin("x"))
+ assert.Error(t, env.ActivatePlugin("x", nil))
var api struct{ plugin.API }
var supervisor MockSupervisor
@@ -145,11 +149,11 @@ func TestEnvironment(t *testing.T) {
supervisor.On("Stop").Return(nil)
supervisor.On("Hooks").Return(&hooks)
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
activePlugins = env.ActivePlugins()
assert.Len(t, activePlugins, 1)
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.Error(t, env.ActivatePlugin("foo", nil))
assert.True(t, env.IsPluginActive("foo"))
hooks.On("OnDeactivate").Return(nil)
@@ -157,7 +161,7 @@ func TestEnvironment(t *testing.T) {
assert.Error(t, env.DeactivatePlugin("foo"))
assert.False(t, env.IsPluginActive("foo"))
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
assert.Equal(t, env.SearchPath(), dir)
@@ -184,7 +188,7 @@ func TestEnvironment_DuplicatePluginError(t *testing.T) {
require.NoError(t, err)
defer env.Shutdown()
- assert.Error(t, env.ActivatePlugin("foo"))
+ assert.Error(t, env.ActivatePlugin("foo", nil))
assert.Empty(t, env.ActivePluginIds())
}
@@ -200,7 +204,7 @@ func TestEnvironment_BadSearchPathError(t *testing.T) {
require.NoError(t, err)
defer env.Shutdown()
- assert.Error(t, env.ActivatePlugin("foo"))
+ assert.Error(t, env.ActivatePlugin("foo", nil))
assert.Empty(t, env.ActivePluginIds())
}
@@ -244,7 +248,7 @@ func TestEnvironment_ActivatePluginErrors(t *testing.T) {
hooks.Mock = mock.Mock{}
provider.Mock = mock.Mock{}
setup()
- assert.Error(t, env.ActivatePlugin("foo"))
+ assert.Error(t, env.ActivatePlugin("foo", nil))
assert.Empty(t, env.ActivePluginIds())
supervisor.AssertExpectations(t)
hooks.AssertExpectations(t)
@@ -285,7 +289,7 @@ func TestEnvironment_ShutdownError(t *testing.T) {
hooks.On("OnDeactivate").Return(fmt.Errorf("test error"))
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
assert.Len(t, env.Shutdown(), 2)
}
@@ -332,7 +336,7 @@ func TestEnvironment_ConcurrentHookInvocations(t *testing.T) {
}
})
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
rec := httptest.NewRecorder()
@@ -391,7 +395,7 @@ func TestEnvironment_HooksForPlugins(t *testing.T) {
Text: "bar",
}, nil)
- assert.NoError(t, env.ActivatePlugin("foo"))
+ assert.NoError(t, env.ActivatePlugin("foo", nil))
assert.Equal(t, env.ActivePluginIds(), []string{"foo"})
resp, appErr, err := env.HooksForPlugin("foo").ExecuteCommand(&model.CommandArgs{
diff --git a/plugin/rpcplugin/rpcplugintest/supervisor.go b/plugin/rpcplugin/rpcplugintest/supervisor.go
index 2ae065621..d225f96fc 100644
--- a/plugin/rpcplugin/rpcplugintest/supervisor.go
+++ b/plugin/rpcplugin/rpcplugintest/supervisor.go
@@ -174,6 +174,14 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) {
bundle := model.BundleInfoForPath(dir)
supervisor, err := sp(bundle)
require.NoError(t, err)
+
+ var supervisorWaitErr error
+ supervisorWaitDone := make(chan bool, 1)
+ go func() {
+ supervisorWaitErr = supervisor.Wait()
+ close(supervisorWaitDone)
+ }()
+
require.NoError(t, supervisor.Start(&api))
failed := false
@@ -189,7 +197,21 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) {
time.Sleep(time.Millisecond * 100)
}
assert.True(t, recovered)
+
+ select {
+ case <-supervisorWaitDone:
+ require.Fail(t, "supervisor.Wait() unexpectedly returned")
+ case <-time.After(500 * time.Millisecond):
+ }
+
require.NoError(t, supervisor.Stop())
+
+ select {
+ case <-supervisorWaitDone:
+ require.Nil(t, supervisorWaitErr)
+ case <-time.After(5000 * time.Millisecond):
+ require.Fail(t, "supervisor.Wait() failed to return")
+ }
}
// Crashed plugins should be relaunched at most three times.
@@ -239,6 +261,14 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun
bundle := model.BundleInfoForPath(dir)
supervisor, err := sp(bundle)
require.NoError(t, err)
+
+ var supervisorWaitErr error
+ supervisorWaitDone := make(chan bool, 1)
+ go func() {
+ supervisorWaitErr = supervisor.Wait()
+ close(supervisorWaitDone)
+ }()
+
require.NoError(t, supervisor.Start(&api))
for attempt := 1; attempt <= 4; attempt++ {
@@ -264,10 +294,19 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun
}
if attempt < 4 {
+ require.Nil(t, supervisorWaitErr)
require.True(t, recovered, "failed to recover after attempt %d", attempt)
} else {
require.False(t, recovered, "unexpectedly recovered after attempt %d", attempt)
}
}
+
+ select {
+ case <-supervisorWaitDone:
+ require.NotNil(t, supervisorWaitErr)
+ case <-time.After(500 * time.Millisecond):
+ require.Fail(t, "supervisor.Wait() failed to return after plugin crashed")
+ }
+
require.NoError(t, supervisor.Stop())
}
diff --git a/plugin/rpcplugin/supervisor.go b/plugin/rpcplugin/supervisor.go
index 6e26d5682..246747c89 100644
--- a/plugin/rpcplugin/supervisor.go
+++ b/plugin/rpcplugin/supervisor.go
@@ -32,6 +32,7 @@ type Supervisor struct {
cancel context.CancelFunc
newProcess func(context.Context) (Process, io.ReadWriteCloser, error)
pluginId string
+ pluginErr error
}
var _ plugin.Supervisor = (*Supervisor)(nil)
@@ -55,6 +56,13 @@ func (s *Supervisor) Start(api plugin.API) error {
}
}
+// Waits for the supervisor to stop (on demand or of its own accord), returning any error that
+// triggered the supervisor to stop.
+func (s *Supervisor) Wait() error {
+ <-s.done
+ return s.pluginErr
+}
+
// Stops the plugin.
func (s *Supervisor) Stop() error {
s.cancel()
@@ -70,7 +78,7 @@ func (s *Supervisor) Hooks() plugin.Hooks {
func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API) {
defer func() {
- s.done <- true
+ close(s.done)
}()
done := ctx.Done()
for i := 0; i <= MaxProcessRestarts; i++ {
@@ -81,10 +89,11 @@ func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API
default:
start = nil
if i < MaxProcessRestarts {
- mlog.Debug("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId))
+ mlog.Error("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId))
time.Sleep(time.Duration((1 + i*i)) * time.Second)
} else {
- mlog.Debug("Plugin terminated unexpectedly too many times", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts))
+ s.pluginErr = fmt.Errorf("plugin terminated unexpectedly too many times")
+ mlog.Error("Plugin shutdown", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts), mlog.Err(s.pluginErr))
}
}
}
diff --git a/plugin/supervisor.go b/plugin/supervisor.go
index 6cb7445f7..f20df7040 100644
--- a/plugin/supervisor.go
+++ b/plugin/supervisor.go
@@ -7,6 +7,7 @@ package plugin
// type is only relevant to the server, and isn't used by the plugins themselves.
type Supervisor interface {
Start(API) error
+ Wait() error
Stop() error
Hooks() Hooks
}