From 4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 7 Aug 2013 13:22:14 -0400 Subject: MultiprocessingCore: added a way to dispatch RMI calls to child processes --- src/lib/Bcfg2/Server/Core.py | 6 +- src/lib/Bcfg2/Server/MultiprocessingCore.py | 76 ++++++++++++++++++++--- src/lib/Bcfg2/Server/Plugin/base.py | 18 ++++++ src/lib/Bcfg2/Server/Plugins/Guppy.py | 1 + src/lib/Bcfg2/Server/Plugins/Packages/__init__.py | 3 + 5 files changed, 90 insertions(+), 14 deletions(-) (limited to 'src/lib/Bcfg2') diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index b577f65e1..6357baae4 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -976,12 +976,10 @@ class BaseCore(object): def _get_rmi(self): """ Get a list of RMI calls exposed by plugins """ rmi = dict() - for pname, pinst in list(self.plugins.items()): + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: for mname in pinst.__rmi__: rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) - famname = self.fam.__class__.__name__ - for mname in self.fam.__rmi__: - rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname) return rmi def _resolve_exposed_method(self, method_name): diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index c185a5893..e79207291 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -18,7 +18,7 @@ import multiprocessing import Bcfg2.Server.Plugin from itertools import cycle from Bcfg2.Cache import Cache -from Bcfg2.Compat import Queue, Empty +from Bcfg2.Compat import Queue, Empty, wraps from Bcfg2.Server.Core import BaseCore, exposed from Bcfg2.Server.BuiltinCore import Core as BuiltinCore from multiprocessing.connection import Listener, Client @@ -29,7 +29,7 @@ class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable): cache expiration events to child nodes. """ #: The method to send over the pipe to expire the cache - method = "expire_cache" + method = "expire_metadata_cache" def __init__(self, *args, **kwargs): self.rpc_q = kwargs.pop("queue") @@ -221,6 +221,8 @@ class ChildCore(BaseCore): # ensure that the child doesn't start a perflog thread self.perflog_thread = None + self._rmi = dict() + def _run(self): return True @@ -238,25 +240,34 @@ class ChildCore(BaseCore): self.logger.debug("Connecting to parent via %s" % address) client = Client(address) method, args, kwargs = data + func = None rv = None - if not hasattr(self, method): + if "." in method: + if method in self._rmi: + func = self._rmi[method] + else: + self.logger.error("%s: Method %s does not exist" % (self.name, + method)) + elif not hasattr(self, method): self.logger.error("%s: Method %s does not exist" % (self.name, method)) - else: + else: # method is not a plugin RMI, and exists func = getattr(self, method) - if func.exposed: - self.logger.debug("%s: Calling RPC method %s" % (self.name, - method)) - rv = func(*args, **kwargs) - else: + if not func.exposed: self.logger.error("%s: Method %s is not exposed" % (self.name, method)) + func = None + if func is not None: + self.logger.debug("%s: Calling RPC method %s" % (self.name, + method)) + rv = func(*args, **kwargs) if address is not None: # if the key is None, then no response is expected self.logger.debug("Returning data to parent via %s" % address) client.send(rv) def _block(self): + self._rmi = self._get_rmi() while not self.terminate.is_set(): try: address, data = self.rpc_q.get(timeout=self.poll_wait) @@ -285,8 +296,20 @@ class ChildCore(BaseCore): time.sleep(1) self.logger.info("%s: All threads stopped" % self.name) + def _get_rmi(self): + rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + mname = crmi[1] + else: + mname = crmi + rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) + return rmi + @exposed - def expire_cache(self, client=None): + def expire_metadata_cache(self, client=None): """ Expire the metadata cache for a client """ self.metadata_cache.expire(client) @@ -388,6 +411,39 @@ class Core(BuiltinCore): time.sleep(1) self.logger.info("Shutdown complete") + def _get_rmi(self): + child_rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + parentname, childname = crmi + else: + parentname = childname = crmi + child_rmi["%s.%s" % (pname, parentname)] = \ + "%s.%s" % (pname, childname) + + rmi = BuiltinCore._get_rmi(self) + for method in rmi.keys(): + if method in child_rmi: + rmi[method] = self._child_rmi_wrapper(method, + rmi[method], + child_rmi[method]) + return rmi + + def _child_rmi_wrapper(self, method, parent_rmi, child_rmi): + """ Returns a callable that dispatches a call to the given + child RMI to child processes, and calls the parent RMI locally + (i.e., in the parent process). """ + @wraps(parent_rmi) + def inner(*args, **kwargs): + self.logger.debug("Dispatching RMI call to %s to children: %s" % + (method, child_rmi)) + self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs) + return parent_rmi(*args, **kwargs) + + return inner + @exposed def set_debug(self, address, debug): self.rpc_q.set_debug(debug) diff --git a/src/lib/Bcfg2/Server/Plugin/base.py b/src/lib/Bcfg2/Server/Plugin/base.py index 244c73aa8..03feceb6f 100644 --- a/src/lib/Bcfg2/Server/Plugin/base.py +++ b/src/lib/Bcfg2/Server/Plugin/base.py @@ -12,6 +12,10 @@ class Debuggable(object): #: List of names of methods to be exposed as XML-RPC functions __rmi__ = ['toggle_debug', 'set_debug'] + #: How exposed XML-RPC functions should be dispatched to child + #: processes. + __child_rmi__ = __rmi__[:] + def __init__(self, name=None): """ :param name: The name of the logger object to get. If none is @@ -91,6 +95,20 @@ class Plugin(Debuggable): #: List of names of methods to be exposed as XML-RPC functions __rmi__ = Debuggable.__rmi__ + #: How exposed XML-RPC functions should be dispatched to child + #: processes, if :mod:`Bcfg2.Server.MultiprocessingCore` is in + #: use. Items ``__child_rmi__`` can either be strings (in which + #: case the same function is called on child processes as on the + #: parent) or 2-tuples, in which case the first element is the + #: name of the RPC function called on the parent process, and the + #: second element is the name of the function to call on child + #: processes. Functions that are not listed in the list will not + #: be dispatched to child processes, i.e., they will only be + #: called on the parent. A function must be listed in ``__rmi__`` + #: in order to be exposed; functions listed in ``_child_rmi__`` + #: but not ``__rmi__`` will be ignored. + __child_rmi__ = Debuggable.__child_rmi__ + def __init__(self, core, datastore): """ :param core: The Bcfg2.Server.Core initializing the plugin diff --git a/src/lib/Bcfg2/Server/Plugins/Guppy.py b/src/lib/Bcfg2/Server/Plugins/Guppy.py index 4f2601f15..3c9b8a459 100644 --- a/src/lib/Bcfg2/Server/Plugins/Guppy.py +++ b/src/lib/Bcfg2/Server/Plugins/Guppy.py @@ -37,6 +37,7 @@ class Guppy(Bcfg2.Server.Plugin.Plugin): experimental = True __rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Enable', 'Disable'] + __child_rmi__ = __rmi__[:] def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py index 3e4fb33ec..4096153a3 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py @@ -93,6 +93,9 @@ class Packages(Bcfg2.Server.Plugin.Plugin, #: and :func:`Reload` __rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Refresh', 'Reload'] + __child_rmi__ = Bcfg2.Server.Plugin.Plugin.__child_rmi__ + \ + [('Refresh', 'expire_cache'), ('Reload', 'expire_cache')] + def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) Bcfg2.Server.Plugin.Caching.__init__(self) -- cgit v1.2.3-1-g7c22