diff options
Diffstat (limited to 'src/sbin/bcfg2-crypt')
-rwxr-xr-x | src/sbin/bcfg2-crypt | 479 |
1 files changed, 2 insertions, 477 deletions
diff --git a/src/sbin/bcfg2-crypt b/src/sbin/bcfg2-crypt index 2212c2360..26d5eedf1 100755 --- a/src/sbin/bcfg2-crypt +++ b/src/sbin/bcfg2-crypt @@ -1,483 +1,8 @@ #!/usr/bin/env python """ helper for encrypting/decrypting Cfg and Properties files """ -import os import sys -import copy -import select -import logging -import lxml.etree -import Bcfg2.Logger -import Bcfg2.Options -from Bcfg2.Server import XMLParser -from Bcfg2.Compat import input # pylint: disable=W0622 -try: - import Bcfg2.Encryption -except ImportError: - print("Could not import %s. Is M2Crypto installed?" % sys.exc_info()[1]) - raise SystemExit(1) - - -def print_xml(element, keep_text=False): - """ Render an XML element for error output. This prefixes the - line number and removes children for nicer display. - - :param element: The element to render - :type element: lxml.etree._Element - :param keep_text: Do not discard text content from the element for - display - :type keep_text: boolean - """ - xml = None - if len(element) or element.text: - el = copy.copy(element) - if el.text and not keep_text: - el.text = '...' - for child in el.iterchildren(): - el.remove(child) - xml = lxml.etree.tostring( - el, - xml_declaration=False).decode("UTF-8").strip() - else: - xml = lxml.etree.tostring( - element, - xml_declaration=False).decode("UTF-8").strip() - return "%s (line %s)" % (xml, element.sourceline) - - -class PassphraseError(Exception): - """ Exception raised when there's a problem determining the - passphrase to encrypt or decrypt with """ - - -class DecryptError(Exception): - """ Exception raised when decryption fails. """ - - -class EncryptError(Exception): - """ Exception raised when encryption fails. """ - - -class CryptoTool(object): - """ Generic decryption/encryption interface base object """ - def __init__(self, filename, setup): - self.setup = setup - self.logger = logging.getLogger(self.__class__.__name__) - self.passphrases = Bcfg2.Encryption.get_passphrases(self.setup) - - self.filename = filename - try: - self.data = open(self.filename).read() - except IOError: - err = sys.exc_info()[1] - self.logger.error("Error reading %s, skipping: %s" % (filename, - err)) - return False - - self.pname, self.passphrase = self._get_passphrase() - - def _get_passphrase(self): - """ get the passphrase for the current file """ - if (not self.setup.cfp.has_section(Bcfg2.Encryption.CFG_SECTION) or - len(Bcfg2.Encryption.get_passphrases(self.setup)) == 0): - raise PassphraseError("No passphrases available in %s" % - self.setup['configfile']) - - pname = None - if self.setup['passphrase']: - pname = self.setup['passphrase'] - - if pname: - if self.setup.cfp.has_option(Bcfg2.Encryption.CFG_SECTION, - pname): - passphrase = self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION, - pname) - self.logger.debug("Using passphrase %s specified on command " - "line" % pname) - return (pname, passphrase) - else: - raise PassphraseError("Could not find passphrase %s in %s" % - (pname, self.setup['configfile'])) - else: - pnames = Bcfg2.Encryption.get_passphrases(self.setup) - if len(pnames) == 1: - pname = pnames.keys()[0] - passphrase = pnames[pname] - self.logger.info("Using passphrase %s" % pname) - return (pname, passphrase) - elif len(pnames) > 1: - return (None, None) - raise PassphraseError("No passphrase could be determined") - - def get_destination_filename(self, original_filename): - """ Get the filename where data should be written """ - return original_filename - - def write(self, data): - """ write data to disk """ - new_fname = self.get_destination_filename(self.filename) - try: - self._write(new_fname, data) - self.logger.info("Wrote data to %s" % new_fname) - return True - except IOError: - err = sys.exc_info()[1] - self.logger.error("Error writing data from %s to %s: %s" % - (self.filename, new_fname, err)) - return False - - def _write(self, filename, data): - """ Perform the actual write of data. This is separate from - :func:`CryptoTool.write` so it can be easily - overridden. """ - open(filename, "wb").write(data) - - -class Decryptor(CryptoTool): - """ Decryptor interface """ - def decrypt(self): - """ decrypt the file, returning the encrypted data """ - raise NotImplementedError - - -class Encryptor(CryptoTool): - """ encryptor interface """ - def encrypt(self): - """ encrypt the file, returning the encrypted data """ - raise NotImplementedError - - -class CfgEncryptor(Encryptor): - """ encryptor class for Cfg files """ - - def __init__(self, filename, setup): - Encryptor.__init__(self, filename, setup) - if self.passphrase is None: - raise PassphraseError("Multiple passphrases found in %s, " - "specify one on the command line with -p" % - self.setup['configfile']) - - def encrypt(self): - return Bcfg2.Encryption.ssl_encrypt( - self.data, self.passphrase, - Bcfg2.Encryption.get_algorithm(self.setup)) - - def get_destination_filename(self, original_filename): - return original_filename + ".crypt" - - -class CfgDecryptor(Decryptor): - """ Decrypt Cfg files """ - - def decrypt(self): - """ decrypt the given file, returning the plaintext data """ - if self.passphrase: - try: - return Bcfg2.Encryption.ssl_decrypt( - self.data, self.passphrase, - Bcfg2.Encryption.get_algorithm(self.setup)) - except Bcfg2.Encryption.EVPError: - raise DecryptError("Could not decrypt %s with the " - "specified passphrase" % self.filename) - except: - raise DecryptError("Error decrypting %s: %s" % - (self.filename, sys.exc_info()[1])) - else: # no passphrase given, brute force - try: - return Bcfg2.Encryption.bruteforce_decrypt( - self.data, passphrases=self.passphrases.values(), - algorithm=Bcfg2.Encryption.get_algorithm(self.setup)) - except Bcfg2.Encryption.EVPError: - raise DecryptError("Could not decrypt %s with any passphrase" % - self.filename) - - def get_destination_filename(self, original_filename): - if original_filename.endswith(".crypt"): - return original_filename[:-6] - else: - return Decryptor.get_plaintext_filename(self, original_filename) - - -class PropertiesCryptoMixin(object): - """ Mixin to provide some common methods for Properties crypto """ - default_xpath = '//*[@encrypted]' - - def _get_elements(self, xdata): - """ Get the list of elements to encrypt or decrypt """ - if self.setup['xpath']: - elements = xdata.xpath(self.setup['xpath']) - if not elements: - self.logger.warning("XPath expression %s matched no " - "elements" % self.setup['xpath']) - else: - elements = xdata.xpath(self.default_xpath) - if not elements: - elements = list(xdata.getiterator(tag=lxml.etree.Element)) - - # filter out elements without text data - for el in elements[:]: - if not el.text: - elements.remove(el) - - if self.setup['interactive']: - for element in elements[:]: - if len(element): - elt = copy.copy(element) - for child in elt.iterchildren(): - elt.remove(child) - else: - elt = element - print(lxml.etree.tostring( - elt, - xml_declaration=False).decode("UTF-8").strip()) - # flush input buffer - while len(select.select([sys.stdin.fileno()], [], [], - 0.0)[0]) > 0: - os.read(sys.stdin.fileno(), 4096) - ans = input("Encrypt this element? [y/N] ") - if not ans.lower().startswith("y"): - elements.remove(element) - return elements - - def _get_element_passphrase(self, element): - """ Get the passphrase to use to encrypt or decrypt a given - element """ - pname = element.get("encrypted") - if pname in self.passphrases: - passphrase = self.passphrases[pname] - elif self.passphrase: - if pname: - self.logger.warning("Passphrase %s not found in %s, " - "using passphrase given on command line" - % (pname, self.setup['configfile'])) - passphrase = self.passphrase - pname = self.pname - else: - raise PassphraseError("Multiple passphrases found in %s, " - "specify one on the command line with -p" % - self.setup['configfile']) - return (pname, passphrase) - - def _write(self, filename, data): - """ Write the data """ - data.getroottree().write(filename, - xml_declaration=False, - pretty_print=True) - - -class PropertiesEncryptor(Encryptor, PropertiesCryptoMixin): - """ encryptor class for Properties files """ - - def encrypt(self): - xdata = lxml.etree.XML(self.data, parser=XMLParser) - for elt in self._get_elements(xdata): - try: - pname, passphrase = self._get_element_passphrase(elt) - except PassphraseError: - raise EncryptError(str(sys.exc_info()[1])) - self.logger.debug("Encrypting %s" % print_xml(elt)) - elt.text = Bcfg2.Encryption.ssl_encrypt( - elt.text, passphrase, - Bcfg2.Encryption.get_algorithm(self.setup)).strip() - elt.set("encrypted", pname) - return xdata - - def _write(self, filename, data): - PropertiesCryptoMixin._write(self, filename, data) - - -class PropertiesDecryptor(Decryptor, PropertiesCryptoMixin): - """ decryptor class for Properties files """ - - def decrypt(self): - decrypted_any = False - xdata = lxml.etree.XML(self.data, parser=XMLParser) - for elt in self._get_elements(xdata): - try: - pname, passphrase = self._get_element_passphrase(elt) - except PassphraseError: - raise DecryptError(str(sys.exc_info()[1])) - self.logger.debug("Decrypting %s" % print_xml(elt)) - try: - decrypted = Bcfg2.Encryption.ssl_decrypt( - elt.text, passphrase, - Bcfg2.Encryption.get_algorithm(self.setup)).strip() - decrypted_any = True - except (Bcfg2.Encryption.EVPError, TypeError): - self.logger.error("Could not decrypt %s, skipping" % - print_xml(elt)) - continue - try: - elt.text = decrypted.encode('ascii', 'xmlcharrefreplace') - elt.set("encrypted", pname) - except UnicodeDecodeError: - # we managed to decrypt the value, but it contains - # content that can't even be encoded into xml - # entities. what probably happened here is that we - # coincidentally could decrypt a value encrypted with - # a different key, and wound up with gibberish. - self.logger.warning("Decrypted %s to gibberish, skipping" % - elt.tag) - if decrypted_any: - return xdata - else: - raise DecryptError("Failed to decrypt any data in %s" % - self.filename) - - def _write(self, filename, data): - PropertiesCryptoMixin._write(self, filename, data) - - -def main(): # pylint: disable=R0912,R0915 - optinfo = dict(interactive=Bcfg2.Options.INTERACTIVE) - optinfo.update(Bcfg2.Options.CRYPT_OPTIONS) - optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS) - setup = Bcfg2.Options.OptionParser(optinfo) - setup.hm = " bcfg2-crypt [options] <filename>\nOptions:\n%s" % \ - setup.buildHelpMessage() - setup.parse(sys.argv[1:]) - - if not setup['args']: - print(setup.hm) - raise SystemExit(1) - - log_args = dict(to_syslog=setup['syslog'], to_console=logging.WARNING) - if setup['verbose']: - log_args['to_console'] = logging.DEBUG - Bcfg2.Logger.setup_logging('bcfg2-crypt', **log_args) - logger = logging.getLogger('bcfg2-crypt') - - if setup['decrypt']: - if setup['encrypt']: - logger.error("You cannot specify both --encrypt and --decrypt") - raise SystemExit(1) - elif setup['remove']: - logger.error("--remove cannot be used with --decrypt, ignoring") - setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default - elif setup['interactive']: - logger.error("Cannot decrypt interactively") - setup['interactive'] = False - - if setup['cfg']: - if setup['properties']: - logger.error("You cannot specify both --cfg and --properties") - raise SystemExit(1) - if setup['xpath']: - logger.error("Specifying --xpath with --cfg is nonsensical, " - "ignoring --xpath") - setup['xpath'] = Bcfg2.Options.CRYPT_XPATH.default - if setup['interactive']: - logger.error("You cannot use interactive mode with --cfg, " - "ignoring -I") - setup['interactive'] = False - elif setup['properties']: - if setup['remove']: - logger.error("--remove cannot be used with --properties, ignoring") - setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default - - for fname in setup['args']: - if not os.path.exists(fname): - logger.error("%s does not exist, skipping" % fname) - continue - - # figure out if we need to encrypt this as a Properties file - # or as a Cfg file - props = False - if setup['properties']: - props = True - elif setup['cfg']: - props = False - elif fname.endswith(".xml"): - try: - xroot = lxml.etree.parse(fname).getroot() - if xroot.tag == "Properties": - props = True - else: - props = False - except IOError: - err = sys.exc_info()[1] - logger.error("Error reading %s, skipping: %s" % (fname, err)) - continue - except lxml.etree.XMLSyntaxError: - props = False - else: - props = False - - if props: - if setup['remove']: - logger.info("Cannot use --remove with Properties file %s, " - "ignoring for this file" % fname) - tools = (PropertiesEncryptor, PropertiesDecryptor) - else: - if setup['xpath']: - logger.info("Cannot use xpath with Cfg file %s, ignoring " - "xpath for this file" % fname) - if setup['interactive']: - logger.info("Cannot use interactive mode with Cfg file %s, " - "ignoring -I for this file" % fname) - tools = (CfgEncryptor, CfgDecryptor) - - data = None - mode = None - if setup['encrypt']: - try: - tool = tools[0](fname, setup) - except PassphraseError: - logger.error(str(sys.exc_info()[1])) - return 2 - mode = "encrypt" - elif setup['decrypt']: - try: - tool = tools[1](fname, setup) - except PassphraseError: - logger.error(str(sys.exc_info()[1])) - return 2 - mode = "decrypt" - else: - logger.info("Neither --encrypt nor --decrypt specified, " - "determining mode") - try: - tool = tools[1](fname, setup) - except PassphraseError: - logger.error(str(sys.exc_info()[1])) - return 2 - - try: - data = tool.decrypt() - mode = "decrypt" - except DecryptError: - logger.info("Failed to decrypt %s, trying encryption" % fname) - try: - tool = tools[0](fname, setup) - except PassphraseError: - logger.error(str(sys.exc_info()[1])) - return 2 - mode = "encrypt" - - if data is None: - try: - data = getattr(tool, mode)() - except (EncryptError, DecryptError): - logger.error("Failed to %s %s, skipping: %s" % - (mode, fname, sys.exc_info()[1])) - continue - if setup['crypt_stdout']: - if len(setup['args']) > 1: - print("----- %s -----" % fname) - print(data) - if len(setup['args']) > 1: - print("") - else: - tool.write(data) - - if (setup['remove'] and - tool.get_destination_filename(fname) != fname): - try: - os.unlink(fname) - except IOError: - err = sys.exc_info()[1] - logger.error("Error removing %s: %s" % (fname, err)) - continue +from Bcfg2.Server.Encryption import CLI if __name__ == '__main__': - sys.exit(main()) + sys.exit(CLI().run()) |