From 4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 7 Aug 2013 13:22:14 -0400 Subject: MultiprocessingCore: added a way to dispatch RMI calls to child processes --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 76 +++++++++++++++++++++++++---- 1 file changed, 66 insertions(+), 10 deletions(-) (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index c185a5893..e79207291 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -18,7 +18,7 @@ import multiprocessing import Bcfg2.Server.Plugin from itertools import cycle from Bcfg2.Cache import Cache -from Bcfg2.Compat import Queue, Empty +from Bcfg2.Compat import Queue, Empty, wraps from Bcfg2.Server.Core import BaseCore, exposed from Bcfg2.Server.BuiltinCore import Core as BuiltinCore from multiprocessing.connection import Listener, Client @@ -29,7 +29,7 @@ class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable): cache expiration events to child nodes. """ #: The method to send over the pipe to expire the cache - method = "expire_cache" + method = "expire_metadata_cache" def __init__(self, *args, **kwargs): self.rpc_q = kwargs.pop("queue") @@ -221,6 +221,8 @@ class ChildCore(BaseCore): # ensure that the child doesn't start a perflog thread self.perflog_thread = None + self._rmi = dict() + def _run(self): return True @@ -238,25 +240,34 @@ class ChildCore(BaseCore): self.logger.debug("Connecting to parent via %s" % address) client = Client(address) method, args, kwargs = data + func = None rv = None - if not hasattr(self, method): + if "." in method: + if method in self._rmi: + func = self._rmi[method] + else: + self.logger.error("%s: Method %s does not exist" % (self.name, + method)) + elif not hasattr(self, method): self.logger.error("%s: Method %s does not exist" % (self.name, method)) - else: + else: # method is not a plugin RMI, and exists func = getattr(self, method) - if func.exposed: - self.logger.debug("%s: Calling RPC method %s" % (self.name, - method)) - rv = func(*args, **kwargs) - else: + if not func.exposed: self.logger.error("%s: Method %s is not exposed" % (self.name, method)) + func = None + if func is not None: + self.logger.debug("%s: Calling RPC method %s" % (self.name, + method)) + rv = func(*args, **kwargs) if address is not None: # if the key is None, then no response is expected self.logger.debug("Returning data to parent via %s" % address) client.send(rv) def _block(self): + self._rmi = self._get_rmi() while not self.terminate.is_set(): try: address, data = self.rpc_q.get(timeout=self.poll_wait) @@ -285,8 +296,20 @@ class ChildCore(BaseCore): time.sleep(1) self.logger.info("%s: All threads stopped" % self.name) + def _get_rmi(self): + rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + mname = crmi[1] + else: + mname = crmi + rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) + return rmi + @exposed - def expire_cache(self, client=None): + def expire_metadata_cache(self, client=None): """ Expire the metadata cache for a client """ self.metadata_cache.expire(client) @@ -388,6 +411,39 @@ class Core(BuiltinCore): time.sleep(1) self.logger.info("Shutdown complete") + def _get_rmi(self): + child_rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + parentname, childname = crmi + else: + parentname = childname = crmi + child_rmi["%s.%s" % (pname, parentname)] = \ + "%s.%s" % (pname, childname) + + rmi = BuiltinCore._get_rmi(self) + for method in rmi.keys(): + if method in child_rmi: + rmi[method] = self._child_rmi_wrapper(method, + rmi[method], + child_rmi[method]) + return rmi + + def _child_rmi_wrapper(self, method, parent_rmi, child_rmi): + """ Returns a callable that dispatches a call to the given + child RMI to child processes, and calls the parent RMI locally + (i.e., in the parent process). """ + @wraps(parent_rmi) + def inner(*args, **kwargs): + self.logger.debug("Dispatching RMI call to %s to children: %s" % + (method, child_rmi)) + self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs) + return parent_rmi(*args, **kwargs) + + return inner + @exposed def set_debug(self, address, debug): self.rpc_q.set_debug(debug) -- cgit v1.2.3-1-g7c22