diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-08-08 13:35:20 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-08-08 13:35:20 -0400 |
commit | 73c61dd6c0c464f06637db750484417bc8bbb6de (patch) | |
tree | 01381b324b6e0a44ebe49660752f31a4abca648a /src/lib/Bcfg2/Server/MultiprocessingCore.py | |
parent | e1e194a573b3803fa7f45a646bbb36b2f164a3e1 (diff) | |
parent | cd14868d4db8eaa7e9421e1d5fe8653294ac1e38 (diff) | |
download | bcfg2-73c61dd6c0c464f06637db750484417bc8bbb6de.tar.gz bcfg2-73c61dd6c0c464f06637db750484417bc8bbb6de.tar.bz2 bcfg2-73c61dd6c0c464f06637db750484417bc8bbb6de.zip |
Merge branch 'options-rewrite'
Conflicts:
src/lib/Bcfg2/Client/Frame.py
src/lib/Bcfg2/Options.py
src/lib/Bcfg2/Server/Admin/Init.py
src/lib/Bcfg2/Server/Admin/Xcmd.py
src/lib/Bcfg2/Server/BuiltinCore.py
src/lib/Bcfg2/Server/Core.py
src/lib/Bcfg2/Server/MultiprocessingCore.py
src/lib/Bcfg2/Server/Plugin/base.py
src/lib/Bcfg2/Server/Plugin/helpers.py
src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
src/lib/Bcfg2/Server/SSLServer.py
src/lib/Bcfg2/Utils.py
src/lib/Bcfg2/settings.py
src/sbin/bcfg2-crypt
src/sbin/bcfg2-info
src/sbin/bcfg2-lint
src/sbin/bcfg2-test
src/sbin/bcfg2-yum-helper
tools/bcfg2-profile-templates.py
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r-- | src/lib/Bcfg2/Server/MultiprocessingCore.py | 51 |
1 files changed, 35 insertions, 16 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index e79207291..678a1c95d 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -15,12 +15,13 @@ import time import threading import lxml.etree import multiprocessing +import Bcfg2.Options import Bcfg2.Server.Plugin from itertools import cycle from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue, Empty, wraps -from Bcfg2.Server.Core import BaseCore, exposed -from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +from Bcfg2.Server.Core import Core, exposed +from Bcfg2.Server.BuiltinCore import BuiltinCore from multiprocessing.connection import Listener, Client @@ -167,7 +168,7 @@ class DualEvent(object): return self._threading_event.wait(timeout=timeout) -class ChildCore(BaseCore): +class ChildCore(Core): """ A child process for :class:`Bcfg2.MultiprocessingCore.Core`. This core builds configurations from a given :class:`multiprocessing.Pipe`. Note that this is a full-fledged @@ -186,12 +187,10 @@ class ChildCore(BaseCore): #: every ``poll_wait`` seconds. poll_wait = 3.0 - def __init__(self, name, setup, rpc_q, terminate): + def __init__(self, name, rpc_q, terminate): """ :param name: The name of this child :type name: string - :param setup: A Bcfg2 options dict - :type setup: Bcfg2.Options.OptionParser :param read_q: The queue the child will read from for RPC communications from the parent process. :type read_q: multiprocessing.Queue @@ -202,7 +201,7 @@ class ChildCore(BaseCore): themselves down. :type terminate: multiprocessing.Event """ - BaseCore.__init__(self, setup) + Core.__init__(self) #: The name of this child self.name = name @@ -216,7 +215,7 @@ class ChildCore(BaseCore): # override this setting so that the child doesn't try to write # the pidfile - self.setup['daemon'] = False + Bcfg2.Options.setup.daemon = False # ensure that the child doesn't start a perflog thread self.perflog_thread = None @@ -283,7 +282,7 @@ class ChildCore(BaseCore): self.shutdown() def shutdown(self): - BaseCore.shutdown(self) + Core.shutdown(self) self.logger.info("%s: Closing RPC command queue" % self.name) self.rpc_q.close() @@ -328,7 +327,7 @@ class ChildCore(BaseCore): return lxml.etree.tostring(self.BuildConfiguration(client)) -class Core(BuiltinCore): +class MultiprocessingCore(BuiltinCore): """ A multiprocessing core that delegates building the actual client configurations to :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` objects. The @@ -336,14 +335,34 @@ class Core(BuiltinCore): :func:`GetConfig` are delegated to children. All other calls are handled by the parent process. """ + options = BuiltinCore.options + [ + Bcfg2.Options.Option( + '--children', dest="core_children", + cf=('server', 'children'), type=int, + default=multiprocessing.cpu_count(), + help='Spawn this number of children for the multiprocessing core')] + #: How long to wait for a child process to shut down cleanly #: before it is terminated. shutdown_timeout = 10.0 - def __init__(self, setup): - BuiltinCore.__init__(self, setup) - if setup['children'] is None: - setup['children'] = multiprocessing.cpu_count() + def __init__(self): + BuiltinCore.__init__(self) + + #: A dict of child name -> one end of the + #: :class:`multiprocessing.Pipe` object used to communicate + #: with that child. (The child is given the other end of the + #: Pipe.) + self.pipes = dict() + + #: A queue that keeps track of which children are available to + #: render a configuration. A child is popped from the queue + #: when it starts to render a config, then it's pushed back on + #: when it's done. This lets us use a blocking call to + #: :func:`Queue.Queue.get` when waiting for an available + #: child. + self.available_children = \ + Queue(maxsize=Bcfg2.Options.setup.core_children) #: The flag that indicates when to stop child threads and #: processes @@ -363,12 +382,12 @@ class Core(BuiltinCore): self.children = None def _run(self): - for cnum in range(self.setup['children']): + for cnum in range(Bcfg2.Options.setup.core_children): name = "Child-%s" % cnum self.logger.debug("Starting child %s" % name) child_q = self.rpc_q.add_subscriber(name) - childcore = ChildCore(name, self.setup, child_q, self.terminate) + childcore = ChildCore(name, child_q, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, |