summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-11-27 11:49:47 -0500
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-11-27 12:12:34 -0500
commitc19e7da20ca6b67956338f9808a80673e06b1e94 (patch)
tree5d62c7c7aaf71e062f8bf22cbce987b8555ad78b
parentfbecb8553136649eaf563d4f7ec21553500e5f16 (diff)
downloadbcfg2-c19e7da20ca6b67956338f9808a80673e06b1e94.tar.gz
bcfg2-c19e7da20ca6b67956338f9808a80673e06b1e94.tar.bz2
bcfg2-c19e7da20ca6b67956338f9808a80673e06b1e94.zip
Threaded plugin fixes:
* Added "Threaded" plugin interface for any plugin that uses threads * Start plugin threads after daemonization * Update existing plugins that use threads (Reporting, Snapshots, ThreadedStatistics interface) * Update unit tests
-rw-r--r--src/lib/Bcfg2/Server/Core.py10
-rw-r--r--src/lib/Bcfg2/Server/Plugin/interfaces.py19
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Reporting.py12
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Snapshots.py2
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py23
5 files changed, 55 insertions, 11 deletions
diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py
index 9a9061a51..040036fb2 100644
--- a/src/lib/Bcfg2/Server/Core.py
+++ b/src/lib/Bcfg2/Server/Core.py
@@ -229,20 +229,23 @@ class BaseCore(object):
"Unloading %s" % (plugin, blacklist))
for plug in blacklist:
del self.plugins[plug]
- # This section logs the experimental plugins
+
+ # Log experimental plugins
expl = [plug for plug in list(self.plugins.values())
if plug.experimental]
if expl:
self.logger.info("Loading experimental plugin(s): %s" %
(" ".join([x.name for x in expl])))
self.logger.info("NOTE: Interfaces subject to change")
- # This section logs the deprecated plugins
+
+ # Log deprecated plugins
depr = [plug for plug in list(self.plugins.values())
if plug.deprecated]
if depr:
self.logger.info("Loading deprecated plugin(s): %s" %
(" ".join([x.name for x in depr])))
+ # Find the metadata plugin and set self.metadata
mlist = self.plugins_by_type(Bcfg2.Server.Plugin.Metadata)
if len(mlist) >= 1:
#: The Metadata plugin
@@ -698,6 +701,9 @@ class BaseCore(object):
self.fam.start()
self.fam_thread.start()
self.fam.AddMonitor(self.cfile, self)
+
+ for plug in self.plugins_by_type(Bcfg2.Server.Plugin.Threaded):
+ plug.start_threads()
except:
self.shutdown()
raise
diff --git a/src/lib/Bcfg2/Server/Plugin/interfaces.py b/src/lib/Bcfg2/Server/Plugin/interfaces.py
index 202ec7533..f42ada773 100644
--- a/src/lib/Bcfg2/Server/Plugin/interfaces.py
+++ b/src/lib/Bcfg2/Server/Plugin/interfaces.py
@@ -299,12 +299,27 @@ class Statistics(Plugin):
raise NotImplementedError
-class ThreadedStatistics(Statistics, threading.Thread):
+class Threaded(object):
+ """ Threaded plugins use threads in any way. The thread must be
+ started after daemonization, so this class implements a single
+ method, :func:`start_threads`, that can be used to start threads
+ after daemonization of the server core. """
+
+ def start_threads(self):
+ """ Start this plugin's threads after daemonization.
+
+ :return: None
+ :raises: :class:`Bcfg2.Server.Plugin.exceptions.PluginInitError`
+ """
+ raise NotImplementedError
+
+class ThreadedStatistics(Statistics, Threaded, threading.Thread):
""" ThreadedStatistics plugins process client statistics in a
separate thread. """
def __init__(self, core, datastore):
Statistics.__init__(self, core, datastore)
+ Threaded.__init__(self)
threading.Thread.__init__(self)
# Event from the core signaling an exit
self.terminate = core.terminate
@@ -312,6 +327,8 @@ class ThreadedStatistics(Statistics, threading.Thread):
self.pending_file = os.path.join(datastore, "etc",
"%s.pending" % self.name)
self.daemon = False
+
+ def start_threads(self):
self.start()
def _save(self):
diff --git a/src/lib/Bcfg2/Server/Plugins/Reporting.py b/src/lib/Bcfg2/Server/Plugins/Reporting.py
index 1a8c3d941..b9d397590 100644
--- a/src/lib/Bcfg2/Server/Plugins/Reporting.py
+++ b/src/lib/Bcfg2/Server/Plugins/Reporting.py
@@ -7,8 +7,8 @@ import lxml.etree
from Bcfg2.Reporting.Transport import load_transport_from_config, \
TransportError
from Bcfg2.Options import REPORTING_COMMON_OPTIONS
-from Bcfg2.Server.Plugin import Statistics, PullSource, PluginInitError, \
- PluginExecutionError
+from Bcfg2.Server.Plugin import Statistics, PullSource, Threaded, \
+ PluginInitError, PluginExecutionError
# required for reporting
try:
@@ -31,7 +31,7 @@ def _rpc_call(method):
return _real_rpc_call
-class Reporting(Statistics, PullSource): # pylint: disable=W0223
+class Reporting(Statistics, Threaded, PullSource): # pylint: disable=W0223
""" Unified statistics and reporting plugin """
__rmi__ = ['Ping', 'GetExtra', 'GetCurrentEntry']
@@ -41,6 +41,7 @@ class Reporting(Statistics, PullSource): # pylint: disable=W0223
def __init__(self, core, datastore):
Statistics.__init__(self, core, datastore)
PullSource.__init__(self)
+ Threaded.__init__(self)
self.core = core
self.whoami = platform.node()
@@ -54,8 +55,11 @@ class Reporting(Statistics, PullSource): # pylint: disable=W0223
self.logger.error(msg)
raise PluginInitError(msg)
+ self.transport = None
+
+ def start_threads(self):
try:
- self.transport = load_transport_from_config(core.setup)
+ self.transport = load_transport_from_config(self.core.setup)
except TransportError:
msg = "%s: Failed to load transport: %s" % \
(self.name, traceback.format_exc().splitlines()[-1])
diff --git a/src/lib/Bcfg2/Server/Plugins/Snapshots.py b/src/lib/Bcfg2/Server/Plugins/Snapshots.py
index 1956af4ad..cc5946bb2 100644
--- a/src/lib/Bcfg2/Server/Plugins/Snapshots.py
+++ b/src/lib/Bcfg2/Server/Plugins/Snapshots.py
@@ -65,6 +65,8 @@ class Snapshots(Bcfg2.Server.Plugin.Statistics):
self.session = Bcfg2.Server.Snapshots.setup_session(core.cfile)
self.work_queue = Queue()
self.loader = threading.Thread(target=self.load_snapshot)
+
+ def start_threads(self):
self.loader.start()
def load_snapshot(self):
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py
index 1a7a0a6f7..343f088b3 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py
@@ -108,15 +108,30 @@ class TestStatistics(TestPlugin):
s.process_statistics, None, None)
-class TestThreadedStatistics(TestStatistics):
+class TestThreaded(Bcfg2TestCase):
+ test_obj = Threaded
+
+ def get_obj(self):
+ return self.test_obj()
+
+ def test_start_threads(self):
+ s = self.get_obj()
+ self.assertRaises(NotImplementedError,
+ s.start_threads)
+
+
+class TestThreadedStatistics(TestStatistics, TestThreaded):
test_obj = ThreadedStatistics
data = [("foo.example.com", "<foo/>"),
("bar.example.com", "<bar/>")]
+ def get_obj(self, core=None):
+ return TestStatistics.get_obj(self, core=core)
+
@patch("threading.Thread.start")
- def test__init(self, mock_start):
- core = Mock()
- ts = self.get_obj(core)
+ def test_start_threads(self, mock_start):
+ ts = self.get_obj()
+ ts.start_threads()
mock_start.assert_any_call()
@patch("%s.open" % builtins)