summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py35
1 files changed, 8 insertions, 27 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 58a05c85d..f58d53c42 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -19,7 +19,7 @@ import Bcfg2.Options
import Bcfg2.Server.Cache
import Bcfg2.Server.Plugin
from itertools import cycle
-from Bcfg2.Compat import Queue, Empty, wraps
+from Bcfg2.Compat import Empty, wraps
from Bcfg2.Server.Core import Core, exposed
from Bcfg2.Server.BuiltinCore import BuiltinCore
from multiprocessing.connection import Listener, Client
@@ -41,8 +41,7 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
Bcfg2.Server.Plugin.Debuggable.__init__(self)
self._terminate = threading.Event()
self._queues = dict()
- self._available_listeners = Queue()
- self._blocking_listeners = []
+ self._listeners = []
def add_subscriber(self, name):
""" Add a subscriber to the queue. This returns the
@@ -63,23 +62,14 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
:class:`multiprocessing.connection.Listener` and passes the
Listener address to the child as part of the RPC call, so that
the child can connect to the Listener to submit its results.
-
- Listeners are reused when possible to minimize overhead.
"""
- try:
- listener = self._available_listeners.get_nowait()
- self.logger.debug("Reusing existing RPC listener at %s" %
- listener.address)
- except Empty:
- listener = Listener()
- self.logger.debug("Created new RPC listener at %s" %
- listener.address)
- self._blocking_listeners.append(listener)
+ listener = Listener()
+ self.logger.debug("Created new RPC listener at %s" % listener.address)
+ self._listeners.append(listener)
try:
self._queues[dest].put((listener.address,
(method, args or [], kwargs or dict())))
conn = listener.accept()
- self._blocking_listeners.remove(listener)
try:
while not self._terminate.is_set():
if conn.poll(self.poll_wait):
@@ -87,7 +77,8 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
finally:
conn.close()
finally:
- self._available_listeners.put(listener)
+ listener.close()
+ self._listeners.remove(listener)
def close(self):
""" Close queues and connections. """
@@ -99,21 +90,11 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
# close any listeners that are waiting for connections
self.logger.debug("Closing RPC connections")
- for listener in self._blocking_listeners:
+ for listener in self._listeners:
self.logger.debug("Closing RPC connection at %s" %
listener.address)
listener.close()
- self.logger.debug("Closing RPC listeners")
- try:
- while True:
- listener = self._available_listeners.get_nowait()
- self.logger.debug("Closing RPC listener at %s" %
- listener.address)
- listener.close()
- except Empty:
- pass
-
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that