From b5615eeb855d756be8aa3f1e995d9f0f3a6b410e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 20 May 2013 12:54:39 -0400 Subject: added multiprocessing server core --- src/lib/Bcfg2/Options.py | 13 +- src/lib/Bcfg2/Server/MultiprocessingCore.py | 203 ++++++++++++++++++++++++++++ src/lib/Bcfg2/Server/Plugins/Metadata.py | 6 + src/sbin/bcfg2-server | 31 +++-- 4 files changed, 242 insertions(+), 11 deletions(-) create mode 100644 src/lib/Bcfg2/Server/MultiprocessingCore.py (limited to 'src') diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index c7604c5c4..fd7c4421a 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -609,6 +609,16 @@ SERVER_AUTHENTICATION = \ default='cert+password', odesc='{cert|bootstrap|cert+password}', cf=('communication', 'authentication')) +SERVER_CHILDREN = \ + Option('Spawn this number of children for the multiprocessing core. ' + 'By default spawns children equivalent to the number of processors ' + 'in the machine.', + default=None, + cmd='--children', + odesc='', + cf=('server', 'children'), + cook=get_int, + long_arg=True) # database options DB_ENGINE = \ @@ -1182,7 +1192,8 @@ SERVER_COMMON_OPTIONS = dict(repo=SERVER_REPOSITORY, vcs_root=SERVER_VCS_ROOT, authentication=SERVER_AUTHENTICATION, perflog=LOG_PERFORMANCE, - perflog_interval=PERFLOG_INTERVAL) + perflog_interval=PERFLOG_INTERVAL, + children=SERVER_CHILDREN) CRYPT_OPTIONS = dict(encrypt=ENCRYPT, decrypt=DECRYPT, diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py new file mode 100644 index 000000000..2e378341e --- /dev/null +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -0,0 +1,203 @@ +""" The multiprocessing server core is a reimplementation of the +:mod:`Bcfg2.Server.BuiltinCore` that uses the Python +:mod:`multiprocessing` library to offload work to multiple child +processes. As such, it requires Python 2.6+. +""" + +import threading +import lxml.etree +import multiprocessing +from Bcfg2.Compat import Queue +from Bcfg2.Server.Core import BaseCore, exposed +from Bcfg2.Server.BuiltinCore import Core as BuiltinCore + + +class DualEvent(object): + """ DualEvent is a clone of :class:`threading.Event` that + internally implements both :class:`threading.Event` and + :class:`multiprocessing.Event`. """ + + def __init__(self, threading_event=None, multiprocessing_event=None): + self._threading_event = threading_event or threading.Event() + self._multiproc_event = multiprocessing_event or \ + multiprocessing.Event() + if threading_event or multiprocessing_event: + # initialize internal flag to false, regardless of the + # state of either object passed in + self.clear() + + def is_set(self): + """ Return true if and only if the internal flag is true. """ + return self._threading_event.is_set() + + isSet = is_set + + def set(self): + """ Set the internal flag to true. """ + self._threading_event.set() + self._multiproc_event.set() + + def clear(self): + """ Reset the internal flag to false. """ + self._threading_event.clear() + self._multiproc_event.clear() + + def wait(self, timeout=None): + """ Block until the internal flag is true, or until the + optional timeout occurs. """ + return self._threading_event.wait(timeout=timeout) + + +class ChildCore(BaseCore): + """ A child process for :class:`Bcfg2.MultiprocessingCore.Core`. + This core builds configurations from a given + :class:`multiprocessing.Pipe`. Note that this is a full-fledged + server core; the only input it gets from the parent process is the + hostnames of clients to render. All other state comes from the + FAM. However, this core only is used to render configs; it doesn't + handle anything else (authentication, probes, etc.) because those + are all much faster. There's no reason that it couldn't handle + those, though, if the pipe communication "protocol" were made more + robust. """ + + #: How long to wait while polling for new clients to build. This + #: doesn't affect the speed with which a client is built, but + #: setting it too high will result in longer shutdown times, since + #: we only check for the termination event from the main process + #: every ``poll_wait`` seconds. + poll_wait = 5.0 + + def __init__(self, setup, pipe, terminate): + """ + :param setup: A Bcfg2 options dict + :type setup: Bcfg2.Options.OptionParser + :param pipe: The pipe to which client hostnames are added for + ChildCore objects to build configurations, and to + which client configurations are added after + having been built by ChildCore objects. + :type pipe: multiprocessing.Pipe + :param terminate: An event that flags ChildCore objects to shut + themselves down. + :type terminate: multiprocessing.Event + """ + BaseCore.__init__(self, setup) + + #: The pipe to which client hostnames are added for ChildCore + #: objects to build configurations, and to which client + #: configurations are added after having been built by + #: ChildCore objects. + self.pipe = pipe + + #: The :class:`multiprocessing.Event` that will be monitored + #: to determine when this child should shut down. + self.terminate = terminate + + def _daemonize(self): + return True + + def _run(self): + return True + + def _block(self): + while not self.terminate.isSet(): + try: + if self.pipe.poll(self.poll_wait): + if not self.metadata.use_database: + # handle FAM events, in case (for instance) the + # client has just been added to clients.xml, or a + # profile has just been asserted. but really, you + # should be using the metadata database if you're + # using this core. + self.fam.handle_events_in_interval(0.1) + client = self.pipe.recv() + self.logger.debug("Building configuration for %s" % client) + config = \ + lxml.etree.tostring(self.BuildConfiguration(client)) + self.logger.debug("Returning configuration for %s to main " + "process" % client) + self.pipe.send(config) + self.logger.debug("Returned configuration for %s to main " + "process" % client) + except KeyboardInterrupt: + break + self.shutdown() + + +class Core(BuiltinCore): + """ A multiprocessing core that delegates building the actual + client configurations to + :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` objects. The + parent process doesn't build any children itself; all calls to + :func:`GetConfig` are delegated to children. All other calls are + handled by the parent process. """ + + #: How long to wait for a child process to shut down cleanly + #: before it is terminated. + shutdown_timeout = 10.0 + + def __init__(self, setup): + BuiltinCore.__init__(self, setup) + if setup['children'] is None: + setup['children'] = multiprocessing.cpu_count() + + #: A dict of child name -> one end of the + #: :class:`multiprocessing.Pipe` object used to communicate + #: with that child. (The child is given the other end of the + #: Pipe.) + self.pipes = dict() + + #: A queue that keeps track of which children are available to + #: render a configuration. A child is popped from the queue + #: when it starts to render a config, then it's pushed back on + #: when it's done. This lets us use a blocking call to + #: :func:`Queue.Queue.get` when waiting for an available + #: child. + self.available_children = Queue(maxsize=self.setup['children']) + + # sigh. multiprocessing was added in py2.6, which is when the + # camelCase methods for threading objects were deprecated in + # favor of the Pythonic under_score methods. So + # multiprocessing.Event *only* has is_set(), while + # threading.Event has *both* isSet() and is_set(). In order + # to make the core work with Python 2.4+, and with both + # multiprocessing and threading Event objects, we just + # monkeypatch self.terminate to have isSet(). + self.terminate = DualEvent(threading_event=self.terminate) + + def _run(self): + for cnum in range(self.setup['children']): + name = "Child-%s" % cnum + (mainpipe, childpipe) = multiprocessing.Pipe() + self.pipes[name] = mainpipe + self.logger.debug("Starting child %s" % name) + childcore = ChildCore(self.setup, childpipe, self.terminate) + child = multiprocessing.Process(target=childcore.run, name=name) + child.start() + self.logger.debug("Child %s started with PID %s" % (name, + child.pid)) + self.available_children.put(name) + return BuiltinCore._run(self) + + def shutdown(self): + BuiltinCore.shutdown(self) + for child in multiprocessing.active_children(): + self.logger.debug("Shutting down child %s" % child.name) + child.join(self.shutdown_timeout) + if child.is_alive(): + self.logger.error("Waited %s seconds to shut down %s, " + "terminating" % (self.shutdown_timeout, + child.name)) + child.terminate() + else: + self.logger.debug("Child %s shut down" % child.name) + + @exposed + def GetConfig(self, address): + client = self.resolve_client(address)[0] + childname = self.available_children.get() + self.logger.debug("Building configuration on child %s" % childname) + pipe = self.pipes[childname] + pipe.send(client) + config = pipe.recv() + self.available_children.put_nowait(childname) + return config diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index ceb1d9080..3b8361c76 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -557,6 +557,12 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, open(os.path.join(repo, cls.name, fname), "w").write(kwargs[aname]) + @property + def use_database(self): + """ Expose self._use_db publicly for use in + :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` """ + return self._use_db + def _handle_file(self, fname): """ set up the necessary magic for handling a metadata file (clients.xml or groups.xml, e.g.) """ diff --git a/src/sbin/bcfg2-server b/src/sbin/bcfg2-server index cdca71e74..21f7842ec 100755 --- a/src/sbin/bcfg2-server +++ b/src/sbin/bcfg2-server @@ -24,18 +24,29 @@ def main(): print("Could not read %s" % setup['configfile']) sys.exit(1) - if setup['backend'] not in ['best', 'cherrypy', 'builtin']: + # TODO: normalize case of various core modules so we can add a new + # core without modifying this script + backends = dict(cherrypy='CherryPyCore', + builtin='BuiltinCore', + best='BuiltinCore', + multiprocessing='MultiprocessingCore') + + if setup['backend'] not in backends: print("Unknown server backend %s, using 'best'" % setup['backend']) setup['backend'] = 'best' - if setup['backend'] == 'cherrypy': - try: - from Bcfg2.Server.CherryPyCore import Core - except ImportError: - err = sys.exc_info()[1] - print("Unable to import CherryPy server core: %s" % err) - raise - elif setup['backend'] == 'builtin' or setup['backend'] == 'best': - from Bcfg2.Server.BuiltinCore import Core + + coremodule = backends[setup['backend']] + try: + Core = getattr(__import__("Bcfg2.Server.%s" % coremodule).Server, + coremodule).Core + except ImportError: + err = sys.exc_info()[1] + print("Unable to import %s server core: %s" % (setup['backend'], err)) + raise + except AttributeError: + err = sys.exc_info()[1] + print("Unable to load %s server core: %s" % (setup['backend'], err)) + raise try: core = Core(setup) -- cgit v1.2.3-1-g7c22