#!/usr/bin/env python """This tool verifies that all clients known to the server build without failures""" import os import sys import signal import fnmatch import logging import Bcfg2.Logger import Bcfg2.Server.Core from math import ceil from nose.core import TestProgram from nose.suite import LazySuite from unittest import TestCase try: from multiprocessing import Process, Queue, active_children HAS_MULTIPROC = True except ImportError: HAS_MULTIPROC = False active_children = lambda: [] # pylint: disable=C0103 class CapturingLogger(object): """ Fake logger that captures logging output so that errors are only displayed for clients that fail tests """ def __init__(self, *args, **kwargs): # pylint: disable=W0613 self.output = [] def error(self, msg): """ discard error messages """ self.output.append(msg) def warning(self, msg): """ discard error messages """ self.output.append(msg) def info(self, msg): """ discard error messages """ self.output.append(msg) def debug(self, msg): """ discard error messages """ self.output.append(msg) def reset_output(self): """ Reset the captured output """ self.output = [] class ClientTestFromQueue(TestCase): """ A test case that tests a value that has been enqueued by a child test process. ``client`` is the name of the client that has been tested; ``result`` is the result from the :class:`ClientTest` test. ``None`` indicates a successful test; a string value indicates a failed test; and an exception indicates an error while running the test. """ __test__ = False # Do not collect def __init__(self, client, result): TestCase.__init__(self) self.client = client self.result = result def shortDescription(self): return "Building configuration for %s" % self.client def runTest(self): """ parse the result from this test """ if isinstance(self.result, Exception): raise self.result assert self.result is None, self.result class ClientTest(TestCase): """ A test case representing the build of all of the configuration for a single host. Checks that none of the build config entities has had a failure when it is building. Optionally ignores some config files that we know will cause errors (because they are private files we don't have access to, for instance) """ __test__ = False # Do not collect divider = "-" * 70 def __init__(self, core, client, ignore=None): TestCase.__init__(self) self.core = core self.core.logger = CapturingLogger() self.client = client if ignore is None: self.ignore = dict() else: self.ignore = ignore def ignore_entry(self, tag, name): """ return True if an error on a given entry should be ignored """ if tag in self.ignore: if name in self.ignore[tag]: return True else: # try wildcard matching for pattern in self.ignore[tag]: if fnmatch.fnmatch(name, pattern): return True return False def shortDescription(self): return "Building configuration for %s" % self.client def runTest(self): """ run this individual test """ config = self.core.BuildConfiguration(self.client) output = self.core.logger.output[:] if output: output.append(self.divider) self.core.logger.reset_output() # check for empty client configuration assert len(config.findall("Bundle")) > 0, \ "\n".join(output + ["%s has no content" % self.client]) # check for missing bundles metadata = self.core.build_metadata(self.client) sbundles = [el.get('name') for el in config.findall("Bundle")] missing = [b for b in metadata.bundles if b not in sbundles] assert len(missing) == 0, \ "\n".join(output + ["Configuration is missing bundle(s): %s" % ':'.join(missing)]) # check for unknown packages unknown_pkgs = [el.get("name") for el in config.xpath('//Package[@type="unknown"]') if not self.ignore_entry(el.tag, el.get("name"))] assert len(unknown_pkgs) == 0, \ "Configuration contains unknown packages: %s" % \ ", ".join(unknown_pkgs) failures = [] msg = output + ["Failures:"] for failure in config.xpath('//*[@failure]'): if not self.ignore_entry(failure.tag, failure.get('name')): failures.append(failure) msg.append("%s:%s: %s" % (failure.tag, failure.get("name"), failure.get("failure"))) assert len(failures) == 0, "\n".join(msg) def __str__(self): return "ClientTest(%s)" % self.client id = __str__ def get_core(setup): """ Get a server core, with events handled """ core = Bcfg2.Server.Core.BaseCore(setup) core.load_plugins() core.block_for_fam_events(handle_events=True) return core def get_ignore(setup): """ Given an options dict, get a dict of entry tags and names to ignore errors from """ ignore = dict() for entry in setup['test_ignore']: tag, name = entry.split(":") try: ignore[tag].append(name) except KeyError: ignore[tag] = [name] return ignore def run_child(setup, clients, queue): """ Run tests for the given clients in a child process, returning results via the given Queue """ core = get_core(setup) ignore = get_ignore(setup) for client in clients: try: ClientTest(core, client, ignore).runTest() queue.put((client, None)) except AssertionError: queue.put((client, str(sys.exc_info()[1]))) except: queue.put((client, sys.exc_info()[1])) core.shutdown() def get_sigint_handler(core): """ Get a function that handles SIGINT/Ctrl-C by shutting down the core and exiting properly.""" def hdlr(sig, frame): # pylint: disable=W0613 """ Handle SIGINT/Ctrl-C by shutting down the core and exiting properly. """ core.shutdown() os._exit(1) # pylint: disable=W0212 return hdlr def parse_args(): """ Parse command line arguments. """ optinfo = dict(Bcfg2.Options.TEST_COMMON_OPTIONS) optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS) optinfo.update(Bcfg2.Options.SERVER_COMMON_OPTIONS) setup = Bcfg2.Options.OptionParser(optinfo) setup.hm = \ "bcfg2-test [options] [client] [client] [...]\nOptions:\n %s" % \ setup.buildHelpMessage() setup.parse(sys.argv[1:]) if setup['debug']: level = logging.DEBUG elif setup['verbose']: level = logging.INFO else: level = logging.WARNING Bcfg2.Logger.setup_logging("bcfg2-test", to_console=setup['verbose'] or setup['debug'], to_syslog=False, to_file=setup['logging'], level=level) logger = logging.getLogger(sys.argv[0]) if (setup['debug'] or setup['verbose']) and "-v" not in setup['noseopts']: setup['noseopts'].append("-v") if setup['children'] and not HAS_MULTIPROC: logger.warning("Python multiprocessing library not found, running " "with no children") setup['children'] = 0 if (setup['children'] and ('--with-xunit' in setup['noseopts'] or '--xunit-file' in setup['noseopts'])): logger.warning("Use the --xunit option to bcfg2-test instead of the " "--with-xunit or --xunit-file options to nosetest") xunitfile = None if '--with-xunit' in setup['noseopts']: setup['noseopts'].remove('--with-xunit') xunitfile = "nosetests.xml" if '--xunit-file' in setup['noseopts']: idx = setup['noseopts'].index('--xunit-file') try: setup['noseopts'].pop(idx) # remove --xunit-file # remove the argument to it xunitfile = setup['noseopts'].pop(idx) except IndexError: pass if xunitfile and not setup['xunit']: setup['xunit'] = xunitfile return setup def main(): setup = parse_args() logger = logging.getLogger(sys.argv[0]) core = get_core(setup) signal.signal(signal.SIGINT, get_sigint_handler(core)) if setup['args']: clients = setup['args'] else: clients = core.metadata.clients ignore = get_ignore(setup) if setup['children']: if setup['children'] > len(clients): logger.info("Refusing to spawn more children than clients to test," " setting children=%s" % len(clients)) setup['children'] = len(clients) perchild = int(ceil(len(clients) / float(setup['children'] + 1))) queue = Queue() for child in range(setup['children']): start = child * perchild end = (child + 1) * perchild child = Process(target=run_child, args=(setup, clients[start:end], queue)) child.start() def generate_tests(): """ Read test results for the clients """ start = setup['children'] * perchild for client in clients[start:]: yield ClientTest(core, client, ignore) for i in range(start): # pylint: disable=W0612 yield ClientTestFromQueue(*queue.get()) else: def generate_tests(): """ Run tests for the clients """ for client in clients: yield ClientTest(core, client, ignore) result = TestProgram(argv=sys.argv[:1] + core.setup['noseopts'], suite=LazySuite(generate_tests), exit=False) # block until all children have completed -- should be # immediate since we've already gotten all the results we # expect for child in active_children(): child.join() core.shutdown() if result.success: os._exit(0) # pylint: disable=W0212 else: os._exit(1) # pylint: disable=W0212 if __name__ == "__main__": sys.exit(main())