From 9084b0e889407956227ae8d65bceff5148f7ee1f Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 23 Jul 2013 08:23:59 -0400 Subject: MultiprocessingCore: rewrote parent-child RPC to be thread-safe (and less powerful) --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 157 ++++++++++++++++------------ 1 file changed, 90 insertions(+), 67 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index c9d7fc8c0..02710ab99 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -3,18 +3,24 @@ :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. -The parent communicates with the children over a -:class:`multiprocessing.Pipe` that implements a very simple RPC -protocol. Each command passed to a child over the Pipe must be a -tuple with the format:: +The parent communicates with the children over two constructs: + +* A :class:`multiprocessing.Pipe` is used to process render requests. + The pipe is locked when in use (i.e., between the time that a client + is submitted to be rendered and the time that its configuration is + returned) to keep things thread-safe. (This is accomplished through + the use of + :attr:`Bcfg2.Server.MultiprocessingCore.available_children.) +* A :class:`multiprocessing.Queue` is used to submit other commands in + a thread-safe, non-blocking fashion. (Note that, since it is a + queue, no results can be returned.) It implements a very simple RPC + protocol. Each command passed to a child over the Pipe must be a + tuple with the format:: (, , ) -The method must be exposed by the child by decorating it with -:func:`Bcfg2.Server.Core.exposed`. - -The RPC call always returns a value via the pipe, so the caller *must* -read the return value in order to keep the pipe consistent. + The method must be exposed by the child by decorating it with + :func:`Bcfg2.Server.Core.exposed`. """ import threading @@ -35,10 +41,10 @@ class DispatchingCache(Cache, Debuggable): method = "expire_cache" def __init__(self, *args, **kwargs): - #: A dict of : :class:`multiprocessing.Pipe` + #: A dict of : :class:`multiprocessing.Queue` #: objects that should be given a cache expiration command any #: time an item is expired. - self.pipes = kwargs.pop("pipes", dict()) + self.command_queues = kwargs.pop("pipes", dict()) Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) @@ -47,24 +53,16 @@ class DispatchingCache(Cache, Debuggable): if (key and key in self) or (not key and len(self)): # dispatching cache expiration to children can be # expensive, so only do it if there's something to expire - for child, pipe in self.pipes.items(): + for child, cmd_q in self.command_queues.items(): if key: self.logger.debug("Expiring metadata cache for %s on %s" % (key, child)) else: self.logger.debug("Expiring metadata cache on %s" % child) - pipe.send((self.method, [key], dict())) - pipe.recv() + cmd_q.put((self.method, [key], dict())) Cache.expire(self, key=key) -class NoSuchMethod(Exception): - """ Exception raised by a child process if it's asked to execute a - method that doesn't exist or that isn't exposed via the - :class:`multiprocessing.Pipe` RPC interface. """ - pass - - class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -120,7 +118,7 @@ class ChildCore(BaseCore): #: every ``poll_wait`` seconds. poll_wait = 5.0 - def __init__(self, setup, pipe, terminate): + def __init__(self, setup, render_pipe, command_queue, terminate): """ :param setup: A Bcfg2 options dict :type setup: Bcfg2.Options.OptionParser @@ -139,61 +137,75 @@ class ChildCore(BaseCore): #: objects to build configurations, and to which client #: configurations are added after having been built by #: ChildCore objects. - self.pipe = pipe + self.render_pipe = render_pipe + + #: The queue from which other commands are received + self.command_queue = command_queue #: The :class:`multiprocessing.Event` that will be monitored #: to determine when this child should shut down. self.terminate = terminate + #: The :class:`threading.Thread` used to process commands + #: received via the :class:`multiprocessing.Queue` RPC + #: interface + self.command_thread = \ + threading.Thread(name="CommandThread", + target=self._command_queue_thread) + def _daemonize(self): return True def _run(self): + try: + self.command_thread.start() + except: + self.shutdown() + raise return True - def rpc_dispatch(self): - """ Dispatch a method received via the - :class:`multiprocessing.Pipe` RPC interface. - - :param data: The tuple of ``(, , )`` - :type data: tuple - """ - method, args, kwargs = self.pipe.recv() - if hasattr(self, method): - func = getattr(self, method) - if func.exposed: - self.pipe.send(func(*args, **kwargs)) - else: - raise NoSuchMethod(method) - else: - raise NoSuchMethod(method) - - @exposed - def GetConfig(self, client): - self.logger.debug("Building configuration for %s" % client) - return lxml.etree.tostring(self.BuildConfiguration(client)) - - @exposed - def expire_cache(self, client=None): - """ Expire the metadata cache for a client """ - self.metadata_cache.expire(client) + def render(self): + """ Process client configuration render requests """ + if self.render_pipe.poll(self.poll_wait): + if not self.metadata.use_database: + # handle FAM events, in case (for instance) the + # client has just been added to clients.xml, or a + # profile has just been asserted. but really, you + # should be using the metadata database if you're + # using this core. + self.fam.handle_events_in_interval(0.1) + client = self.render_pipe.recv() + self.logger.debug("Building configuration for %s" % client) + self.render_pipe.send( + lxml.etree.tostring(self.BuildConfiguration(client))) def _block(self): while not self.terminate.isSet(): try: - if self.pipe.poll(self.poll_wait): - if not self.metadata.use_database: - # handle FAM events, in case (for instance) the - # client has just been added to clients.xml, or a - # profile has just been asserted. but really, you - # should be using the metadata database if you're - # using this core. - self.fam.handle_events_in_interval(0.1) - self.rpc_dispatch() + self.render() except KeyboardInterrupt: break self.shutdown() + def _command_queue_thread(self): + """ Process commands received on the command queue thread """ + while not self.terminate.isSet(): + method, args, kwargs = self.command_queue.get() + if hasattr(self, method): + func = getattr(self, method) + if func.exposed: + self.logger.debug("Child calling RPC method %s" % method) + func(*args, **kwargs) + else: + self.logger.error("Method %s is not exposed" % method) + else: + self.logger.error("Method %s does not exist" % method) + + @exposed + def expire_cache(self, client=None): + """ Expire the metadata cache for a client """ + self.metadata_cache.expire(client) + class Core(BuiltinCore): """ A multiprocessing core that delegates building the actual @@ -213,10 +225,14 @@ class Core(BuiltinCore): setup['children'] = multiprocessing.cpu_count() #: A dict of child name -> one end of the - #: :class:`multiprocessing.Pipe` object used to communicate - #: with that child. (The child is given the other end of the - #: Pipe.) - self.pipes = dict() + #: :class:`multiprocessing.Pipe` object used to submit render + #: requests to that child. (The child is given the other end + #: of the Pipe.) + self.render_pipes = dict() + + #: A dict of child name -> :class:`multiprocessing.Queue` + #: object used to pass commands to that child. + self.command_queues = dict() #: A queue that keeps track of which children are available to #: render a configuration. A child is popped from the queue @@ -241,11 +257,18 @@ class Core(BuiltinCore): def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum + + # create Pipe for render requests and results (mainpipe, childpipe) = multiprocessing.Pipe() - self.pipes[name] = mainpipe - self.metadata_cache.pipes[name] = mainpipe + self.render_pipes[name] = mainpipe + + # create Queue for other commands + cmd_q = multiprocessing.Queue() + self.command_queues[name] = cmd_q + self.metadata_cache.command_queues[name] = cmd_q + self.logger.debug("Starting child %s" % name) - childcore = ChildCore(self.setup, childpipe, self.terminate) + childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, @@ -277,8 +300,8 @@ class Core(BuiltinCore): client = self.resolve_client(address)[0] childname = self.available_children.get() self.logger.debug("Building configuration on child %s" % childname) - pipe = self.pipes[childname] - pipe.send(("GetConfig", [client], dict())) + pipe = self.render_pipes[childname] + pipe.send(client) config = pipe.recv() self.available_children.put_nowait(childname) return config -- cgit v1.2.3-1-g7c22