#!/usr/bin/env python """This tool examines your Bcfg2 specifications for errors.""" __revision__ = '$Revision$' import sys import inspect import logging import ConfigParser import Bcfg2.Logger import Bcfg2.Options import Bcfg2.Server.Core import Bcfg2.Server.Lint logger = logging.getLogger('bcfg2-lint') class Parser(ConfigParser.ConfigParser): def get(self, section, option, default): """ Override ConfigParser.get: If the request option is not in the config file then return the value of default rather than raise an exception. We still raise exceptions on missing sections. """ try: return ConfigParser.ConfigParser.get(self, section, option) except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): return default def run_serverless_plugins(plugins, config=None, setup=None): logger.debug("Running serverless plugins") errors = (0, 0) for plugin_name, plugin in plugins.items(): plugin_errors = run_plugin(plugin, plugin_name, setup=setup, config=config, files=files) errors = [errors[n] + plugin_errors[n] for n in range(0, len(errors))] return errors def run_server_plugins(plugins, config=None, setup=None): core = load_server(setup) logger.debug("Running server plugins") errors = (0, 0) for plugin_name, plugin in plugins.items(): plugin_errors = run_plugin(plugin, plugin_name, args=[core], setup=setup, config=config, files=files) errors = [errors[n] + plugin_errors[n] for n in range(0, len(errors))] return errors def run_plugin(plugin, plugin_name, setup=None, args=None, config=None, files=None): logger.debug(" Running %s" % plugin_name) if args is None: args = [] if config is not None and config.has_section(plugin_name): args.append(dict(config.items(plugin_name), **setup)) else: args.append(setup) # older versions of python do not support mixing *-magic and # non-*-magic (e.g., "plugin(*args, files=files)", so we do this # all with *-magic kwargs = dict(files=files) return plugin(*args, **kwargs).Run() def load_server(setup): """ load server """ core = Bcfg2.Server.Core.Core(setup['repo'], setup['plugins'], setup['password'], setup['encoding']) if setup['event debug']: core.fam.debug = True core.fam.handle_events_in_interval(4) return core if __name__ == '__main__': optinfo = { 'configfile': Bcfg2.Options.CFILE, 'help': Bcfg2.Options.HELP, 'verbose': Bcfg2.Options.VERBOSE, } optinfo.update({ 'event debug': Bcfg2.Options.DEBUG, 'encoding': Bcfg2.Options.ENCODING, # Server options 'repo': Bcfg2.Options.SERVER_REPOSITORY, 'plugins': Bcfg2.Options.SERVER_PLUGINS, 'mconnect': Bcfg2.Options.SERVER_MCONNECT, 'filemonitor': Bcfg2.Options.SERVER_FILEMONITOR, 'location': Bcfg2.Options.SERVER_LOCATION, 'static': Bcfg2.Options.SERVER_STATIC, 'key': Bcfg2.Options.SERVER_KEY, 'cert': Bcfg2.Options.SERVER_CERT, 'ca': Bcfg2.Options.SERVER_CA, 'password': Bcfg2.Options.SERVER_PASSWORD, 'protocol': Bcfg2.Options.SERVER_PROTOCOL, # More options 'logging': Bcfg2.Options.LOGGING_FILE_PATH, 'stdin': Bcfg2.Options.FILES_ON_STDIN, 'schema': Bcfg2.Options.SCHEMA_PATH, 'config': Bcfg2.Options.Option('Specify bcfg2-lint configuration file', '/etc/bcfg2-lint.conf', cmd='--lint-config', odesc='', long_arg = True), }) setup = Bcfg2.Options.OptionParser(optinfo) setup.parse(sys.argv[1:]) log_args = dict(to_syslog=False, to_console=logging.WARNING) if setup['verbose']: log_args['to_console'] = logging.DEBUG Bcfg2.Logger.setup_logging('bcfg2-info', **log_args) config = Parser() config.read(setup['config']) # get list of plugins to run if setup['args']: allplugins = setup['args'] elif "bcfg2-repo-validate" in sys.argv[0]: allplugins = 'Duplicates,RequiredAttrs,Validate'.split(',') else: allplugins = config.get('lint', 'plugins', ",".join(Bcfg2.Server.Lint.__all__)).split(',') if setup['stdin']: files = [s.strip() for s in sys.stdin.readlines()] else: files = None # load plugins serverplugins = {} serverlessplugins = {} for plugin_name in allplugins: try: mod = getattr(__import__("Bcfg2.Server.Lint.%s" % (plugin_name)).Server.Lint, plugin_name) except ImportError: try: mod = __import__(plugin_name) except Exception, err: logger.error("Failed to load plugin %s: %s" % (plugin_name, err)) raise SystemExit(1) plugin = getattr(mod, plugin_name) if [c for c in inspect.getmro(plugin) if c == Bcfg2.Server.Lint.ServerPlugin]: serverplugins[plugin_name] = plugin else: serverlessplugins[plugin_name] = plugin # errors is a tuple of (errors, warnings) errors = run_serverless_plugins(serverlessplugins, config=config, setup=setup) if serverplugins: perrors = run_server_plugins(serverplugins, config=config, setup=setup) errors = [errors[n] + perrors[n] for n in range(0, len(errors))] if errors[0] or errors[1] or setup['verbose']: print "%d errors" % errors[0] print "%d warnings" % errors[1] if errors[0]: raise SystemExit(2) elif errors[1]: raise SystemExit(3)