diff options
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r-- | src/lib/Bcfg2/Server/MultiprocessingCore.py | 46 |
1 files changed, 8 insertions, 38 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 517140178..c42009bdd 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -16,32 +16,15 @@ import threading import lxml.etree import multiprocessing import Bcfg2.Options +import Bcfg2.Server.Cache import Bcfg2.Server.Plugin from itertools import cycle -from Bcfg2.Server.Cache import Cache from Bcfg2.Compat import Queue, Empty, wraps from Bcfg2.Server.Core import Core, exposed from Bcfg2.Server.BuiltinCore import BuiltinCore from multiprocessing.connection import Listener, Client -class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable): - """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates - cache expiration events to child nodes. """ - - #: The method to send over the pipe to expire the cache - method = "expire_metadata_cache" - - def __init__(self, *args, **kwargs): - self.rpc_q = kwargs.pop("queue") - Bcfg2.Server.Plugin.Debuggable.__init__(self) - Cache.__init__(self, *args, **kwargs) - - def expire(self, key=None): - self.rpc_q.publish(self.method, args=[key]) - Cache.expire(self, key=key) - - class RPCQueue(Bcfg2.Server.Plugin.Debuggable): """ An implementation of a :class:`multiprocessing.Queue` designed for several additional use patterns: @@ -304,16 +287,9 @@ class ChildCore(Core): return rmi @exposed - def expire_metadata_cache(self, client=None): - """ Expire the metadata cache for a client """ - self.metadata_cache.expire(client) - - @exposed - def RecvProbeData(self, address, _): - """ Expire the probe cache for a client """ - self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing, - key=self.resolve_client(address, - metadata=False)[0]) + def expire_cache(self, *tags, **kwargs): + """ Expire cached data """ + Bcfg2.Server.Cache.expire(*tags, exact=kwargs.pop("exact", False)) @exposed def GetConfig(self, client): @@ -368,8 +344,6 @@ class MultiprocessingCore(BuiltinCore): #: used to send or publish commands to children. self.rpc_q = RPCQueue() - self.metadata_cache = DispatchingCache(queue=self.rpc_q) - #: A list of children that will be cycled through self._all_children = [] @@ -392,6 +366,7 @@ class MultiprocessingCore(BuiltinCore): self.logger.debug("Started %s children: %s" % (len(self._all_children), self._all_children)) self.children = cycle(self._all_children) + Bcfg2.Server.Cache.add_expire_hook(self.cache_dispatch) return BuiltinCore._run(self) def shutdown(self): @@ -464,16 +439,11 @@ class MultiprocessingCore(BuiltinCore): def set_debug(self, address, debug): self.rpc_q.set_debug(debug) self.rpc_q.publish("set_debug", args=[address, debug]) - self.metadata_cache.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) - @exposed - def RecvProbeData(self, address, probedata): - rv = BuiltinCore.RecvProbeData(self, address, probedata) - # we don't want the children to actually process probe data, - # so we don't send the data, just the fact that we got some. - self.rpc_q.publish("RecvProbeData", args=[address, None]) - return rv + def cache_dispatch(self, tags, exact, _): + """ Publish cache expiration events to child nodes. """ + self.rpc_q.publish("expire_cache", args=tags, kwargs=dict(exact=exact)) @exposed def GetConfig(self, address): |