From c8d71e18c16039593b309bc35e4ceffc50a0107d Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 5 Aug 2013 13:53:20 -0400 Subject: MultiprocessingCore: greatly simplified parent-child RPC, removed non-thread-safe bits --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 407 ++++++++++------------------ 1 file changed, 145 insertions(+), 262 deletions(-) (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 775131188..4c304d28c 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -4,33 +4,24 @@ processes. As such, it requires Python 2.6+. The parent communicates with the children over -:class:`multiprocessing.Pipe` objects that are wrapped in a -:class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` to -make them thread-safe. Each command passed over the Pipe should be in -the following format:: +:class:`multiprocessing.Queue` objects via a +:class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object. - (, , ) - -The parent can also communicate with children over a one-way -:class:`multiprocessing.Queue` object that is used for -publish-subscribe communications, i.e., most XML-RPC commands. -(Setting debug, e.g., doesn't require a response from the children.) - -The method must be exposed by the child by decorating it with -:func:`Bcfg2.Server.Core.exposed`. +A method being called via the RPCQueue must be exposed by the child by +decorating it with :func:`Bcfg2.Server.Core.exposed`. """ import time import threading import lxml.etree import multiprocessing -from uuid import uuid4 from itertools import cycle from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue, Empty +from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.Core import BaseCore, exposed from Bcfg2.Server.BuiltinCore import Core as BuiltinCore -from Bcfg2.Server.Plugin import Debuggable, track_statistics +from multiprocessing.connection import Listener, Client class DispatchingCache(Cache, Debuggable): @@ -41,166 +32,103 @@ class DispatchingCache(Cache, Debuggable): method = "expire_cache" def __init__(self, *args, **kwargs): - self.cmd_q = kwargs.pop("queue") + self.rpc_q = kwargs.pop("queue") Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) def expire(self, key=None): - self.cmd_q.put((self.method, [key], dict())) + self.rpc_q.publish(self.method, args=[key]) Cache.expire(self, key=key) -class PublishSubscribeQueue(object): +class RPCQueue(Debuggable): """ An implementation of a :class:`multiprocessing.Queue` designed - for publish-subscribe use patterns. I.e., a single node adds items - to the queue, and every other node retrieves the item. This is - the 'publish' end; the subscribers can deal with this as a normal - Queue with no special handling. + for several additional use patterns: - Note that, since this is the publishing end, there's no support - for getting. + * Random-access reads, based on a key that identifies the data; + * Publish-subscribe, where a datum is sent to all hosts. + + The subscribers can deal with this as a normal Queue with no + special handling. """ + poll_wait = 3.0 def __init__(self): - self._queues = [] - - def add_subscriber(self): - """ Add a subscriber to the queue. This returns a - :class:`multiprocessing.Queue` object that is used as the - subscription end of the queue. """ - new_q = multiprocessing.Queue() - self._queues.append(new_q) - return new_q - - def put(self, obj, block=True, timeout=None): - """ Put ``obj`` into the queue. See - :func:`multiprocessing.Queue.put` for more details.""" - for queue in self._queues: - queue.put(obj, block=block, timeout=timeout) - - def put_nowait(self, obj): - """ Equivalent to ``put(obj, False)``. """ - self.put(obj, block=False) + Debuggable.__init__(self) + self._terminate = threading.Event() + self._queues = dict() + self._available_listeners = Queue() + self._blocking_listeners = [] + + def add_subscriber(self, name): + """ Add a subscriber to the queue. This returns the + :class:`multiprocessing.Queue` object that the subscriber + should read from. """ + self._queues[name] = multiprocessing.Queue() + return self._queues[name] + + def publish(self, method, args=None, kwargs=None): + """ Publish an RPC call to the queue for consumption by all + subscribers. """ + for queue in self._queues.values(): + queue.put((None, (method, args or [], kwargs or dict()))) + + def rpc(self, dest, method, args=None, kwargs=None): + """ Make an RPC call to the named subscriber, expecting a + response. This opens a + :class:`multiprocessing.connection.Listener` and passes the + Listener address to the child as part of the RPC call, so that + the child can connect to the Listener to submit its results. + + Listeners are reused when possible to minimize overhead. + """ + try: + listener = self._available_listeners.get_nowait() + self.logger.debug("Reusing existing RPC listener at %s" % + listener.address) + except Empty: + listener = Listener() + self.logger.debug("Created new RPC listener at %s" % + listener.address) + self._blocking_listeners.append(listener) + try: + self._queues[dest].put((listener.address, + (method, args or [], kwargs or dict()))) + conn = listener.accept() + self._blocking_listeners.remove(listener) + try: + while not self._terminate.is_set(): + if conn.poll(self.poll_wait): + return conn.recv() + finally: + conn.close() + finally: + self._available_listeners.put(listener) def close(self): - """ Close the queue. See :func:`multiprocessing.Queue.close` - for more details. """ - for queue in self._queues: + """ Close queues and connections. """ + self._terminate.set() + self.logger.debug("Closing RPC queues") + for name, queue in self._queues.items(): + self.logger.debug("Closing RPC queue to %s" % name) queue.close() + # close any listeners that are waiting for connections + self.logger.debug("Closing RPC connections") + for listener in self._blocking_listeners: + self.logger.debug("Closing RPC connection at %s" % + listener.address) + listener.close() -class ThreadSafePipeDispatcher(Debuggable): - """ This is a wrapper around :class:`multiprocessing.Pipe` objects - that allows them to be used in multithreaded applications. When - performing a ``send()``, a key is included that will be used to - identify the response. As responses are received from the Pipe, - they are added to a dict that is used to get the appropriate - response for a given thread. - - The remote end of the Pipe must deal with the key being sent with - the data in a tuple of ``(key, data)``, and it must include the - key with its response. - - It is the responsibility of the user to ensure that the key is - unique. - - Note that this adds a bottleneck -- all communication over the - actual Pipe happens in a single thread. But for our purposes, - Pipe communication is fairly minimal and that's an acceptable - bottleneck.""" - - #: How long to wait while polling for new data to send. This - #: doesn't affect the speed with which data is sent, but - #: setting it too high will result in longer shutdown times, since - #: we only check for the termination event from the main process - #: every ``poll_wait`` seconds. - poll_wait = 2.0 - - _sentinel = object() - - def __init__(self, terminate): - Debuggable.__init__(self) - - #: The threading flag that is used to determine when the - #: threads should stop. - self.terminate = terminate - - #: The :class:`multiprocessing.Pipe` tuple used by this object - self.pipe = multiprocessing.Pipe() - - self._mainpipe = self.pipe[0] - self._recv_dict = dict() - self._send_queue = Queue() - - self.send_thread = threading.Thread(name="PipeSendThread", - target=self._send_thread) - self.send_thread.start() - self.recv_thread = threading.Thread(name="PipeRecvThread", - target=self._recv_thread) - self.recv_thread.start() - - def _send_thread(self): - """ Run the single thread through which send requests are passed """ - self.logger.debug("Starting interprocess RPC send thread") - while not self.terminate.isSet(): - try: - self._mainpipe.send(self._send_queue.get(True, self.poll_wait)) - except Empty: - pass - self.logger.info("Interprocess RPC send thread stopped") - - def send(self, key, data): - """ Send data with the given unique key """ - self._send_queue.put((key, data)) - - def _recv_thread(self): - """ Run the single thread through which recv requests are passed """ - self.logger.debug("Starting interprocess RPC receive thread") - while not self.terminate.isSet(): - if self._mainpipe.poll(self.poll_wait): - key, data = self._mainpipe.recv() - if key in self._recv_dict: - self.logger.error("Duplicate key in received data: %s" % - key) - self._mainpipe.close() - self._recv_dict[key] = data - self.logger.info("Interprocess RPC receive thread stopped") - - def recv(self, key): - """ Receive data with the given unique key """ - self.poll(key, timeout=None) - return self._recv_dict.pop(key) - - def poll(self, key, timeout=_sentinel): - """ Poll for data with the given unique key. See - :func:`multiprocessing.Connection.poll` for the possible - values of ``timeout``. """ - if timeout is self._sentinel: - return key in self._recv_dict - - abort = threading.Event() - - if timeout is not None: - timer = threading.Timer(float(timeout), abort.set) - timer.start() + self.logger.debug("Closing RPC listeners") try: - while not abort.is_set(): - if key in self._recv_dict: - return True - return False - finally: - if timeout is not None: - timer.cancel() - - @staticmethod - def genkey(base): - """ Generate a key suitable for use with - :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` - send() requests, based on the given data. The key is - constructed from the string given, some information about this - thread, and some random data. """ - thread = threading.current_thread() - return "%s-%s-%s-%s" % (base, thread.name, thread.ident, uuid4()) + while True: + listener = self._available_listeners.get_nowait() + self.logger.debug("Closing RPC listener at %s" % + listener.address) + listener.close() + except Empty: + pass class DualEvent(object): @@ -258,18 +186,18 @@ class ChildCore(BaseCore): #: every ``poll_wait`` seconds. poll_wait = 3.0 - def __init__(self, name, setup, rpc_pipe, cmd_q, terminate): + def __init__(self, name, setup, rpc_q, terminate): """ :param name: The name of this child :type name: string :param setup: A Bcfg2 options dict :type setup: Bcfg2.Options.OptionParser - :param rpc_pipe: The pipe used for RPC communication with the - parent process - :type rpc_pipe: multiprocessing.Pipe - :param cmd_q: The queue used for one-way pub-sub - communications from the parent process - :type cmd_q: multiprocessing.Queue + :param read_q: The queue the child will read from for RPC + communications from the parent process. + :type read_q: multiprocessing.Queue + :param write_q: The queue the child will write the results of + RPC calls to. + :type write_q: multiprocessing.Queue :param terminate: An event that flags ChildCore objects to shut themselves down. :type terminate: multiprocessing.Event @@ -279,22 +207,12 @@ class ChildCore(BaseCore): #: The name of this child self.name = name - #: The pipe used for RPC communication with the parent - self.rpc_pipe = rpc_pipe - - #: The queue used to receive pub-sub commands - self.cmd_q = cmd_q - #: The :class:`multiprocessing.Event` that will be monitored #: to determine when this child should shut down. self.terminate = terminate - # a list of all rendering threads - self._threads = [] - - # the thread used to process publish-subscribe commands - self._command_thread = threading.Thread(name="CommandThread", - target=self._dispatch_commands) + #: The queue used for RPC communication + self.rpc_q = rpc_q # override this setting so that the child doesn't try to write # the pidfile @@ -304,75 +222,60 @@ class ChildCore(BaseCore): self.perflog_thread = None def _run(self): - self._command_thread.start() return True def _daemonize(self): return True - def _dispatch_commands(self): - """ Dispatch commands received via the pub-sub queue interface - """ - self.logger.debug("Starting %s RPC subscription thread" % self.name) - while not self.terminate.is_set(): - try: - data = self.cmd_q.get(True, self.poll_wait) - self.logger.debug("%s: Processing asynchronous command: %s" % - (self.name, data[0])) - self._dispatch(data) - except Empty: - pass - self.logger.info("%s RPC subscription thread stopped" % self.name) - - def _dispatch_render(self): - """ Dispatch render requests received via the RPC pipe - interface """ - key, data = self.rpc_pipe.recv() - self.rpc_pipe.send((key, self._dispatch(data))) - - @track_statistics() - def _reap_threads(self): - """ Reap rendering threads that have completed """ - for thread in self._threads[:]: - if not thread.is_alive(): - self._threads.remove(thread) - - def _dispatch(self, data): - """ Generic method dispatcher used for commands received from - either the pub-sub queue or the RPC pipe. """ + def _dispatch(self, address, data): + """ Method dispatcher used for commands received from + the RPC queue. """ + if address is not None: + # if the key is None, then no response is expected. we + # make the return connection before dispatching the actual + # RPC call so that the parent is blocking for a connection + # as briefly as possible + self.logger.error("Connecting to parent via %s" % address) + client = Client(address) method, args, kwargs = data + rv = None if not hasattr(self, method): self.logger.error("%s: Method %s does not exist" % (self.name, method)) - return None - - func = getattr(self, method) - if func.exposed: - self.logger.debug("%s: Calling RPC method %s" % (self.name, - method)) - return func(*args, **kwargs) else: - self.logger.error("%s: Method %s is not exposed" % (self.name, - method)) - return None + func = getattr(self, method) + if func.exposed: + self.logger.debug("%s: Calling RPC method %s" % (self.name, + method)) + rv = func(*args, **kwargs) + else: + self.logger.error("%s: Method %s is not exposed" % (self.name, + method)) + if address is not None: + # if the key is None, then no response is expected + self.logger.error("Returning data to parent via %s" % address) + client.send(rv) def _block(self): - while not self.terminate.isSet(): + while not self.terminate.is_set(): try: - if self.rpc_pipe.poll(self.poll_wait): - rpc_thread = threading.Thread( - name="Renderer%s" % len(self._threads), - target=self._dispatch_render) - self._threads.append(rpc_thread) - rpc_thread.start() - self._reap_threads() + address, data = self.rpc_q.get(timeout=self.poll_wait) + threadname = "-".join(str(i) for i in data) + rpc_thread = threading.Thread(name=threadname, + target=self._dispatch, + args=[address, data]) + rpc_thread.start() + except Empty: + pass except KeyboardInterrupt: break self.shutdown() def shutdown(self): BaseCore.shutdown(self) - self._reap_threads() + self.logger.info("%s: Closing RPC command queue" % self.name) + self.rpc_q.close() + while len(threading.enumerate()) > 1: threads = [t for t in threading.enumerate() if t != threading.current_thread()] @@ -380,13 +283,8 @@ class ChildCore(BaseCore): (self.name, len(threads), [t.name for t in threads])) time.sleep(1) - self._reap_threads() self.logger.info("%s: All threads stopped" % self.name) - @exposed - def set_debug(self, address, debug): - BaseCore.set_debug(self, address, debug) - @exposed def expire_cache(self, client=None): """ Expire the metadata cache for a client """ @@ -417,22 +315,15 @@ class Core(BuiltinCore): if setup['children'] is None: setup['children'] = multiprocessing.cpu_count() - #: A dict of child name -> - #: :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` - #: objects used to pass render requests to that child. (The - #: child is given the other end of the Pipe.) - self.pipes = dict() - - #: A - #: :class:`Bcfg2.Server.MultiprocessingCore.PublishSubscribeQueue` - #: object used to publish commands to all children. - self.cmd_q = PublishSubscribeQueue() - #: The flag that indicates when to stop child threads and #: processes self.terminate = DualEvent(threading_event=self.terminate) - self.metadata_cache = DispatchingCache(queue=self.cmd_q) + #: A :class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object + #: used to send or publish commands to children. + self.rpc_q = RPCQueue() + + self.metadata_cache = DispatchingCache(queue=self.rpc_q) #: A list of children that will be cycled through self._all_children = [] @@ -445,13 +336,9 @@ class Core(BuiltinCore): for cnum in range(self.setup['children']): name = "Child-%s" % cnum - # create Pipe for render requests - dispatcher = ThreadSafePipeDispatcher(self.terminate) - self.pipes[name] = dispatcher - self.logger.debug("Starting child %s" % name) - childcore = ChildCore(name, self.setup, dispatcher.pipe[1], - self.cmd_q.add_subscriber(), self.terminate) + child_q = self.rpc_q.add_subscriber(name) + childcore = ChildCore(name, self.setup, child_q, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, @@ -464,8 +351,8 @@ class Core(BuiltinCore): def shutdown(self): BuiltinCore.shutdown(self) - self.logger.debug("Closing RPC command queues") - self.cmd_q.close() + self.logger.info("Closing RPC command queues") + self.rpc_q.close() def term_children(): """ Terminate all remaining multiprocessing children. """ @@ -496,10 +383,9 @@ class Core(BuiltinCore): @exposed def set_debug(self, address, debug): - self.cmd_q.put(("set_debug", [address, debug], dict())) + self.rpc_q.set_debug(debug) + self.rpc_q.publish("set_debug", args=[address, debug]) self.metadata_cache.set_debug(debug) - for pipe in self.pipes.values(): - pipe.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) @exposed @@ -508,10 +394,7 @@ class Core(BuiltinCore): childname = self.children.next() self.logger.debug("Building configuration for %s on %s" % (client, childname)) - key = ThreadSafePipeDispatcher.genkey(client) - pipe = self.pipes[childname] - pipe.send(key, ("GetConfig", [client], dict())) - return pipe.recv(key) + return self.rpc_q.rpc(childname, "GetConfig", args=[client]) @exposed def get_statistics(self, address): @@ -543,10 +426,10 @@ class Core(BuiltinCore): (vals[2] * vals[3])) / newcount stats[totalname] = (newmin, newmax, newmean, newcount) - key = ThreadSafePipeDispatcher.genkey("get_statistics") stats = dict() - for childname, pipe in self.pipes.items(): - pipe.send(key, ("get_statistics", [address], dict())) - _aggregate_statistics(pipe.recv(key), prefix=childname) + for childname in self._all_children: + _aggregate_statistics( + self.rpc_q.rpc(childname, "get_statistics", args=[address]), + prefix=childname) _aggregate_statistics(BuiltinCore.get_statistics(self, address)) return stats -- cgit v1.2.3-1-g7c22