From 8a8b10fe738e4cf90f5601b05992f0a4e77502a8 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 5 Nov 2013 22:14:43 -0500 Subject: Multiprocessing: don't reuse child RPC listeners --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 33 ++++++----------------------- 1 file changed, 7 insertions(+), 26 deletions(-) (limited to 'src/lib/Bcfg2/Server') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 3cc308b1c..e74bf492e 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -57,8 +57,7 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable): Bcfg2.Server.Plugin.Debuggable.__init__(self) self._terminate = threading.Event() self._queues = dict() - self._available_listeners = Queue() - self._blocking_listeners = [] + self._listeners = [] def add_subscriber(self, name): """ Add a subscriber to the queue. This returns the @@ -79,23 +78,14 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable): :class:`multiprocessing.connection.Listener` and passes the Listener address to the child as part of the RPC call, so that the child can connect to the Listener to submit its results. - - Listeners are reused when possible to minimize overhead. """ - try: - listener = self._available_listeners.get_nowait() - self.logger.debug("Reusing existing RPC listener at %s" % - listener.address) - except Empty: - listener = Listener() - self.logger.debug("Created new RPC listener at %s" % - listener.address) - self._blocking_listeners.append(listener) + listener = Listener() + self.logger.debug("Created new RPC listener at %s" % listener.address) + self._listeners.append(listener) try: self._queues[dest].put((listener.address, (method, args or [], kwargs or dict()))) conn = listener.accept() - self._blocking_listeners.remove(listener) try: while not self._terminate.is_set(): if conn.poll(self.poll_wait): @@ -103,7 +93,8 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable): finally: conn.close() finally: - self._available_listeners.put(listener) + listener.close() + self._listeners.remove(listener) def close(self): """ Close queues and connections. """ @@ -115,21 +106,11 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable): # close any listeners that are waiting for connections self.logger.debug("Closing RPC connections") - for listener in self._blocking_listeners: + for listener in self._listeners: self.logger.debug("Closing RPC connection at %s" % listener.address) listener.close() - self.logger.debug("Closing RPC listeners") - try: - while True: - listener = self._available_listeners.get_nowait() - self.logger.debug("Closing RPC listener at %s" % - listener.address) - listener.close() - except Empty: - pass - class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that -- cgit v1.2.3-1-g7c22