diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-08-16 10:53:23 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-08-16 10:53:23 -0400 |
commit | 38f3cfcfdbf36bca3b048cc87e6d11e0c883673b (patch) | |
tree | 6a12ce51ad9078dd843dc41f668da3a77e889979 /src/lib/Bcfg2/Server/MultiprocessingCore.py | |
parent | 35498c8b849c15632d720656d5736c4c85f76b53 (diff) | |
download | bcfg2-38f3cfcfdbf36bca3b048cc87e6d11e0c883673b.tar.gz bcfg2-38f3cfcfdbf36bca3b048cc87e6d11e0c883673b.tar.bz2 bcfg2-38f3cfcfdbf36bca3b048cc87e6d11e0c883673b.zip |
Rewrote arbitrary data cache system
The caching facilities in Bcfg2.Server.Cache provided basically no
features. This rewrites that to allow for much more powerful cache
expiration, with a particular focus on interoperation between
different components and plugins to let caches be expired as
necessary. (E.g., the Probes plugin can expire the Metadata cache.)
This does not affect any of the file data cached by Bcfg2, only the
caches that are populated with arbitrary data (Metadata, Packages,
Probes, etc.).
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r-- | src/lib/Bcfg2/Server/MultiprocessingCore.py | 46 |
1 files changed, 8 insertions, 38 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 517140178..c42009bdd 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -16,32 +16,15 @@ import threading import lxml.etree import multiprocessing import Bcfg2.Options +import Bcfg2.Server.Cache import Bcfg2.Server.Plugin from itertools import cycle -from Bcfg2.Server.Cache import Cache from Bcfg2.Compat import Queue, Empty, wraps from Bcfg2.Server.Core import Core, exposed from Bcfg2.Server.BuiltinCore import BuiltinCore from multiprocessing.connection import Listener, Client -class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable): - """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates - cache expiration events to child nodes. """ - - #: The method to send over the pipe to expire the cache - method = "expire_metadata_cache" - - def __init__(self, *args, **kwargs): - self.rpc_q = kwargs.pop("queue") - Bcfg2.Server.Plugin.Debuggable.__init__(self) - Cache.__init__(self, *args, **kwargs) - - def expire(self, key=None): - self.rpc_q.publish(self.method, args=[key]) - Cache.expire(self, key=key) - - class RPCQueue(Bcfg2.Server.Plugin.Debuggable): """ An implementation of a :class:`multiprocessing.Queue` designed for several additional use patterns: @@ -304,16 +287,9 @@ class ChildCore(Core): return rmi @exposed - def expire_metadata_cache(self, client=None): - """ Expire the metadata cache for a client """ - self.metadata_cache.expire(client) - - @exposed - def RecvProbeData(self, address, _): - """ Expire the probe cache for a client """ - self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing, - key=self.resolve_client(address, - metadata=False)[0]) + def expire_cache(self, *tags, **kwargs): + """ Expire cached data """ + Bcfg2.Server.Cache.expire(*tags, exact=kwargs.pop("exact", False)) @exposed def GetConfig(self, client): @@ -368,8 +344,6 @@ class MultiprocessingCore(BuiltinCore): #: used to send or publish commands to children. self.rpc_q = RPCQueue() - self.metadata_cache = DispatchingCache(queue=self.rpc_q) - #: A list of children that will be cycled through self._all_children = [] @@ -392,6 +366,7 @@ class MultiprocessingCore(BuiltinCore): self.logger.debug("Started %s children: %s" % (len(self._all_children), self._all_children)) self.children = cycle(self._all_children) + Bcfg2.Server.Cache.add_expire_hook(self.cache_dispatch) return BuiltinCore._run(self) def shutdown(self): @@ -464,16 +439,11 @@ class MultiprocessingCore(BuiltinCore): def set_debug(self, address, debug): self.rpc_q.set_debug(debug) self.rpc_q.publish("set_debug", args=[address, debug]) - self.metadata_cache.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) - @exposed - def RecvProbeData(self, address, probedata): - rv = BuiltinCore.RecvProbeData(self, address, probedata) - # we don't want the children to actually process probe data, - # so we don't send the data, just the fact that we got some. - self.rpc_q.publish("RecvProbeData", args=[address, None]) - return rv + def cache_dispatch(self, tags, exact, _): + """ Publish cache expiration events to child nodes. """ + self.rpc_q.publish("expire_cache", args=tags, kwargs=dict(exact=exact)) @exposed def GetConfig(self, address): |