summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-07-23 08:23:59 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-07-23 08:40:04 -0400
commit9084b0e889407956227ae8d65bceff5148f7ee1f (patch)
treecb1e774bbb6271fab3bfa48e5fbf7af7d3272a58 /src/lib/Bcfg2/Server/MultiprocessingCore.py
parenta9ea92aa595c5df63eff25ff545927078f7651e6 (diff)
downloadbcfg2-9084b0e889407956227ae8d65bceff5148f7ee1f.tar.gz
bcfg2-9084b0e889407956227ae8d65bceff5148f7ee1f.tar.bz2
bcfg2-9084b0e889407956227ae8d65bceff5148f7ee1f.zip
MultiprocessingCore: rewrote parent-child RPC to be thread-safe (and less powerful)
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py157
1 files changed, 90 insertions, 67 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index c9d7fc8c0..02710ab99 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -3,18 +3,24 @@
:mod:`multiprocessing` library to offload work to multiple child
processes. As such, it requires Python 2.6+.
-The parent communicates with the children over a
-:class:`multiprocessing.Pipe` that implements a very simple RPC
-protocol. Each command passed to a child over the Pipe must be a
-tuple with the format::
+The parent communicates with the children over two constructs:
+
+* A :class:`multiprocessing.Pipe` is used to process render requests.
+ The pipe is locked when in use (i.e., between the time that a client
+ is submitted to be rendered and the time that its configuration is
+ returned) to keep things thread-safe. (This is accomplished through
+ the use of
+ :attr:`Bcfg2.Server.MultiprocessingCore.available_children.)
+* A :class:`multiprocessing.Queue` is used to submit other commands in
+ a thread-safe, non-blocking fashion. (Note that, since it is a
+ queue, no results can be returned.) It implements a very simple RPC
+ protocol. Each command passed to a child over the Pipe must be a
+ tuple with the format::
(<method>, <args>, <kwargs>)
-The method must be exposed by the child by decorating it with
-:func:`Bcfg2.Server.Core.exposed`.
-
-The RPC call always returns a value via the pipe, so the caller *must*
-read the return value in order to keep the pipe consistent.
+ The method must be exposed by the child by decorating it with
+ :func:`Bcfg2.Server.Core.exposed`.
"""
import threading
@@ -35,10 +41,10 @@ class DispatchingCache(Cache, Debuggable):
method = "expire_cache"
def __init__(self, *args, **kwargs):
- #: A dict of <child name>: :class:`multiprocessing.Pipe`
+ #: A dict of <child name>: :class:`multiprocessing.Queue`
#: objects that should be given a cache expiration command any
#: time an item is expired.
- self.pipes = kwargs.pop("pipes", dict())
+ self.command_queues = kwargs.pop("pipes", dict())
Debuggable.__init__(self)
Cache.__init__(self, *args, **kwargs)
@@ -47,24 +53,16 @@ class DispatchingCache(Cache, Debuggable):
if (key and key in self) or (not key and len(self)):
# dispatching cache expiration to children can be
# expensive, so only do it if there's something to expire
- for child, pipe in self.pipes.items():
+ for child, cmd_q in self.command_queues.items():
if key:
self.logger.debug("Expiring metadata cache for %s on %s" %
(key, child))
else:
self.logger.debug("Expiring metadata cache on %s" % child)
- pipe.send((self.method, [key], dict()))
- pipe.recv()
+ cmd_q.put((self.method, [key], dict()))
Cache.expire(self, key=key)
-class NoSuchMethod(Exception):
- """ Exception raised by a child process if it's asked to execute a
- method that doesn't exist or that isn't exposed via the
- :class:`multiprocessing.Pipe` RPC interface. """
- pass
-
-
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that
internally implements both :class:`threading.Event` and
@@ -120,7 +118,7 @@ class ChildCore(BaseCore):
#: every ``poll_wait`` seconds.
poll_wait = 5.0
- def __init__(self, setup, pipe, terminate):
+ def __init__(self, setup, render_pipe, command_queue, terminate):
"""
:param setup: A Bcfg2 options dict
:type setup: Bcfg2.Options.OptionParser
@@ -139,61 +137,75 @@ class ChildCore(BaseCore):
#: objects to build configurations, and to which client
#: configurations are added after having been built by
#: ChildCore objects.
- self.pipe = pipe
+ self.render_pipe = render_pipe
+
+ #: The queue from which other commands are received
+ self.command_queue = command_queue
#: The :class:`multiprocessing.Event` that will be monitored
#: to determine when this child should shut down.
self.terminate = terminate
+ #: The :class:`threading.Thread` used to process commands
+ #: received via the :class:`multiprocessing.Queue` RPC
+ #: interface
+ self.command_thread = \
+ threading.Thread(name="CommandThread",
+ target=self._command_queue_thread)
+
def _daemonize(self):
return True
def _run(self):
+ try:
+ self.command_thread.start()
+ except:
+ self.shutdown()
+ raise
return True
- def rpc_dispatch(self):
- """ Dispatch a method received via the
- :class:`multiprocessing.Pipe` RPC interface.
-
- :param data: The tuple of ``(<method name>, <args>, <kwargs>)``
- :type data: tuple
- """
- method, args, kwargs = self.pipe.recv()
- if hasattr(self, method):
- func = getattr(self, method)
- if func.exposed:
- self.pipe.send(func(*args, **kwargs))
- else:
- raise NoSuchMethod(method)
- else:
- raise NoSuchMethod(method)
-
- @exposed
- def GetConfig(self, client):
- self.logger.debug("Building configuration for %s" % client)
- return lxml.etree.tostring(self.BuildConfiguration(client))
-
- @exposed
- def expire_cache(self, client=None):
- """ Expire the metadata cache for a client """
- self.metadata_cache.expire(client)
+ def render(self):
+ """ Process client configuration render requests """
+ if self.render_pipe.poll(self.poll_wait):
+ if not self.metadata.use_database:
+ # handle FAM events, in case (for instance) the
+ # client has just been added to clients.xml, or a
+ # profile has just been asserted. but really, you
+ # should be using the metadata database if you're
+ # using this core.
+ self.fam.handle_events_in_interval(0.1)
+ client = self.render_pipe.recv()
+ self.logger.debug("Building configuration for %s" % client)
+ self.render_pipe.send(
+ lxml.etree.tostring(self.BuildConfiguration(client)))
def _block(self):
while not self.terminate.isSet():
try:
- if self.pipe.poll(self.poll_wait):
- if not self.metadata.use_database:
- # handle FAM events, in case (for instance) the
- # client has just been added to clients.xml, or a
- # profile has just been asserted. but really, you
- # should be using the metadata database if you're
- # using this core.
- self.fam.handle_events_in_interval(0.1)
- self.rpc_dispatch()
+ self.render()
except KeyboardInterrupt:
break
self.shutdown()
+ def _command_queue_thread(self):
+ """ Process commands received on the command queue thread """
+ while not self.terminate.isSet():
+ method, args, kwargs = self.command_queue.get()
+ if hasattr(self, method):
+ func = getattr(self, method)
+ if func.exposed:
+ self.logger.debug("Child calling RPC method %s" % method)
+ func(*args, **kwargs)
+ else:
+ self.logger.error("Method %s is not exposed" % method)
+ else:
+ self.logger.error("Method %s does not exist" % method)
+
+ @exposed
+ def expire_cache(self, client=None):
+ """ Expire the metadata cache for a client """
+ self.metadata_cache.expire(client)
+
class Core(BuiltinCore):
""" A multiprocessing core that delegates building the actual
@@ -213,10 +225,14 @@ class Core(BuiltinCore):
setup['children'] = multiprocessing.cpu_count()
#: A dict of child name -> one end of the
- #: :class:`multiprocessing.Pipe` object used to communicate
- #: with that child. (The child is given the other end of the
- #: Pipe.)
- self.pipes = dict()
+ #: :class:`multiprocessing.Pipe` object used to submit render
+ #: requests to that child. (The child is given the other end
+ #: of the Pipe.)
+ self.render_pipes = dict()
+
+ #: A dict of child name -> :class:`multiprocessing.Queue`
+ #: object used to pass commands to that child.
+ self.command_queues = dict()
#: A queue that keeps track of which children are available to
#: render a configuration. A child is popped from the queue
@@ -241,11 +257,18 @@ class Core(BuiltinCore):
def _run(self):
for cnum in range(self.setup['children']):
name = "Child-%s" % cnum
+
+ # create Pipe for render requests and results
(mainpipe, childpipe) = multiprocessing.Pipe()
- self.pipes[name] = mainpipe
- self.metadata_cache.pipes[name] = mainpipe
+ self.render_pipes[name] = mainpipe
+
+ # create Queue for other commands
+ cmd_q = multiprocessing.Queue()
+ self.command_queues[name] = cmd_q
+ self.metadata_cache.command_queues[name] = cmd_q
+
self.logger.debug("Starting child %s" % name)
- childcore = ChildCore(self.setup, childpipe, self.terminate)
+ childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
child.start()
self.logger.debug("Child %s started with PID %s" % (name,
@@ -277,8 +300,8 @@ class Core(BuiltinCore):
client = self.resolve_client(address)[0]
childname = self.available_children.get()
self.logger.debug("Building configuration on child %s" % childname)
- pipe = self.pipes[childname]
- pipe.send(("GetConfig", [client], dict()))
+ pipe = self.render_pipes[childname]
+ pipe.send(client)
config = pipe.recv()
self.available_children.put_nowait(childname)
return config