summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py46
1 files changed, 8 insertions, 38 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 517140178..c42009bdd 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -16,32 +16,15 @@ import threading
import lxml.etree
import multiprocessing
import Bcfg2.Options
+import Bcfg2.Server.Cache
import Bcfg2.Server.Plugin
from itertools import cycle
-from Bcfg2.Server.Cache import Cache
from Bcfg2.Compat import Queue, Empty, wraps
from Bcfg2.Server.Core import Core, exposed
from Bcfg2.Server.BuiltinCore import BuiltinCore
from multiprocessing.connection import Listener, Client
-class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable):
- """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates
- cache expiration events to child nodes. """
-
- #: The method to send over the pipe to expire the cache
- method = "expire_metadata_cache"
-
- def __init__(self, *args, **kwargs):
- self.rpc_q = kwargs.pop("queue")
- Bcfg2.Server.Plugin.Debuggable.__init__(self)
- Cache.__init__(self, *args, **kwargs)
-
- def expire(self, key=None):
- self.rpc_q.publish(self.method, args=[key])
- Cache.expire(self, key=key)
-
-
class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
""" An implementation of a :class:`multiprocessing.Queue` designed
for several additional use patterns:
@@ -304,16 +287,9 @@ class ChildCore(Core):
return rmi
@exposed
- def expire_metadata_cache(self, client=None):
- """ Expire the metadata cache for a client """
- self.metadata_cache.expire(client)
-
- @exposed
- def RecvProbeData(self, address, _):
- """ Expire the probe cache for a client """
- self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing,
- key=self.resolve_client(address,
- metadata=False)[0])
+ def expire_cache(self, *tags, **kwargs):
+ """ Expire cached data """
+ Bcfg2.Server.Cache.expire(*tags, exact=kwargs.pop("exact", False))
@exposed
def GetConfig(self, client):
@@ -368,8 +344,6 @@ class MultiprocessingCore(BuiltinCore):
#: used to send or publish commands to children.
self.rpc_q = RPCQueue()
- self.metadata_cache = DispatchingCache(queue=self.rpc_q)
-
#: A list of children that will be cycled through
self._all_children = []
@@ -392,6 +366,7 @@ class MultiprocessingCore(BuiltinCore):
self.logger.debug("Started %s children: %s" % (len(self._all_children),
self._all_children))
self.children = cycle(self._all_children)
+ Bcfg2.Server.Cache.add_expire_hook(self.cache_dispatch)
return BuiltinCore._run(self)
def shutdown(self):
@@ -464,16 +439,11 @@ class MultiprocessingCore(BuiltinCore):
def set_debug(self, address, debug):
self.rpc_q.set_debug(debug)
self.rpc_q.publish("set_debug", args=[address, debug])
- self.metadata_cache.set_debug(debug)
return BuiltinCore.set_debug(self, address, debug)
- @exposed
- def RecvProbeData(self, address, probedata):
- rv = BuiltinCore.RecvProbeData(self, address, probedata)
- # we don't want the children to actually process probe data,
- # so we don't send the data, just the fact that we got some.
- self.rpc_q.publish("RecvProbeData", args=[address, None])
- return rv
+ def cache_dispatch(self, tags, exact, _):
+ """ Publish cache expiration events to child nodes. """
+ self.rpc_q.publish("expire_cache", args=tags, kwargs=dict(exact=exact))
@exposed
def GetConfig(self, address):