summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-07 13:22:14 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-07 13:28:47 -0400
commit4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b (patch)
treef66fbf4de5249632b631a45088f0e9f922645844 /src/lib/Bcfg2/Server/MultiprocessingCore.py
parent7e9787c947e99b68317f5420951a296cea858daa (diff)
downloadbcfg2-4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b.tar.gz
bcfg2-4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b.tar.bz2
bcfg2-4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b.zip
MultiprocessingCore: added a way to dispatch RMI calls to child processes
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py76
1 files changed, 66 insertions, 10 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index c185a5893..e79207291 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -18,7 +18,7 @@ import multiprocessing
import Bcfg2.Server.Plugin
from itertools import cycle
from Bcfg2.Cache import Cache
-from Bcfg2.Compat import Queue, Empty
+from Bcfg2.Compat import Queue, Empty, wraps
from Bcfg2.Server.Core import BaseCore, exposed
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
from multiprocessing.connection import Listener, Client
@@ -29,7 +29,7 @@ class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable):
cache expiration events to child nodes. """
#: The method to send over the pipe to expire the cache
- method = "expire_cache"
+ method = "expire_metadata_cache"
def __init__(self, *args, **kwargs):
self.rpc_q = kwargs.pop("queue")
@@ -221,6 +221,8 @@ class ChildCore(BaseCore):
# ensure that the child doesn't start a perflog thread
self.perflog_thread = None
+ self._rmi = dict()
+
def _run(self):
return True
@@ -238,25 +240,34 @@ class ChildCore(BaseCore):
self.logger.debug("Connecting to parent via %s" % address)
client = Client(address)
method, args, kwargs = data
+ func = None
rv = None
- if not hasattr(self, method):
+ if "." in method:
+ if method in self._rmi:
+ func = self._rmi[method]
+ else:
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ elif not hasattr(self, method):
self.logger.error("%s: Method %s does not exist" % (self.name,
method))
- else:
+ else: # method is not a plugin RMI, and exists
func = getattr(self, method)
- if func.exposed:
- self.logger.debug("%s: Calling RPC method %s" % (self.name,
- method))
- rv = func(*args, **kwargs)
- else:
+ if not func.exposed:
self.logger.error("%s: Method %s is not exposed" % (self.name,
method))
+ func = None
+ if func is not None:
+ self.logger.debug("%s: Calling RPC method %s" % (self.name,
+ method))
+ rv = func(*args, **kwargs)
if address is not None:
# if the key is None, then no response is expected
self.logger.debug("Returning data to parent via %s" % address)
client.send(rv)
def _block(self):
+ self._rmi = self._get_rmi()
while not self.terminate.is_set():
try:
address, data = self.rpc_q.get(timeout=self.poll_wait)
@@ -285,8 +296,20 @@ class ChildCore(BaseCore):
time.sleep(1)
self.logger.info("%s: All threads stopped" % self.name)
+ def _get_rmi(self):
+ rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ mname = crmi[1]
+ else:
+ mname = crmi
+ rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
+ return rmi
+
@exposed
- def expire_cache(self, client=None):
+ def expire_metadata_cache(self, client=None):
""" Expire the metadata cache for a client """
self.metadata_cache.expire(client)
@@ -388,6 +411,39 @@ class Core(BuiltinCore):
time.sleep(1)
self.logger.info("Shutdown complete")
+ def _get_rmi(self):
+ child_rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ parentname, childname = crmi
+ else:
+ parentname = childname = crmi
+ child_rmi["%s.%s" % (pname, parentname)] = \
+ "%s.%s" % (pname, childname)
+
+ rmi = BuiltinCore._get_rmi(self)
+ for method in rmi.keys():
+ if method in child_rmi:
+ rmi[method] = self._child_rmi_wrapper(method,
+ rmi[method],
+ child_rmi[method])
+ return rmi
+
+ def _child_rmi_wrapper(self, method, parent_rmi, child_rmi):
+ """ Returns a callable that dispatches a call to the given
+ child RMI to child processes, and calls the parent RMI locally
+ (i.e., in the parent process). """
+ @wraps(parent_rmi)
+ def inner(*args, **kwargs):
+ self.logger.debug("Dispatching RMI call to %s to children: %s" %
+ (method, child_rmi))
+ self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs)
+ return parent_rmi(*args, **kwargs)
+
+ return inner
+
@exposed
def set_debug(self, address, debug):
self.rpc_q.set_debug(debug)