summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-05 13:53:20 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-05 14:51:45 -0400
commitc8d71e18c16039593b309bc35e4ceffc50a0107d (patch)
tree85601d1f0e0c3f61342d5c2787c67873e835a385 /src/lib/Bcfg2/Server/MultiprocessingCore.py
parente1f045ff3c56b09ff06e11e6d4f9677bf63d051f (diff)
downloadbcfg2-c8d71e18c16039593b309bc35e4ceffc50a0107d.tar.gz
bcfg2-c8d71e18c16039593b309bc35e4ceffc50a0107d.tar.bz2
bcfg2-c8d71e18c16039593b309bc35e4ceffc50a0107d.zip
MultiprocessingCore: greatly simplified parent-child RPC, removed non-thread-safe bits
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py407
1 files changed, 145 insertions, 262 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 775131188..4c304d28c 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -4,33 +4,24 @@
processes. As such, it requires Python 2.6+.
The parent communicates with the children over
-:class:`multiprocessing.Pipe` objects that are wrapped in a
-:class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` to
-make them thread-safe. Each command passed over the Pipe should be in
-the following format::
+:class:`multiprocessing.Queue` objects via a
+:class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object.
- (<method>, <args>, <kwargs>)
-
-The parent can also communicate with children over a one-way
-:class:`multiprocessing.Queue` object that is used for
-publish-subscribe communications, i.e., most XML-RPC commands.
-(Setting debug, e.g., doesn't require a response from the children.)
-
-The method must be exposed by the child by decorating it with
-:func:`Bcfg2.Server.Core.exposed`.
+A method being called via the RPCQueue must be exposed by the child by
+decorating it with :func:`Bcfg2.Server.Core.exposed`.
"""
import time
import threading
import lxml.etree
import multiprocessing
-from uuid import uuid4
from itertools import cycle
from Bcfg2.Cache import Cache
from Bcfg2.Compat import Queue, Empty
+from Bcfg2.Server.Plugin import Debuggable
from Bcfg2.Server.Core import BaseCore, exposed
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
-from Bcfg2.Server.Plugin import Debuggable, track_statistics
+from multiprocessing.connection import Listener, Client
class DispatchingCache(Cache, Debuggable):
@@ -41,166 +32,103 @@ class DispatchingCache(Cache, Debuggable):
method = "expire_cache"
def __init__(self, *args, **kwargs):
- self.cmd_q = kwargs.pop("queue")
+ self.rpc_q = kwargs.pop("queue")
Debuggable.__init__(self)
Cache.__init__(self, *args, **kwargs)
def expire(self, key=None):
- self.cmd_q.put((self.method, [key], dict()))
+ self.rpc_q.publish(self.method, args=[key])
Cache.expire(self, key=key)
-class PublishSubscribeQueue(object):
+class RPCQueue(Debuggable):
""" An implementation of a :class:`multiprocessing.Queue` designed
- for publish-subscribe use patterns. I.e., a single node adds items
- to the queue, and every other node retrieves the item. This is
- the 'publish' end; the subscribers can deal with this as a normal
- Queue with no special handling.
+ for several additional use patterns:
- Note that, since this is the publishing end, there's no support
- for getting.
+ * Random-access reads, based on a key that identifies the data;
+ * Publish-subscribe, where a datum is sent to all hosts.
+
+ The subscribers can deal with this as a normal Queue with no
+ special handling.
"""
+ poll_wait = 3.0
def __init__(self):
- self._queues = []
-
- def add_subscriber(self):
- """ Add a subscriber to the queue. This returns a
- :class:`multiprocessing.Queue` object that is used as the
- subscription end of the queue. """
- new_q = multiprocessing.Queue()
- self._queues.append(new_q)
- return new_q
-
- def put(self, obj, block=True, timeout=None):
- """ Put ``obj`` into the queue. See
- :func:`multiprocessing.Queue.put` for more details."""
- for queue in self._queues:
- queue.put(obj, block=block, timeout=timeout)
-
- def put_nowait(self, obj):
- """ Equivalent to ``put(obj, False)``. """
- self.put(obj, block=False)
+ Debuggable.__init__(self)
+ self._terminate = threading.Event()
+ self._queues = dict()
+ self._available_listeners = Queue()
+ self._blocking_listeners = []
+
+ def add_subscriber(self, name):
+ """ Add a subscriber to the queue. This returns the
+ :class:`multiprocessing.Queue` object that the subscriber
+ should read from. """
+ self._queues[name] = multiprocessing.Queue()
+ return self._queues[name]
+
+ def publish(self, method, args=None, kwargs=None):
+ """ Publish an RPC call to the queue for consumption by all
+ subscribers. """
+ for queue in self._queues.values():
+ queue.put((None, (method, args or [], kwargs or dict())))
+
+ def rpc(self, dest, method, args=None, kwargs=None):
+ """ Make an RPC call to the named subscriber, expecting a
+ response. This opens a
+ :class:`multiprocessing.connection.Listener` and passes the
+ Listener address to the child as part of the RPC call, so that
+ the child can connect to the Listener to submit its results.
+
+ Listeners are reused when possible to minimize overhead.
+ """
+ try:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Reusing existing RPC listener at %s" %
+ listener.address)
+ except Empty:
+ listener = Listener()
+ self.logger.debug("Created new RPC listener at %s" %
+ listener.address)
+ self._blocking_listeners.append(listener)
+ try:
+ self._queues[dest].put((listener.address,
+ (method, args or [], kwargs or dict())))
+ conn = listener.accept()
+ self._blocking_listeners.remove(listener)
+ try:
+ while not self._terminate.is_set():
+ if conn.poll(self.poll_wait):
+ return conn.recv()
+ finally:
+ conn.close()
+ finally:
+ self._available_listeners.put(listener)
def close(self):
- """ Close the queue. See :func:`multiprocessing.Queue.close`
- for more details. """
- for queue in self._queues:
+ """ Close queues and connections. """
+ self._terminate.set()
+ self.logger.debug("Closing RPC queues")
+ for name, queue in self._queues.items():
+ self.logger.debug("Closing RPC queue to %s" % name)
queue.close()
+ # close any listeners that are waiting for connections
+ self.logger.debug("Closing RPC connections")
+ for listener in self._blocking_listeners:
+ self.logger.debug("Closing RPC connection at %s" %
+ listener.address)
+ listener.close()
-class ThreadSafePipeDispatcher(Debuggable):
- """ This is a wrapper around :class:`multiprocessing.Pipe` objects
- that allows them to be used in multithreaded applications. When
- performing a ``send()``, a key is included that will be used to
- identify the response. As responses are received from the Pipe,
- they are added to a dict that is used to get the appropriate
- response for a given thread.
-
- The remote end of the Pipe must deal with the key being sent with
- the data in a tuple of ``(key, data)``, and it must include the
- key with its response.
-
- It is the responsibility of the user to ensure that the key is
- unique.
-
- Note that this adds a bottleneck -- all communication over the
- actual Pipe happens in a single thread. But for our purposes,
- Pipe communication is fairly minimal and that's an acceptable
- bottleneck."""
-
- #: How long to wait while polling for new data to send. This
- #: doesn't affect the speed with which data is sent, but
- #: setting it too high will result in longer shutdown times, since
- #: we only check for the termination event from the main process
- #: every ``poll_wait`` seconds.
- poll_wait = 2.0
-
- _sentinel = object()
-
- def __init__(self, terminate):
- Debuggable.__init__(self)
-
- #: The threading flag that is used to determine when the
- #: threads should stop.
- self.terminate = terminate
-
- #: The :class:`multiprocessing.Pipe` tuple used by this object
- self.pipe = multiprocessing.Pipe()
-
- self._mainpipe = self.pipe[0]
- self._recv_dict = dict()
- self._send_queue = Queue()
-
- self.send_thread = threading.Thread(name="PipeSendThread",
- target=self._send_thread)
- self.send_thread.start()
- self.recv_thread = threading.Thread(name="PipeRecvThread",
- target=self._recv_thread)
- self.recv_thread.start()
-
- def _send_thread(self):
- """ Run the single thread through which send requests are passed """
- self.logger.debug("Starting interprocess RPC send thread")
- while not self.terminate.isSet():
- try:
- self._mainpipe.send(self._send_queue.get(True, self.poll_wait))
- except Empty:
- pass
- self.logger.info("Interprocess RPC send thread stopped")
-
- def send(self, key, data):
- """ Send data with the given unique key """
- self._send_queue.put((key, data))
-
- def _recv_thread(self):
- """ Run the single thread through which recv requests are passed """
- self.logger.debug("Starting interprocess RPC receive thread")
- while not self.terminate.isSet():
- if self._mainpipe.poll(self.poll_wait):
- key, data = self._mainpipe.recv()
- if key in self._recv_dict:
- self.logger.error("Duplicate key in received data: %s" %
- key)
- self._mainpipe.close()
- self._recv_dict[key] = data
- self.logger.info("Interprocess RPC receive thread stopped")
-
- def recv(self, key):
- """ Receive data with the given unique key """
- self.poll(key, timeout=None)
- return self._recv_dict.pop(key)
-
- def poll(self, key, timeout=_sentinel):
- """ Poll for data with the given unique key. See
- :func:`multiprocessing.Connection.poll` for the possible
- values of ``timeout``. """
- if timeout is self._sentinel:
- return key in self._recv_dict
-
- abort = threading.Event()
-
- if timeout is not None:
- timer = threading.Timer(float(timeout), abort.set)
- timer.start()
+ self.logger.debug("Closing RPC listeners")
try:
- while not abort.is_set():
- if key in self._recv_dict:
- return True
- return False
- finally:
- if timeout is not None:
- timer.cancel()
-
- @staticmethod
- def genkey(base):
- """ Generate a key suitable for use with
- :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher`
- send() requests, based on the given data. The key is
- constructed from the string given, some information about this
- thread, and some random data. """
- thread = threading.current_thread()
- return "%s-%s-%s-%s" % (base, thread.name, thread.ident, uuid4())
+ while True:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Closing RPC listener at %s" %
+ listener.address)
+ listener.close()
+ except Empty:
+ pass
class DualEvent(object):
@@ -258,18 +186,18 @@ class ChildCore(BaseCore):
#: every ``poll_wait`` seconds.
poll_wait = 3.0
- def __init__(self, name, setup, rpc_pipe, cmd_q, terminate):
+ def __init__(self, name, setup, rpc_q, terminate):
"""
:param name: The name of this child
:type name: string
:param setup: A Bcfg2 options dict
:type setup: Bcfg2.Options.OptionParser
- :param rpc_pipe: The pipe used for RPC communication with the
- parent process
- :type rpc_pipe: multiprocessing.Pipe
- :param cmd_q: The queue used for one-way pub-sub
- communications from the parent process
- :type cmd_q: multiprocessing.Queue
+ :param read_q: The queue the child will read from for RPC
+ communications from the parent process.
+ :type read_q: multiprocessing.Queue
+ :param write_q: The queue the child will write the results of
+ RPC calls to.
+ :type write_q: multiprocessing.Queue
:param terminate: An event that flags ChildCore objects to shut
themselves down.
:type terminate: multiprocessing.Event
@@ -279,22 +207,12 @@ class ChildCore(BaseCore):
#: The name of this child
self.name = name
- #: The pipe used for RPC communication with the parent
- self.rpc_pipe = rpc_pipe
-
- #: The queue used to receive pub-sub commands
- self.cmd_q = cmd_q
-
#: The :class:`multiprocessing.Event` that will be monitored
#: to determine when this child should shut down.
self.terminate = terminate
- # a list of all rendering threads
- self._threads = []
-
- # the thread used to process publish-subscribe commands
- self._command_thread = threading.Thread(name="CommandThread",
- target=self._dispatch_commands)
+ #: The queue used for RPC communication
+ self.rpc_q = rpc_q
# override this setting so that the child doesn't try to write
# the pidfile
@@ -304,75 +222,60 @@ class ChildCore(BaseCore):
self.perflog_thread = None
def _run(self):
- self._command_thread.start()
return True
def _daemonize(self):
return True
- def _dispatch_commands(self):
- """ Dispatch commands received via the pub-sub queue interface
- """
- self.logger.debug("Starting %s RPC subscription thread" % self.name)
- while not self.terminate.is_set():
- try:
- data = self.cmd_q.get(True, self.poll_wait)
- self.logger.debug("%s: Processing asynchronous command: %s" %
- (self.name, data[0]))
- self._dispatch(data)
- except Empty:
- pass
- self.logger.info("%s RPC subscription thread stopped" % self.name)
-
- def _dispatch_render(self):
- """ Dispatch render requests received via the RPC pipe
- interface """
- key, data = self.rpc_pipe.recv()
- self.rpc_pipe.send((key, self._dispatch(data)))
-
- @track_statistics()
- def _reap_threads(self):
- """ Reap rendering threads that have completed """
- for thread in self._threads[:]:
- if not thread.is_alive():
- self._threads.remove(thread)
-
- def _dispatch(self, data):
- """ Generic method dispatcher used for commands received from
- either the pub-sub queue or the RPC pipe. """
+ def _dispatch(self, address, data):
+ """ Method dispatcher used for commands received from
+ the RPC queue. """
+ if address is not None:
+ # if the key is None, then no response is expected. we
+ # make the return connection before dispatching the actual
+ # RPC call so that the parent is blocking for a connection
+ # as briefly as possible
+ self.logger.error("Connecting to parent via %s" % address)
+ client = Client(address)
method, args, kwargs = data
+ rv = None
if not hasattr(self, method):
self.logger.error("%s: Method %s does not exist" % (self.name,
method))
- return None
-
- func = getattr(self, method)
- if func.exposed:
- self.logger.debug("%s: Calling RPC method %s" % (self.name,
- method))
- return func(*args, **kwargs)
else:
- self.logger.error("%s: Method %s is not exposed" % (self.name,
- method))
- return None
+ func = getattr(self, method)
+ if func.exposed:
+ self.logger.debug("%s: Calling RPC method %s" % (self.name,
+ method))
+ rv = func(*args, **kwargs)
+ else:
+ self.logger.error("%s: Method %s is not exposed" % (self.name,
+ method))
+ if address is not None:
+ # if the key is None, then no response is expected
+ self.logger.error("Returning data to parent via %s" % address)
+ client.send(rv)
def _block(self):
- while not self.terminate.isSet():
+ while not self.terminate.is_set():
try:
- if self.rpc_pipe.poll(self.poll_wait):
- rpc_thread = threading.Thread(
- name="Renderer%s" % len(self._threads),
- target=self._dispatch_render)
- self._threads.append(rpc_thread)
- rpc_thread.start()
- self._reap_threads()
+ address, data = self.rpc_q.get(timeout=self.poll_wait)
+ threadname = "-".join(str(i) for i in data)
+ rpc_thread = threading.Thread(name=threadname,
+ target=self._dispatch,
+ args=[address, data])
+ rpc_thread.start()
+ except Empty:
+ pass
except KeyboardInterrupt:
break
self.shutdown()
def shutdown(self):
BaseCore.shutdown(self)
- self._reap_threads()
+ self.logger.info("%s: Closing RPC command queue" % self.name)
+ self.rpc_q.close()
+
while len(threading.enumerate()) > 1:
threads = [t for t in threading.enumerate()
if t != threading.current_thread()]
@@ -380,14 +283,9 @@ class ChildCore(BaseCore):
(self.name, len(threads),
[t.name for t in threads]))
time.sleep(1)
- self._reap_threads()
self.logger.info("%s: All threads stopped" % self.name)
@exposed
- def set_debug(self, address, debug):
- BaseCore.set_debug(self, address, debug)
-
- @exposed
def expire_cache(self, client=None):
""" Expire the metadata cache for a client """
self.metadata_cache.expire(client)
@@ -417,22 +315,15 @@ class Core(BuiltinCore):
if setup['children'] is None:
setup['children'] = multiprocessing.cpu_count()
- #: A dict of child name ->
- #: :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher`
- #: objects used to pass render requests to that child. (The
- #: child is given the other end of the Pipe.)
- self.pipes = dict()
-
- #: A
- #: :class:`Bcfg2.Server.MultiprocessingCore.PublishSubscribeQueue`
- #: object used to publish commands to all children.
- self.cmd_q = PublishSubscribeQueue()
-
#: The flag that indicates when to stop child threads and
#: processes
self.terminate = DualEvent(threading_event=self.terminate)
- self.metadata_cache = DispatchingCache(queue=self.cmd_q)
+ #: A :class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object
+ #: used to send or publish commands to children.
+ self.rpc_q = RPCQueue()
+
+ self.metadata_cache = DispatchingCache(queue=self.rpc_q)
#: A list of children that will be cycled through
self._all_children = []
@@ -445,13 +336,9 @@ class Core(BuiltinCore):
for cnum in range(self.setup['children']):
name = "Child-%s" % cnum
- # create Pipe for render requests
- dispatcher = ThreadSafePipeDispatcher(self.terminate)
- self.pipes[name] = dispatcher
-
self.logger.debug("Starting child %s" % name)
- childcore = ChildCore(name, self.setup, dispatcher.pipe[1],
- self.cmd_q.add_subscriber(), self.terminate)
+ child_q = self.rpc_q.add_subscriber(name)
+ childcore = ChildCore(name, self.setup, child_q, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
child.start()
self.logger.debug("Child %s started with PID %s" % (name,
@@ -464,8 +351,8 @@ class Core(BuiltinCore):
def shutdown(self):
BuiltinCore.shutdown(self)
- self.logger.debug("Closing RPC command queues")
- self.cmd_q.close()
+ self.logger.info("Closing RPC command queues")
+ self.rpc_q.close()
def term_children():
""" Terminate all remaining multiprocessing children. """
@@ -496,10 +383,9 @@ class Core(BuiltinCore):
@exposed
def set_debug(self, address, debug):
- self.cmd_q.put(("set_debug", [address, debug], dict()))
+ self.rpc_q.set_debug(debug)
+ self.rpc_q.publish("set_debug", args=[address, debug])
self.metadata_cache.set_debug(debug)
- for pipe in self.pipes.values():
- pipe.set_debug(debug)
return BuiltinCore.set_debug(self, address, debug)
@exposed
@@ -508,10 +394,7 @@ class Core(BuiltinCore):
childname = self.children.next()
self.logger.debug("Building configuration for %s on %s" % (client,
childname))
- key = ThreadSafePipeDispatcher.genkey(client)
- pipe = self.pipes[childname]
- pipe.send(key, ("GetConfig", [client], dict()))
- return pipe.recv(key)
+ return self.rpc_q.rpc(childname, "GetConfig", args=[client])
@exposed
def get_statistics(self, address):
@@ -543,10 +426,10 @@ class Core(BuiltinCore):
(vals[2] * vals[3])) / newcount
stats[totalname] = (newmin, newmax, newmean, newcount)
- key = ThreadSafePipeDispatcher.genkey("get_statistics")
stats = dict()
- for childname, pipe in self.pipes.items():
- pipe.send(key, ("get_statistics", [address], dict()))
- _aggregate_statistics(pipe.recv(key), prefix=childname)
+ for childname in self._all_children:
+ _aggregate_statistics(
+ self.rpc_q.rpc(childname, "get_statistics", args=[address]),
+ prefix=childname)
_aggregate_statistics(BuiltinCore.get_statistics(self, address))
return stats