summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-07 13:22:14 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-07 13:28:47 -0400
commit4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b (patch)
treef66fbf4de5249632b631a45088f0e9f922645844 /src/lib/Bcfg2/Server
parent7e9787c947e99b68317f5420951a296cea858daa (diff)
downloadbcfg2-4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b.tar.gz
bcfg2-4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b.tar.bz2
bcfg2-4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b.zip
MultiprocessingCore: added a way to dispatch RMI calls to child processes
Diffstat (limited to 'src/lib/Bcfg2/Server')
-rw-r--r--src/lib/Bcfg2/Server/Core.py6
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py76
-rw-r--r--src/lib/Bcfg2/Server/Plugin/base.py18
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Guppy.py1
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/__init__.py3
5 files changed, 90 insertions, 14 deletions
diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py
index b577f65e1..6357baae4 100644
--- a/src/lib/Bcfg2/Server/Core.py
+++ b/src/lib/Bcfg2/Server/Core.py
@@ -976,12 +976,10 @@ class BaseCore(object):
def _get_rmi(self):
""" Get a list of RMI calls exposed by plugins """
rmi = dict()
- for pname, pinst in list(self.plugins.items()):
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
for mname in pinst.__rmi__:
rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
- famname = self.fam.__class__.__name__
- for mname in self.fam.__rmi__:
- rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname)
return rmi
def _resolve_exposed_method(self, method_name):
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index c185a5893..e79207291 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -18,7 +18,7 @@ import multiprocessing
import Bcfg2.Server.Plugin
from itertools import cycle
from Bcfg2.Cache import Cache
-from Bcfg2.Compat import Queue, Empty
+from Bcfg2.Compat import Queue, Empty, wraps
from Bcfg2.Server.Core import BaseCore, exposed
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
from multiprocessing.connection import Listener, Client
@@ -29,7 +29,7 @@ class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable):
cache expiration events to child nodes. """
#: The method to send over the pipe to expire the cache
- method = "expire_cache"
+ method = "expire_metadata_cache"
def __init__(self, *args, **kwargs):
self.rpc_q = kwargs.pop("queue")
@@ -221,6 +221,8 @@ class ChildCore(BaseCore):
# ensure that the child doesn't start a perflog thread
self.perflog_thread = None
+ self._rmi = dict()
+
def _run(self):
return True
@@ -238,25 +240,34 @@ class ChildCore(BaseCore):
self.logger.debug("Connecting to parent via %s" % address)
client = Client(address)
method, args, kwargs = data
+ func = None
rv = None
- if not hasattr(self, method):
+ if "." in method:
+ if method in self._rmi:
+ func = self._rmi[method]
+ else:
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ elif not hasattr(self, method):
self.logger.error("%s: Method %s does not exist" % (self.name,
method))
- else:
+ else: # method is not a plugin RMI, and exists
func = getattr(self, method)
- if func.exposed:
- self.logger.debug("%s: Calling RPC method %s" % (self.name,
- method))
- rv = func(*args, **kwargs)
- else:
+ if not func.exposed:
self.logger.error("%s: Method %s is not exposed" % (self.name,
method))
+ func = None
+ if func is not None:
+ self.logger.debug("%s: Calling RPC method %s" % (self.name,
+ method))
+ rv = func(*args, **kwargs)
if address is not None:
# if the key is None, then no response is expected
self.logger.debug("Returning data to parent via %s" % address)
client.send(rv)
def _block(self):
+ self._rmi = self._get_rmi()
while not self.terminate.is_set():
try:
address, data = self.rpc_q.get(timeout=self.poll_wait)
@@ -285,8 +296,20 @@ class ChildCore(BaseCore):
time.sleep(1)
self.logger.info("%s: All threads stopped" % self.name)
+ def _get_rmi(self):
+ rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ mname = crmi[1]
+ else:
+ mname = crmi
+ rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
+ return rmi
+
@exposed
- def expire_cache(self, client=None):
+ def expire_metadata_cache(self, client=None):
""" Expire the metadata cache for a client """
self.metadata_cache.expire(client)
@@ -388,6 +411,39 @@ class Core(BuiltinCore):
time.sleep(1)
self.logger.info("Shutdown complete")
+ def _get_rmi(self):
+ child_rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ parentname, childname = crmi
+ else:
+ parentname = childname = crmi
+ child_rmi["%s.%s" % (pname, parentname)] = \
+ "%s.%s" % (pname, childname)
+
+ rmi = BuiltinCore._get_rmi(self)
+ for method in rmi.keys():
+ if method in child_rmi:
+ rmi[method] = self._child_rmi_wrapper(method,
+ rmi[method],
+ child_rmi[method])
+ return rmi
+
+ def _child_rmi_wrapper(self, method, parent_rmi, child_rmi):
+ """ Returns a callable that dispatches a call to the given
+ child RMI to child processes, and calls the parent RMI locally
+ (i.e., in the parent process). """
+ @wraps(parent_rmi)
+ def inner(*args, **kwargs):
+ self.logger.debug("Dispatching RMI call to %s to children: %s" %
+ (method, child_rmi))
+ self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs)
+ return parent_rmi(*args, **kwargs)
+
+ return inner
+
@exposed
def set_debug(self, address, debug):
self.rpc_q.set_debug(debug)
diff --git a/src/lib/Bcfg2/Server/Plugin/base.py b/src/lib/Bcfg2/Server/Plugin/base.py
index 244c73aa8..03feceb6f 100644
--- a/src/lib/Bcfg2/Server/Plugin/base.py
+++ b/src/lib/Bcfg2/Server/Plugin/base.py
@@ -12,6 +12,10 @@ class Debuggable(object):
#: List of names of methods to be exposed as XML-RPC functions
__rmi__ = ['toggle_debug', 'set_debug']
+ #: How exposed XML-RPC functions should be dispatched to child
+ #: processes.
+ __child_rmi__ = __rmi__[:]
+
def __init__(self, name=None):
"""
:param name: The name of the logger object to get. If none is
@@ -91,6 +95,20 @@ class Plugin(Debuggable):
#: List of names of methods to be exposed as XML-RPC functions
__rmi__ = Debuggable.__rmi__
+ #: How exposed XML-RPC functions should be dispatched to child
+ #: processes, if :mod:`Bcfg2.Server.MultiprocessingCore` is in
+ #: use. Items ``__child_rmi__`` can either be strings (in which
+ #: case the same function is called on child processes as on the
+ #: parent) or 2-tuples, in which case the first element is the
+ #: name of the RPC function called on the parent process, and the
+ #: second element is the name of the function to call on child
+ #: processes. Functions that are not listed in the list will not
+ #: be dispatched to child processes, i.e., they will only be
+ #: called on the parent. A function must be listed in ``__rmi__``
+ #: in order to be exposed; functions listed in ``_child_rmi__``
+ #: but not ``__rmi__`` will be ignored.
+ __child_rmi__ = Debuggable.__child_rmi__
+
def __init__(self, core, datastore):
"""
:param core: The Bcfg2.Server.Core initializing the plugin
diff --git a/src/lib/Bcfg2/Server/Plugins/Guppy.py b/src/lib/Bcfg2/Server/Plugins/Guppy.py
index 4f2601f15..3c9b8a459 100644
--- a/src/lib/Bcfg2/Server/Plugins/Guppy.py
+++ b/src/lib/Bcfg2/Server/Plugins/Guppy.py
@@ -37,6 +37,7 @@ class Guppy(Bcfg2.Server.Plugin.Plugin):
experimental = True
__rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Enable', 'Disable']
+ __child_rmi__ = __rmi__[:]
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
index 3e4fb33ec..4096153a3 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
@@ -93,6 +93,9 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
#: and :func:`Reload`
__rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Refresh', 'Reload']
+ __child_rmi__ = Bcfg2.Server.Plugin.Plugin.__child_rmi__ + \
+ [('Refresh', 'expire_cache'), ('Reload', 'expire_cache')]
+
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
Bcfg2.Server.Plugin.Caching.__init__(self)