summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-07-22 14:26:49 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-07-22 14:26:49 -0400
commit7d17d1c283f2718ae86a4b2db03726f4ae802889 (patch)
treec48678935734091e2aba6e44cbb389fb41c2be17 /src/lib/Bcfg2/Server/MultiprocessingCore.py
parent752da22a2247892f647c0a9c46e7b0faf9351ea6 (diff)
downloadbcfg2-7d17d1c283f2718ae86a4b2db03726f4ae802889.tar.gz
bcfg2-7d17d1c283f2718ae86a4b2db03726f4ae802889.tar.bz2
bcfg2-7d17d1c283f2718ae86a4b2db03726f4ae802889.zip
MultiprocessingCore: Dispatch metadata cache expiration to children
When the broker in a multiprocessing configuration expires its metadata cache (e.g., when probe data is received), it must dispatch that expiration call to its children. This also makes the protocol for communication between the broker and its children into a real RPC protocol, so we can do even more stuff in the future.
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py102
1 files changed, 92 insertions, 10 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 81fba7092..066519774 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -2,16 +2,71 @@
:mod:`Bcfg2.Server.BuiltinCore` that uses the Python
:mod:`multiprocessing` library to offload work to multiple child
processes. As such, it requires Python 2.6+.
+
+The parent communicates with the children over a
+:class:`multiprocessing.Pipe` that implements a very simple RPC
+protocol. Each command passed to a child over the Pipe must be a
+tuple with the format::
+
+ (<method>, <args>, <kwargs>)
+
+The method must be exposed by the child by decorating it with
+:func:`Bcfg2.Server.Core.exposed`.
+
+The RPC call always returns a value via the pipe, so the caller *must*
+read the return value in order to keep the pipe consistent.
"""
+import logging
import threading
import lxml.etree
import multiprocessing
+from Bcfg2.Cache import Cache
from Bcfg2.Compat import Queue
from Bcfg2.Server.Core import BaseCore, exposed
+from Bcfg2.Server.Plugin import Debuggable
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
+class DispatchingCache(Cache, Debuggable):
+ """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates
+ cache expiration events to child nodes. """
+
+ #: The method to send over the pipe to expire the cache
+ method = "expire_cache"
+
+ def __init__(self, *args, **kwargs):
+ #: A dict of <child name>: :class:`multiprocessing.Pipe`
+ #: objects that should be given a cache expiration command any
+ #: time an item is expired.
+ self.pipes = kwargs.pop("pipes", dict())
+
+ #: A :class:`logging.Logger` object this cache object can use
+ self.logger = logging.getLogger(self.__class__.__name__)
+ Cache.__init__(self, *args, **kwargs)
+
+ def expire(self, key=None):
+ if (key and key in self) or (not key and len(self)):
+ # dispatching cache expiration to children can be
+ # expensive, so only do it if there's something to expire
+ for child, pipe in self.pipes.items():
+ if key:
+ self.logger.debug("Expiring metadata cache for %s on %s" %
+ (key, child))
+ else:
+ self.logger.debug("Expiring metadata cache on %s" % child)
+ pipe.send((self.method, [key], dict()))
+ pipe.recv()
+ Cache.expire(self, key=key)
+
+
+class NoSuchMethod(Exception):
+ """ Exception raised by a child process if it's asked to execute a
+ method that doesn't exist or that isn't exposed via the
+ :class:`multiprocessing.Pipe` RPC interface. """
+ pass
+
+
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that
internally implements both :class:`threading.Event` and
@@ -98,6 +153,33 @@ class ChildCore(BaseCore):
def _run(self):
return True
+ def rpc_dispatch(self):
+ """ Dispatch a method received via the
+ :class:`multiprocessing.Pipe` RPC interface.
+
+ :param data: The tuple of ``(<method name>, <args>, <kwargs>)``
+ :type data: tuple
+ """
+ method, args, kwargs = self.pipe.recv()
+ if hasattr(self, method):
+ func = getattr(self, method)
+ if func.exposed:
+ self.pipe.send(func(*args, **kwargs))
+ else:
+ raise NoSuchMethod(method)
+ else:
+ raise NoSuchMethod(method)
+
+ @exposed
+ def GetConfig(self, client):
+ self.logger.debug("Building configuration for %s" % client)
+ return lxml.etree.tostring(self.BuildConfiguration(client))
+
+ @exposed
+ def expire_cache(self, client=None):
+ """ Expire the metadata cache for a client """
+ self.metadata_cache.expire(client)
+
def _block(self):
while not self.terminate.isSet():
try:
@@ -109,15 +191,7 @@ class ChildCore(BaseCore):
# should be using the metadata database if you're
# using this core.
self.fam.handle_events_in_interval(0.1)
- client = self.pipe.recv()
- self.logger.debug("Building configuration for %s" % client)
- config = \
- lxml.etree.tostring(self.BuildConfiguration(client))
- self.logger.debug("Returning configuration for %s to main "
- "process" % client)
- self.pipe.send(config)
- self.logger.debug("Returned configuration for %s to main "
- "process" % client)
+ self.rpc_dispatch()
except KeyboardInterrupt:
break
self.shutdown()
@@ -164,11 +238,14 @@ class Core(BuiltinCore):
# monkeypatch self.terminate to have isSet().
self.terminate = DualEvent(threading_event=self.terminate)
+ self.metadata_cache = DispatchingCache()
+
def _run(self):
for cnum in range(self.setup['children']):
name = "Child-%s" % cnum
(mainpipe, childpipe) = multiprocessing.Pipe()
self.pipes[name] = mainpipe
+ self.metadata_cache.pipes[name] = mainpipe
self.logger.debug("Starting child %s" % name)
childcore = ChildCore(self.setup, childpipe, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
@@ -193,12 +270,17 @@ class Core(BuiltinCore):
self.logger.debug("All children shut down")
@exposed
+ def set_debug(self, address, debug):
+ BuiltinCore.set_debug(self, address, debug)
+ self.metadata_cache.set_debug(debug)
+
+ @exposed
def GetConfig(self, address):
client = self.resolve_client(address)[0]
childname = self.available_children.get()
self.logger.debug("Building configuration on child %s" % childname)
pipe = self.pipes[childname]
- pipe.send(client)
+ pipe.send(("GetConfig", [client], dict()))
config = pipe.recv()
self.available_children.put_nowait(childname)
return config