From 50b7407fd8c29bfede3091fa9e76b8e2a78de3ec Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 30 Jul 2013 13:54:37 -0400 Subject: MultiprocessingCore: make multiprocessing children threaded for higher performance --- src/lib/Bcfg2/Server/Core.py | 20 +- src/lib/Bcfg2/Server/MultiprocessingCore.py | 480 ++++++++++++++++++++-------- 2 files changed, 352 insertions(+), 148 deletions(-) diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index 0cd4bea3e..e37c0b4e3 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -289,11 +289,12 @@ class BaseCore(object): self.logger.debug("Performance logging thread starting") while not self.terminate.isSet(): self.terminate.wait(self.setup['perflog_interval']) - for name, stats in self.get_statistics(None).items(): - self.logger.info("Performance statistics: " - "%s min=%.06f, max=%.06f, average=%.06f, " - "count=%d" % ((name, ) + stats)) - self.logger.debug("Performance logging thread terminated") + if not self.terminate.isSet(): + for name, stats in self.get_statistics(None).items(): + self.logger.info("Performance statistics: " + "%s min=%.06f, max=%.06f, average=%.06f, " + "count=%d" % ((name, ) + stats)) + self.logger.info("Performance logging thread terminated") def _file_monitor_thread(self): """ The thread that runs the @@ -314,7 +315,7 @@ class BaseCore(object): except: continue self._update_vcs_revision() - self.logger.debug("File monitor thread terminated") + self.logger.info("File monitor thread terminated") @track_statistics() def _update_vcs_revision(self): @@ -430,14 +431,14 @@ class BaseCore(object): def shutdown(self): """ Perform plugin and FAM shutdown tasks. """ - self.logger.debug("Shutting down core...") + self.logger.info("Shutting down core...") if not self.terminate.isSet(): self.terminate.set() self.fam.shutdown() - self.logger.debug("FAM shut down") + self.logger.info("FAM shut down") for plugin in list(self.plugins.values()): plugin.shutdown() - self.logger.debug("All plugins shut down") + self.logger.info("All plugins shut down") @property def metadata_cache_mode(self): @@ -1052,6 +1053,7 @@ class BaseCore(object): for plugin in self.plugins_by_type(Probing): for probe in plugin.GetProbes(metadata): resp.append(probe) + self.logger.debug("Sending probe list to %s" % client) return lxml.etree.tostring(resp, xml_declaration=False).decode('UTF-8') except: diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 02710ab99..af8f6a56e 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -3,31 +3,31 @@ :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. -The parent communicates with the children over two constructs: - -* A :class:`multiprocessing.Pipe` is used to process render requests. - The pipe is locked when in use (i.e., between the time that a client - is submitted to be rendered and the time that its configuration is - returned) to keep things thread-safe. (This is accomplished through - the use of - :attr:`Bcfg2.Server.MultiprocessingCore.available_children.) -* A :class:`multiprocessing.Queue` is used to submit other commands in - a thread-safe, non-blocking fashion. (Note that, since it is a - queue, no results can be returned.) It implements a very simple RPC - protocol. Each command passed to a child over the Pipe must be a - tuple with the format:: +The parent communicates with the children over +:class:`multiprocessing.Pipe` objects that are wrapped in a +:class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` to +make them thread-safe. Each command passed over the Pipe should be in +the following format:: (, , ) - The method must be exposed by the child by decorating it with - :func:`Bcfg2.Server.Core.exposed`. +The parent can also communicate with children over a one-way +:class:`multiprocessing.Queue` object that is used for +publish-subscribe communications, i.e., most XML-RPC commands. +(Setting debug, e.g., doesn't require a response from the children.) + +The method must be exposed by the child by decorating it with +:func:`Bcfg2.Server.Core.exposed`. """ +import time import threading import lxml.etree import multiprocessing +from uuid import uuid4 +from itertools import cycle from Bcfg2.Cache import Cache -from Bcfg2.Compat import Queue +from Bcfg2.Compat import Queue, Empty from Bcfg2.Server.Core import BaseCore, exposed from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore @@ -41,28 +41,160 @@ class DispatchingCache(Cache, Debuggable): method = "expire_cache" def __init__(self, *args, **kwargs): - #: A dict of : :class:`multiprocessing.Queue` - #: objects that should be given a cache expiration command any - #: time an item is expired. - self.command_queues = kwargs.pop("pipes", dict()) - + self.cmd_q = kwargs.pop("queue") Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) def expire(self, key=None): - if (key and key in self) or (not key and len(self)): - # dispatching cache expiration to children can be - # expensive, so only do it if there's something to expire - for child, cmd_q in self.command_queues.items(): - if key: - self.logger.debug("Expiring metadata cache for %s on %s" % - (key, child)) - else: - self.logger.debug("Expiring metadata cache on %s" % child) - cmd_q.put((self.method, [key], dict())) + self.cmd_q.put((self.method, [key], dict())) Cache.expire(self, key=key) +class PublishSubscribeQueue(object): + """ An implementation of a :class:`multiprocessing.Queue` designed + for publish-subscribe use patterns. I.e., a single node adds items + to the queue, and every other node retrieves the item. This is + the 'publish' end; the subscribers can deal with this as a normal + Queue with no special handling. + + Note that, since this is the publishing end, there's no support + for getting. + """ + def __init__(self): + self._queues = [] + + def add_subscriber(self): + new_q = multiprocessing.Queue() + self._queues.append(new_q) + return new_q + + def put(self, data, block=True, timeout=None): + for queue in self._queues: + queue.put(data, block=block, timeout=timeout) + + def put_nowait(self, data): + self.put(data, block=False) + + def close(self): + for queue in self._queues: + queue.close() + + +class ThreadSafePipeDispatcher(Debuggable): + """ This is a wrapper around :class:`multiprocessing.Pipe` objects + that allows them to be used in multithreaded applications. When + performing a ``send()``, a key is included that will be used to + identify the response. As responses are received from the Pipe, + they are added to a dict that is used to get the appropriate + response for a given thread. + + The remote end of the Pipe must deal with the key being sent with + the data in a tuple of ``(key, data)``, and it must include the + key with its response. + + It is the responsibility of the user to ensure that the key is + unique. + + Note that this adds a bottleneck -- all communication over the + actual Pipe happens in a single thread. But for our purposes, + Pipe communication is fairly minimal and that's an acceptable + bottleneck.""" + + #: How long to wait while polling for new data to send. This + #: doesn't affect the speed with which data is sent, but + #: setting it too high will result in longer shutdown times, since + #: we only check for the termination event from the main process + #: every ``poll_wait`` seconds. + poll_wait = 2.0 + + _sentinel = object() + + def __init__(self, terminate): + Debuggable.__init__(self) + + #: The threading flag that is used to determine when the + #: threads should stop. + self.terminate = terminate + + #: The :class:`multiprocessing.Pipe` tuple used by this object + self.pipe = multiprocessing.Pipe() + + self._mainpipe = self.pipe[0] + self._recv_dict = dict() + self._send_queue = Queue() + + self.send_thread = threading.Thread(name="PipeSendThread", + target=self._send_thread) + self.send_thread.start() + self.recv_thread = threading.Thread(name="PipeRecvThread", + target=self._recv_thread) + self.recv_thread.start() + + def _send_thread(self): + """ Run the single thread through which send requests are passed """ + self.logger.debug("Starting interprocess RPC send thread") + while not self.terminate.isSet(): + try: + data = self._send_queue.get(True, self.poll_wait) + self._mainpipe.send(data) + except Empty: + pass + self.logger.info("Interprocess RPC send thread stopped") + + def send(self, key, data): + """ Send data with the given unique key """ + self._send_queue.put((key, data)) + + def _recv_thread(self): + """ Run the single thread through which recv requests are passed """ + self.logger.debug("Starting interprocess RPC receive thread") + while not self.terminate.isSet(): + if self._mainpipe.poll(self.poll_wait): + key, data = self._mainpipe.recv() + if key in self._recv_dict: + self.logger.error("Duplicate key in received data: %s" % + key) + self._mainpipe.close() + self._recv_dict[key] = data + self.logger.info("Interprocess RPC receive thread stopped") + + def recv(self, key): + """ Receive data with the given unique key """ + self.poll(key, timeout=None) + return self._recv_dict.pop(key) + + def poll(self, key, timeout=_sentinel): + """ Poll for data with the given unique key. See + :func:`multiprocessing.Connection.poll` for the possible + values of ``timeout``. """ + if timeout is self._sentinel: + return key in self._recv_dict + + abort = threading.Event() + + if timeout is not None: + timer = threading.Timer(float(timeout), abort.set) + timer.start() + try: + while not abort.is_set(): + if key in self._recv_dict: + return True + return False + finally: + if timeout is not None: + timer.cancel() + + @staticmethod + def genkey(base): + """ Generate a key suitable for use with + :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` + send() requests, based on the given data. The key is + constructed from the string given, some information about this + thread, and some random data. """ + thread = threading.current_thread() + return "%s-%s-%s-%s" % (base, thread.name, thread.ident, uuid4()) + + class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -111,101 +243,153 @@ class ChildCore(BaseCore): those, though, if the pipe communication "protocol" were made more robust. """ - #: How long to wait while polling for new clients to build. This - #: doesn't affect the speed with which a client is built, but + #: How long to wait while polling for new RPC commands. This + #: doesn't affect the speed with which a command is processed, but #: setting it too high will result in longer shutdown times, since #: we only check for the termination event from the main process #: every ``poll_wait`` seconds. - poll_wait = 5.0 + poll_wait = 3.0 - def __init__(self, setup, render_pipe, command_queue, terminate): + def __init__(self, name, setup, rpc_pipe, cmd_q, terminate): """ + :param name: The name of this child + :type name: string :param setup: A Bcfg2 options dict :type setup: Bcfg2.Options.OptionParser - :param pipe: The pipe to which client hostnames are added for - ChildCore objects to build configurations, and to - which client configurations are added after - having been built by ChildCore objects. - :type pipe: multiprocessing.Pipe + :param rpc_pipe: The pipe used for RPC communication with the + parent process + :type rpc_pipe: multiprocessing.Pipe + :param cmd_q: The queue used for one-way pub-sub + communications from the parent process + :type cmd_q: multiprocessing.Queue :param terminate: An event that flags ChildCore objects to shut themselves down. :type terminate: multiprocessing.Event """ BaseCore.__init__(self, setup) - #: The pipe to which client hostnames are added for ChildCore - #: objects to build configurations, and to which client - #: configurations are added after having been built by - #: ChildCore objects. - self.render_pipe = render_pipe + #: The name of this child + self.name = name + + #: The pipe used for RPC communication with the parent + self.rpc_pipe = rpc_pipe - #: The queue from which other commands are received - self.command_queue = command_queue + #: The queue used to receive pub-sub commands + self.cmd_q = cmd_q #: The :class:`multiprocessing.Event` that will be monitored #: to determine when this child should shut down. self.terminate = terminate - #: The :class:`threading.Thread` used to process commands - #: received via the :class:`multiprocessing.Queue` RPC - #: interface - self.command_thread = \ - threading.Thread(name="CommandThread", - target=self._command_queue_thread) + # a list of all rendering threads + self._threads = [] - def _daemonize(self): - return True + # the thread used to process publish-subscribe commands + self._command_thread = threading.Thread(name="CommandThread", + target=self._dispatch_commands) + + # override this setting so that the child doesn't try to write + # the pidfile + self.setup['daemon'] = False + + # ensure that the child doesn't start a perflog thread + self.perflog_thread = None def _run(self): - try: - self.command_thread.start() - except: - self.shutdown() - raise + self._command_thread.start() return True - def render(self): - """ Process client configuration render requests """ - if self.render_pipe.poll(self.poll_wait): - if not self.metadata.use_database: - # handle FAM events, in case (for instance) the - # client has just been added to clients.xml, or a - # profile has just been asserted. but really, you - # should be using the metadata database if you're - # using this core. - self.fam.handle_events_in_interval(0.1) - client = self.render_pipe.recv() - self.logger.debug("Building configuration for %s" % client) - self.render_pipe.send( - lxml.etree.tostring(self.BuildConfiguration(client))) + def _daemonize(self): + return True + + def _dispatch_commands(self): + """ Dispatch commands received via the pub-sub queue interface + """ + self.logger.debug("Starting %s RPC subscription thread" % self.name) + while not self.terminate.is_set(): + try: + data = self.cmd_q.get(True, self.poll_wait) + self.logger.debug("%s: Processing asynchronous command: %s" % + (self.name, data[0])) + self._dispatch(data) + except Empty: + pass + self.logger.info("%s RPC subscription thread stopped" % self.name) + + def _dispatch_render(self): + """ Dispatch render requests received via the RPC pipe + interface """ + key, data = self.rpc_pipe.recv() + self.rpc_pipe.send((key, self._dispatch(data))) + + def _reap_threads(self): + """ Reap rendering threads that have completed """ + for thread in self._threads[:]: + if not thread.is_alive(): + self._threads.remove(thread) + + def _dispatch(self, data): + """ Generic method dispatcher used for commands received from + either the pub-sub queue or the RPC pipe. """ + method, args, kwargs = data + if not hasattr(self, method): + self.logger.error("%s: Method %s does not exist" % (self.name, + method)) + return None + + func = getattr(self, method) + if func.exposed: + self.logger.debug("%s: Calling RPC method %s" % (self.name, + method)) + return func(*args, **kwargs) + else: + self.logger.error("%s: Method %s is not exposed" % (self.name, + method)) + return None def _block(self): while not self.terminate.isSet(): try: - self.render() + if self.rpc_pipe.poll(self.poll_wait): + rpc_thread = threading.Thread( + name="Renderer%s" % len(self._threads), + target=self._dispatch_render) + self._threads.append(rpc_thread) + rpc_thread.start() + self._reap_threads() except KeyboardInterrupt: break self.shutdown() - def _command_queue_thread(self): - """ Process commands received on the command queue thread """ - while not self.terminate.isSet(): - method, args, kwargs = self.command_queue.get() - if hasattr(self, method): - func = getattr(self, method) - if func.exposed: - self.logger.debug("Child calling RPC method %s" % method) - func(*args, **kwargs) - else: - self.logger.error("Method %s is not exposed" % method) - else: - self.logger.error("Method %s does not exist" % method) + def shutdown(self): + BaseCore.shutdown(self) + self._reap_threads() + while len(threading.enumerate()) > 1: + threads = [t for t in threading.enumerate() + if t != threading.current_thread()] + self.logger.info("%s: Waiting for %d thread(s): %s" % + (self.name, len(threads), + [t.name for t in threads])) + time.sleep(1) + self._reap_threads() + self.logger.info("%s: All threads stopped" % self.name) + + @exposed + def set_debug(self, address, debug): + BaseCore.set_debug(self, address, debug) @exposed def expire_cache(self, client=None): """ Expire the metadata cache for a client """ self.metadata_cache.expire(client) + @exposed + def GetConfig(self, client): + """ Render the configuration for a client """ + self.logger.debug("%s: Building configuration for %s" % + (self.name, client)) + return lxml.etree.tostring(self.BuildConfiguration(client)) + class Core(BuiltinCore): """ A multiprocessing core that delegates building the actual @@ -224,84 +408,102 @@ class Core(BuiltinCore): if setup['children'] is None: setup['children'] = multiprocessing.cpu_count() - #: A dict of child name -> one end of the - #: :class:`multiprocessing.Pipe` object used to submit render - #: requests to that child. (The child is given the other end - #: of the Pipe.) - self.render_pipes = dict() - - #: A dict of child name -> :class:`multiprocessing.Queue` - #: object used to pass commands to that child. - self.command_queues = dict() - - #: A queue that keeps track of which children are available to - #: render a configuration. A child is popped from the queue - #: when it starts to render a config, then it's pushed back on - #: when it's done. This lets us use a blocking call to - #: :func:`Queue.Queue.get` when waiting for an available - #: child. - self.available_children = Queue(maxsize=self.setup['children']) - - # sigh. multiprocessing was added in py2.6, which is when the - # camelCase methods for threading objects were deprecated in - # favor of the Pythonic under_score methods. So - # multiprocessing.Event *only* has is_set(), while - # threading.Event has *both* isSet() and is_set(). In order - # to make the core work with Python 2.4+, and with both - # multiprocessing and threading Event objects, we just - # monkeypatch self.terminate to have isSet(). + #: A dict of child name -> + #: :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` + #: objects used to pass render requests to that child. (The + #: child is given the other end of the Pipe.) + self.pipes = dict() + + #: A + #: :class:`Bcfg2.Server.MultiprocessingCore.PublishSubscribeQueue` + #: object used to publish commands to all children. + self.cmd_q = PublishSubscribeQueue() + + #: The flag that indicates when to stop child threads and + #: processes self.terminate = DualEvent(threading_event=self.terminate) - self.metadata_cache = DispatchingCache() + self.metadata_cache = DispatchingCache(queue=self.cmd_q) + + #: A list of children that will be cycled through + self._all_children = [] + + #: An iterator that each child will be taken from in sequence, + #: to provide a round-robin distribution of render requests + self.children = None def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum - # create Pipe for render requests and results - (mainpipe, childpipe) = multiprocessing.Pipe() - self.render_pipes[name] = mainpipe - - # create Queue for other commands - cmd_q = multiprocessing.Queue() - self.command_queues[name] = cmd_q - self.metadata_cache.command_queues[name] = cmd_q + # create Pipe for render requests + dispatcher = ThreadSafePipeDispatcher(self.terminate) + self.pipes[name] = dispatcher self.logger.debug("Starting child %s" % name) - childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate) + childcore = ChildCore(name, self.setup, dispatcher.pipe[1], + self.cmd_q.add_subscriber(), self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, child.pid)) - self.available_children.put(name) + self._all_children.append(name) + self.logger.debug("Started %s children: %s" % (len(self._all_children), + self._all_children)) + self.children = cycle(self._all_children) return BuiltinCore._run(self) def shutdown(self): BuiltinCore.shutdown(self) - for child in multiprocessing.active_children(): - self.logger.debug("Shutting down child %s" % child.name) - child.join(self.shutdown_timeout) - if child.is_alive(): + self.logger.debug("Closing RPC command queues") + self.cmd_q.close() + + def term_children(): + for child in multiprocessing.active_children(): self.logger.error("Waited %s seconds to shut down %s, " "terminating" % (self.shutdown_timeout, child.name)) child.terminate() - else: - self.logger.debug("Child %s shut down" % child.name) - self.logger.debug("All children shut down") + + timer = threading.Timer(self.shutdown_timeout, term_children) + timer.start() + while len(multiprocessing.active_children()): + self.logger.info("Waiting for %s child(ren): %s" % + (len(multiprocessing.active_children()), + [c.name + for c in multiprocessing.active_children()])) + time.sleep(1) + timer.cancel() + self.logger.info("All children shut down") + + while len(threading.enumerate()) > 1: + threads = [t for t in threading.enumerate() + if t != threading.current_thread()] + self.logger.info("Waiting for %s thread(s): %s" % + (len(threads), [t.name for t in threads])) + time.sleep(1) + self.logger.info("Shutdown complete") @exposed def set_debug(self, address, debug): + self.cmd_q.put(("set_debug", [address, debug], dict())) self.metadata_cache.set_debug(debug) + for pipe in self.pipes.values(): + pipe.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] - childname = self.available_children.get() - self.logger.debug("Building configuration on child %s" % childname) - pipe = self.render_pipes[childname] - pipe.send(client) - config = pipe.recv() - self.available_children.put_nowait(childname) - return config + childname = self.children.next() + self.logger.debug("Building configuration for %s on %s" % (client, + childname)) + key = ThreadSafePipeDispatcher.genkey(client) + pipe = self.pipes[childname] + pipe.send(key, ("GetConfig", [client], dict())) + if pipe.poll(key, timeout=self.setup['client_timeout']): + return pipe.recv(key) + else: + self.logger.error("Building configuration for %s on %s timed out" % + (client, childname)) + return None -- cgit v1.2.3-1-g7c22