summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-11-11 15:46:09 -0500
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-11-11 15:46:09 -0500
commite30c785c94c5aa399c44fff386fa2279f64f1acc (patch)
treee72c636dcae00a6ea0c668cfbee7d3e6b4e6a3fe /src/lib/Bcfg2/Server/MultiprocessingCore.py
parent7aa15c4c5507e311ff66264bc31e6758a80eb337 (diff)
parent103b1b5198828876fa0684296900769018075f1b (diff)
downloadbcfg2-e30c785c94c5aa399c44fff386fa2279f64f1acc.tar.gz
bcfg2-e30c785c94c5aa399c44fff386fa2279f64f1acc.tar.bz2
bcfg2-e30c785c94c5aa399c44fff386fa2279f64f1acc.zip
Merge branch 'maint'
Conflicts: src/lib/Bcfg2/Server/Admin/Compare.py src/lib/Bcfg2/Server/Admin/Snapshots.py src/lib/Bcfg2/Server/MultiprocessingCore.py src/lib/Bcfg2/Server/Plugins/Probes.py src/sbin/bcfg2-crypt src/sbin/bcfg2-reports tools/upgrade/1.3/migrate_configs.py tools/upgrade/1.3/migrate_perms_to_mode.py
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py35
1 files changed, 8 insertions, 27 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 58a05c85d..f58d53c42 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -19,7 +19,7 @@ import Bcfg2.Options
import Bcfg2.Server.Cache
import Bcfg2.Server.Plugin
from itertools import cycle
-from Bcfg2.Compat import Queue, Empty, wraps
+from Bcfg2.Compat import Empty, wraps
from Bcfg2.Server.Core import Core, exposed
from Bcfg2.Server.BuiltinCore import BuiltinCore
from multiprocessing.connection import Listener, Client
@@ -41,8 +41,7 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
Bcfg2.Server.Plugin.Debuggable.__init__(self)
self._terminate = threading.Event()
self._queues = dict()
- self._available_listeners = Queue()
- self._blocking_listeners = []
+ self._listeners = []
def add_subscriber(self, name):
""" Add a subscriber to the queue. This returns the
@@ -63,23 +62,14 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
:class:`multiprocessing.connection.Listener` and passes the
Listener address to the child as part of the RPC call, so that
the child can connect to the Listener to submit its results.
-
- Listeners are reused when possible to minimize overhead.
"""
- try:
- listener = self._available_listeners.get_nowait()
- self.logger.debug("Reusing existing RPC listener at %s" %
- listener.address)
- except Empty:
- listener = Listener()
- self.logger.debug("Created new RPC listener at %s" %
- listener.address)
- self._blocking_listeners.append(listener)
+ listener = Listener()
+ self.logger.debug("Created new RPC listener at %s" % listener.address)
+ self._listeners.append(listener)
try:
self._queues[dest].put((listener.address,
(method, args or [], kwargs or dict())))
conn = listener.accept()
- self._blocking_listeners.remove(listener)
try:
while not self._terminate.is_set():
if conn.poll(self.poll_wait):
@@ -87,7 +77,8 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
finally:
conn.close()
finally:
- self._available_listeners.put(listener)
+ listener.close()
+ self._listeners.remove(listener)
def close(self):
""" Close queues and connections. """
@@ -99,21 +90,11 @@ class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
# close any listeners that are waiting for connections
self.logger.debug("Closing RPC connections")
- for listener in self._blocking_listeners:
+ for listener in self._listeners:
self.logger.debug("Closing RPC connection at %s" %
listener.address)
listener.close()
- self.logger.debug("Closing RPC listeners")
- try:
- while True:
- listener = self._available_listeners.get_nowait()
- self.logger.debug("Closing RPC listener at %s" %
- listener.address)
- listener.close()
- except Empty:
- pass
-
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that