summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-16 10:53:23 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-16 10:53:23 -0400
commit38f3cfcfdbf36bca3b048cc87e6d11e0c883673b (patch)
tree6a12ce51ad9078dd843dc41f668da3a77e889979 /src/lib/Bcfg2/Server/MultiprocessingCore.py
parent35498c8b849c15632d720656d5736c4c85f76b53 (diff)
downloadbcfg2-38f3cfcfdbf36bca3b048cc87e6d11e0c883673b.tar.gz
bcfg2-38f3cfcfdbf36bca3b048cc87e6d11e0c883673b.tar.bz2
bcfg2-38f3cfcfdbf36bca3b048cc87e6d11e0c883673b.zip
Rewrote arbitrary data cache system
The caching facilities in Bcfg2.Server.Cache provided basically no features. This rewrites that to allow for much more powerful cache expiration, with a particular focus on interoperation between different components and plugins to let caches be expired as necessary. (E.g., the Probes plugin can expire the Metadata cache.) This does not affect any of the file data cached by Bcfg2, only the caches that are populated with arbitrary data (Metadata, Packages, Probes, etc.).
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py46
1 files changed, 8 insertions, 38 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 517140178..c42009bdd 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -16,32 +16,15 @@ import threading
import lxml.etree
import multiprocessing
import Bcfg2.Options
+import Bcfg2.Server.Cache
import Bcfg2.Server.Plugin
from itertools import cycle
-from Bcfg2.Server.Cache import Cache
from Bcfg2.Compat import Queue, Empty, wraps
from Bcfg2.Server.Core import Core, exposed
from Bcfg2.Server.BuiltinCore import BuiltinCore
from multiprocessing.connection import Listener, Client
-class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable):
- """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates
- cache expiration events to child nodes. """
-
- #: The method to send over the pipe to expire the cache
- method = "expire_metadata_cache"
-
- def __init__(self, *args, **kwargs):
- self.rpc_q = kwargs.pop("queue")
- Bcfg2.Server.Plugin.Debuggable.__init__(self)
- Cache.__init__(self, *args, **kwargs)
-
- def expire(self, key=None):
- self.rpc_q.publish(self.method, args=[key])
- Cache.expire(self, key=key)
-
-
class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
""" An implementation of a :class:`multiprocessing.Queue` designed
for several additional use patterns:
@@ -304,16 +287,9 @@ class ChildCore(Core):
return rmi
@exposed
- def expire_metadata_cache(self, client=None):
- """ Expire the metadata cache for a client """
- self.metadata_cache.expire(client)
-
- @exposed
- def RecvProbeData(self, address, _):
- """ Expire the probe cache for a client """
- self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing,
- key=self.resolve_client(address,
- metadata=False)[0])
+ def expire_cache(self, *tags, **kwargs):
+ """ Expire cached data """
+ Bcfg2.Server.Cache.expire(*tags, exact=kwargs.pop("exact", False))
@exposed
def GetConfig(self, client):
@@ -368,8 +344,6 @@ class MultiprocessingCore(BuiltinCore):
#: used to send or publish commands to children.
self.rpc_q = RPCQueue()
- self.metadata_cache = DispatchingCache(queue=self.rpc_q)
-
#: A list of children that will be cycled through
self._all_children = []
@@ -392,6 +366,7 @@ class MultiprocessingCore(BuiltinCore):
self.logger.debug("Started %s children: %s" % (len(self._all_children),
self._all_children))
self.children = cycle(self._all_children)
+ Bcfg2.Server.Cache.add_expire_hook(self.cache_dispatch)
return BuiltinCore._run(self)
def shutdown(self):
@@ -464,16 +439,11 @@ class MultiprocessingCore(BuiltinCore):
def set_debug(self, address, debug):
self.rpc_q.set_debug(debug)
self.rpc_q.publish("set_debug", args=[address, debug])
- self.metadata_cache.set_debug(debug)
return BuiltinCore.set_debug(self, address, debug)
- @exposed
- def RecvProbeData(self, address, probedata):
- rv = BuiltinCore.RecvProbeData(self, address, probedata)
- # we don't want the children to actually process probe data,
- # so we don't send the data, just the fact that we got some.
- self.rpc_q.publish("RecvProbeData", args=[address, None])
- return rv
+ def cache_dispatch(self, tags, exact, _):
+ """ Publish cache expiration events to child nodes. """
+ self.rpc_q.publish("expire_cache", args=tags, kwargs=dict(exact=exact))
@exposed
def GetConfig(self, address):