summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-07-30 13:54:37 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-07-30 14:01:50 -0400
commit50b7407fd8c29bfede3091fa9e76b8e2a78de3ec (patch)
tree5d5935a50e27a8875513aac9fc66905b2ceb1fa7 /src/lib/Bcfg2/Server/MultiprocessingCore.py
parent736d98688db0ad853a14d58a5fa395a9844dc590 (diff)
downloadbcfg2-50b7407fd8c29bfede3091fa9e76b8e2a78de3ec.tar.gz
bcfg2-50b7407fd8c29bfede3091fa9e76b8e2a78de3ec.tar.bz2
bcfg2-50b7407fd8c29bfede3091fa9e76b8e2a78de3ec.zip
MultiprocessingCore: make multiprocessing children threaded for higher performance
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py480
1 files changed, 341 insertions, 139 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 02710ab99..af8f6a56e 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -3,31 +3,31 @@
:mod:`multiprocessing` library to offload work to multiple child
processes. As such, it requires Python 2.6+.
-The parent communicates with the children over two constructs:
-
-* A :class:`multiprocessing.Pipe` is used to process render requests.
- The pipe is locked when in use (i.e., between the time that a client
- is submitted to be rendered and the time that its configuration is
- returned) to keep things thread-safe. (This is accomplished through
- the use of
- :attr:`Bcfg2.Server.MultiprocessingCore.available_children.)
-* A :class:`multiprocessing.Queue` is used to submit other commands in
- a thread-safe, non-blocking fashion. (Note that, since it is a
- queue, no results can be returned.) It implements a very simple RPC
- protocol. Each command passed to a child over the Pipe must be a
- tuple with the format::
+The parent communicates with the children over
+:class:`multiprocessing.Pipe` objects that are wrapped in a
+:class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` to
+make them thread-safe. Each command passed over the Pipe should be in
+the following format::
(<method>, <args>, <kwargs>)
- The method must be exposed by the child by decorating it with
- :func:`Bcfg2.Server.Core.exposed`.
+The parent can also communicate with children over a one-way
+:class:`multiprocessing.Queue` object that is used for
+publish-subscribe communications, i.e., most XML-RPC commands.
+(Setting debug, e.g., doesn't require a response from the children.)
+
+The method must be exposed by the child by decorating it with
+:func:`Bcfg2.Server.Core.exposed`.
"""
+import time
import threading
import lxml.etree
import multiprocessing
+from uuid import uuid4
+from itertools import cycle
from Bcfg2.Cache import Cache
-from Bcfg2.Compat import Queue
+from Bcfg2.Compat import Queue, Empty
from Bcfg2.Server.Core import BaseCore, exposed
from Bcfg2.Server.Plugin import Debuggable
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
@@ -41,28 +41,160 @@ class DispatchingCache(Cache, Debuggable):
method = "expire_cache"
def __init__(self, *args, **kwargs):
- #: A dict of <child name>: :class:`multiprocessing.Queue`
- #: objects that should be given a cache expiration command any
- #: time an item is expired.
- self.command_queues = kwargs.pop("pipes", dict())
-
+ self.cmd_q = kwargs.pop("queue")
Debuggable.__init__(self)
Cache.__init__(self, *args, **kwargs)
def expire(self, key=None):
- if (key and key in self) or (not key and len(self)):
- # dispatching cache expiration to children can be
- # expensive, so only do it if there's something to expire
- for child, cmd_q in self.command_queues.items():
- if key:
- self.logger.debug("Expiring metadata cache for %s on %s" %
- (key, child))
- else:
- self.logger.debug("Expiring metadata cache on %s" % child)
- cmd_q.put((self.method, [key], dict()))
+ self.cmd_q.put((self.method, [key], dict()))
Cache.expire(self, key=key)
+class PublishSubscribeQueue(object):
+ """ An implementation of a :class:`multiprocessing.Queue` designed
+ for publish-subscribe use patterns. I.e., a single node adds items
+ to the queue, and every other node retrieves the item. This is
+ the 'publish' end; the subscribers can deal with this as a normal
+ Queue with no special handling.
+
+ Note that, since this is the publishing end, there's no support
+ for getting.
+ """
+ def __init__(self):
+ self._queues = []
+
+ def add_subscriber(self):
+ new_q = multiprocessing.Queue()
+ self._queues.append(new_q)
+ return new_q
+
+ def put(self, data, block=True, timeout=None):
+ for queue in self._queues:
+ queue.put(data, block=block, timeout=timeout)
+
+ def put_nowait(self, data):
+ self.put(data, block=False)
+
+ def close(self):
+ for queue in self._queues:
+ queue.close()
+
+
+class ThreadSafePipeDispatcher(Debuggable):
+ """ This is a wrapper around :class:`multiprocessing.Pipe` objects
+ that allows them to be used in multithreaded applications. When
+ performing a ``send()``, a key is included that will be used to
+ identify the response. As responses are received from the Pipe,
+ they are added to a dict that is used to get the appropriate
+ response for a given thread.
+
+ The remote end of the Pipe must deal with the key being sent with
+ the data in a tuple of ``(key, data)``, and it must include the
+ key with its response.
+
+ It is the responsibility of the user to ensure that the key is
+ unique.
+
+ Note that this adds a bottleneck -- all communication over the
+ actual Pipe happens in a single thread. But for our purposes,
+ Pipe communication is fairly minimal and that's an acceptable
+ bottleneck."""
+
+ #: How long to wait while polling for new data to send. This
+ #: doesn't affect the speed with which data is sent, but
+ #: setting it too high will result in longer shutdown times, since
+ #: we only check for the termination event from the main process
+ #: every ``poll_wait`` seconds.
+ poll_wait = 2.0
+
+ _sentinel = object()
+
+ def __init__(self, terminate):
+ Debuggable.__init__(self)
+
+ #: The threading flag that is used to determine when the
+ #: threads should stop.
+ self.terminate = terminate
+
+ #: The :class:`multiprocessing.Pipe` tuple used by this object
+ self.pipe = multiprocessing.Pipe()
+
+ self._mainpipe = self.pipe[0]
+ self._recv_dict = dict()
+ self._send_queue = Queue()
+
+ self.send_thread = threading.Thread(name="PipeSendThread",
+ target=self._send_thread)
+ self.send_thread.start()
+ self.recv_thread = threading.Thread(name="PipeRecvThread",
+ target=self._recv_thread)
+ self.recv_thread.start()
+
+ def _send_thread(self):
+ """ Run the single thread through which send requests are passed """
+ self.logger.debug("Starting interprocess RPC send thread")
+ while not self.terminate.isSet():
+ try:
+ data = self._send_queue.get(True, self.poll_wait)
+ self._mainpipe.send(data)
+ except Empty:
+ pass
+ self.logger.info("Interprocess RPC send thread stopped")
+
+ def send(self, key, data):
+ """ Send data with the given unique key """
+ self._send_queue.put((key, data))
+
+ def _recv_thread(self):
+ """ Run the single thread through which recv requests are passed """
+ self.logger.debug("Starting interprocess RPC receive thread")
+ while not self.terminate.isSet():
+ if self._mainpipe.poll(self.poll_wait):
+ key, data = self._mainpipe.recv()
+ if key in self._recv_dict:
+ self.logger.error("Duplicate key in received data: %s" %
+ key)
+ self._mainpipe.close()
+ self._recv_dict[key] = data
+ self.logger.info("Interprocess RPC receive thread stopped")
+
+ def recv(self, key):
+ """ Receive data with the given unique key """
+ self.poll(key, timeout=None)
+ return self._recv_dict.pop(key)
+
+ def poll(self, key, timeout=_sentinel):
+ """ Poll for data with the given unique key. See
+ :func:`multiprocessing.Connection.poll` for the possible
+ values of ``timeout``. """
+ if timeout is self._sentinel:
+ return key in self._recv_dict
+
+ abort = threading.Event()
+
+ if timeout is not None:
+ timer = threading.Timer(float(timeout), abort.set)
+ timer.start()
+ try:
+ while not abort.is_set():
+ if key in self._recv_dict:
+ return True
+ return False
+ finally:
+ if timeout is not None:
+ timer.cancel()
+
+ @staticmethod
+ def genkey(base):
+ """ Generate a key suitable for use with
+ :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher`
+ send() requests, based on the given data. The key is
+ constructed from the string given, some information about this
+ thread, and some random data. """
+ thread = threading.current_thread()
+ return "%s-%s-%s-%s" % (base, thread.name, thread.ident, uuid4())
+
+
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that
internally implements both :class:`threading.Event` and
@@ -111,101 +243,153 @@ class ChildCore(BaseCore):
those, though, if the pipe communication "protocol" were made more
robust. """
- #: How long to wait while polling for new clients to build. This
- #: doesn't affect the speed with which a client is built, but
+ #: How long to wait while polling for new RPC commands. This
+ #: doesn't affect the speed with which a command is processed, but
#: setting it too high will result in longer shutdown times, since
#: we only check for the termination event from the main process
#: every ``poll_wait`` seconds.
- poll_wait = 5.0
+ poll_wait = 3.0
- def __init__(self, setup, render_pipe, command_queue, terminate):
+ def __init__(self, name, setup, rpc_pipe, cmd_q, terminate):
"""
+ :param name: The name of this child
+ :type name: string
:param setup: A Bcfg2 options dict
:type setup: Bcfg2.Options.OptionParser
- :param pipe: The pipe to which client hostnames are added for
- ChildCore objects to build configurations, and to
- which client configurations are added after
- having been built by ChildCore objects.
- :type pipe: multiprocessing.Pipe
+ :param rpc_pipe: The pipe used for RPC communication with the
+ parent process
+ :type rpc_pipe: multiprocessing.Pipe
+ :param cmd_q: The queue used for one-way pub-sub
+ communications from the parent process
+ :type cmd_q: multiprocessing.Queue
:param terminate: An event that flags ChildCore objects to shut
themselves down.
:type terminate: multiprocessing.Event
"""
BaseCore.__init__(self, setup)
- #: The pipe to which client hostnames are added for ChildCore
- #: objects to build configurations, and to which client
- #: configurations are added after having been built by
- #: ChildCore objects.
- self.render_pipe = render_pipe
+ #: The name of this child
+ self.name = name
+
+ #: The pipe used for RPC communication with the parent
+ self.rpc_pipe = rpc_pipe
- #: The queue from which other commands are received
- self.command_queue = command_queue
+ #: The queue used to receive pub-sub commands
+ self.cmd_q = cmd_q
#: The :class:`multiprocessing.Event` that will be monitored
#: to determine when this child should shut down.
self.terminate = terminate
- #: The :class:`threading.Thread` used to process commands
- #: received via the :class:`multiprocessing.Queue` RPC
- #: interface
- self.command_thread = \
- threading.Thread(name="CommandThread",
- target=self._command_queue_thread)
+ # a list of all rendering threads
+ self._threads = []
- def _daemonize(self):
- return True
+ # the thread used to process publish-subscribe commands
+ self._command_thread = threading.Thread(name="CommandThread",
+ target=self._dispatch_commands)
+
+ # override this setting so that the child doesn't try to write
+ # the pidfile
+ self.setup['daemon'] = False
+
+ # ensure that the child doesn't start a perflog thread
+ self.perflog_thread = None
def _run(self):
- try:
- self.command_thread.start()
- except:
- self.shutdown()
- raise
+ self._command_thread.start()
return True
- def render(self):
- """ Process client configuration render requests """
- if self.render_pipe.poll(self.poll_wait):
- if not self.metadata.use_database:
- # handle FAM events, in case (for instance) the
- # client has just been added to clients.xml, or a
- # profile has just been asserted. but really, you
- # should be using the metadata database if you're
- # using this core.
- self.fam.handle_events_in_interval(0.1)
- client = self.render_pipe.recv()
- self.logger.debug("Building configuration for %s" % client)
- self.render_pipe.send(
- lxml.etree.tostring(self.BuildConfiguration(client)))
+ def _daemonize(self):
+ return True
+
+ def _dispatch_commands(self):
+ """ Dispatch commands received via the pub-sub queue interface
+ """
+ self.logger.debug("Starting %s RPC subscription thread" % self.name)
+ while not self.terminate.is_set():
+ try:
+ data = self.cmd_q.get(True, self.poll_wait)
+ self.logger.debug("%s: Processing asynchronous command: %s" %
+ (self.name, data[0]))
+ self._dispatch(data)
+ except Empty:
+ pass
+ self.logger.info("%s RPC subscription thread stopped" % self.name)
+
+ def _dispatch_render(self):
+ """ Dispatch render requests received via the RPC pipe
+ interface """
+ key, data = self.rpc_pipe.recv()
+ self.rpc_pipe.send((key, self._dispatch(data)))
+
+ def _reap_threads(self):
+ """ Reap rendering threads that have completed """
+ for thread in self._threads[:]:
+ if not thread.is_alive():
+ self._threads.remove(thread)
+
+ def _dispatch(self, data):
+ """ Generic method dispatcher used for commands received from
+ either the pub-sub queue or the RPC pipe. """
+ method, args, kwargs = data
+ if not hasattr(self, method):
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ return None
+
+ func = getattr(self, method)
+ if func.exposed:
+ self.logger.debug("%s: Calling RPC method %s" % (self.name,
+ method))
+ return func(*args, **kwargs)
+ else:
+ self.logger.error("%s: Method %s is not exposed" % (self.name,
+ method))
+ return None
def _block(self):
while not self.terminate.isSet():
try:
- self.render()
+ if self.rpc_pipe.poll(self.poll_wait):
+ rpc_thread = threading.Thread(
+ name="Renderer%s" % len(self._threads),
+ target=self._dispatch_render)
+ self._threads.append(rpc_thread)
+ rpc_thread.start()
+ self._reap_threads()
except KeyboardInterrupt:
break
self.shutdown()
- def _command_queue_thread(self):
- """ Process commands received on the command queue thread """
- while not self.terminate.isSet():
- method, args, kwargs = self.command_queue.get()
- if hasattr(self, method):
- func = getattr(self, method)
- if func.exposed:
- self.logger.debug("Child calling RPC method %s" % method)
- func(*args, **kwargs)
- else:
- self.logger.error("Method %s is not exposed" % method)
- else:
- self.logger.error("Method %s does not exist" % method)
+ def shutdown(self):
+ BaseCore.shutdown(self)
+ self._reap_threads()
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("%s: Waiting for %d thread(s): %s" %
+ (self.name, len(threads),
+ [t.name for t in threads]))
+ time.sleep(1)
+ self._reap_threads()
+ self.logger.info("%s: All threads stopped" % self.name)
+
+ @exposed
+ def set_debug(self, address, debug):
+ BaseCore.set_debug(self, address, debug)
@exposed
def expire_cache(self, client=None):
""" Expire the metadata cache for a client """
self.metadata_cache.expire(client)
+ @exposed
+ def GetConfig(self, client):
+ """ Render the configuration for a client """
+ self.logger.debug("%s: Building configuration for %s" %
+ (self.name, client))
+ return lxml.etree.tostring(self.BuildConfiguration(client))
+
class Core(BuiltinCore):
""" A multiprocessing core that delegates building the actual
@@ -224,84 +408,102 @@ class Core(BuiltinCore):
if setup['children'] is None:
setup['children'] = multiprocessing.cpu_count()
- #: A dict of child name -> one end of the
- #: :class:`multiprocessing.Pipe` object used to submit render
- #: requests to that child. (The child is given the other end
- #: of the Pipe.)
- self.render_pipes = dict()
-
- #: A dict of child name -> :class:`multiprocessing.Queue`
- #: object used to pass commands to that child.
- self.command_queues = dict()
-
- #: A queue that keeps track of which children are available to
- #: render a configuration. A child is popped from the queue
- #: when it starts to render a config, then it's pushed back on
- #: when it's done. This lets us use a blocking call to
- #: :func:`Queue.Queue.get` when waiting for an available
- #: child.
- self.available_children = Queue(maxsize=self.setup['children'])
-
- # sigh. multiprocessing was added in py2.6, which is when the
- # camelCase methods for threading objects were deprecated in
- # favor of the Pythonic under_score methods. So
- # multiprocessing.Event *only* has is_set(), while
- # threading.Event has *both* isSet() and is_set(). In order
- # to make the core work with Python 2.4+, and with both
- # multiprocessing and threading Event objects, we just
- # monkeypatch self.terminate to have isSet().
+ #: A dict of child name ->
+ #: :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher`
+ #: objects used to pass render requests to that child. (The
+ #: child is given the other end of the Pipe.)
+ self.pipes = dict()
+
+ #: A
+ #: :class:`Bcfg2.Server.MultiprocessingCore.PublishSubscribeQueue`
+ #: object used to publish commands to all children.
+ self.cmd_q = PublishSubscribeQueue()
+
+ #: The flag that indicates when to stop child threads and
+ #: processes
self.terminate = DualEvent(threading_event=self.terminate)
- self.metadata_cache = DispatchingCache()
+ self.metadata_cache = DispatchingCache(queue=self.cmd_q)
+
+ #: A list of children that will be cycled through
+ self._all_children = []
+
+ #: An iterator that each child will be taken from in sequence,
+ #: to provide a round-robin distribution of render requests
+ self.children = None
def _run(self):
for cnum in range(self.setup['children']):
name = "Child-%s" % cnum
- # create Pipe for render requests and results
- (mainpipe, childpipe) = multiprocessing.Pipe()
- self.render_pipes[name] = mainpipe
-
- # create Queue for other commands
- cmd_q = multiprocessing.Queue()
- self.command_queues[name] = cmd_q
- self.metadata_cache.command_queues[name] = cmd_q
+ # create Pipe for render requests
+ dispatcher = ThreadSafePipeDispatcher(self.terminate)
+ self.pipes[name] = dispatcher
self.logger.debug("Starting child %s" % name)
- childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate)
+ childcore = ChildCore(name, self.setup, dispatcher.pipe[1],
+ self.cmd_q.add_subscriber(), self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
child.start()
self.logger.debug("Child %s started with PID %s" % (name,
child.pid))
- self.available_children.put(name)
+ self._all_children.append(name)
+ self.logger.debug("Started %s children: %s" % (len(self._all_children),
+ self._all_children))
+ self.children = cycle(self._all_children)
return BuiltinCore._run(self)
def shutdown(self):
BuiltinCore.shutdown(self)
- for child in multiprocessing.active_children():
- self.logger.debug("Shutting down child %s" % child.name)
- child.join(self.shutdown_timeout)
- if child.is_alive():
+ self.logger.debug("Closing RPC command queues")
+ self.cmd_q.close()
+
+ def term_children():
+ for child in multiprocessing.active_children():
self.logger.error("Waited %s seconds to shut down %s, "
"terminating" % (self.shutdown_timeout,
child.name))
child.terminate()
- else:
- self.logger.debug("Child %s shut down" % child.name)
- self.logger.debug("All children shut down")
+
+ timer = threading.Timer(self.shutdown_timeout, term_children)
+ timer.start()
+ while len(multiprocessing.active_children()):
+ self.logger.info("Waiting for %s child(ren): %s" %
+ (len(multiprocessing.active_children()),
+ [c.name
+ for c in multiprocessing.active_children()]))
+ time.sleep(1)
+ timer.cancel()
+ self.logger.info("All children shut down")
+
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("Waiting for %s thread(s): %s" %
+ (len(threads), [t.name for t in threads]))
+ time.sleep(1)
+ self.logger.info("Shutdown complete")
@exposed
def set_debug(self, address, debug):
+ self.cmd_q.put(("set_debug", [address, debug], dict()))
self.metadata_cache.set_debug(debug)
+ for pipe in self.pipes.values():
+ pipe.set_debug(debug)
return BuiltinCore.set_debug(self, address, debug)
@exposed
def GetConfig(self, address):
client = self.resolve_client(address)[0]
- childname = self.available_children.get()
- self.logger.debug("Building configuration on child %s" % childname)
- pipe = self.render_pipes[childname]
- pipe.send(client)
- config = pipe.recv()
- self.available_children.put_nowait(childname)
- return config
+ childname = self.children.next()
+ self.logger.debug("Building configuration for %s on %s" % (client,
+ childname))
+ key = ThreadSafePipeDispatcher.genkey(client)
+ pipe = self.pipes[childname]
+ pipe.send(key, ("GetConfig", [client], dict()))
+ if pipe.poll(key, timeout=self.setup['client_timeout']):
+ return pipe.recv(key)
+ else:
+ self.logger.error("Building configuration for %s on %s timed out" %
+ (client, childname))
+ return None