summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py102
1 files changed, 92 insertions, 10 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 81fba7092..066519774 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -2,16 +2,71 @@
:mod:`Bcfg2.Server.BuiltinCore` that uses the Python
:mod:`multiprocessing` library to offload work to multiple child
processes. As such, it requires Python 2.6+.
+
+The parent communicates with the children over a
+:class:`multiprocessing.Pipe` that implements a very simple RPC
+protocol. Each command passed to a child over the Pipe must be a
+tuple with the format::
+
+ (<method>, <args>, <kwargs>)
+
+The method must be exposed by the child by decorating it with
+:func:`Bcfg2.Server.Core.exposed`.
+
+The RPC call always returns a value via the pipe, so the caller *must*
+read the return value in order to keep the pipe consistent.
"""
+import logging
import threading
import lxml.etree
import multiprocessing
+from Bcfg2.Cache import Cache
from Bcfg2.Compat import Queue
from Bcfg2.Server.Core import BaseCore, exposed
+from Bcfg2.Server.Plugin import Debuggable
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
+class DispatchingCache(Cache, Debuggable):
+ """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates
+ cache expiration events to child nodes. """
+
+ #: The method to send over the pipe to expire the cache
+ method = "expire_cache"
+
+ def __init__(self, *args, **kwargs):
+ #: A dict of <child name>: :class:`multiprocessing.Pipe`
+ #: objects that should be given a cache expiration command any
+ #: time an item is expired.
+ self.pipes = kwargs.pop("pipes", dict())
+
+ #: A :class:`logging.Logger` object this cache object can use
+ self.logger = logging.getLogger(self.__class__.__name__)
+ Cache.__init__(self, *args, **kwargs)
+
+ def expire(self, key=None):
+ if (key and key in self) or (not key and len(self)):
+ # dispatching cache expiration to children can be
+ # expensive, so only do it if there's something to expire
+ for child, pipe in self.pipes.items():
+ if key:
+ self.logger.debug("Expiring metadata cache for %s on %s" %
+ (key, child))
+ else:
+ self.logger.debug("Expiring metadata cache on %s" % child)
+ pipe.send((self.method, [key], dict()))
+ pipe.recv()
+ Cache.expire(self, key=key)
+
+
+class NoSuchMethod(Exception):
+ """ Exception raised by a child process if it's asked to execute a
+ method that doesn't exist or that isn't exposed via the
+ :class:`multiprocessing.Pipe` RPC interface. """
+ pass
+
+
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that
internally implements both :class:`threading.Event` and
@@ -98,6 +153,33 @@ class ChildCore(BaseCore):
def _run(self):
return True
+ def rpc_dispatch(self):
+ """ Dispatch a method received via the
+ :class:`multiprocessing.Pipe` RPC interface.
+
+ :param data: The tuple of ``(<method name>, <args>, <kwargs>)``
+ :type data: tuple
+ """
+ method, args, kwargs = self.pipe.recv()
+ if hasattr(self, method):
+ func = getattr(self, method)
+ if func.exposed:
+ self.pipe.send(func(*args, **kwargs))
+ else:
+ raise NoSuchMethod(method)
+ else:
+ raise NoSuchMethod(method)
+
+ @exposed
+ def GetConfig(self, client):
+ self.logger.debug("Building configuration for %s" % client)
+ return lxml.etree.tostring(self.BuildConfiguration(client))
+
+ @exposed
+ def expire_cache(self, client=None):
+ """ Expire the metadata cache for a client """
+ self.metadata_cache.expire(client)
+
def _block(self):
while not self.terminate.isSet():
try:
@@ -109,15 +191,7 @@ class ChildCore(BaseCore):
# should be using the metadata database if you're
# using this core.
self.fam.handle_events_in_interval(0.1)
- client = self.pipe.recv()
- self.logger.debug("Building configuration for %s" % client)
- config = \
- lxml.etree.tostring(self.BuildConfiguration(client))
- self.logger.debug("Returning configuration for %s to main "
- "process" % client)
- self.pipe.send(config)
- self.logger.debug("Returned configuration for %s to main "
- "process" % client)
+ self.rpc_dispatch()
except KeyboardInterrupt:
break
self.shutdown()
@@ -164,11 +238,14 @@ class Core(BuiltinCore):
# monkeypatch self.terminate to have isSet().
self.terminate = DualEvent(threading_event=self.terminate)
+ self.metadata_cache = DispatchingCache()
+
def _run(self):
for cnum in range(self.setup['children']):
name = "Child-%s" % cnum
(mainpipe, childpipe) = multiprocessing.Pipe()
self.pipes[name] = mainpipe
+ self.metadata_cache.pipes[name] = mainpipe
self.logger.debug("Starting child %s" % name)
childcore = ChildCore(self.setup, childpipe, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
@@ -193,12 +270,17 @@ class Core(BuiltinCore):
self.logger.debug("All children shut down")
@exposed
+ def set_debug(self, address, debug):
+ BuiltinCore.set_debug(self, address, debug)
+ self.metadata_cache.set_debug(debug)
+
+ @exposed
def GetConfig(self, address):
client = self.resolve_client(address)[0]
childname = self.available_children.get()
self.logger.debug("Building configuration on child %s" % childname)
pipe = self.pipes[childname]
- pipe.send(client)
+ pipe.send(("GetConfig", [client], dict()))
config = pipe.recv()
self.available_children.put_nowait(childname)
return config