diff options
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r-- | src/lib/Bcfg2/Server/MultiprocessingCore.py | 111 |
1 files changed, 53 insertions, 58 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 2cb3adae3..724b34d8d 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -15,32 +15,16 @@ import time import threading import lxml.etree import multiprocessing +import Bcfg2.Options +import Bcfg2.Server.Cache import Bcfg2.Server.Plugin from itertools import cycle -from Bcfg2.Cache import Cache -from Bcfg2.Compat import Empty, wraps -from Bcfg2.Server.Core import BaseCore, exposed -from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +from Bcfg2.Compat import Queue, Empty, wraps +from Bcfg2.Server.Core import Core, exposed +from Bcfg2.Server.BuiltinCore import BuiltinCore from multiprocessing.connection import Listener, Client -class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable): - """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates - cache expiration events to child nodes. """ - - #: The method to send over the pipe to expire the cache - method = "expire_metadata_cache" - - def __init__(self, *args, **kwargs): - self.rpc_q = kwargs.pop("queue") - Bcfg2.Server.Plugin.Debuggable.__init__(self) - Cache.__init__(self, *args, **kwargs) - - def expire(self, key=None): - self.rpc_q.publish(self.method, args=[key]) - Cache.expire(self, key=key) - - class RPCQueue(Bcfg2.Server.Plugin.Debuggable): """ An implementation of a :class:`multiprocessing.Queue` designed for several additional use patterns: @@ -148,7 +132,7 @@ class DualEvent(object): return self._threading_event.wait(timeout=timeout) -class ChildCore(BaseCore): +class ChildCore(Core): """ A child process for :class:`Bcfg2.MultiprocessingCore.Core`. This core builds configurations from a given :class:`multiprocessing.Pipe`. Note that this is a full-fledged @@ -167,12 +151,10 @@ class ChildCore(BaseCore): #: every ``poll_wait`` seconds. poll_wait = 3.0 - def __init__(self, name, setup, rpc_q, terminate): + def __init__(self, name, rpc_q, terminate): """ :param name: The name of this child :type name: string - :param setup: A Bcfg2 options dict - :type setup: Bcfg2.Options.OptionParser :param read_q: The queue the child will read from for RPC communications from the parent process. :type read_q: multiprocessing.Queue @@ -183,7 +165,7 @@ class ChildCore(BaseCore): themselves down. :type terminate: multiprocessing.Event """ - BaseCore.__init__(self, setup) + Core.__init__(self) #: The name of this child self.name = name @@ -197,7 +179,7 @@ class ChildCore(BaseCore): # override this setting so that the child doesn't try to write # the pidfile - self.setup['daemon'] = False + Bcfg2.Options.setup.daemon = False # ensure that the child doesn't start a perflog thread self.perflog_thread = None @@ -207,9 +189,6 @@ class ChildCore(BaseCore): def _run(self): return True - def _daemonize(self): - return True - def _dispatch(self, address, data): """ Method dispatcher used for commands received from the RPC queue. """ @@ -264,7 +243,7 @@ class ChildCore(BaseCore): self.shutdown() def shutdown(self): - BaseCore.shutdown(self) + Core.shutdown(self) self.logger.info("%s: Closing RPC command queue" % self.name) self.rpc_q.close() @@ -289,16 +268,9 @@ class ChildCore(BaseCore): return rmi @exposed - def expire_metadata_cache(self, client=None): - """ Expire the metadata cache for a client """ - self.metadata_cache.expire(client) - - @exposed - def RecvProbeData(self, address, _): - """ Expire the probe cache for a client """ - self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing, - key=self.resolve_client(address, - metadata=False)[0]) + def expire_cache(self, *tags, **kwargs): + """ Expire cached data """ + Bcfg2.Server.Cache.expire(*tags, exact=kwargs.pop("exact", False)) @exposed def GetConfig(self, client): @@ -309,7 +281,7 @@ class ChildCore(BaseCore): return lxml.etree.tostring(self.BuildConfiguration(client)) -class Core(BuiltinCore): +class MultiprocessingCore(BuiltinCore): """ A multiprocessing core that delegates building the actual client configurations to :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` objects. The @@ -317,14 +289,34 @@ class Core(BuiltinCore): :func:`GetConfig` are delegated to children. All other calls are handled by the parent process. """ + options = BuiltinCore.options + [ + Bcfg2.Options.Option( + '--children', dest="core_children", + cf=('server', 'children'), type=int, + default=multiprocessing.cpu_count(), + help='Spawn this number of children for the multiprocessing core')] + #: How long to wait for a child process to shut down cleanly #: before it is terminated. shutdown_timeout = 10.0 - def __init__(self, setup): - BuiltinCore.__init__(self, setup) - if setup['children'] is None: - setup['children'] = multiprocessing.cpu_count() + def __init__(self): + BuiltinCore.__init__(self) + + #: A dict of child name -> one end of the + #: :class:`multiprocessing.Pipe` object used to communicate + #: with that child. (The child is given the other end of the + #: Pipe.) + self.pipes = dict() + + #: A queue that keeps track of which children are available to + #: render a configuration. A child is popped from the queue + #: when it starts to render a config, then it's pushed back on + #: when it's done. This lets us use a blocking call to + #: :func:`Queue.Queue.get` when waiting for an available + #: child. + self.available_children = \ + Queue(maxsize=Bcfg2.Options.setup.core_children) #: The flag that indicates when to stop child threads and #: processes @@ -334,8 +326,6 @@ class Core(BuiltinCore): #: used to send or publish commands to children. self.rpc_q = RPCQueue() - self.metadata_cache = DispatchingCache(queue=self.rpc_q) - #: A list of children that will be cycled through self._all_children = [] @@ -343,13 +333,22 @@ class Core(BuiltinCore): #: to provide a round-robin distribution of render requests self.children = None + def __str__(self): + if hasattr(Bcfg2.Options.setup, "location"): + return "%s(%s; %s children)" % (self.__class__.__name__, + Bcfg2.Options.setup.location, + len(self._all_children)) + else: + return "%s(%s children)" % (self.__class__.__name__, + len(self._all_children)) + def _run(self): - for cnum in range(self.setup['children']): + for cnum in range(Bcfg2.Options.setup.core_children): name = "Child-%s" % cnum self.logger.debug("Starting child %s" % name) child_q = self.rpc_q.add_subscriber(name) - childcore = ChildCore(name, self.setup, child_q, self.terminate) + childcore = ChildCore(name, child_q, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, @@ -358,6 +357,7 @@ class Core(BuiltinCore): self.logger.debug("Started %s children: %s" % (len(self._all_children), self._all_children)) self.children = cycle(self._all_children) + Bcfg2.Server.Cache.add_expire_hook(self.cache_dispatch) return BuiltinCore._run(self) def shutdown(self): @@ -430,16 +430,11 @@ class Core(BuiltinCore): def set_debug(self, address, debug): self.rpc_q.set_debug(debug) self.rpc_q.publish("set_debug", args=[address, debug]) - self.metadata_cache.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) - @exposed - def RecvProbeData(self, address, probedata): - rv = BuiltinCore.RecvProbeData(self, address, probedata) - # we don't want the children to actually process probe data, - # so we don't send the data, just the fact that we got some. - self.rpc_q.publish("RecvProbeData", args=[address, None]) - return rv + def cache_dispatch(self, tags, exact, _): + """ Publish cache expiration events to child nodes. """ + self.rpc_q.publish("expire_cache", args=tags, kwargs=dict(exact=exact)) @exposed def GetConfig(self, address): |