summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-08 08:10:16 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-08 08:10:16 -0400
commite1e194a573b3803fa7f45a646bbb36b2f164a3e1 (patch)
treee9b689d1be39d38279e0a16f010e8d5e573612ef /src/lib/Bcfg2/Server/MultiprocessingCore.py
parent35851347089db1a092ec715cb183aec19f19e983 (diff)
parenteef441c1acdf1d3d483647b153f721cbab4a8517 (diff)
downloadbcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.tar.gz
bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.tar.bz2
bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.zip
Merge branch 'maint'
Conflicts: doc/appendix/files/mysql.txt doc/getting_started/index.txt doc/server/plugins/structures/bundler/kernel.txt src/lib/Bcfg2/Server/MultiprocessingCore.py src/lib/Bcfg2/Server/Plugin/interfaces.py src/lib/Bcfg2/Server/Plugins/Packages/Yum.py src/lib/Bcfg2/Server/Plugins/Probes.py src/lib/Bcfg2/Server/Plugins/SSHbase.py
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py499
1 files changed, 349 insertions, 150 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index b9716619d..e79207291 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -3,66 +3,134 @@
:mod:`multiprocessing` library to offload work to multiple child
processes. As such, it requires Python 2.6+.
-The parent communicates with the children over two constructs:
-
-* A :class:`multiprocessing.Pipe` is used to process render requests.
- The pipe is locked when in use (i.e., between the time that a client
- is submitted to be rendered and the time that its configuration is
- returned) to keep things thread-safe. (This is accomplished through
- the use of
- :attr:`Bcfg2.Server.MultiprocessingCore.available_children.)
-* A :class:`multiprocessing.Queue` is used to submit other commands in
- a thread-safe, non-blocking fashion. (Note that, since it is a
- queue, no results can be returned.) It implements a very simple RPC
- protocol. Each command passed to a child over the Pipe must be a
- tuple with the format::
-
- (<method>, <args>, <kwargs>)
-
- The method must be exposed by the child by decorating it with
- :func:`Bcfg2.Server.Core.exposed`.
+The parent communicates with the children over
+:class:`multiprocessing.Queue` objects via a
+:class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object.
+
+A method being called via the RPCQueue must be exposed by the child by
+decorating it with :func:`Bcfg2.Server.Core.exposed`.
"""
+import time
import threading
import lxml.etree
import multiprocessing
-from Bcfg2.Compat import Queue
-from Bcfg2.Server.Cache import Cache
+import Bcfg2.Server.Plugin
+from itertools import cycle
+from Bcfg2.Cache import Cache
+from Bcfg2.Compat import Queue, Empty, wraps
from Bcfg2.Server.Core import BaseCore, exposed
-from Bcfg2.Server.Plugin import Debuggable
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
+from multiprocessing.connection import Listener, Client
-class DispatchingCache(Cache, Debuggable):
+class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable):
""" Implementation of :class:`Bcfg2.Cache.Cache` that propagates
cache expiration events to child nodes. """
#: The method to send over the pipe to expire the cache
- method = "expire_cache"
+ method = "expire_metadata_cache"
def __init__(self, *args, **kwargs):
- #: A dict of <child name>: :class:`multiprocessing.Queue`
- #: objects that should be given a cache expiration command any
- #: time an item is expired.
- self.command_queues = kwargs.pop("pipes", dict())
-
- Debuggable.__init__(self)
+ self.rpc_q = kwargs.pop("queue")
+ Bcfg2.Server.Plugin.Debuggable.__init__(self)
Cache.__init__(self, *args, **kwargs)
def expire(self, key=None):
- if (key and key in self) or (not key and len(self)):
- # dispatching cache expiration to children can be
- # expensive, so only do it if there's something to expire
- for child, cmd_q in self.command_queues.items():
- if key:
- self.logger.debug("Expiring metadata cache for %s on %s" %
- (key, child))
- else:
- self.logger.debug("Expiring metadata cache on %s" % child)
- cmd_q.put((self.method, [key], dict()))
+ self.rpc_q.publish(self.method, args=[key])
Cache.expire(self, key=key)
+class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
+ """ An implementation of a :class:`multiprocessing.Queue` designed
+ for several additional use patterns:
+
+ * Random-access reads, based on a key that identifies the data;
+ * Publish-subscribe, where a datum is sent to all hosts.
+
+ The subscribers can deal with this as a normal Queue with no
+ special handling.
+ """
+ poll_wait = 3.0
+
+ def __init__(self):
+ Bcfg2.Server.Plugin.Debuggable.__init__(self)
+ self._terminate = threading.Event()
+ self._queues = dict()
+ self._available_listeners = Queue()
+ self._blocking_listeners = []
+
+ def add_subscriber(self, name):
+ """ Add a subscriber to the queue. This returns the
+ :class:`multiprocessing.Queue` object that the subscriber
+ should read from. """
+ self._queues[name] = multiprocessing.Queue()
+ return self._queues[name]
+
+ def publish(self, method, args=None, kwargs=None):
+ """ Publish an RPC call to the queue for consumption by all
+ subscribers. """
+ for queue in self._queues.values():
+ queue.put((None, (method, args or [], kwargs or dict())))
+
+ def rpc(self, dest, method, args=None, kwargs=None):
+ """ Make an RPC call to the named subscriber, expecting a
+ response. This opens a
+ :class:`multiprocessing.connection.Listener` and passes the
+ Listener address to the child as part of the RPC call, so that
+ the child can connect to the Listener to submit its results.
+
+ Listeners are reused when possible to minimize overhead.
+ """
+ try:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Reusing existing RPC listener at %s" %
+ listener.address)
+ except Empty:
+ listener = Listener()
+ self.logger.debug("Created new RPC listener at %s" %
+ listener.address)
+ self._blocking_listeners.append(listener)
+ try:
+ self._queues[dest].put((listener.address,
+ (method, args or [], kwargs or dict())))
+ conn = listener.accept()
+ self._blocking_listeners.remove(listener)
+ try:
+ while not self._terminate.is_set():
+ if conn.poll(self.poll_wait):
+ return conn.recv()
+ finally:
+ conn.close()
+ finally:
+ self._available_listeners.put(listener)
+
+ def close(self):
+ """ Close queues and connections. """
+ self._terminate.set()
+ self.logger.debug("Closing RPC queues")
+ for name, queue in self._queues.items():
+ self.logger.debug("Closing RPC queue to %s" % name)
+ queue.close()
+
+ # close any listeners that are waiting for connections
+ self.logger.debug("Closing RPC connections")
+ for listener in self._blocking_listeners:
+ self.logger.debug("Closing RPC connection at %s" %
+ listener.address)
+ listener.close()
+
+ self.logger.debug("Closing RPC listeners")
+ try:
+ while True:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Closing RPC listener at %s" %
+ listener.address)
+ listener.close()
+ except Empty:
+ pass
+
+
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that
internally implements both :class:`threading.Event` and
@@ -111,101 +179,154 @@ class ChildCore(BaseCore):
those, though, if the pipe communication "protocol" were made more
robust. """
- #: How long to wait while polling for new clients to build. This
- #: doesn't affect the speed with which a client is built, but
+ #: How long to wait while polling for new RPC commands. This
+ #: doesn't affect the speed with which a command is processed, but
#: setting it too high will result in longer shutdown times, since
#: we only check for the termination event from the main process
#: every ``poll_wait`` seconds.
- poll_wait = 5.0
+ poll_wait = 3.0
- def __init__(self, setup, render_pipe, command_queue, terminate):
+ def __init__(self, name, setup, rpc_q, terminate):
"""
+ :param name: The name of this child
+ :type name: string
:param setup: A Bcfg2 options dict
:type setup: Bcfg2.Options.OptionParser
- :param pipe: The pipe to which client hostnames are added for
- ChildCore objects to build configurations, and to
- which client configurations are added after
- having been built by ChildCore objects.
- :type pipe: multiprocessing.Pipe
+ :param read_q: The queue the child will read from for RPC
+ communications from the parent process.
+ :type read_q: multiprocessing.Queue
+ :param write_q: The queue the child will write the results of
+ RPC calls to.
+ :type write_q: multiprocessing.Queue
:param terminate: An event that flags ChildCore objects to shut
themselves down.
:type terminate: multiprocessing.Event
"""
BaseCore.__init__(self, setup)
- #: The pipe to which client hostnames are added for ChildCore
- #: objects to build configurations, and to which client
- #: configurations are added after having been built by
- #: ChildCore objects.
- self.render_pipe = render_pipe
-
- #: The queue from which other commands are received
- self.command_queue = command_queue
+ #: The name of this child
+ self.name = name
#: The :class:`multiprocessing.Event` that will be monitored
#: to determine when this child should shut down.
self.terminate = terminate
- #: The :class:`threading.Thread` used to process commands
- #: received via the :class:`multiprocessing.Queue` RPC
- #: interface
- self.command_thread = \
- threading.Thread(name="CommandThread",
- target=self._command_queue_thread)
+ #: The queue used for RPC communication
+ self.rpc_q = rpc_q
- def _daemonize(self):
- return True
+ # override this setting so that the child doesn't try to write
+ # the pidfile
+ self.setup['daemon'] = False
+
+ # ensure that the child doesn't start a perflog thread
+ self.perflog_thread = None
+
+ self._rmi = dict()
def _run(self):
- try:
- self.command_thread.start()
- except:
- self.shutdown()
- raise
return True
- def render(self):
- """ Process client configuration render requests """
- if self.render_pipe.poll(self.poll_wait):
- if not self.metadata.use_database:
- # handle FAM events, in case (for instance) the
- # client has just been added to clients.xml, or a
- # profile has just been asserted. but really, you
- # should be using the metadata database if you're
- # using this core.
- self.fam.handle_events_in_interval(0.1)
- client = self.render_pipe.recv()
- self.logger.debug("Building configuration for %s" % client)
- self.render_pipe.send(
- lxml.etree.tostring(self.BuildConfiguration(client)))
+ def _daemonize(self):
+ return True
+
+ def _dispatch(self, address, data):
+ """ Method dispatcher used for commands received from
+ the RPC queue. """
+ if address is not None:
+ # if the key is None, then no response is expected. we
+ # make the return connection before dispatching the actual
+ # RPC call so that the parent is blocking for a connection
+ # as briefly as possible
+ self.logger.debug("Connecting to parent via %s" % address)
+ client = Client(address)
+ method, args, kwargs = data
+ func = None
+ rv = None
+ if "." in method:
+ if method in self._rmi:
+ func = self._rmi[method]
+ else:
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ elif not hasattr(self, method):
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ else: # method is not a plugin RMI, and exists
+ func = getattr(self, method)
+ if not func.exposed:
+ self.logger.error("%s: Method %s is not exposed" % (self.name,
+ method))
+ func = None
+ if func is not None:
+ self.logger.debug("%s: Calling RPC method %s" % (self.name,
+ method))
+ rv = func(*args, **kwargs)
+ if address is not None:
+ # if the key is None, then no response is expected
+ self.logger.debug("Returning data to parent via %s" % address)
+ client.send(rv)
def _block(self):
- while not self.terminate.isSet():
+ self._rmi = self._get_rmi()
+ while not self.terminate.is_set():
try:
- self.render()
+ address, data = self.rpc_q.get(timeout=self.poll_wait)
+ threadname = "-".join(str(i) for i in data)
+ rpc_thread = threading.Thread(name=threadname,
+ target=self._dispatch,
+ args=[address, data])
+ rpc_thread.start()
+ except Empty:
+ pass
except KeyboardInterrupt:
break
self.shutdown()
- def _command_queue_thread(self):
- """ Process commands received on the command queue thread """
- while not self.terminate.isSet():
- method, args, kwargs = self.command_queue.get()
- if hasattr(self, method):
- func = getattr(self, method)
- if func.exposed:
- self.logger.debug("Child calling RPC method %s" % method)
- func(*args, **kwargs)
+ def shutdown(self):
+ BaseCore.shutdown(self)
+ self.logger.info("%s: Closing RPC command queue" % self.name)
+ self.rpc_q.close()
+
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("%s: Waiting for %d thread(s): %s" %
+ (self.name, len(threads),
+ [t.name for t in threads]))
+ time.sleep(1)
+ self.logger.info("%s: All threads stopped" % self.name)
+
+ def _get_rmi(self):
+ rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ mname = crmi[1]
else:
- self.logger.error("Method %s is not exposed" % method)
- else:
- self.logger.error("Method %s does not exist" % method)
+ mname = crmi
+ rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
+ return rmi
@exposed
- def expire_cache(self, client=None):
+ def expire_metadata_cache(self, client=None):
""" Expire the metadata cache for a client """
self.metadata_cache.expire(client)
+ @exposed
+ def RecvProbeData(self, address, _):
+ """ Expire the probe cache for a client """
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing,
+ key=self.resolve_client(address,
+ metadata=False)[0])
+
+ @exposed
+ def GetConfig(self, client):
+ """ Render the configuration for a client """
+ self.logger.debug("%s: Building configuration for %s" %
+ (self.name, client))
+ return lxml.etree.tostring(self.BuildConfiguration(client))
+
class Core(BuiltinCore):
""" A multiprocessing core that delegates building the actual
@@ -224,84 +345,162 @@ class Core(BuiltinCore):
if setup['children'] is None:
setup['children'] = multiprocessing.cpu_count()
- #: A dict of child name -> one end of the
- #: :class:`multiprocessing.Pipe` object used to submit render
- #: requests to that child. (The child is given the other end
- #: of the Pipe.)
- self.render_pipes = dict()
-
- #: A dict of child name -> :class:`multiprocessing.Queue`
- #: object used to pass commands to that child.
- self.command_queues = dict()
-
- #: A queue that keeps track of which children are available to
- #: render a configuration. A child is popped from the queue
- #: when it starts to render a config, then it's pushed back on
- #: when it's done. This lets us use a blocking call to
- #: :func:`Queue.Queue.get` when waiting for an available
- #: child.
- self.available_children = Queue(maxsize=self.setup['children'])
-
- # sigh. multiprocessing was added in py2.6, which is when the
- # camelCase methods for threading objects were deprecated in
- # favor of the Pythonic under_score methods. So
- # multiprocessing.Event *only* has is_set(), while
- # threading.Event has *both* isSet() and is_set(). In order
- # to make the core work with Python 2.4+, and with both
- # multiprocessing and threading Event objects, we just
- # monkeypatch self.terminate to have isSet().
+ #: The flag that indicates when to stop child threads and
+ #: processes
self.terminate = DualEvent(threading_event=self.terminate)
- self.metadata_cache = DispatchingCache()
+ #: A :class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object
+ #: used to send or publish commands to children.
+ self.rpc_q = RPCQueue()
+
+ self.metadata_cache = DispatchingCache(queue=self.rpc_q)
+
+ #: A list of children that will be cycled through
+ self._all_children = []
+
+ #: An iterator that each child will be taken from in sequence,
+ #: to provide a round-robin distribution of render requests
+ self.children = None
def _run(self):
for cnum in range(self.setup['children']):
name = "Child-%s" % cnum
- # create Pipe for render requests and results
- (mainpipe, childpipe) = multiprocessing.Pipe()
- self.render_pipes[name] = mainpipe
-
- # create Queue for other commands
- cmd_q = multiprocessing.Queue()
- self.command_queues[name] = cmd_q
- self.metadata_cache.command_queues[name] = cmd_q
-
self.logger.debug("Starting child %s" % name)
- childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate)
+ child_q = self.rpc_q.add_subscriber(name)
+ childcore = ChildCore(name, self.setup, child_q, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
child.start()
self.logger.debug("Child %s started with PID %s" % (name,
child.pid))
- self.available_children.put(name)
+ self._all_children.append(name)
+ self.logger.debug("Started %s children: %s" % (len(self._all_children),
+ self._all_children))
+ self.children = cycle(self._all_children)
return BuiltinCore._run(self)
def shutdown(self):
BuiltinCore.shutdown(self)
- for child in multiprocessing.active_children():
- self.logger.debug("Shutting down child %s" % child.name)
- child.join(self.shutdown_timeout)
- if child.is_alive():
+ self.logger.info("Closing RPC command queues")
+ self.rpc_q.close()
+
+ def term_children():
+ """ Terminate all remaining multiprocessing children. """
+ for child in multiprocessing.active_children():
self.logger.error("Waited %s seconds to shut down %s, "
"terminating" % (self.shutdown_timeout,
child.name))
child.terminate()
- else:
- self.logger.debug("Child %s shut down" % child.name)
- self.logger.debug("All children shut down")
+
+ timer = threading.Timer(self.shutdown_timeout, term_children)
+ timer.start()
+ while len(multiprocessing.active_children()):
+ self.logger.info("Waiting for %s child(ren): %s" %
+ (len(multiprocessing.active_children()),
+ [c.name
+ for c in multiprocessing.active_children()]))
+ time.sleep(1)
+ timer.cancel()
+ self.logger.info("All children shut down")
+
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("Waiting for %s thread(s): %s" %
+ (len(threads), [t.name for t in threads]))
+ time.sleep(1)
+ self.logger.info("Shutdown complete")
+
+ def _get_rmi(self):
+ child_rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ parentname, childname = crmi
+ else:
+ parentname = childname = crmi
+ child_rmi["%s.%s" % (pname, parentname)] = \
+ "%s.%s" % (pname, childname)
+
+ rmi = BuiltinCore._get_rmi(self)
+ for method in rmi.keys():
+ if method in child_rmi:
+ rmi[method] = self._child_rmi_wrapper(method,
+ rmi[method],
+ child_rmi[method])
+ return rmi
+
+ def _child_rmi_wrapper(self, method, parent_rmi, child_rmi):
+ """ Returns a callable that dispatches a call to the given
+ child RMI to child processes, and calls the parent RMI locally
+ (i.e., in the parent process). """
+ @wraps(parent_rmi)
+ def inner(*args, **kwargs):
+ self.logger.debug("Dispatching RMI call to %s to children: %s" %
+ (method, child_rmi))
+ self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs)
+ return parent_rmi(*args, **kwargs)
+
+ return inner
@exposed
def set_debug(self, address, debug):
+ self.rpc_q.set_debug(debug)
+ self.rpc_q.publish("set_debug", args=[address, debug])
self.metadata_cache.set_debug(debug)
return BuiltinCore.set_debug(self, address, debug)
@exposed
+ def RecvProbeData(self, address, probedata):
+ rv = BuiltinCore.RecvProbeData(self, address, probedata)
+ # we don't want the children to actually process probe data,
+ # so we don't send the data, just the fact that we got some.
+ self.rpc_q.publish("RecvProbeData", args=[address, None])
+ return rv
+
+ @exposed
def GetConfig(self, address):
client = self.resolve_client(address)[0]
- childname = self.available_children.get()
- self.logger.debug("Building configuration on child %s" % childname)
- pipe = self.render_pipes[childname]
- pipe.send(client)
- config = pipe.recv()
- self.available_children.put_nowait(childname)
- return config
+ childname = self.children.next()
+ self.logger.debug("Building configuration for %s on %s" % (client,
+ childname))
+ return self.rpc_q.rpc(childname, "GetConfig", args=[client])
+
+ @exposed
+ def get_statistics(self, address):
+ stats = dict()
+
+ def _aggregate_statistics(newstats, prefix=None):
+ """ Aggregate a set of statistics from a child or parent
+ server core. This adds the statistics to the overall
+ statistics dict (optionally prepending a prefix, such as
+ "Child-1", to uniquely identify this set of statistics),
+ and aggregates it with the set of running totals that are
+ kept from all cores. """
+ for statname, vals in newstats.items():
+ if statname.startswith("ChildCore:"):
+ statname = statname[5:]
+ if prefix:
+ prettyname = "%s:%s" % (prefix, statname)
+ else:
+ prettyname = statname
+ stats[prettyname] = vals
+ totalname = "Total:%s" % statname
+ if totalname not in stats:
+ stats[totalname] = vals
+ else:
+ newmin = min(stats[totalname][0], vals[0])
+ newmax = max(stats[totalname][1], vals[1])
+ newcount = stats[totalname][3] + vals[3]
+ newmean = ((stats[totalname][2] * stats[totalname][3]) +
+ (vals[2] * vals[3])) / newcount
+ stats[totalname] = (newmin, newmax, newmean, newcount)
+
+ stats = dict()
+ for childname in self._all_children:
+ _aggregate_statistics(
+ self.rpc_q.rpc(childname, "get_statistics", args=[address]),
+ prefix=childname)
+ _aggregate_statistics(BuiltinCore.get_statistics(self, address))
+ return stats