diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-07-22 14:26:49 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-07-22 14:26:49 -0400 |
commit | 7d17d1c283f2718ae86a4b2db03726f4ae802889 (patch) | |
tree | c48678935734091e2aba6e44cbb389fb41c2be17 /src/lib | |
parent | 752da22a2247892f647c0a9c46e7b0faf9351ea6 (diff) | |
download | bcfg2-7d17d1c283f2718ae86a4b2db03726f4ae802889.tar.gz bcfg2-7d17d1c283f2718ae86a4b2db03726f4ae802889.tar.bz2 bcfg2-7d17d1c283f2718ae86a4b2db03726f4ae802889.zip |
MultiprocessingCore: Dispatch metadata cache expiration to children
When the broker in a multiprocessing configuration expires its
metadata cache (e.g., when probe data is received), it must dispatch
that expiration call to its children.
This also makes the protocol for communication between the broker and
its children into a real RPC protocol, so we can do even more stuff in
the future.
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/Bcfg2/Server/MultiprocessingCore.py | 102 |
1 files changed, 92 insertions, 10 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 81fba7092..066519774 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -2,16 +2,71 @@ :mod:`Bcfg2.Server.BuiltinCore` that uses the Python :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. + +The parent communicates with the children over a +:class:`multiprocessing.Pipe` that implements a very simple RPC +protocol. Each command passed to a child over the Pipe must be a +tuple with the format:: + + (<method>, <args>, <kwargs>) + +The method must be exposed by the child by decorating it with +:func:`Bcfg2.Server.Core.exposed`. + +The RPC call always returns a value via the pipe, so the caller *must* +read the return value in order to keep the pipe consistent. """ +import logging import threading import lxml.etree import multiprocessing +from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue from Bcfg2.Server.Core import BaseCore, exposed +from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +class DispatchingCache(Cache, Debuggable): + """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates + cache expiration events to child nodes. """ + + #: The method to send over the pipe to expire the cache + method = "expire_cache" + + def __init__(self, *args, **kwargs): + #: A dict of <child name>: :class:`multiprocessing.Pipe` + #: objects that should be given a cache expiration command any + #: time an item is expired. + self.pipes = kwargs.pop("pipes", dict()) + + #: A :class:`logging.Logger` object this cache object can use + self.logger = logging.getLogger(self.__class__.__name__) + Cache.__init__(self, *args, **kwargs) + + def expire(self, key=None): + if (key and key in self) or (not key and len(self)): + # dispatching cache expiration to children can be + # expensive, so only do it if there's something to expire + for child, pipe in self.pipes.items(): + if key: + self.logger.debug("Expiring metadata cache for %s on %s" % + (key, child)) + else: + self.logger.debug("Expiring metadata cache on %s" % child) + pipe.send((self.method, [key], dict())) + pipe.recv() + Cache.expire(self, key=key) + + +class NoSuchMethod(Exception): + """ Exception raised by a child process if it's asked to execute a + method that doesn't exist or that isn't exposed via the + :class:`multiprocessing.Pipe` RPC interface. """ + pass + + class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -98,6 +153,33 @@ class ChildCore(BaseCore): def _run(self): return True + def rpc_dispatch(self): + """ Dispatch a method received via the + :class:`multiprocessing.Pipe` RPC interface. + + :param data: The tuple of ``(<method name>, <args>, <kwargs>)`` + :type data: tuple + """ + method, args, kwargs = self.pipe.recv() + if hasattr(self, method): + func = getattr(self, method) + if func.exposed: + self.pipe.send(func(*args, **kwargs)) + else: + raise NoSuchMethod(method) + else: + raise NoSuchMethod(method) + + @exposed + def GetConfig(self, client): + self.logger.debug("Building configuration for %s" % client) + return lxml.etree.tostring(self.BuildConfiguration(client)) + + @exposed + def expire_cache(self, client=None): + """ Expire the metadata cache for a client """ + self.metadata_cache.expire(client) + def _block(self): while not self.terminate.isSet(): try: @@ -109,15 +191,7 @@ class ChildCore(BaseCore): # should be using the metadata database if you're # using this core. self.fam.handle_events_in_interval(0.1) - client = self.pipe.recv() - self.logger.debug("Building configuration for %s" % client) - config = \ - lxml.etree.tostring(self.BuildConfiguration(client)) - self.logger.debug("Returning configuration for %s to main " - "process" % client) - self.pipe.send(config) - self.logger.debug("Returned configuration for %s to main " - "process" % client) + self.rpc_dispatch() except KeyboardInterrupt: break self.shutdown() @@ -164,11 +238,14 @@ class Core(BuiltinCore): # monkeypatch self.terminate to have isSet(). self.terminate = DualEvent(threading_event=self.terminate) + self.metadata_cache = DispatchingCache() + def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum (mainpipe, childpipe) = multiprocessing.Pipe() self.pipes[name] = mainpipe + self.metadata_cache.pipes[name] = mainpipe self.logger.debug("Starting child %s" % name) childcore = ChildCore(self.setup, childpipe, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) @@ -193,12 +270,17 @@ class Core(BuiltinCore): self.logger.debug("All children shut down") @exposed + def set_debug(self, address, debug): + BuiltinCore.set_debug(self, address, debug) + self.metadata_cache.set_debug(debug) + + @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] childname = self.available_children.get() self.logger.debug("Building configuration on child %s" % childname) pipe = self.pipes[childname] - pipe.send(client) + pipe.send(("GetConfig", [client], dict())) config = pipe.recv() self.available_children.put_nowait(childname) return config |