From c92acd93d23c27914c63295dc3396382b9db756d Mon Sep 17 00:00:00 2001 From: Tim Laszlo Date: Wed, 15 May 2013 10:20:36 -0500 Subject: exclude failures type from Interaction.bad/modified/extra (cherry picked from commit 06bc603bbfb7615a2840c7bef0ef37013c585adf) --- src/lib/Bcfg2/Reporting/models.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src') diff --git a/src/lib/Bcfg2/Reporting/models.py b/src/lib/Bcfg2/Reporting/models.py index 2e75c1d1a..28c38e741 100644 --- a/src/lib/Bcfg2/Reporting/models.py +++ b/src/lib/Bcfg2/Reporting/models.py @@ -229,18 +229,24 @@ class Interaction(models.Model): def bad(self): rv = [] for entry in self.entry_types: + if entry == 'failures': + continue rv.extend(getattr(self, entry).filter(state=TYPE_BAD)) return rv def modified(self): rv = [] for entry in self.entry_types: + if entry == 'failures': + continue rv.extend(getattr(self, entry).filter(state=TYPE_MODIFIED)) return rv def extra(self): rv = [] for entry in self.entry_types: + if entry == 'failures': + continue rv.extend(getattr(self, entry).filter(state=TYPE_EXTRA)) return rv -- cgit v1.2.3-1-g7c22 From fad4f0016f072365605d58a29f1a0b07ad18fe5c Mon Sep 17 00:00:00 2001 From: Sol Jerome Date: Fri, 17 May 2013 13:17:59 -0500 Subject: SSLServer: Handle socket timeouts gracefully Signed-off-by: Sol Jerome --- src/lib/Bcfg2/SSLServer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/Bcfg2/SSLServer.py b/src/lib/Bcfg2/SSLServer.py index 141bd1282..84cd9edcb 100644 --- a/src/lib/Bcfg2/SSLServer.py +++ b/src/lib/Bcfg2/SSLServer.py @@ -281,7 +281,10 @@ class XMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): raise except socket.error: err = sys.exc_info()[1] - if err[0] == 32: + if isinstance(err, socket.timeout): + self.logger.warning("Connection timed out for %s" % + self.client_address[0]) + elif err[0] == 32: self.logger.warning("Connection dropped from %s" % self.client_address[0]) elif err[0] == 104: -- cgit v1.2.3-1-g7c22 From b5615eeb855d756be8aa3f1e995d9f0f3a6b410e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 20 May 2013 12:54:39 -0400 Subject: added multiprocessing server core --- src/lib/Bcfg2/Options.py | 13 +- src/lib/Bcfg2/Server/MultiprocessingCore.py | 203 ++++++++++++++++++++++++++++ src/lib/Bcfg2/Server/Plugins/Metadata.py | 6 + src/sbin/bcfg2-server | 31 +++-- 4 files changed, 242 insertions(+), 11 deletions(-) create mode 100644 src/lib/Bcfg2/Server/MultiprocessingCore.py (limited to 'src') diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index c7604c5c4..fd7c4421a 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -609,6 +609,16 @@ SERVER_AUTHENTICATION = \ default='cert+password', odesc='{cert|bootstrap|cert+password}', cf=('communication', 'authentication')) +SERVER_CHILDREN = \ + Option('Spawn this number of children for the multiprocessing core. ' + 'By default spawns children equivalent to the number of processors ' + 'in the machine.', + default=None, + cmd='--children', + odesc='', + cf=('server', 'children'), + cook=get_int, + long_arg=True) # database options DB_ENGINE = \ @@ -1182,7 +1192,8 @@ SERVER_COMMON_OPTIONS = dict(repo=SERVER_REPOSITORY, vcs_root=SERVER_VCS_ROOT, authentication=SERVER_AUTHENTICATION, perflog=LOG_PERFORMANCE, - perflog_interval=PERFLOG_INTERVAL) + perflog_interval=PERFLOG_INTERVAL, + children=SERVER_CHILDREN) CRYPT_OPTIONS = dict(encrypt=ENCRYPT, decrypt=DECRYPT, diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py new file mode 100644 index 000000000..2e378341e --- /dev/null +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -0,0 +1,203 @@ +""" The multiprocessing server core is a reimplementation of the +:mod:`Bcfg2.Server.BuiltinCore` that uses the Python +:mod:`multiprocessing` library to offload work to multiple child +processes. As such, it requires Python 2.6+. +""" + +import threading +import lxml.etree +import multiprocessing +from Bcfg2.Compat import Queue +from Bcfg2.Server.Core import BaseCore, exposed +from Bcfg2.Server.BuiltinCore import Core as BuiltinCore + + +class DualEvent(object): + """ DualEvent is a clone of :class:`threading.Event` that + internally implements both :class:`threading.Event` and + :class:`multiprocessing.Event`. """ + + def __init__(self, threading_event=None, multiprocessing_event=None): + self._threading_event = threading_event or threading.Event() + self._multiproc_event = multiprocessing_event or \ + multiprocessing.Event() + if threading_event or multiprocessing_event: + # initialize internal flag to false, regardless of the + # state of either object passed in + self.clear() + + def is_set(self): + """ Return true if and only if the internal flag is true. """ + return self._threading_event.is_set() + + isSet = is_set + + def set(self): + """ Set the internal flag to true. """ + self._threading_event.set() + self._multiproc_event.set() + + def clear(self): + """ Reset the internal flag to false. """ + self._threading_event.clear() + self._multiproc_event.clear() + + def wait(self, timeout=None): + """ Block until the internal flag is true, or until the + optional timeout occurs. """ + return self._threading_event.wait(timeout=timeout) + + +class ChildCore(BaseCore): + """ A child process for :class:`Bcfg2.MultiprocessingCore.Core`. + This core builds configurations from a given + :class:`multiprocessing.Pipe`. Note that this is a full-fledged + server core; the only input it gets from the parent process is the + hostnames of clients to render. All other state comes from the + FAM. However, this core only is used to render configs; it doesn't + handle anything else (authentication, probes, etc.) because those + are all much faster. There's no reason that it couldn't handle + those, though, if the pipe communication "protocol" were made more + robust. """ + + #: How long to wait while polling for new clients to build. This + #: doesn't affect the speed with which a client is built, but + #: setting it too high will result in longer shutdown times, since + #: we only check for the termination event from the main process + #: every ``poll_wait`` seconds. + poll_wait = 5.0 + + def __init__(self, setup, pipe, terminate): + """ + :param setup: A Bcfg2 options dict + :type setup: Bcfg2.Options.OptionParser + :param pipe: The pipe to which client hostnames are added for + ChildCore objects to build configurations, and to + which client configurations are added after + having been built by ChildCore objects. + :type pipe: multiprocessing.Pipe + :param terminate: An event that flags ChildCore objects to shut + themselves down. + :type terminate: multiprocessing.Event + """ + BaseCore.__init__(self, setup) + + #: The pipe to which client hostnames are added for ChildCore + #: objects to build configurations, and to which client + #: configurations are added after having been built by + #: ChildCore objects. + self.pipe = pipe + + #: The :class:`multiprocessing.Event` that will be monitored + #: to determine when this child should shut down. + self.terminate = terminate + + def _daemonize(self): + return True + + def _run(self): + return True + + def _block(self): + while not self.terminate.isSet(): + try: + if self.pipe.poll(self.poll_wait): + if not self.metadata.use_database: + # handle FAM events, in case (for instance) the + # client has just been added to clients.xml, or a + # profile has just been asserted. but really, you + # should be using the metadata database if you're + # using this core. + self.fam.handle_events_in_interval(0.1) + client = self.pipe.recv() + self.logger.debug("Building configuration for %s" % client) + config = \ + lxml.etree.tostring(self.BuildConfiguration(client)) + self.logger.debug("Returning configuration for %s to main " + "process" % client) + self.pipe.send(config) + self.logger.debug("Returned configuration for %s to main " + "process" % client) + except KeyboardInterrupt: + break + self.shutdown() + + +class Core(BuiltinCore): + """ A multiprocessing core that delegates building the actual + client configurations to + :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` objects. The + parent process doesn't build any children itself; all calls to + :func:`GetConfig` are delegated to children. All other calls are + handled by the parent process. """ + + #: How long to wait for a child process to shut down cleanly + #: before it is terminated. + shutdown_timeout = 10.0 + + def __init__(self, setup): + BuiltinCore.__init__(self, setup) + if setup['children'] is None: + setup['children'] = multiprocessing.cpu_count() + + #: A dict of child name -> one end of the + #: :class:`multiprocessing.Pipe` object used to communicate + #: with that child. (The child is given the other end of the + #: Pipe.) + self.pipes = dict() + + #: A queue that keeps track of which children are available to + #: render a configuration. A child is popped from the queue + #: when it starts to render a config, then it's pushed back on + #: when it's done. This lets us use a blocking call to + #: :func:`Queue.Queue.get` when waiting for an available + #: child. + self.available_children = Queue(maxsize=self.setup['children']) + + # sigh. multiprocessing was added in py2.6, which is when the + # camelCase methods for threading objects were deprecated in + # favor of the Pythonic under_score methods. So + # multiprocessing.Event *only* has is_set(), while + # threading.Event has *both* isSet() and is_set(). In order + # to make the core work with Python 2.4+, and with both + # multiprocessing and threading Event objects, we just + # monkeypatch self.terminate to have isSet(). + self.terminate = DualEvent(threading_event=self.terminate) + + def _run(self): + for cnum in range(self.setup['children']): + name = "Child-%s" % cnum + (mainpipe, childpipe) = multiprocessing.Pipe() + self.pipes[name] = mainpipe + self.logger.debug("Starting child %s" % name) + childcore = ChildCore(self.setup, childpipe, self.terminate) + child = multiprocessing.Process(target=childcore.run, name=name) + child.start() + self.logger.debug("Child %s started with PID %s" % (name, + child.pid)) + self.available_children.put(name) + return BuiltinCore._run(self) + + def shutdown(self): + BuiltinCore.shutdown(self) + for child in multiprocessing.active_children(): + self.logger.debug("Shutting down child %s" % child.name) + child.join(self.shutdown_timeout) + if child.is_alive(): + self.logger.error("Waited %s seconds to shut down %s, " + "terminating" % (self.shutdown_timeout, + child.name)) + child.terminate() + else: + self.logger.debug("Child %s shut down" % child.name) + + @exposed + def GetConfig(self, address): + client = self.resolve_client(address)[0] + childname = self.available_children.get() + self.logger.debug("Building configuration on child %s" % childname) + pipe = self.pipes[childname] + pipe.send(client) + config = pipe.recv() + self.available_children.put_nowait(childname) + return config diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index ceb1d9080..3b8361c76 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -557,6 +557,12 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, open(os.path.join(repo, cls.name, fname), "w").write(kwargs[aname]) + @property + def use_database(self): + """ Expose self._use_db publicly for use in + :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` """ + return self._use_db + def _handle_file(self, fname): """ set up the necessary magic for handling a metadata file (clients.xml or groups.xml, e.g.) """ diff --git a/src/sbin/bcfg2-server b/src/sbin/bcfg2-server index cdca71e74..21f7842ec 100755 --- a/src/sbin/bcfg2-server +++ b/src/sbin/bcfg2-server @@ -24,18 +24,29 @@ def main(): print("Could not read %s" % setup['configfile']) sys.exit(1) - if setup['backend'] not in ['best', 'cherrypy', 'builtin']: + # TODO: normalize case of various core modules so we can add a new + # core without modifying this script + backends = dict(cherrypy='CherryPyCore', + builtin='BuiltinCore', + best='BuiltinCore', + multiprocessing='MultiprocessingCore') + + if setup['backend'] not in backends: print("Unknown server backend %s, using 'best'" % setup['backend']) setup['backend'] = 'best' - if setup['backend'] == 'cherrypy': - try: - from Bcfg2.Server.CherryPyCore import Core - except ImportError: - err = sys.exc_info()[1] - print("Unable to import CherryPy server core: %s" % err) - raise - elif setup['backend'] == 'builtin' or setup['backend'] == 'best': - from Bcfg2.Server.BuiltinCore import Core + + coremodule = backends[setup['backend']] + try: + Core = getattr(__import__("Bcfg2.Server.%s" % coremodule).Server, + coremodule).Core + except ImportError: + err = sys.exc_info()[1] + print("Unable to import %s server core: %s" % (setup['backend'], err)) + raise + except AttributeError: + err = sys.exc_info()[1] + print("Unable to load %s server core: %s" % (setup['backend'], err)) + raise try: core = Core(setup) -- cgit v1.2.3-1-g7c22 From 1ba77f3dc93181beb26a3abc8cd352e07f1af33e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 21 Mar 2013 10:03:05 -0400 Subject: SSLServer: name XMLRPC thread for easier debugging --- src/lib/Bcfg2/SSLServer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/Bcfg2/SSLServer.py b/src/lib/Bcfg2/SSLServer.py index 84cd9edcb..316c2f86c 100644 --- a/src/lib/Bcfg2/SSLServer.py +++ b/src/lib/Bcfg2/SSLServer.py @@ -417,7 +417,9 @@ class XMLRPCServer(SocketServer.ThreadingMixIn, SSLServer, def serve_forever(self): """Serve single requests until (self.serve == False).""" self.serve = True - self.task_thread = threading.Thread(target=self._tasks_thread) + self.task_thread = \ + threading.Thread(name="%sThread" % self.__class__.__name__, + target=self._tasks_thread) self.task_thread.start() self.logger.info("serve_forever() [start]") signal.signal(signal.SIGINT, self._handle_shutdown_signal) -- cgit v1.2.3-1-g7c22 From 90c1b197b91e7c6ce4425568082eaff90c96a6dc Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 20 May 2013 11:50:05 -0400 Subject: Core: added more debugging on server shutdown --- src/lib/Bcfg2/Server/Core.py | 4 ++++ src/lib/Bcfg2/Server/MultiprocessingCore.py | 1 + 2 files changed, 5 insertions(+) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index ab8cda3da..c246860c1 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -301,6 +301,7 @@ class BaseCore(object): self.logger.info("Performance statistics: " "%s min=%.06f, max=%.06f, average=%.06f, " "count=%d" % ((name, ) + stats)) + self.logger.debug("Performance logging thread terminated") def _file_monitor_thread(self): """ The thread that runs the @@ -321,6 +322,7 @@ class BaseCore(object): except: continue self._update_vcs_revision() + self.logger.debug("File monitor thread terminated") @track_statistics() def _update_vcs_revision(self): @@ -440,8 +442,10 @@ class BaseCore(object): if not self.terminate.isSet(): self.terminate.set() self.fam.shutdown() + self.logger.debug("FAM shut down") for plugin in list(self.plugins.values()): plugin.shutdown() + self.logger.debug("All plugins shut down") @property def metadata_cache_mode(self): diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 2e378341e..81fba7092 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -190,6 +190,7 @@ class Core(BuiltinCore): child.terminate() else: self.logger.debug("Child %s shut down" % child.name) + self.logger.debug("All children shut down") @exposed def GetConfig(self, address): -- cgit v1.2.3-1-g7c22 From 28fb5e8695ce4638ab15757760bd943f5a49773a Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 20 May 2013 14:10:47 -0400 Subject: bcfg2-admin client: Only load Metadata plugin --- src/lib/Bcfg2/Server/Admin/Client.py | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Admin/Client.py b/src/lib/Bcfg2/Server/Admin/Client.py index b7916fab9..570e993ed 100644 --- a/src/lib/Bcfg2/Server/Admin/Client.py +++ b/src/lib/Bcfg2/Server/Admin/Client.py @@ -8,6 +8,7 @@ from Bcfg2.Server.Plugin import MetadataConsistencyError class Client(Bcfg2.Server.Admin.MetadataCore): """ Create, delete, or list client entries """ __usage__ = "[options] [add|del|list] [attr=val]" + __plugin_whitelist__ = ["Metadata"] def __call__(self, args): if len(args) == 0: -- cgit v1.2.3-1-g7c22 From 1690cf04ca2b63f8312670dc6f9067fae5c8cd73 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 21 May 2013 09:43:42 -0400 Subject: Packages: don't cache package collections with no sources --- src/lib/Bcfg2/Server/Plugins/Packages/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py index d5773de97..f82b8a392 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py @@ -527,8 +527,9 @@ class Packages(Bcfg2.Server.Plugin.Plugin, collection = cclass(metadata, relevant, self.cachepath, self.data, self.core.fam, debug=self.debug_flag) ckey = collection.cachekey - self.clients[metadata.hostname] = ckey - self.collections[ckey] = collection + if cclass != Collection: + self.clients[metadata.hostname] = ckey + self.collections[ckey] = collection return collection def get_additional_data(self, metadata): -- cgit v1.2.3-1-g7c22