From 7d17d1c283f2718ae86a4b2db03726f4ae802889 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 22 Jul 2013 14:26:49 -0400 Subject: MultiprocessingCore: Dispatch metadata cache expiration to children When the broker in a multiprocessing configuration expires its metadata cache (e.g., when probe data is received), it must dispatch that expiration call to its children. This also makes the protocol for communication between the broker and its children into a real RPC protocol, so we can do even more stuff in the future. --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 102 +++++++++++++++++++++++++--- 1 file changed, 92 insertions(+), 10 deletions(-) (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 81fba7092..066519774 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -2,16 +2,71 @@ :mod:`Bcfg2.Server.BuiltinCore` that uses the Python :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. + +The parent communicates with the children over a +:class:`multiprocessing.Pipe` that implements a very simple RPC +protocol. Each command passed to a child over the Pipe must be a +tuple with the format:: + + (, , ) + +The method must be exposed by the child by decorating it with +:func:`Bcfg2.Server.Core.exposed`. + +The RPC call always returns a value via the pipe, so the caller *must* +read the return value in order to keep the pipe consistent. """ +import logging import threading import lxml.etree import multiprocessing +from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue from Bcfg2.Server.Core import BaseCore, exposed +from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +class DispatchingCache(Cache, Debuggable): + """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates + cache expiration events to child nodes. """ + + #: The method to send over the pipe to expire the cache + method = "expire_cache" + + def __init__(self, *args, **kwargs): + #: A dict of : :class:`multiprocessing.Pipe` + #: objects that should be given a cache expiration command any + #: time an item is expired. + self.pipes = kwargs.pop("pipes", dict()) + + #: A :class:`logging.Logger` object this cache object can use + self.logger = logging.getLogger(self.__class__.__name__) + Cache.__init__(self, *args, **kwargs) + + def expire(self, key=None): + if (key and key in self) or (not key and len(self)): + # dispatching cache expiration to children can be + # expensive, so only do it if there's something to expire + for child, pipe in self.pipes.items(): + if key: + self.logger.debug("Expiring metadata cache for %s on %s" % + (key, child)) + else: + self.logger.debug("Expiring metadata cache on %s" % child) + pipe.send((self.method, [key], dict())) + pipe.recv() + Cache.expire(self, key=key) + + +class NoSuchMethod(Exception): + """ Exception raised by a child process if it's asked to execute a + method that doesn't exist or that isn't exposed via the + :class:`multiprocessing.Pipe` RPC interface. """ + pass + + class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -98,6 +153,33 @@ class ChildCore(BaseCore): def _run(self): return True + def rpc_dispatch(self): + """ Dispatch a method received via the + :class:`multiprocessing.Pipe` RPC interface. + + :param data: The tuple of ``(, , )`` + :type data: tuple + """ + method, args, kwargs = self.pipe.recv() + if hasattr(self, method): + func = getattr(self, method) + if func.exposed: + self.pipe.send(func(*args, **kwargs)) + else: + raise NoSuchMethod(method) + else: + raise NoSuchMethod(method) + + @exposed + def GetConfig(self, client): + self.logger.debug("Building configuration for %s" % client) + return lxml.etree.tostring(self.BuildConfiguration(client)) + + @exposed + def expire_cache(self, client=None): + """ Expire the metadata cache for a client """ + self.metadata_cache.expire(client) + def _block(self): while not self.terminate.isSet(): try: @@ -109,15 +191,7 @@ class ChildCore(BaseCore): # should be using the metadata database if you're # using this core. self.fam.handle_events_in_interval(0.1) - client = self.pipe.recv() - self.logger.debug("Building configuration for %s" % client) - config = \ - lxml.etree.tostring(self.BuildConfiguration(client)) - self.logger.debug("Returning configuration for %s to main " - "process" % client) - self.pipe.send(config) - self.logger.debug("Returned configuration for %s to main " - "process" % client) + self.rpc_dispatch() except KeyboardInterrupt: break self.shutdown() @@ -164,11 +238,14 @@ class Core(BuiltinCore): # monkeypatch self.terminate to have isSet(). self.terminate = DualEvent(threading_event=self.terminate) + self.metadata_cache = DispatchingCache() + def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum (mainpipe, childpipe) = multiprocessing.Pipe() self.pipes[name] = mainpipe + self.metadata_cache.pipes[name] = mainpipe self.logger.debug("Starting child %s" % name) childcore = ChildCore(self.setup, childpipe, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) @@ -192,13 +269,18 @@ class Core(BuiltinCore): self.logger.debug("Child %s shut down" % child.name) self.logger.debug("All children shut down") + @exposed + def set_debug(self, address, debug): + BuiltinCore.set_debug(self, address, debug) + self.metadata_cache.set_debug(debug) + @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] childname = self.available_children.get() self.logger.debug("Building configuration on child %s" % childname) pipe = self.pipes[childname] - pipe.send(client) + pipe.send(("GetConfig", [client], dict())) config = pipe.recv() self.available_children.put_nowait(childname) return config -- cgit v1.2.3-1-g7c22