summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py33
1 files changed, 7 insertions, 26 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 3cc308b1c..e74bf492e 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -57,8 +57,7 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
Bcfg2.Server.Plugin.Debuggable.__init__(self)
self._terminate = threading.Event()
self._queues = dict()
- self._available_listeners = Queue()
- self._blocking_listeners = []
+ self._listeners = []
def add_subscriber(self, name):
""" Add a subscriber to the queue. This returns the
@@ -79,23 +78,14 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
:class:`multiprocessing.connection.Listener` and passes the
Listener address to the child as part of the RPC call, so that
the child can connect to the Listener to submit its results.
-
- Listeners are reused when possible to minimize overhead.
"""
- try:
- listener = self._available_listeners.get_nowait()
- self.logger.debug("Reusing existing RPC listener at %s" %
- listener.address)
- except Empty:
- listener = Listener()
- self.logger.debug("Created new RPC listener at %s" %
- listener.address)
- self._blocking_listeners.append(listener)
+ listener = Listener()
+ self.logger.debug("Created new RPC listener at %s" % listener.address)
+ self._listeners.append(listener)
try:
self._queues[dest].put((listener.address,
(method, args or [], kwargs or dict())))
conn = listener.accept()
- self._blocking_listeners.remove(listener)
try:
while not self._terminate.is_set():
if conn.poll(self.poll_wait):
@@ -103,7 +93,8 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
finally:
conn.close()
finally:
- self._available_listeners.put(listener)
+ listener.close()
+ self._listeners.remove(listener)
def close(self):
""" Close queues and connections. """
@@ -115,21 +106,11 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
# close any listeners that are waiting for connections
self.logger.debug("Closing RPC connections")
- for listener in self._blocking_listeners:
+ for listener in self._listeners:
self.logger.debug("Closing RPC connection at %s" %
listener.address)
listener.close()
- self.logger.debug("Closing RPC listeners")
- try:
- while True:
- listener = self._available_listeners.get_nowait()
- self.logger.debug("Closing RPC listener at %s" %
- listener.address)
- listener.close()
- except Empty:
- pass
-
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that