summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py114
1 files changed, 53 insertions, 61 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 4986aac60..724b34d8d 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -15,32 +15,16 @@ import time
import threading
import lxml.etree
import multiprocessing
+import Bcfg2.Options
+import Bcfg2.Server.Cache
import Bcfg2.Server.Plugin
from itertools import cycle
-from Bcfg2.Cache import Cache
-from Bcfg2.Compat import Empty, wraps
-from Bcfg2.Server.Core import BaseCore, exposed
-from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
+from Bcfg2.Compat import Queue, Empty, wraps
+from Bcfg2.Server.Core import Core, exposed
+from Bcfg2.Server.BuiltinCore import BuiltinCore
from multiprocessing.connection import Listener, Client
-class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable):
- """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates
- cache expiration events to child nodes. """
-
- #: The method to send over the pipe to expire the cache
- method = "expire_metadata_cache"
-
- def __init__(self, *args, **kwargs):
- self.rpc_q = kwargs.pop("queue")
- Bcfg2.Server.Plugin.Debuggable.__init__(self)
- Cache.__init__(self, *args, **kwargs)
-
- def expire(self, key=None):
- self.rpc_q.publish(self.method, args=[key])
- Cache.expire(self, key=key)
-
-
class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
""" An implementation of a :class:`multiprocessing.Queue` designed
for several additional use patterns:
@@ -148,7 +132,7 @@ class DualEvent(object):
return self._threading_event.wait(timeout=timeout)
-class ChildCore(BaseCore):
+class ChildCore(Core):
""" A child process for :class:`Bcfg2.MultiprocessingCore.Core`.
This core builds configurations from a given
:class:`multiprocessing.Pipe`. Note that this is a full-fledged
@@ -167,12 +151,10 @@ class ChildCore(BaseCore):
#: every ``poll_wait`` seconds.
poll_wait = 3.0
- def __init__(self, name, setup, rpc_q, terminate):
+ def __init__(self, name, rpc_q, terminate):
"""
:param name: The name of this child
:type name: string
- :param setup: A Bcfg2 options dict
- :type setup: Bcfg2.Options.OptionParser
:param read_q: The queue the child will read from for RPC
communications from the parent process.
:type read_q: multiprocessing.Queue
@@ -183,7 +165,7 @@ class ChildCore(BaseCore):
themselves down.
:type terminate: multiprocessing.Event
"""
- BaseCore.__init__(self, setup)
+ Core.__init__(self)
#: The name of this child
self.name = name
@@ -197,7 +179,7 @@ class ChildCore(BaseCore):
# override this setting so that the child doesn't try to write
# the pidfile
- self.setup['daemon'] = False
+ Bcfg2.Options.setup.daemon = False
# ensure that the child doesn't start a perflog thread
self.perflog_thread = None
@@ -207,12 +189,6 @@ class ChildCore(BaseCore):
def _run(self):
return True
- def _daemonize(self):
- return True
-
- def _drop_privileges(self):
- pass
-
def _dispatch(self, address, data):
""" Method dispatcher used for commands received from
the RPC queue. """
@@ -267,7 +243,7 @@ class ChildCore(BaseCore):
self.shutdown()
def shutdown(self):
- BaseCore.shutdown(self)
+ Core.shutdown(self)
self.logger.info("%s: Closing RPC command queue" % self.name)
self.rpc_q.close()
@@ -292,16 +268,9 @@ class ChildCore(BaseCore):
return rmi
@exposed
- def expire_metadata_cache(self, client=None):
- """ Expire the metadata cache for a client """
- self.metadata_cache.expire(client)
-
- @exposed
- def RecvProbeData(self, address, _):
- """ Expire the probe cache for a client """
- self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing,
- key=self.resolve_client(address,
- metadata=False)[0])
+ def expire_cache(self, *tags, **kwargs):
+ """ Expire cached data """
+ Bcfg2.Server.Cache.expire(*tags, exact=kwargs.pop("exact", False))
@exposed
def GetConfig(self, client):
@@ -312,7 +281,7 @@ class ChildCore(BaseCore):
return lxml.etree.tostring(self.BuildConfiguration(client))
-class Core(BuiltinCore):
+class MultiprocessingCore(BuiltinCore):
""" A multiprocessing core that delegates building the actual
client configurations to
:class:`Bcfg2.Server.MultiprocessingCore.ChildCore` objects. The
@@ -320,14 +289,34 @@ class Core(BuiltinCore):
:func:`GetConfig` are delegated to children. All other calls are
handled by the parent process. """
+ options = BuiltinCore.options + [
+ Bcfg2.Options.Option(
+ '--children', dest="core_children",
+ cf=('server', 'children'), type=int,
+ default=multiprocessing.cpu_count(),
+ help='Spawn this number of children for the multiprocessing core')]
+
#: How long to wait for a child process to shut down cleanly
#: before it is terminated.
shutdown_timeout = 10.0
- def __init__(self, setup):
- BuiltinCore.__init__(self, setup)
- if setup['children'] is None:
- setup['children'] = multiprocessing.cpu_count()
+ def __init__(self):
+ BuiltinCore.__init__(self)
+
+ #: A dict of child name -> one end of the
+ #: :class:`multiprocessing.Pipe` object used to communicate
+ #: with that child. (The child is given the other end of the
+ #: Pipe.)
+ self.pipes = dict()
+
+ #: A queue that keeps track of which children are available to
+ #: render a configuration. A child is popped from the queue
+ #: when it starts to render a config, then it's pushed back on
+ #: when it's done. This lets us use a blocking call to
+ #: :func:`Queue.Queue.get` when waiting for an available
+ #: child.
+ self.available_children = \
+ Queue(maxsize=Bcfg2.Options.setup.core_children)
#: The flag that indicates when to stop child threads and
#: processes
@@ -337,8 +326,6 @@ class Core(BuiltinCore):
#: used to send or publish commands to children.
self.rpc_q = RPCQueue()
- self.metadata_cache = DispatchingCache(queue=self.rpc_q)
-
#: A list of children that will be cycled through
self._all_children = []
@@ -346,13 +333,22 @@ class Core(BuiltinCore):
#: to provide a round-robin distribution of render requests
self.children = None
+ def __str__(self):
+ if hasattr(Bcfg2.Options.setup, "location"):
+ return "%s(%s; %s children)" % (self.__class__.__name__,
+ Bcfg2.Options.setup.location,
+ len(self._all_children))
+ else:
+ return "%s(%s children)" % (self.__class__.__name__,
+ len(self._all_children))
+
def _run(self):
- for cnum in range(self.setup['children']):
+ for cnum in range(Bcfg2.Options.setup.core_children):
name = "Child-%s" % cnum
self.logger.debug("Starting child %s" % name)
child_q = self.rpc_q.add_subscriber(name)
- childcore = ChildCore(name, self.setup, child_q, self.terminate)
+ childcore = ChildCore(name, child_q, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
child.start()
self.logger.debug("Child %s started with PID %s" % (name,
@@ -361,6 +357,7 @@ class Core(BuiltinCore):
self.logger.debug("Started %s children: %s" % (len(self._all_children),
self._all_children))
self.children = cycle(self._all_children)
+ Bcfg2.Server.Cache.add_expire_hook(self.cache_dispatch)
return BuiltinCore._run(self)
def shutdown(self):
@@ -433,16 +430,11 @@ class Core(BuiltinCore):
def set_debug(self, address, debug):
self.rpc_q.set_debug(debug)
self.rpc_q.publish("set_debug", args=[address, debug])
- self.metadata_cache.set_debug(debug)
return BuiltinCore.set_debug(self, address, debug)
- @exposed
- def RecvProbeData(self, address, probedata):
- rv = BuiltinCore.RecvProbeData(self, address, probedata)
- # we don't want the children to actually process probe data,
- # so we don't send the data, just the fact that we got some.
- self.rpc_q.publish("RecvProbeData", args=[address, None])
- return rv
+ def cache_dispatch(self, tags, exact, _):
+ """ Publish cache expiration events to child nodes. """
+ self.rpc_q.publish("expire_cache", args=tags, kwargs=dict(exact=exact))
@exposed
def GetConfig(self, address):