diff options
Diffstat (limited to 'src/lib/Bcfg2')
-rw-r--r-- | src/lib/Bcfg2/Server/MultiprocessingCore.py | 102 |
1 files changed, 92 insertions, 10 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 81fba7092..066519774 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -2,16 +2,71 @@ :mod:`Bcfg2.Server.BuiltinCore` that uses the Python :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. + +The parent communicates with the children over a +:class:`multiprocessing.Pipe` that implements a very simple RPC +protocol. Each command passed to a child over the Pipe must be a +tuple with the format:: + + (<method>, <args>, <kwargs>) + +The method must be exposed by the child by decorating it with +:func:`Bcfg2.Server.Core.exposed`. + +The RPC call always returns a value via the pipe, so the caller *must* +read the return value in order to keep the pipe consistent. """ +import logging import threading import lxml.etree import multiprocessing +from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue from Bcfg2.Server.Core import BaseCore, exposed +from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +class DispatchingCache(Cache, Debuggable): + """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates + cache expiration events to child nodes. """ + + #: The method to send over the pipe to expire the cache + method = "expire_cache" + + def __init__(self, *args, **kwargs): + #: A dict of <child name>: :class:`multiprocessing.Pipe` + #: objects that should be given a cache expiration command any + #: time an item is expired. + self.pipes = kwargs.pop("pipes", dict()) + + #: A :class:`logging.Logger` object this cache object can use + self.logger = logging.getLogger(self.__class__.__name__) + Cache.__init__(self, *args, **kwargs) + + def expire(self, key=None): + if (key and key in self) or (not key and len(self)): + # dispatching cache expiration to children can be + # expensive, so only do it if there's something to expire + for child, pipe in self.pipes.items(): + if key: + self.logger.debug("Expiring metadata cache for %s on %s" % + (key, child)) + else: + self.logger.debug("Expiring metadata cache on %s" % child) + pipe.send((self.method, [key], dict())) + pipe.recv() + Cache.expire(self, key=key) + + +class NoSuchMethod(Exception): + """ Exception raised by a child process if it's asked to execute a + method that doesn't exist or that isn't exposed via the + :class:`multiprocessing.Pipe` RPC interface. """ + pass + + class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -98,6 +153,33 @@ class ChildCore(BaseCore): def _run(self): return True + def rpc_dispatch(self): + """ Dispatch a method received via the + :class:`multiprocessing.Pipe` RPC interface. + + :param data: The tuple of ``(<method name>, <args>, <kwargs>)`` + :type data: tuple + """ + method, args, kwargs = self.pipe.recv() + if hasattr(self, method): + func = getattr(self, method) + if func.exposed: + self.pipe.send(func(*args, **kwargs)) + else: + raise NoSuchMethod(method) + else: + raise NoSuchMethod(method) + + @exposed + def GetConfig(self, client): + self.logger.debug("Building configuration for %s" % client) + return lxml.etree.tostring(self.BuildConfiguration(client)) + + @exposed + def expire_cache(self, client=None): + """ Expire the metadata cache for a client """ + self.metadata_cache.expire(client) + def _block(self): while not self.terminate.isSet(): try: @@ -109,15 +191,7 @@ class ChildCore(BaseCore): # should be using the metadata database if you're # using this core. self.fam.handle_events_in_interval(0.1) - client = self.pipe.recv() - self.logger.debug("Building configuration for %s" % client) - config = \ - lxml.etree.tostring(self.BuildConfiguration(client)) - self.logger.debug("Returning configuration for %s to main " - "process" % client) - self.pipe.send(config) - self.logger.debug("Returned configuration for %s to main " - "process" % client) + self.rpc_dispatch() except KeyboardInterrupt: break self.shutdown() @@ -164,11 +238,14 @@ class Core(BuiltinCore): # monkeypatch self.terminate to have isSet(). self.terminate = DualEvent(threading_event=self.terminate) + self.metadata_cache = DispatchingCache() + def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum (mainpipe, childpipe) = multiprocessing.Pipe() self.pipes[name] = mainpipe + self.metadata_cache.pipes[name] = mainpipe self.logger.debug("Starting child %s" % name) childcore = ChildCore(self.setup, childpipe, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) @@ -193,12 +270,17 @@ class Core(BuiltinCore): self.logger.debug("All children shut down") @exposed + def set_debug(self, address, debug): + BuiltinCore.set_debug(self, address, debug) + self.metadata_cache.set_debug(debug) + + @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] childname = self.available_children.get() self.logger.debug("Building configuration on child %s" % childname) pipe = self.pipes[childname] - pipe.send(client) + pipe.send(("GetConfig", [client], dict())) config = pipe.recv() self.available_children.put_nowait(childname) return config |