From eac71fc1109f2edc6b71e62a6cff38d762bebe63 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 25 Sep 2012 11:49:15 -0400 Subject: expanded pylint coverage --- src/sbin/bcfg2-crypt | 100 ++++++++++++++++++++------------------ src/sbin/bcfg2-lint | 134 +++++++++++++++++++++++++++------------------------ src/sbin/bcfg2-test | 4 ++ 3 files changed, 130 insertions(+), 108 deletions(-) (limited to 'src/sbin') diff --git a/src/sbin/bcfg2-crypt b/src/sbin/bcfg2-crypt index bae4ad8ef..904f77611 100755 --- a/src/sbin/bcfg2-crypt +++ b/src/sbin/bcfg2-crypt @@ -9,59 +9,48 @@ import lxml.etree import Bcfg2.Logger import Bcfg2.Options from Bcfg2.Server import XMLParser -from Bcfg2.Compat import input +from Bcfg2.Compat import input # pylint: disable=W0622 try: import Bcfg2.Encryption except ImportError: - err = sys.exc_info()[1] - print("Could not import %s. Is M2Crypto installed?" % err) + print("Could not import %s. Is M2Crypto installed?" % sys.exc_info()[1]) raise SystemExit(1) -LOGGER = None - -def get_logger(verbose=0): - """ set up logging according to the verbose level given on the - command line """ - global LOGGER - if LOGGER is None: - LOGGER = logging.getLogger(sys.argv[0]) - stderr = logging.StreamHandler() - if verbose: - level = logging.DEBUG - else: - level = logging.WARNING - LOGGER.setLevel(level) - LOGGER.addHandler(stderr) - syslog = logging.handlers.SysLogHandler("/dev/log") - syslog.setFormatter(logging.Formatter("%(name)s: %(message)s")) - LOGGER.addHandler(syslog) - return LOGGER - class EncryptionChunkingError(Exception): + """ error raised when Encryptor cannot break a file up into chunks + to be encrypted, or cannot reassemble the chunks """ pass class Encryptor(object): + """ Generic encryptor for all files """ + def __init__(self, setup): self.setup = setup - self.logger = get_logger() self.passphrase = None self.pname = None + self.logger = logging.getLogger(self.__class__.__name__) def get_encrypted_filename(self, plaintext_filename): + """ get the name of the file encrypted data should be written to """ return plaintext_filename def get_plaintext_filename(self, encrypted_filename): + """ get the name of the file decrypted data should be written to """ return encrypted_filename def chunk(self, data): + """ generator to break the file up into smaller chunks that + will each be individually encrypted or decrypted """ yield data - def unchunk(self, data, original): + def unchunk(self, data, original): # pylint: disable=W0613 + """ given chunks of a file, reassemble then into the whole file """ return data[0] def set_passphrase(self): + """ set the passphrase for the current file """ if (not self.setup.cfp.has_section("encryption") or self.setup.cfp.options("encryption") == 0): self.logger.error("No passphrases available in %s" % @@ -98,6 +87,7 @@ class Encryptor(object): return False def encrypt(self, fname): + """ encrypt the given file, returning the encrypted data """ try: plaintext = open(fname).read() except IOError: @@ -124,12 +114,16 @@ class Encryptor(object): return False return self.unchunk(crypted, plaintext) + # pylint: disable=W0613 def _encrypt(self, plaintext, passphrase, name=None): + """ encrypt a single chunk of a file """ return Bcfg2.Encryption.ssl_encrypt( plaintext, passphrase, Bcfg2.Encryption.get_algorithm(self.setup)) + # pylint: enable=W0613 def decrypt(self, fname): + """ decrypt the given file, returning the plaintext data """ try: crypted = open(fname).read() except IOError: @@ -149,12 +143,12 @@ class Encryptor(object): except Bcfg2.Encryption.EVPError: self.logger.info("Could not decrypt %s with the " "specified passphrase" % fname) - return False + continue except: err = sys.exc_info()[1] self.logger.error("Error decrypting %s: %s" % (fname, err)) - return False + continue except TypeError: pchunk = None for pname in self.setup.cfp.options('encryption'): @@ -175,7 +169,7 @@ class Encryptor(object): self.logger.error("Could not decrypt %s with any " "passphrase in %s" % (fname, self.setup['configfile'])) - return False + continue except EncryptionChunkingError: err = sys.exc_info()[1] self.logger.error("Error getting encrypted data from %s: %s" % @@ -190,7 +184,14 @@ class Encryptor(object): (fname, err)) return False + def _decrypt(self, crypted, passphrase): + """ decrypt a single chunk """ + return Bcfg2.Encryption.ssl_decrypt( + crypted, passphrase, + Bcfg2.Encryption.get_algorithm(self.setup)) + def write_encrypted(self, fname, data=None): + """ write encrypted data to disk """ if data is None: data = self.decrypt(fname) new_fname = self.get_encrypted_filename(fname) @@ -210,6 +211,7 @@ class Encryptor(object): return False def write_decrypted(self, fname, data=None): + """ write decrypted data to disk """ if data is None: data = self.decrypt(fname) new_fname = self.get_plaintext_filename(fname) @@ -224,6 +226,7 @@ class Encryptor(object): return False def get_passphrase(self, chunk): + """ get the passphrase for a chunk of a file """ pname = self._get_passphrase(chunk) if not self.pname: if not pname: @@ -246,16 +249,14 @@ class Encryptor(object): (self.pname, pname)) return (passphrase, pname) - def _get_passphrase(self, chunk): + def _get_passphrase(self, chunk): # pylint: disable=W0613 + """ get the passphrase for a chunk of a file """ return None - def _decrypt(self, crypted, passphrase): - return Bcfg2.Encryption.ssl_decrypt( - crypted, passphrase, - Bcfg2.Encryption.get_algorithm(self.setup)) - class CfgEncryptor(Encryptor): + """ encryptor class for Cfg files """ + def get_encrypted_filename(self, plaintext_filename): return plaintext_filename + ".crypt" @@ -267,6 +268,8 @@ class CfgEncryptor(Encryptor): class PropertiesEncryptor(Encryptor): + """ encryptor class for Properties files """ + def _encrypt(self, plaintext, passphrase, name=None): # plaintext is an lxml.etree._Element if name is None: @@ -347,7 +350,7 @@ class PropertiesEncryptor(Encryptor): return crypted -def main(): +def main(): # pylint: disable=R0912,R0915 optinfo = dict(interactive=Bcfg2.Options.INTERACTIVE) optinfo.update(Bcfg2.Options.CRYPT_OPTIONS) optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS) @@ -360,35 +363,40 @@ def main(): print(setup.hm) raise SystemExit(1) + log_args = dict(to_syslog=setup['syslog'], to_console=logging.WARNING) + if setup['verbose']: + log_args['to_console'] = logging.DEBUG + Bcfg2.Logger.setup_logging('bcfg2-crypt', **log_args) + logger = logging.getLogger('bcfg2-crypt') + if setup['decrypt']: if setup['encrypt']: - print("You cannot specify both --encrypt and --decrypt") + logger.error("You cannot specify both --encrypt and --decrypt") raise SystemExit(1) elif setup['remove']: - print("--remove cannot be used with --decrypt, ignoring") + logger.error("--remove cannot be used with --decrypt, ignoring") setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default elif setup['interactive']: - print("Cannot decrypt interactively") + logger.error("Cannot decrypt interactively") setup['interactive'] = False if setup['cfg']: if setup['properties']: - print("You cannot specify both --cfg and --properties") + logger.error("You cannot specify both --cfg and --properties") raise SystemExit(1) if setup['xpath']: - print("Specifying --xpath with --cfg is nonsensical, ignoring " - "--xpath") + logger.error("Specifying --xpath with --cfg is nonsensical, " + "ignoring --xpath") setup['xpath'] = Bcfg2.Options.CRYPT_XPATH.default if setup['interactive']: - print("You cannot use interactive mode with --cfg, ignoring -I") + logger.error("You cannot use interactive mode with --cfg, " + "ignoring -I") setup['interactive'] = False elif setup['properties']: if setup['remove']: - print("--remove cannot be used with --properties, ignoring") + logger.error("--remove cannot be used with --properties, ignoring") setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default - logger = get_logger(setup['verbose']) - props_crypt = PropertiesEncryptor(setup) cfg_crypt = CfgEncryptor(setup) @@ -456,7 +464,7 @@ def main(): if data is None: data = xform(fname) if not data: - print("Failed to %s %s, skipping" % (xform.__name__, fname)) + logger.error("Failed to %s %s, skipping" % (xform.__name__, fname)) continue if setup['crypt_stdout']: if len(setup['args']) > 1: diff --git a/src/sbin/bcfg2-lint b/src/sbin/bcfg2-lint index f1f91b7f4..0321d3045 100755 --- a/src/sbin/bcfg2-lint +++ b/src/sbin/bcfg2-lint @@ -9,108 +9,89 @@ import Bcfg2.Logger import Bcfg2.Options import Bcfg2.Server.Core import Bcfg2.Server.Lint -# Compatibility imports from Bcfg2.Compat import ConfigParser -logger = logging.getLogger('bcfg2-lint') +LOGGER = logging.getLogger('bcfg2-lint') -def run_serverless_plugins(plugins, config=None, setup=None, errorhandler=None): - logger.debug("Running serverless plugins") + +def run_serverless_plugins(plugins, setup=None, errorhandler=None, files=None): + """ Run serverless plugins """ + LOGGER.debug("Running serverless plugins") for plugin_name, plugin in list(plugins.items()): run_plugin(plugin, plugin_name, errorhandler=errorhandler, - setup=setup, config=config, files=files) + setup=setup, files=files) + -def run_server_plugins(plugins, config=None, setup=None, errorhandler=None): +def run_server_plugins(plugins, setup=None, errorhandler=None, files=None): + """ run plugins that require a running server to run """ core = load_server(setup) - logger.debug("Running server plugins") + LOGGER.debug("Running server plugins") for plugin_name, plugin in list(plugins.items()): run_plugin(plugin, plugin_name, args=[core], errorhandler=errorhandler, - setup=setup, config=config, files=files) + setup=setup, files=files) + def run_plugin(plugin, plugin_name, setup=None, errorhandler=None, - args=None, config=None, files=None): - logger.debug(" Running %s" % plugin_name) + args=None, files=None): + """ run a single plugin, server-ful or serverless. """ + LOGGER.debug(" Running %s" % plugin_name) if args is None: args = [] if errorhandler is None: - errorhandler = get_errorhandler(config) + errorhandler = get_errorhandler(setup) - if config is not None and config.has_section(plugin_name): + if setup is not None and setup.cfp.has_section(plugin_name): arg = setup - for key, val in config.items(plugin_name): + for key, val in setup.cfp.items(plugin_name): arg[key] = val args.append(arg) else: args.append(setup) - - # older versions of python do not support mixing *-magic and - # non-*-magic (e.g., "plugin(*args, files=files)", so we do this - # all with *-magic - kwargs = dict(files=files, errorhandler=errorhandler) - - return plugin(*args, **kwargs).Run() - -def get_errorhandler(config): + + return plugin(*args, files=files, errorhandler=errorhandler).Run() + + +def get_errorhandler(setup): """ get a Bcfg2.Server.Lint.ErrorHandler object """ - if config.has_section("errors"): - conf = dict(config.items("errors")) + if setup.cfp.has_section("errors"): + conf = dict(setup.cfp.items("errors")) else: conf = None return Bcfg2.Server.Lint.ErrorHandler(config=conf) + def load_server(setup): """ load server """ core = Bcfg2.Server.Core.BaseCore(setup) core.fam.handle_events_in_interval(4) return core + def load_plugin(module, obj_name=None): + """ load a single plugin """ parts = module.split(".") if obj_name is None: obj_name = parts[-1] mod = __import__(module) - for p in parts[1:]: - mod = getattr(mod, p) + for part in parts[1:]: + mod = getattr(mod, part) return getattr(mod, obj_name) -if __name__ == '__main__': - optinfo = dict(config=Bcfg2.Options.LINT_CONFIG, - showerrors=Bcfg2.Options.LINT_SHOW_ERRORS, - stdin=Bcfg2.Options.LINT_FILES_ON_STDIN, - schema=Bcfg2.Options.SCHEMA_PATH, - plugins=Bcfg2.Options.SERVER_PLUGINS) - optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS) - optinfo.update(Bcfg2.Options.SERVER_COMMON_OPTIONS) - setup = Bcfg2.Options.OptionParser(optinfo) - setup.parse(sys.argv[1:]) - - log_args = dict(to_syslog=setup['syslog'], to_console=logging.WARNING) - if setup['verbose']: - log_args['to_console'] = logging.DEBUG - Bcfg2.Logger.setup_logging('bcfg2-info', **log_args) - - config = ConfigParser.SafeConfigParser() - config.read(setup['configfile']) - config.read(setup['config']) - # get list of plugins to run +def load_plugins(setup): + """ get list of plugins to run """ if setup['args']: plugin_list = setup['args'] elif "bcfg2-repo-validate" in sys.argv[0]: plugin_list = 'Duplicates,RequiredAttrs,Validate'.split(',') else: try: - plugin_list = config.get('lint', 'plugins').split(',') + plugin_list = setup.cfp.get('lint', 'plugins').split(',') except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): plugin_list = Bcfg2.Server.Lint.__all__ - if setup['stdin']: - files = [s.strip() for s in sys.stdin.readlines()] - else: - files = None - allplugins = dict() for plugin in plugin_list: try: @@ -121,12 +102,12 @@ if __name__ == '__main__': load_plugin("Bcfg2.Server.Plugins." + plugin, obj_name=plugin + "Lint") except (ImportError, AttributeError): - err = sys.exc_info()[1] - logger.error("Failed to load plugin %s: %s" % - (plugin + "Lint", err)) + err = sys.exc_info()[1] + LOGGER.error("Failed to load plugin %s: %s" % + (plugin + "Lint", err)) except AttributeError: err = sys.exc_info()[1] - logger.error("Failed to load plugin %s: %s" % (plugin, err)) + LOGGER.error("Failed to load plugin %s: %s" % (plugin, err)) serverplugins = dict() serverlessplugins = dict() @@ -136,21 +117,47 @@ if __name__ == '__main__': serverplugins[plugin_name] = plugin else: serverlessplugins[plugin_name] = plugin + return (serverlessplugins, serverplugins) + + +def main(): + optinfo = dict(lint_config=Bcfg2.Options.LINT_CONFIG, + showerrors=Bcfg2.Options.LINT_SHOW_ERRORS, + stdin=Bcfg2.Options.LINT_FILES_ON_STDIN, + schema=Bcfg2.Options.SCHEMA_PATH, + plugins=Bcfg2.Options.SERVER_PLUGINS) + optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS) + optinfo.update(Bcfg2.Options.SERVER_COMMON_OPTIONS) + setup = Bcfg2.Options.OptionParser(optinfo) + setup.parse(sys.argv[1:]) + + log_args = dict(to_syslog=setup['syslog'], to_console=logging.WARNING) + if setup['verbose']: + log_args['to_console'] = logging.DEBUG + Bcfg2.Logger.setup_logging('bcfg2-info', **log_args) - errorhandler = get_errorhandler(config) + setup.cfp.read(setup['lint_config']) + + if setup['stdin']: + files = [s.strip() for s in sys.stdin.readlines()] + else: + files = None + + (serverlessplugins, serverplugins) = load_plugins(setup) + + errorhandler = get_errorhandler(setup) if setup['showerrors']: for plugin in serverplugins.values() + serverlessplugins.values(): errorhandler.RegisterErrors(getattr(plugin, 'Errors')()) print("%-35s %-35s" % ("Error name", "Handler")) - for err, handler in errorhandler._handlers.items(): + for err, handler in errorhandler.errors.items(): print("%-35s %-35s" % (err, handler.__name__)) raise SystemExit(0) - run_serverless_plugins(serverlessplugins, - errorhandler=errorhandler, - config=config, setup=setup) + run_serverless_plugins(serverlessplugins, errorhandler=errorhandler, + setup=setup, files=files) if serverplugins: if errorhandler.errors: @@ -166,7 +173,7 @@ if __name__ == '__main__': "plugins") else: run_server_plugins(serverplugins, errorhandler=errorhandler, - config=config, setup=setup) + setup=setup, files=files) if errorhandler.errors or errorhandler.warnings or setup['verbose']: print("%d errors" % errorhandler.errors) @@ -176,3 +183,6 @@ if __name__ == '__main__': raise SystemExit(2) elif errorhandler.warnings: raise SystemExit(3) + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/sbin/bcfg2-test b/src/sbin/bcfg2-test index 8323eeb22..815d2740c 100755 --- a/src/sbin/bcfg2-test +++ b/src/sbin/bcfg2-test @@ -32,6 +32,8 @@ class ClientTest(TestCase): self.ignore = ignore def ignore_entry(self, tag, name): + """ return True if an error on a given entry should be ignored + """ if tag in self.ignore: if name in self.ignore[tag]: return True @@ -43,6 +45,7 @@ class ClientTest(TestCase): return False def runTest(self): + """ run this individual test """ config = self.bcfg2_core.BuildConfiguration(self.client) failures = [] @@ -86,6 +89,7 @@ def main(): ignore[tag] = [name] def run_tests(): + """ Run the test suite """ core.fam.handle_events_in_interval(0.1) if setup['args']: -- cgit v1.2.3-1-g7c22