#!/usr/bin/env python """ helper for encrypting/decrypting Cfg and Properties files """ import os import sys import copy import select import logging import lxml.etree import Bcfg2.Logger import Bcfg2.Options from Bcfg2.Server import XMLParser from Bcfg2.Compat import input # pylint: disable=W0622 try: import Bcfg2.Encryption except ImportError: print("Could not import %s. Is M2Crypto installed?" % sys.exc_info()[1]) raise SystemExit(1) def print_xml(element, keep_text=False): """ Render an XML element for error output. This prefixes the line number and removes children for nicer display. :param element: The element to render :type element: lxml.etree._Element :param keep_text: Do not discard text content from the element for display :type keep_text: boolean """ xml = None if len(element) or element.text: el = copy.copy(element) if el.text and not keep_text: el.text = '...' for child in el.iterchildren(): el.remove(child) xml = lxml.etree.tostring( el, xml_declaration=False).decode("UTF-8").strip() else: xml = lxml.etree.tostring( element, xml_declaration=False).decode("UTF-8").strip() return "%s (line %s)" % (xml, element.sourceline) class PassphraseError(Exception): """ Exception raised when there's a problem determining the passphrase to encrypt or decrypt with """ class DecryptError(Exception): """ Exception raised when decryption fails. """ class EncryptError(Exception): """ Exception raised when encryption fails. """ class CryptoTool(object): """ Generic decryption/encryption interface base object """ def __init__(self, filename, setup): self.setup = setup self.logger = logging.getLogger(self.__class__.__name__) self.passphrases = Bcfg2.Encryption.get_passphrases(self.setup) self.filename = filename try: self.data = open(self.filename).read() except IOError: err = sys.exc_info()[1] self.logger.error("Error reading %s, skipping: %s" % (filename, err)) return False self.pname, self.passphrase = self._get_passphrase() def _get_passphrase(self): """ get the passphrase for the current file """ if (not self.setup.cfp.has_section(Bcfg2.Encryption.CFG_SECTION) or len(Bcfg2.Encryption.get_passphrases(self.setup)) == 0): raise PassphraseError("No passphrases available in %s" % self.setup['configfile']) pname = None if self.setup['passphrase']: pname = self.setup['passphrase'] if pname: if self.setup.cfp.has_option(Bcfg2.Encryption.CFG_SECTION, pname): passphrase = self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION, pname) self.logger.debug("Using passphrase %s specified on command " "line" % pname) return (pname, passphrase) else: raise PassphraseError("Could not find passphrase %s in %s" % (pname, self.setup['configfile'])) else: pnames = Bcfg2.Encryption.get_passphrases(self.setup) if len(pnames) == 1: pname = pnames.keys()[0] passphrase = pnames[pname] self.logger.info("Using passphrase %s" % pname) return (pname, passphrase) elif len(pnames) > 1: return (None, None) raise PassphraseError("No passphrase could be determined") def get_destination_filename(self, original_filename): """ Get the filename where data should be written """ return original_filename def write(self, data): """ write data to disk """ new_fname = self.get_destination_filename(self.filename) try: self._write(new_fname, data) self.logger.info("Wrote data to %s" % new_fname) return True except IOError: err = sys.exc_info()[1] self.logger.error("Error writing data from %s to %s: %s" % (self.filename, new_fname, err)) return False def _write(self, filename, data): """ Perform the actual write of data. This is separate from :func:`CryptoTool.write` so it can be easily overridden. """ open(filename, "wb").write(data) class Decryptor(CryptoTool): """ Decryptor interface """ def decrypt(self): """ decrypt the file, returning the encrypted data """ raise NotImplementedError class Encryptor(CryptoTool): """ encryptor interface """ def encrypt(self): """ encrypt the file, returning the encrypted data """ raise NotImplementedError class CfgEncryptor(Encryptor): """ encryptor class for Cfg files """ def __init__(self, filename, setup): Encryptor.__init__(self, filename, setup) if self.passphrase is None: raise PassphraseError("Multiple passphrases found in %s, " "specify one on the command line with -p" % self.setup['configfile']) def encrypt(self): return Bcfg2.Encryption.ssl_encrypt( self.data, self.passphrase, Bcfg2.Encryption.get_algorithm(self.setup)) def get_destination_filename(self, original_filename): return original_filename + ".crypt" class CfgDecryptor(Decryptor): """ Decrypt Cfg files """ def decrypt(self): """ decrypt the given file, returning the plaintext data """ if self.passphrase: try: return Bcfg2.Encryption.ssl_decrypt( self.data, self.passphrase, Bcfg2.Encryption.get_algorithm(self.setup)) except Bcfg2.Encryption.EVPError: raise DecryptError("Could not decrypt %s with the " "specified passphrase" % self.filename) except: raise DecryptError("Error decrypting %s: %s" % (self.filename, sys.exc_info()[1])) else: # no passphrase given, brute force try: return Bcfg2.Encryption.bruteforce_decrypt( self.data, passphrases=self.passphrases.values(), algorithm=Bcfg2.Encryption.get_algorithm(self.setup)) except Bcfg2.Encryption.EVPError: raise DecryptError("Could not decrypt %s with any passphrase" % self.filename) def get_destination_filename(self, original_filename): if original_filename.endswith(".crypt"): return original_filename[:-6] else: return Decryptor.get_plaintext_filename(self, original_filename) class PropertiesCryptoMixin(object): """ Mixin to provide some common methods for Properties crypto """ default_xpath = '//*[@encrypted]' def _get_elements(self, xdata): """ Get the list of elements to encrypt or decrypt """ if self.setup['xpath']: elements = xdata.xpath(self.setup['xpath']) if not elements: self.logger.warning("XPath expression %s matched no " "elements" % self.setup['xpath']) else: elements = xdata.xpath(self.default_xpath) if not elements: elements = list(xdata.getiterator(tag=lxml.etree.Element)) # filter out elements without text data for el in elements[:]: if not el.text: elements.remove(el) if self.setup['interactive']: for element in elements[:]: if len(element): elt = copy.copy(element) for child in elt.iterchildren(): elt.remove(child) else: elt = element print(lxml.etree.tostring( elt, xml_declaration=False).decode("UTF-8").strip()) # flush input buffer while len(select.select([sys.stdin.fileno()], [], [], 0.0)[0]) > 0: os.read(sys.stdin.fileno(), 4096) ans = input("Encrypt this element? [y/N] ") if not ans.lower().startswith("y"): elements.remove(element) return elements def _get_element_passphrase(self, element): """ Get the passphrase to use to encrypt or decrypt a given element """ pname = element.get("encrypted") if pname in self.passphrases: passphrase = self.passphrases[pname] elif self.passphrase: if pname: self.logger.warning("Passphrase %s not found in %s, " "using passphrase given on command line" % (pname, self.setup['configfile'])) passphrase = self.passphrase pname = self.pname else: raise PassphraseError("Multiple passphrases found in %s, " "specify one on the command line with -p" % self.setup['configfile']) return (pname, passphrase) def _write(self, filename, data): """ Write the data """ data.getroottree().write(filename, xml_declaration=False, pretty_print=True) class PropertiesEncryptor(Encryptor, PropertiesCryptoMixin): """ encryptor class for Properties files """ def encrypt(self): xdata = lxml.etree.XML(self.data, parser=XMLParser) for elt in self._get_elements(xdata): try: pname, passphrase = self._get_element_passphrase(elt) except PassphraseError: raise EncryptError(str(sys.exc_info()[1])) self.logger.debug("Encrypting %s" % print_xml(elt)) elt.text = Bcfg2.Encryption.ssl_encrypt( elt.text, passphrase, Bcfg2.Encryption.get_algorithm(self.setup)).strip() elt.set("encrypted", pname) return xdata def _write(self, filename, data): PropertiesCryptoMixin._write(self, filename, data) class PropertiesDecryptor(Decryptor, PropertiesCryptoMixin): """ decryptor class for Properties files """ def decrypt(self): decrypted_any = False xdata = lxml.etree.XML(self.data, parser=XMLParser) for elt in self._get_elements(xdata): try: pname, passphrase = self._get_element_passphrase(elt) except PassphraseError: raise DecryptError(str(sys.exc_info()[1])) self.logger.debug("Decrypting %s" % print_xml(elt)) try: decrypted = Bcfg2.Encryption.ssl_decrypt( elt.text, passphrase, Bcfg2.Encryption.get_algorithm(self.setup)).strip() decrypted_any = True except (Bcfg2.Encryption.EVPError, TypeError): self.logger.error("Could not decrypt %s, skipping" % print_xml(elt)) continue try: elt.text = decrypted.encode('ascii', 'xmlcharrefreplace') elt.set("encrypted", pname) except UnicodeDecodeError: # we managed to decrypt the value, but it contains # content that can't even be encoded into xml # entities. what probably happened here is that we # coincidentally could decrypt a value encrypted with # a different key, and wound up with gibberish. self.logger.warning("Decrypted %s to gibberish, skipping" % elt.tag) if decrypted_any: return xdata else: raise DecryptError("Failed to decrypt any data in %s" % self.filename) def _write(self, filename, data): PropertiesCryptoMixin._write(self, filename, data) def main(): # pylint: disable=R0912,R0915 optinfo = dict(interactive=Bcfg2.Options.INTERACTIVE) optinfo.update(Bcfg2.Options.CRYPT_OPTIONS) optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS) setup = Bcfg2.Options.OptionParser(optinfo) setup.hm = " bcfg2-crypt [options] \nOptions:\n%s" % \ setup.buildHelpMessage() setup.parse(sys.argv[1:]) if not setup['args']: print(setup.hm) raise SystemExit(1) log_args = dict(to_syslog=setup['syslog'], to_console=logging.WARNING) if setup['verbose']: log_args['to_console'] = logging.DEBUG Bcfg2.Logger.setup_logging('bcfg2-crypt', **log_args) logger = logging.getLogger('bcfg2-crypt') if setup['decrypt']: if setup['encrypt']: logger.error("You cannot specify both --encrypt and --decrypt") raise SystemExit(1) elif setup['remove']: logger.error("--remove cannot be used with --decrypt, ignoring") setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default elif setup['interactive']: logger.error("Cannot decrypt interactively") setup['interactive'] = False if setup['cfg']: if setup['properties']: logger.error("You cannot specify both --cfg and --properties") raise SystemExit(1) if setup['xpath']: logger.error("Specifying --xpath with --cfg is nonsensical, " "ignoring --xpath") setup['xpath'] = Bcfg2.Options.CRYPT_XPATH.default if setup['interactive']: logger.error("You cannot use interactive mode with --cfg, " "ignoring -I") setup['interactive'] = False elif setup['properties']: if setup['remove']: logger.error("--remove cannot be used with --properties, ignoring") setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default for fname in setup['args']: if not os.path.exists(fname): logger.error("%s does not exist, skipping" % fname) continue # figure out if we need to encrypt this as a Properties file # or as a Cfg file props = False if setup['properties']: props = True elif setup['cfg']: props = False elif fname.endswith(".xml"): try: xroot = lxml.etree.parse(fname).getroot() if xroot.tag == "Properties": props = True else: props = False except IOError: err = sys.exc_info()[1] logger.error("Error reading %s, skipping: %s" % (fname, err)) continue except lxml.etree.XMLSyntaxError: props = False else: props = False if props: if setup['remove']: logger.info("Cannot use --remove with Properties file %s, " "ignoring for this file" % fname) tools = (PropertiesEncryptor, PropertiesDecryptor) else: if setup['xpath']: logger.info("Cannot use xpath with Cfg file %s, ignoring " "xpath for this file" % fname) if setup['interactive']: logger.info("Cannot use interactive mode with Cfg file %s, " "ignoring -I for this file" % fname) tools = (CfgEncryptor, CfgDecryptor) data = None mode = None if setup['encrypt']: try: tool = tools[0](fname, setup) except PassphraseError: logger.error(str(sys.exc_info()[1])) return 2 mode = "encrypt" elif setup['decrypt']: try: tool = tools[1](fname, setup) except PassphraseError: logger.error(str(sys.exc_info()[1])) return 2 mode = "decrypt" else: logger.info("Neither --encrypt nor --decrypt specified, " "determining mode") try: tool = tools[1](fname, setup) except PassphraseError: logger.error(str(sys.exc_info()[1])) return 2 try: data = tool.decrypt() mode = "decrypt" except DecryptError: logger.info("Failed to decrypt %s, trying encryption" % fname) try: tool = tools[0](fname, setup) except PassphraseError: logger.error(str(sys.exc_info()[1])) return 2 mode = "encrypt" if data is None: try: data = getattr(tool, mode)() except (EncryptError, DecryptError): logger.error("Failed to %s %s, skipping: %s" % (mode, fname, sys.exc_info()[1])) continue if setup['crypt_stdout']: if len(setup['args']) > 1: print("----- %s -----" % fname) print(data) if len(setup['args']) > 1: print("") else: tool.write(data) if (setup['remove'] and tool.get_destination_filename(fname) != fname): try: os.unlink(fname) except IOError: err = sys.exc_info()[1] logger.error("Error removing %s: %s" % (fname, err)) continue if __name__ == '__main__': sys.exit(main())