From b5615eeb855d756be8aa3f1e995d9f0f3a6b410e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 20 May 2013 12:54:39 -0400 Subject: added multiprocessing server core --- doc/development/core.txt | 5 + doc/server/configuration.txt | 37 +++-- src/lib/Bcfg2/Options.py | 13 +- src/lib/Bcfg2/Server/MultiprocessingCore.py | 203 ++++++++++++++++++++++++++++ src/lib/Bcfg2/Server/Plugins/Metadata.py | 6 + src/sbin/bcfg2-server | 31 +++-- 6 files changed, 272 insertions(+), 23 deletions(-) create mode 100644 src/lib/Bcfg2/Server/MultiprocessingCore.py diff --git a/doc/development/core.txt b/doc/development/core.txt index 3607533ea..886a5538b 100644 --- a/doc/development/core.txt +++ b/doc/development/core.txt @@ -71,6 +71,11 @@ XML-RPC Server .. automodule:: Bcfg2.SSLServer +Multiprocessing Core +-------------------- + +.. automodule:: Bcfg2.Server.MultiprocessingCore + CherryPy Core ------------- diff --git a/doc/server/configuration.txt b/doc/server/configuration.txt index 2c5879ff0..7892c2612 100644 --- a/doc/server/configuration.txt +++ b/doc/server/configuration.txt @@ -20,6 +20,9 @@ Bcfg2, although it has become easier in 1.3.0. The steps to do so are described in three sections below: Common steps for all versions; steps for older versions only; and steps for 1.3.0. +Many of the steps below may have already been performed by your OS +packages. + Common Steps ------------ @@ -131,7 +134,7 @@ is even invoked. Restart ``bcfg2-server`` and you should see it running as non-root in ``ps`` output:: - % ps -ef | grep '[b]cfg2-server' + % ps -ef | grep '[b]cfg2-server' 1000 11581 1 0 07:55 ? 00:00:15 python usr/sbin/bcfg2-server -C /etc/bcfg2.conf -D /var/run/bcfg2-server/bcfg2-server.pid Steps on Bcfg2 1.3.0 @@ -159,7 +162,7 @@ natively in 1.3. Simply add the following lines to ``bcfg2.conf``:: Restart ``bcfg2-server`` and you should see it running as non-root in ``ps`` output:: - % ps -ef | grep '[b]cfg2-server' + % ps -ef | grep '[b]cfg2-server' 1000 11581 1 0 07:55 ? 00:00:15 python usr/sbin/bcfg2-server -C /etc/bcfg2.conf -D /var/run/bcfg2-server/bcfg2-server.pid .. _server-backends: @@ -169,10 +172,11 @@ Server Backends .. versionadded:: 1.3.0 -Bcfg2 supports two different server backends: a builtin server -based on the Python SimpleXMLRPCServer object, and a server that uses -CherryPy (http://www.cherrypy.org). Each one has advantages and -disadvantages. +Bcfg2 supports three different server backends: a builtin server based +on the Python SimpleXMLRPCServer object; a server that uses CherryPy +(http://www.cherrypy.org); and a version of the builtin server that +uses the Python :mod:`multiprocessing` module. Each one has +advantages and disadvantages. The builtin server: @@ -181,27 +185,36 @@ The builtin server: * Works on Python 2.4; * Is slow with larger numbers of clients. +The multiprocessing server: + +* Leverages most of the stability and maturity of the builtin server, + but does have some new bits; +* Introduces concurrent processing to Bcfg2, which may break in + various edge cases; +* Supports certificate authentication; +* Requires Python 2.6; +* Is faster with large numbers of concurrent runs. + The CherryPy server: * Is very new and potentially buggy; * Does not support certificate authentication yet, only password authentication; -* Requires CherryPy 3.2, which requires Python 2.5; +* Requires CherryPy 3.3, which requires Python 2.5; * Is smarter about daemonization, particularly if you are :ref:`server-dropping-privs`; * Is faster with large numbers of clients. Basically, the builtin server should be used unless you have a -particular need for performance, and can sacrifice certificate -authentication. +particular need for performance. The CherryPy server is purely +experimental at this point. To select which backend to use, set the ``backend`` option in the ``[server]`` section of ``/etc/bcfg2.conf``. Options are: * ``cherrypy`` * ``builtin`` +* ``multiprocessing`` * ``best`` (the default; currently the same as ``builtin``) -If the certificate authentication issues (a limitation in CherryPy -itself) can be resolved and the CherryPy server proves to be stable, -it will likely become the default (and ``best``) in a future release. +``best`` may change in future releases. diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index c7604c5c4..fd7c4421a 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -609,6 +609,16 @@ SERVER_AUTHENTICATION = \ default='cert+password', odesc='{cert|bootstrap|cert+password}', cf=('communication', 'authentication')) +SERVER_CHILDREN = \ + Option('Spawn this number of children for the multiprocessing core. ' + 'By default spawns children equivalent to the number of processors ' + 'in the machine.', + default=None, + cmd='--children', + odesc='', + cf=('server', 'children'), + cook=get_int, + long_arg=True) # database options DB_ENGINE = \ @@ -1182,7 +1192,8 @@ SERVER_COMMON_OPTIONS = dict(repo=SERVER_REPOSITORY, vcs_root=SERVER_VCS_ROOT, authentication=SERVER_AUTHENTICATION, perflog=LOG_PERFORMANCE, - perflog_interval=PERFLOG_INTERVAL) + perflog_interval=PERFLOG_INTERVAL, + children=SERVER_CHILDREN) CRYPT_OPTIONS = dict(encrypt=ENCRYPT, decrypt=DECRYPT, diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py new file mode 100644 index 000000000..2e378341e --- /dev/null +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -0,0 +1,203 @@ +""" The multiprocessing server core is a reimplementation of the +:mod:`Bcfg2.Server.BuiltinCore` that uses the Python +:mod:`multiprocessing` library to offload work to multiple child +processes. As such, it requires Python 2.6+. +""" + +import threading +import lxml.etree +import multiprocessing +from Bcfg2.Compat import Queue +from Bcfg2.Server.Core import BaseCore, exposed +from Bcfg2.Server.BuiltinCore import Core as BuiltinCore + + +class DualEvent(object): + """ DualEvent is a clone of :class:`threading.Event` that + internally implements both :class:`threading.Event` and + :class:`multiprocessing.Event`. """ + + def __init__(self, threading_event=None, multiprocessing_event=None): + self._threading_event = threading_event or threading.Event() + self._multiproc_event = multiprocessing_event or \ + multiprocessing.Event() + if threading_event or multiprocessing_event: + # initialize internal flag to false, regardless of the + # state of either object passed in + self.clear() + + def is_set(self): + """ Return true if and only if the internal flag is true. """ + return self._threading_event.is_set() + + isSet = is_set + + def set(self): + """ Set the internal flag to true. """ + self._threading_event.set() + self._multiproc_event.set() + + def clear(self): + """ Reset the internal flag to false. """ + self._threading_event.clear() + self._multiproc_event.clear() + + def wait(self, timeout=None): + """ Block until the internal flag is true, or until the + optional timeout occurs. """ + return self._threading_event.wait(timeout=timeout) + + +class ChildCore(BaseCore): + """ A child process for :class:`Bcfg2.MultiprocessingCore.Core`. + This core builds configurations from a given + :class:`multiprocessing.Pipe`. Note that this is a full-fledged + server core; the only input it gets from the parent process is the + hostnames of clients to render. All other state comes from the + FAM. However, this core only is used to render configs; it doesn't + handle anything else (authentication, probes, etc.) because those + are all much faster. There's no reason that it couldn't handle + those, though, if the pipe communication "protocol" were made more + robust. """ + + #: How long to wait while polling for new clients to build. This + #: doesn't affect the speed with which a client is built, but + #: setting it too high will result in longer shutdown times, since + #: we only check for the termination event from the main process + #: every ``poll_wait`` seconds. + poll_wait = 5.0 + + def __init__(self, setup, pipe, terminate): + """ + :param setup: A Bcfg2 options dict + :type setup: Bcfg2.Options.OptionParser + :param pipe: The pipe to which client hostnames are added for + ChildCore objects to build configurations, and to + which client configurations are added after + having been built by ChildCore objects. + :type pipe: multiprocessing.Pipe + :param terminate: An event that flags ChildCore objects to shut + themselves down. + :type terminate: multiprocessing.Event + """ + BaseCore.__init__(self, setup) + + #: The pipe to which client hostnames are added for ChildCore + #: objects to build configurations, and to which client + #: configurations are added after having been built by + #: ChildCore objects. + self.pipe = pipe + + #: The :class:`multiprocessing.Event` that will be monitored + #: to determine when this child should shut down. + self.terminate = terminate + + def _daemonize(self): + return True + + def _run(self): + return True + + def _block(self): + while not self.terminate.isSet(): + try: + if self.pipe.poll(self.poll_wait): + if not self.metadata.use_database: + # handle FAM events, in case (for instance) the + # client has just been added to clients.xml, or a + # profile has just been asserted. but really, you + # should be using the metadata database if you're + # using this core. + self.fam.handle_events_in_interval(0.1) + client = self.pipe.recv() + self.logger.debug("Building configuration for %s" % client) + config = \ + lxml.etree.tostring(self.BuildConfiguration(client)) + self.logger.debug("Returning configuration for %s to main " + "process" % client) + self.pipe.send(config) + self.logger.debug("Returned configuration for %s to main " + "process" % client) + except KeyboardInterrupt: + break + self.shutdown() + + +class Core(BuiltinCore): + """ A multiprocessing core that delegates building the actual + client configurations to + :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` objects. The + parent process doesn't build any children itself; all calls to + :func:`GetConfig` are delegated to children. All other calls are + handled by the parent process. """ + + #: How long to wait for a child process to shut down cleanly + #: before it is terminated. + shutdown_timeout = 10.0 + + def __init__(self, setup): + BuiltinCore.__init__(self, setup) + if setup['children'] is None: + setup['children'] = multiprocessing.cpu_count() + + #: A dict of child name -> one end of the + #: :class:`multiprocessing.Pipe` object used to communicate + #: with that child. (The child is given the other end of the + #: Pipe.) + self.pipes = dict() + + #: A queue that keeps track of which children are available to + #: render a configuration. A child is popped from the queue + #: when it starts to render a config, then it's pushed back on + #: when it's done. This lets us use a blocking call to + #: :func:`Queue.Queue.get` when waiting for an available + #: child. + self.available_children = Queue(maxsize=self.setup['children']) + + # sigh. multiprocessing was added in py2.6, which is when the + # camelCase methods for threading objects were deprecated in + # favor of the Pythonic under_score methods. So + # multiprocessing.Event *only* has is_set(), while + # threading.Event has *both* isSet() and is_set(). In order + # to make the core work with Python 2.4+, and with both + # multiprocessing and threading Event objects, we just + # monkeypatch self.terminate to have isSet(). + self.terminate = DualEvent(threading_event=self.terminate) + + def _run(self): + for cnum in range(self.setup['children']): + name = "Child-%s" % cnum + (mainpipe, childpipe) = multiprocessing.Pipe() + self.pipes[name] = mainpipe + self.logger.debug("Starting child %s" % name) + childcore = ChildCore(self.setup, childpipe, self.terminate) + child = multiprocessing.Process(target=childcore.run, name=name) + child.start() + self.logger.debug("Child %s started with PID %s" % (name, + child.pid)) + self.available_children.put(name) + return BuiltinCore._run(self) + + def shutdown(self): + BuiltinCore.shutdown(self) + for child in multiprocessing.active_children(): + self.logger.debug("Shutting down child %s" % child.name) + child.join(self.shutdown_timeout) + if child.is_alive(): + self.logger.error("Waited %s seconds to shut down %s, " + "terminating" % (self.shutdown_timeout, + child.name)) + child.terminate() + else: + self.logger.debug("Child %s shut down" % child.name) + + @exposed + def GetConfig(self, address): + client = self.resolve_client(address)[0] + childname = self.available_children.get() + self.logger.debug("Building configuration on child %s" % childname) + pipe = self.pipes[childname] + pipe.send(client) + config = pipe.recv() + self.available_children.put_nowait(childname) + return config diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index ceb1d9080..3b8361c76 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -557,6 +557,12 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, open(os.path.join(repo, cls.name, fname), "w").write(kwargs[aname]) + @property + def use_database(self): + """ Expose self._use_db publicly for use in + :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` """ + return self._use_db + def _handle_file(self, fname): """ set up the necessary magic for handling a metadata file (clients.xml or groups.xml, e.g.) """ diff --git a/src/sbin/bcfg2-server b/src/sbin/bcfg2-server index cdca71e74..21f7842ec 100755 --- a/src/sbin/bcfg2-server +++ b/src/sbin/bcfg2-server @@ -24,18 +24,29 @@ def main(): print("Could not read %s" % setup['configfile']) sys.exit(1) - if setup['backend'] not in ['best', 'cherrypy', 'builtin']: + # TODO: normalize case of various core modules so we can add a new + # core without modifying this script + backends = dict(cherrypy='CherryPyCore', + builtin='BuiltinCore', + best='BuiltinCore', + multiprocessing='MultiprocessingCore') + + if setup['backend'] not in backends: print("Unknown server backend %s, using 'best'" % setup['backend']) setup['backend'] = 'best' - if setup['backend'] == 'cherrypy': - try: - from Bcfg2.Server.CherryPyCore import Core - except ImportError: - err = sys.exc_info()[1] - print("Unable to import CherryPy server core: %s" % err) - raise - elif setup['backend'] == 'builtin' or setup['backend'] == 'best': - from Bcfg2.Server.BuiltinCore import Core + + coremodule = backends[setup['backend']] + try: + Core = getattr(__import__("Bcfg2.Server.%s" % coremodule).Server, + coremodule).Core + except ImportError: + err = sys.exc_info()[1] + print("Unable to import %s server core: %s" % (setup['backend'], err)) + raise + except AttributeError: + err = sys.exc_info()[1] + print("Unable to load %s server core: %s" % (setup['backend'], err)) + raise try: core = Core(setup) -- cgit v1.2.3-1-g7c22