diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-08-08 08:10:16 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-08-08 08:10:16 -0400 |
commit | e1e194a573b3803fa7f45a646bbb36b2f164a3e1 (patch) | |
tree | e9b689d1be39d38279e0a16f010e8d5e573612ef /src/lib/Bcfg2/Server/MultiprocessingCore.py | |
parent | 35851347089db1a092ec715cb183aec19f19e983 (diff) | |
parent | eef441c1acdf1d3d483647b153f721cbab4a8517 (diff) | |
download | bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.tar.gz bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.tar.bz2 bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.zip |
Merge branch 'maint'
Conflicts:
doc/appendix/files/mysql.txt
doc/getting_started/index.txt
doc/server/plugins/structures/bundler/kernel.txt
src/lib/Bcfg2/Server/MultiprocessingCore.py
src/lib/Bcfg2/Server/Plugin/interfaces.py
src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
src/lib/Bcfg2/Server/Plugins/Probes.py
src/lib/Bcfg2/Server/Plugins/SSHbase.py
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r-- | src/lib/Bcfg2/Server/MultiprocessingCore.py | 499 |
1 files changed, 349 insertions, 150 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index b9716619d..e79207291 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -3,66 +3,134 @@ :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. -The parent communicates with the children over two constructs: - -* A :class:`multiprocessing.Pipe` is used to process render requests. - The pipe is locked when in use (i.e., between the time that a client - is submitted to be rendered and the time that its configuration is - returned) to keep things thread-safe. (This is accomplished through - the use of - :attr:`Bcfg2.Server.MultiprocessingCore.available_children.) -* A :class:`multiprocessing.Queue` is used to submit other commands in - a thread-safe, non-blocking fashion. (Note that, since it is a - queue, no results can be returned.) It implements a very simple RPC - protocol. Each command passed to a child over the Pipe must be a - tuple with the format:: - - (<method>, <args>, <kwargs>) - - The method must be exposed by the child by decorating it with - :func:`Bcfg2.Server.Core.exposed`. +The parent communicates with the children over +:class:`multiprocessing.Queue` objects via a +:class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object. + +A method being called via the RPCQueue must be exposed by the child by +decorating it with :func:`Bcfg2.Server.Core.exposed`. """ +import time import threading import lxml.etree import multiprocessing -from Bcfg2.Compat import Queue -from Bcfg2.Server.Cache import Cache +import Bcfg2.Server.Plugin +from itertools import cycle +from Bcfg2.Cache import Cache +from Bcfg2.Compat import Queue, Empty, wraps from Bcfg2.Server.Core import BaseCore, exposed -from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +from multiprocessing.connection import Listener, Client -class DispatchingCache(Cache, Debuggable): +class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable): """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates cache expiration events to child nodes. """ #: The method to send over the pipe to expire the cache - method = "expire_cache" + method = "expire_metadata_cache" def __init__(self, *args, **kwargs): - #: A dict of <child name>: :class:`multiprocessing.Queue` - #: objects that should be given a cache expiration command any - #: time an item is expired. - self.command_queues = kwargs.pop("pipes", dict()) - - Debuggable.__init__(self) + self.rpc_q = kwargs.pop("queue") + Bcfg2.Server.Plugin.Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) def expire(self, key=None): - if (key and key in self) or (not key and len(self)): - # dispatching cache expiration to children can be - # expensive, so only do it if there's something to expire - for child, cmd_q in self.command_queues.items(): - if key: - self.logger.debug("Expiring metadata cache for %s on %s" % - (key, child)) - else: - self.logger.debug("Expiring metadata cache on %s" % child) - cmd_q.put((self.method, [key], dict())) + self.rpc_q.publish(self.method, args=[key]) Cache.expire(self, key=key) +class RPCQueue(Bcfg2.Server.Plugin.Debuggable): + """ An implementation of a :class:`multiprocessing.Queue` designed + for several additional use patterns: + + * Random-access reads, based on a key that identifies the data; + * Publish-subscribe, where a datum is sent to all hosts. + + The subscribers can deal with this as a normal Queue with no + special handling. + """ + poll_wait = 3.0 + + def __init__(self): + Bcfg2.Server.Plugin.Debuggable.__init__(self) + self._terminate = threading.Event() + self._queues = dict() + self._available_listeners = Queue() + self._blocking_listeners = [] + + def add_subscriber(self, name): + """ Add a subscriber to the queue. This returns the + :class:`multiprocessing.Queue` object that the subscriber + should read from. """ + self._queues[name] = multiprocessing.Queue() + return self._queues[name] + + def publish(self, method, args=None, kwargs=None): + """ Publish an RPC call to the queue for consumption by all + subscribers. """ + for queue in self._queues.values(): + queue.put((None, (method, args or [], kwargs or dict()))) + + def rpc(self, dest, method, args=None, kwargs=None): + """ Make an RPC call to the named subscriber, expecting a + response. This opens a + :class:`multiprocessing.connection.Listener` and passes the + Listener address to the child as part of the RPC call, so that + the child can connect to the Listener to submit its results. + + Listeners are reused when possible to minimize overhead. + """ + try: + listener = self._available_listeners.get_nowait() + self.logger.debug("Reusing existing RPC listener at %s" % + listener.address) + except Empty: + listener = Listener() + self.logger.debug("Created new RPC listener at %s" % + listener.address) + self._blocking_listeners.append(listener) + try: + self._queues[dest].put((listener.address, + (method, args or [], kwargs or dict()))) + conn = listener.accept() + self._blocking_listeners.remove(listener) + try: + while not self._terminate.is_set(): + if conn.poll(self.poll_wait): + return conn.recv() + finally: + conn.close() + finally: + self._available_listeners.put(listener) + + def close(self): + """ Close queues and connections. """ + self._terminate.set() + self.logger.debug("Closing RPC queues") + for name, queue in self._queues.items(): + self.logger.debug("Closing RPC queue to %s" % name) + queue.close() + + # close any listeners that are waiting for connections + self.logger.debug("Closing RPC connections") + for listener in self._blocking_listeners: + self.logger.debug("Closing RPC connection at %s" % + listener.address) + listener.close() + + self.logger.debug("Closing RPC listeners") + try: + while True: + listener = self._available_listeners.get_nowait() + self.logger.debug("Closing RPC listener at %s" % + listener.address) + listener.close() + except Empty: + pass + + class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -111,101 +179,154 @@ class ChildCore(BaseCore): those, though, if the pipe communication "protocol" were made more robust. """ - #: How long to wait while polling for new clients to build. This - #: doesn't affect the speed with which a client is built, but + #: How long to wait while polling for new RPC commands. This + #: doesn't affect the speed with which a command is processed, but #: setting it too high will result in longer shutdown times, since #: we only check for the termination event from the main process #: every ``poll_wait`` seconds. - poll_wait = 5.0 + poll_wait = 3.0 - def __init__(self, setup, render_pipe, command_queue, terminate): + def __init__(self, name, setup, rpc_q, terminate): """ + :param name: The name of this child + :type name: string :param setup: A Bcfg2 options dict :type setup: Bcfg2.Options.OptionParser - :param pipe: The pipe to which client hostnames are added for - ChildCore objects to build configurations, and to - which client configurations are added after - having been built by ChildCore objects. - :type pipe: multiprocessing.Pipe + :param read_q: The queue the child will read from for RPC + communications from the parent process. + :type read_q: multiprocessing.Queue + :param write_q: The queue the child will write the results of + RPC calls to. + :type write_q: multiprocessing.Queue :param terminate: An event that flags ChildCore objects to shut themselves down. :type terminate: multiprocessing.Event """ BaseCore.__init__(self, setup) - #: The pipe to which client hostnames are added for ChildCore - #: objects to build configurations, and to which client - #: configurations are added after having been built by - #: ChildCore objects. - self.render_pipe = render_pipe - - #: The queue from which other commands are received - self.command_queue = command_queue + #: The name of this child + self.name = name #: The :class:`multiprocessing.Event` that will be monitored #: to determine when this child should shut down. self.terminate = terminate - #: The :class:`threading.Thread` used to process commands - #: received via the :class:`multiprocessing.Queue` RPC - #: interface - self.command_thread = \ - threading.Thread(name="CommandThread", - target=self._command_queue_thread) + #: The queue used for RPC communication + self.rpc_q = rpc_q - def _daemonize(self): - return True + # override this setting so that the child doesn't try to write + # the pidfile + self.setup['daemon'] = False + + # ensure that the child doesn't start a perflog thread + self.perflog_thread = None + + self._rmi = dict() def _run(self): - try: - self.command_thread.start() - except: - self.shutdown() - raise return True - def render(self): - """ Process client configuration render requests """ - if self.render_pipe.poll(self.poll_wait): - if not self.metadata.use_database: - # handle FAM events, in case (for instance) the - # client has just been added to clients.xml, or a - # profile has just been asserted. but really, you - # should be using the metadata database if you're - # using this core. - self.fam.handle_events_in_interval(0.1) - client = self.render_pipe.recv() - self.logger.debug("Building configuration for %s" % client) - self.render_pipe.send( - lxml.etree.tostring(self.BuildConfiguration(client))) + def _daemonize(self): + return True + + def _dispatch(self, address, data): + """ Method dispatcher used for commands received from + the RPC queue. """ + if address is not None: + # if the key is None, then no response is expected. we + # make the return connection before dispatching the actual + # RPC call so that the parent is blocking for a connection + # as briefly as possible + self.logger.debug("Connecting to parent via %s" % address) + client = Client(address) + method, args, kwargs = data + func = None + rv = None + if "." in method: + if method in self._rmi: + func = self._rmi[method] + else: + self.logger.error("%s: Method %s does not exist" % (self.name, + method)) + elif not hasattr(self, method): + self.logger.error("%s: Method %s does not exist" % (self.name, + method)) + else: # method is not a plugin RMI, and exists + func = getattr(self, method) + if not func.exposed: + self.logger.error("%s: Method %s is not exposed" % (self.name, + method)) + func = None + if func is not None: + self.logger.debug("%s: Calling RPC method %s" % (self.name, + method)) + rv = func(*args, **kwargs) + if address is not None: + # if the key is None, then no response is expected + self.logger.debug("Returning data to parent via %s" % address) + client.send(rv) def _block(self): - while not self.terminate.isSet(): + self._rmi = self._get_rmi() + while not self.terminate.is_set(): try: - self.render() + address, data = self.rpc_q.get(timeout=self.poll_wait) + threadname = "-".join(str(i) for i in data) + rpc_thread = threading.Thread(name=threadname, + target=self._dispatch, + args=[address, data]) + rpc_thread.start() + except Empty: + pass except KeyboardInterrupt: break self.shutdown() - def _command_queue_thread(self): - """ Process commands received on the command queue thread """ - while not self.terminate.isSet(): - method, args, kwargs = self.command_queue.get() - if hasattr(self, method): - func = getattr(self, method) - if func.exposed: - self.logger.debug("Child calling RPC method %s" % method) - func(*args, **kwargs) + def shutdown(self): + BaseCore.shutdown(self) + self.logger.info("%s: Closing RPC command queue" % self.name) + self.rpc_q.close() + + while len(threading.enumerate()) > 1: + threads = [t for t in threading.enumerate() + if t != threading.current_thread()] + self.logger.info("%s: Waiting for %d thread(s): %s" % + (self.name, len(threads), + [t.name for t in threads])) + time.sleep(1) + self.logger.info("%s: All threads stopped" % self.name) + + def _get_rmi(self): + rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + mname = crmi[1] else: - self.logger.error("Method %s is not exposed" % method) - else: - self.logger.error("Method %s does not exist" % method) + mname = crmi + rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) + return rmi @exposed - def expire_cache(self, client=None): + def expire_metadata_cache(self, client=None): """ Expire the metadata cache for a client """ self.metadata_cache.expire(client) + @exposed + def RecvProbeData(self, address, _): + """ Expire the probe cache for a client """ + self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing, + key=self.resolve_client(address, + metadata=False)[0]) + + @exposed + def GetConfig(self, client): + """ Render the configuration for a client """ + self.logger.debug("%s: Building configuration for %s" % + (self.name, client)) + return lxml.etree.tostring(self.BuildConfiguration(client)) + class Core(BuiltinCore): """ A multiprocessing core that delegates building the actual @@ -224,84 +345,162 @@ class Core(BuiltinCore): if setup['children'] is None: setup['children'] = multiprocessing.cpu_count() - #: A dict of child name -> one end of the - #: :class:`multiprocessing.Pipe` object used to submit render - #: requests to that child. (The child is given the other end - #: of the Pipe.) - self.render_pipes = dict() - - #: A dict of child name -> :class:`multiprocessing.Queue` - #: object used to pass commands to that child. - self.command_queues = dict() - - #: A queue that keeps track of which children are available to - #: render a configuration. A child is popped from the queue - #: when it starts to render a config, then it's pushed back on - #: when it's done. This lets us use a blocking call to - #: :func:`Queue.Queue.get` when waiting for an available - #: child. - self.available_children = Queue(maxsize=self.setup['children']) - - # sigh. multiprocessing was added in py2.6, which is when the - # camelCase methods for threading objects were deprecated in - # favor of the Pythonic under_score methods. So - # multiprocessing.Event *only* has is_set(), while - # threading.Event has *both* isSet() and is_set(). In order - # to make the core work with Python 2.4+, and with both - # multiprocessing and threading Event objects, we just - # monkeypatch self.terminate to have isSet(). + #: The flag that indicates when to stop child threads and + #: processes self.terminate = DualEvent(threading_event=self.terminate) - self.metadata_cache = DispatchingCache() + #: A :class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object + #: used to send or publish commands to children. + self.rpc_q = RPCQueue() + + self.metadata_cache = DispatchingCache(queue=self.rpc_q) + + #: A list of children that will be cycled through + self._all_children = [] + + #: An iterator that each child will be taken from in sequence, + #: to provide a round-robin distribution of render requests + self.children = None def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum - # create Pipe for render requests and results - (mainpipe, childpipe) = multiprocessing.Pipe() - self.render_pipes[name] = mainpipe - - # create Queue for other commands - cmd_q = multiprocessing.Queue() - self.command_queues[name] = cmd_q - self.metadata_cache.command_queues[name] = cmd_q - self.logger.debug("Starting child %s" % name) - childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate) + child_q = self.rpc_q.add_subscriber(name) + childcore = ChildCore(name, self.setup, child_q, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, child.pid)) - self.available_children.put(name) + self._all_children.append(name) + self.logger.debug("Started %s children: %s" % (len(self._all_children), + self._all_children)) + self.children = cycle(self._all_children) return BuiltinCore._run(self) def shutdown(self): BuiltinCore.shutdown(self) - for child in multiprocessing.active_children(): - self.logger.debug("Shutting down child %s" % child.name) - child.join(self.shutdown_timeout) - if child.is_alive(): + self.logger.info("Closing RPC command queues") + self.rpc_q.close() + + def term_children(): + """ Terminate all remaining multiprocessing children. """ + for child in multiprocessing.active_children(): self.logger.error("Waited %s seconds to shut down %s, " "terminating" % (self.shutdown_timeout, child.name)) child.terminate() - else: - self.logger.debug("Child %s shut down" % child.name) - self.logger.debug("All children shut down") + + timer = threading.Timer(self.shutdown_timeout, term_children) + timer.start() + while len(multiprocessing.active_children()): + self.logger.info("Waiting for %s child(ren): %s" % + (len(multiprocessing.active_children()), + [c.name + for c in multiprocessing.active_children()])) + time.sleep(1) + timer.cancel() + self.logger.info("All children shut down") + + while len(threading.enumerate()) > 1: + threads = [t for t in threading.enumerate() + if t != threading.current_thread()] + self.logger.info("Waiting for %s thread(s): %s" % + (len(threads), [t.name for t in threads])) + time.sleep(1) + self.logger.info("Shutdown complete") + + def _get_rmi(self): + child_rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + parentname, childname = crmi + else: + parentname = childname = crmi + child_rmi["%s.%s" % (pname, parentname)] = \ + "%s.%s" % (pname, childname) + + rmi = BuiltinCore._get_rmi(self) + for method in rmi.keys(): + if method in child_rmi: + rmi[method] = self._child_rmi_wrapper(method, + rmi[method], + child_rmi[method]) + return rmi + + def _child_rmi_wrapper(self, method, parent_rmi, child_rmi): + """ Returns a callable that dispatches a call to the given + child RMI to child processes, and calls the parent RMI locally + (i.e., in the parent process). """ + @wraps(parent_rmi) + def inner(*args, **kwargs): + self.logger.debug("Dispatching RMI call to %s to children: %s" % + (method, child_rmi)) + self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs) + return parent_rmi(*args, **kwargs) + + return inner @exposed def set_debug(self, address, debug): + self.rpc_q.set_debug(debug) + self.rpc_q.publish("set_debug", args=[address, debug]) self.metadata_cache.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) @exposed + def RecvProbeData(self, address, probedata): + rv = BuiltinCore.RecvProbeData(self, address, probedata) + # we don't want the children to actually process probe data, + # so we don't send the data, just the fact that we got some. + self.rpc_q.publish("RecvProbeData", args=[address, None]) + return rv + + @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] - childname = self.available_children.get() - self.logger.debug("Building configuration on child %s" % childname) - pipe = self.render_pipes[childname] - pipe.send(client) - config = pipe.recv() - self.available_children.put_nowait(childname) - return config + childname = self.children.next() + self.logger.debug("Building configuration for %s on %s" % (client, + childname)) + return self.rpc_q.rpc(childname, "GetConfig", args=[client]) + + @exposed + def get_statistics(self, address): + stats = dict() + + def _aggregate_statistics(newstats, prefix=None): + """ Aggregate a set of statistics from a child or parent + server core. This adds the statistics to the overall + statistics dict (optionally prepending a prefix, such as + "Child-1", to uniquely identify this set of statistics), + and aggregates it with the set of running totals that are + kept from all cores. """ + for statname, vals in newstats.items(): + if statname.startswith("ChildCore:"): + statname = statname[5:] + if prefix: + prettyname = "%s:%s" % (prefix, statname) + else: + prettyname = statname + stats[prettyname] = vals + totalname = "Total:%s" % statname + if totalname not in stats: + stats[totalname] = vals + else: + newmin = min(stats[totalname][0], vals[0]) + newmax = max(stats[totalname][1], vals[1]) + newcount = stats[totalname][3] + vals[3] + newmean = ((stats[totalname][2] * stats[totalname][3]) + + (vals[2] * vals[3])) / newcount + stats[totalname] = (newmin, newmax, newmean, newcount) + + stats = dict() + for childname in self._all_children: + _aggregate_statistics( + self.rpc_q.rpc(childname, "get_statistics", args=[address]), + prefix=childname) + _aggregate_statistics(BuiltinCore.get_statistics(self, address)) + return stats |