From 67fda2597efe7cec04b037138cef86f1e328cc4c Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 27 Jun 2013 10:39:46 -0400 Subject: Options: migrated server core to new option parser --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 36 +++++++++++++++++------------ 1 file changed, 21 insertions(+), 15 deletions(-) (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 81fba7092..7e04b1eae 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -7,9 +7,10 @@ processes. As such, it requires Python 2.6+. import threading import lxml.etree import multiprocessing +import Bcfg2.Options from Bcfg2.Compat import Queue -from Bcfg2.Server.Core import BaseCore, exposed -from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +from Bcfg2.Server.Core import Core, exposed +from Bcfg2.Server.BuiltinCore import BuiltinCore class DualEvent(object): @@ -48,7 +49,7 @@ class DualEvent(object): return self._threading_event.wait(timeout=timeout) -class ChildCore(BaseCore): +class ChildCore(Core): """ A child process for :class:`Bcfg2.MultiprocessingCore.Core`. This core builds configurations from a given :class:`multiprocessing.Pipe`. Note that this is a full-fledged @@ -67,10 +68,8 @@ class ChildCore(BaseCore): #: every ``poll_wait`` seconds. poll_wait = 5.0 - def __init__(self, setup, pipe, terminate): + def __init__(self, pipe, terminate): """ - :param setup: A Bcfg2 options dict - :type setup: Bcfg2.Options.OptionParser :param pipe: The pipe to which client hostnames are added for ChildCore objects to build configurations, and to which client configurations are added after @@ -80,7 +79,7 @@ class ChildCore(BaseCore): themselves down. :type terminate: multiprocessing.Event """ - BaseCore.__init__(self, setup) + Core.__init__(self) #: The pipe to which client hostnames are added for ChildCore #: objects to build configurations, and to which client @@ -123,7 +122,7 @@ class ChildCore(BaseCore): self.shutdown() -class Core(BuiltinCore): +class MultiprocessingCore(BuiltinCore): """ A multiprocessing core that delegates building the actual client configurations to :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` objects. The @@ -131,14 +130,20 @@ class Core(BuiltinCore): :func:`GetConfig` are delegated to children. All other calls are handled by the parent process. """ + options = BuiltinCore.options + [ + Bcfg2.Options.Option( + '--children', dest="core_children", + cf=('server', 'children'), type=int, + default=multiprocessing.cpu_count(), + help='Spawn this number of children for the multiprocessing core')] + + #: How long to wait for a child process to shut down cleanly #: before it is terminated. shutdown_timeout = 10.0 - def __init__(self, setup): - BuiltinCore.__init__(self, setup) - if setup['children'] is None: - setup['children'] = multiprocessing.cpu_count() + def __init__(self): + BuiltinCore.__init__(self) #: A dict of child name -> one end of the #: :class:`multiprocessing.Pipe` object used to communicate @@ -152,7 +157,8 @@ class Core(BuiltinCore): #: when it's done. This lets us use a blocking call to #: :func:`Queue.Queue.get` when waiting for an available #: child. - self.available_children = Queue(maxsize=self.setup['children']) + self.available_children = \ + Queue(maxsize=Bcfg2.Options.setup.core_children) # sigh. multiprocessing was added in py2.6, which is when the # camelCase methods for threading objects were deprecated in @@ -165,12 +171,12 @@ class Core(BuiltinCore): self.terminate = DualEvent(threading_event=self.terminate) def _run(self): - for cnum in range(self.setup['children']): + for cnum in range(Bcfg2.Options.setup.core_children): name = "Child-%s" % cnum (mainpipe, childpipe) = multiprocessing.Pipe() self.pipes[name] = mainpipe self.logger.debug("Starting child %s" % name) - childcore = ChildCore(self.setup, childpipe, self.terminate) + childcore = ChildCore(childpipe, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, -- cgit v1.2.3-1-g7c22