From 227a293af2420de367fc34b510a4c7337f18d93f Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 2 Jul 2013 12:09:24 -0400 Subject: Rewrote bcfg2-crypt. This adds several features and generally makes it work a lot more smoothly by not trying to overoptimize and share too much code. You can now brute-force decrypt a Cfg file (i.e., without specifying a passphrase, it will try all of them). You can also now write a Properties file with `encrypted="passphrase_name"` attributes in place, and then encrypt (and decrypt) it in one fell swoop. Various other minor improvements. --- src/sbin/bcfg2-crypt | 535 ++++++++++++++++++++++----------------------------- 1 file changed, 232 insertions(+), 303 deletions(-) (limited to 'src') diff --git a/src/sbin/bcfg2-crypt b/src/sbin/bcfg2-crypt index aad89882f..7102d06a9 100755 --- a/src/sbin/bcfg2-crypt +++ b/src/sbin/bcfg2-crypt @@ -18,291 +18,167 @@ except ImportError: raise SystemExit(1) -class EncryptionChunkingError(Exception): - """ error raised when Encryptor cannot break a file up into chunks - to be encrypted, or cannot reassemble the chunks """ - pass +class PassphraseError(Exception): + """ Exception raised when there's a problem determining the + passphrase to encrypt or decrypt with """ -class Encryptor(object): - """ Generic encryptor for all files """ - - def __init__(self, setup): +class CryptoTool(object): + """ Generic decryption/encryption interface base object """ + def __init__(self, filename, setup): self.setup = setup - self.passphrase = None - self.pname = None self.logger = logging.getLogger(self.__class__.__name__) + self.passphrases = Bcfg2.Encryption.get_passphrases(self.setup) - def get_encrypted_filename(self, plaintext_filename): - """ get the name of the file encrypted data should be written to """ - return plaintext_filename - - def get_plaintext_filename(self, encrypted_filename): - """ get the name of the file decrypted data should be written to """ - return encrypted_filename - - def chunk(self, data): - """ generator to break the file up into smaller chunks that - will each be individually encrypted or decrypted """ - yield data - - def unchunk(self, data, original): # pylint: disable=W0613 - """ given chunks of a file, reassemble then into the whole file """ + self.filename = filename try: - return data[0] - except IndexError: - raise EncryptionChunkingError("No data to unchunk") + self.data = open(self.filename).read() + except IOError: + err = sys.exc_info()[1] + self.logger.error("Error reading %s, skipping: %s" % (filename, + err)) + return False - def set_passphrase(self): - """ set the passphrase for the current file """ + self.pname, self.passphrase = self._get_passphrase() + + def _get_passphrase(self): + """ get the passphrase for the current file """ if (not self.setup.cfp.has_section(Bcfg2.Encryption.CFG_SECTION) or len(Bcfg2.Encryption.get_passphrases(self.setup)) == 0): - self.logger.error("No passphrases available in %s" % - self.setup['configfile']) - return False - - if self.passphrase: - self.logger.debug("Using previously determined passphrase %s" % - self.pname) - return True + raise PassphraseError("No passphrases available in %s" % + self.setup['configfile']) + pname = None if self.setup['passphrase']: - self.pname = self.setup['passphrase'] + pname = self.setup['passphrase'] - if self.pname: + if pname: if self.setup.cfp.has_option(Bcfg2.Encryption.CFG_SECTION, - self.pname): - self.passphrase = \ - self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION, - self.pname) + pname): + passphrase = self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION, + pname) self.logger.debug("Using passphrase %s specified on command " - "line" % self.pname) - return True + "line" % pname) + return (pname, passphrase) else: - self.logger.error("Could not find passphrase %s in %s" % - (self.pname, self.setup['configfile'])) - return False + raise PassphraseError("Could not find passphrase %s in %s" % + (pname, self.setup['configfile'])) else: pnames = Bcfg2.Encryption.get_passphrases(self.setup) if len(pnames) == 1: - self.pname = pnames.keys()[0] - self.passphrase = pnames[self.pname] - self.logger.info("Using passphrase %s" % self.pname) - return True + pname = pnames.keys()[0] + passphrase = pnames[pname] + self.logger.info("Using passphrase %s" % pname) + return (pname, passphrase) elif len(pnames) > 1: - self.logger.warning("Multiple passphrases found in %s, " - "specify one on the command line with -p" % - self.setup['configfile']) - self.logger.info("No passphrase could be determined") - return False - - def encrypt(self, fname): - """ encrypt the given file, returning the encrypted data """ + return (None, None) + raise PassphraseError("No passphrase could be determined") + + def get_destination_filename(self, original_filename): + """ Get the filename where data should be written """ + return original_filename + + def write(self, data): + """ write data to disk """ + new_fname = self.get_destination_filename(self.filename) try: - plaintext = open(fname).read() + self._write(new_fname, data) + self.logger.info("Wrote data to %s" % new_fname) + return True except IOError: err = sys.exc_info()[1] - self.logger.error("Error reading %s, skipping: %s" % (fname, err)) + self.logger.error("Error writing data from %s to %s: %s" % + (self.filename, new_fname, err)) return False - if not self.set_passphrase(): - return False + def _write(self, filename, data): + """ Perform the actual write of data. This is separate from + :func:`CryptoTool.write` so it can be easily + overridden. """ + open(filename, "wb").write(data) - crypted = [] - try: - for chunk in self.chunk(plaintext): - try: - passphrase, pname = self.get_passphrase(chunk) - except TypeError: - return False - crypted.append(self._encrypt(chunk, passphrase, name=pname)) - except EncryptionChunkingError: - err = sys.exc_info()[1] - self.logger.error("Error getting data to encrypt from %s: %s" % - (fname, err)) - return False - return self.unchunk(crypted, plaintext) +class Decryptor(CryptoTool): + """ Decryptor interface """ + def decrypt(self): + """ decrypt the file, returning the encrypted data """ + raise NotImplementedError - # pylint: disable=W0613 - def _encrypt(self, plaintext, passphrase, name=None): - """ encrypt a single chunk of a file """ - return Bcfg2.Encryption.ssl_encrypt( - plaintext, passphrase, - Bcfg2.Encryption.get_algorithm(self.setup)) - # pylint: enable=W0613 - def decrypt(self, fname): - """ decrypt the given file, returning the plaintext data """ - try: - crypted = open(fname).read() - except IOError: - err = sys.exc_info()[1] - self.logger.error("Error reading %s, skipping: %s" % (fname, err)) - return False +class Encryptor(CryptoTool): + """ encryptor interface """ + def encrypt(self): + """ encrypt the file, returning the encrypted data """ + raise NotImplementedError - self.set_passphrase() - plaintext = [] - try: - for chunk in self.chunk(crypted): - try: - passphrase, pname = self.get_passphrase(chunk) - try: - plaintext.append(self._decrypt(chunk, passphrase)) - except Bcfg2.Encryption.EVPError: - self.logger.info("Could not decrypt %s with the " - "specified passphrase" % fname) - continue - except: - err = sys.exc_info()[1] - self.logger.error("Error decrypting %s: %s" % - (fname, err)) - continue - except TypeError: - pchunk = None - passphrases = Bcfg2.Encryption.get_passphrases(self.setup) - for pname, passphrase in passphrases.items(): - self.logger.debug("Trying passphrase %s" % pname) - try: - pchunk = self._decrypt(chunk, passphrase) - break - except Bcfg2.Encryption.EVPError: - pass - except: - err = sys.exc_info()[1] - self.logger.error("Error decrypting %s: %s" % - (fname, err)) - if pchunk is not None: - plaintext.append(pchunk) - else: - self.logger.error("Could not decrypt %s with any " - "passphrase in %s" % - (fname, self.setup['configfile'])) - continue - except EncryptionChunkingError: - err = sys.exc_info()[1] - self.logger.error("Error getting encrypted data from %s: %s" % - (fname, err)) - return False +class CfgEncryptor(Encryptor): + """ encryptor class for Cfg files """ - try: - return self.unchunk(plaintext, crypted) - except EncryptionChunkingError: - err = sys.exc_info()[1] - self.logger.error("Error assembling plaintext data from %s: %s" % - (fname, err)) - return False + def __init__(self, filename, setup): + Encryptor.__init__(self, filename, setup) + if self.passphrase is None: + raise PassphraseError("Multiple passphrases found in %s, " + "specify one on the command line with -p" % + self.setup['configfile']) - def _decrypt(self, crypted, passphrase): - """ decrypt a single chunk """ - return Bcfg2.Encryption.ssl_decrypt( - crypted, passphrase, + def encrypt(self): + return Bcfg2.Encryption.ssl_encrypt( + self.data, self.passphrase, Bcfg2.Encryption.get_algorithm(self.setup)) - def write_encrypted(self, fname, data=None): - """ write encrypted data to disk """ - if data is None: - data = self.decrypt(fname) - new_fname = self.get_encrypted_filename(fname) - try: - open(new_fname, "wb").write(data) - self.logger.info("Wrote encrypted data to %s" % new_fname) - return True - except IOError: - err = sys.exc_info()[1] - self.logger.error("Error writing encrypted data from %s to %s: %s" - % (fname, new_fname, err)) - return False - except EncryptionChunkingError: - err = sys.exc_info()[1] - self.logger.error("Error assembling encrypted data from %s: %s" % - (fname, err)) - return False + def get_destination_filename(self, original_filename): + return original_filename + ".crypt" - def write_decrypted(self, fname, data=None): - """ write decrypted data to disk """ - if data is None: - data = self.decrypt(fname) - new_fname = self.get_plaintext_filename(fname) - try: - open(new_fname, "wb").write(data) - self.logger.info("Wrote decrypted data to %s" % new_fname) - return True - except IOError: - err = sys.exc_info()[1] - self.logger.error("Error writing encrypted data from %s to %s: %s" - % (fname, new_fname, err)) - return False - def get_passphrase(self, chunk): - """ get the passphrase for a chunk of a file """ - pname = self._get_passphrase(chunk) - if not self.pname: - if not pname: - self.logger.info("No passphrase given on command line or " - "found in file") +class CfgDecryptor(Decryptor): + """ Decrypt Cfg files """ + + def decrypt(self): + """ decrypt the given file, returning the plaintext data """ + if self.passphrase: + try: + return Bcfg2.Encryption.ssl_decrypt( + self.data, self.passphrase, + Bcfg2.Encryption.get_algorithm(self.setup)) + except Bcfg2.Encryption.EVPError: + self.logger.info("Could not decrypt %s with the " + "specified passphrase" % self.filename) return False - elif self.setup.cfp.has_option(Bcfg2.Encryption.CFG_SECTION, - pname): - passphrase = self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION, - pname) - else: - self.logger.error("Could not find passphrase %s in %s" % - (pname, self.setup['configfile'])) + except: + err = sys.exc_info()[1] + self.logger.error("Error decrypting %s: %s" % + (self.filename, err)) return False + else: # no passphrase given, brute force + try: + return Bcfg2.Encryption.bruteforce_decrypt( + self.data, passphrases=self.passphrases.values(), + algorithm=Bcfg2.Encryption.get_algorithm(self.setup)) + except Bcfg2.Encryption.EVPError: + self.logger.info("Could not decrypt %s with any passphrase" % + self.filename) + + def get_destination_filename(self, original_filename): + if original_filename.endswith(".crypt"): + return original_filename[:-6] else: - pname = self.pname - passphrase = self.passphrase - if self.pname != pname: - self.logger.warning("Passphrase given on command line (%s) " - "differs from passphrase embedded in " - "file (%s), using command-line option" % - (self.pname, pname)) - return (passphrase, pname) - - def _get_passphrase(self, chunk): # pylint: disable=W0613 - """ get the passphrase for a chunk of a file """ - return None - - -class CfgEncryptor(Encryptor): - """ encryptor class for Cfg files """ - - def get_encrypted_filename(self, plaintext_filename): - return plaintext_filename + ".crypt" - - def get_plaintext_filename(self, encrypted_filename): - if encrypted_filename.endswith(".crypt"): - return encrypted_filename[:-6] - else: - return Encryptor.get_plaintext_filename(self, encrypted_filename) + return Decryptor.get_plaintext_filename(self, original_filename) -class PropertiesEncryptor(Encryptor): - """ encryptor class for Properties files """ - - def _encrypt(self, plaintext, passphrase, name=None): - # plaintext is an lxml.etree._Element - if name is None: - name = "true" - if plaintext.text and plaintext.text.strip(): - plaintext.text = Bcfg2.Encryption.ssl_encrypt( - plaintext.text, - passphrase, - Bcfg2.Encryption.get_algorithm(self.setup)).strip() - plaintext.set("encrypted", name) - return plaintext +class PropertiesCryptoMixin(object): + """ Mixin to provide some common methods for Properties crypto """ + default_xpath = '//*' - def chunk(self, data): - xdata = lxml.etree.XML(data, parser=XMLParser) + def _get_elements(self, xdata): + """ Get the list of elements to encrypt or decrypt """ if self.setup['xpath']: elements = xdata.xpath(self.setup['xpath']) if not elements: - raise EncryptionChunkingError("XPath expression %s matched no " - "elements" % self.setup['xpath']) + self.logger.warning("XPath expression %s matched no " + "elements" % self.setup['xpath']) else: - elements = xdata.xpath('//*[@encrypted]') + elements = xdata.xpath(self.default_xpath) if not elements: elements = list(xdata.getiterator(tag=lxml.etree.Element)) @@ -329,50 +205,85 @@ class PropertiesEncryptor(Encryptor): ans = input("Encrypt this element? [y/N] ") if not ans.lower().startswith("y"): elements.remove(element) + return elements + + def _get_element_passphrase(self, element): + """ Get the passphrase to use to encrypt or decrypt a given + element """ + pname = element.get("encrypted") + if pname in self.passphrases: + passphrase = self.passphrases[pname] + elif self.passphrase: + if pname: + self.logger.warning("Passphrase %s not found in %s, " + "using passphrase given on command line" + % (pname, self.setup['configfile'])) + passphrase = self.passphrase + pname = self.pname + else: + raise PassphraseError("Multiple passphrases found in %s, " + "specify one on the command line with -p" % + self.setup['configfile']) + return (pname, passphrase) - # this is not a good use of a generator, but we need to - # generate the full list of elements in order to ensure that - # some exist before we know what to return - for elt in elements: - yield elt - - def unchunk(self, data, original): - # Properties elements are modified in-place, so we don't - # actually need to unchunk anything - xdata = data[0] - # find root element - while xdata.getparent() is not None: - xdata = xdata.getparent() - return lxml.etree.tostring(xdata, - xml_declaration=False, - pretty_print=True).decode('UTF-8') - - def _get_passphrase(self, chunk): - pname = chunk.get("encrypted") - if pname and pname.lower() != "true": - return pname - return None - - def _decrypt(self, crypted, passphrase): - # crypted is in lxml.etree._Element - if not crypted.text or not crypted.text.strip(): - self.logger.warning("Skipping empty element %s" % crypted.tag) - return crypted - decrypted = Bcfg2.Encryption.ssl_decrypt( - crypted.text, - passphrase, - Bcfg2.Encryption.get_algorithm(self.setup)).strip() - try: - crypted.text = decrypted.encode('ascii', 'xmlcharrefreplace') - except UnicodeDecodeError: - # we managed to decrypt the value, but it contains content - # that can't even be encoded into xml entities. what - # probably happened here is that we coincidentally could - # decrypt a value encrypted with a different key, and - # wound up with gibberish. - self.logger.warning("Decrypted %s to gibberish, skipping" % - crypted.tag) - return crypted + def _write(self, filename, data): + """ Write the data """ + data.getroottree().write(filename, + xml_declaration=False, + pretty_print=True) + + +class PropertiesEncryptor(Encryptor, PropertiesCryptoMixin): + """ encryptor class for Properties files """ + + def encrypt(self): + xdata = lxml.etree.XML(self.data, parser=XMLParser) + for elt in self._get_elements(xdata): + try: + pname, passphrase = self._get_element_passphrase(elt) + except PassphraseError: + self.logger.error(str(sys.exc_info()[1])) + return False + elt.text = Bcfg2.Encryption.ssl_encrypt( + elt.text, passphrase, + Bcfg2.Encryption.get_algorithm(self.setup)).strip() + elt.set("encrypted", pname) + return xdata + + def _write(self, filename, data): + PropertiesCryptoMixin._write(self, filename, data) + + +class PropertiesDecryptor(Decryptor, PropertiesCryptoMixin): + """ decryptor class for Properties files """ + default_xpath = '//*[@encrypted]' + + def decrypt(self): + xdata = lxml.etree.XML(self.data, parser=XMLParser) + for elt in self._get_elements(xdata): + try: + pname, passphrase = self._get_element_passphrase(elt) + except PassphraseError: + self.logger.error(str(sys.exc_info()[1])) + return False + decrypted = Bcfg2.Encryption.ssl_decrypt( + elt.text, passphrase, + Bcfg2.Encryption.get_algorithm(self.setup)).strip() + try: + elt.text = decrypted.encode('ascii', 'xmlcharrefreplace') + elt.set("encrypted", pname) + except UnicodeDecodeError: + # we managed to decrypt the value, but it contains + # content that can't even be encoded into xml + # entities. what probably happened here is that we + # coincidentally could decrypt a value encrypted with + # a different key, and wound up with gibberish. + self.logger.warning("Decrypted %s to gibberish, skipping" % + elt.tag) + return xdata + + def _write(self, filename, data): + PropertiesCryptoMixin._write(self, filename, data) def main(): # pylint: disable=R0912,R0915 @@ -422,9 +333,6 @@ def main(): # pylint: disable=R0912,R0915 logger.error("--remove cannot be used with --properties, ignoring") setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default - props_crypt = PropertiesEncryptor(setup) - cfg_crypt = CfgEncryptor(setup) - for fname in setup['args']: if not os.path.exists(fname): logger.error("%s does not exist, skipping" % fname) @@ -454,10 +362,10 @@ def main(): # pylint: disable=R0912,R0915 props = False if props: - encryptor = props_crypt if setup['remove']: logger.info("Cannot use --remove with Properties file %s, " "ignoring for this file" % fname) + tools = (PropertiesEncryptor, PropertiesDecryptor) else: if setup['xpath']: logger.info("Cannot use xpath with Cfg file %s, ignoring " @@ -465,31 +373,52 @@ def main(): # pylint: disable=R0912,R0915 if setup['interactive']: logger.info("Cannot use interactive mode with Cfg file %s, " "ignoring -I for this file" % fname) - encryptor = cfg_crypt + tools = (CfgEncryptor, CfgDecryptor) data = None + mode = None if setup['encrypt']: - xform = encryptor.encrypt - write = encryptor.write_encrypted + try: + tool = tools[0](fname, setup) + except PassphraseError: + logger.error(str(sys.exc_info()[1])) + return 2 + mode = "encrypt" elif setup['decrypt']: - xform = encryptor.decrypt - write = encryptor.write_decrypted + try: + tool = tools[1](fname, setup) + except PassphraseError: + logger.error(str(sys.exc_info()[1])) + return 2 + mode = "decrypt" else: logger.info("Neither --encrypt nor --decrypt specified, " "determining mode") - data = encryptor.decrypt(fname) - if data: - write = encryptor.write_decrypted - else: - logger.info("Failed to decrypt %s, trying encryption" % fname) + try: + tool = tools[1](fname, setup) + except PassphraseError: + logger.error(str(sys.exc_info()[1])) + return 2 + + try: + data = tool.decrypt() + mode = "decrypt" + except: # pylint: disable=W0702 + pass + if data is False: data = None - xform = encryptor.encrypt - write = encryptor.write_encrypted + logger.info("Failed to decrypt %s, trying encryption" % fname) + try: + tool = tools[0](fname, setup) + except PassphraseError: + logger.error(str(sys.exc_info()[1])) + return 2 + mode = "encrypt" if data is None: - data = xform(fname) - if not data: - logger.error("Failed to %s %s, skipping" % (xform.__name__, fname)) + data = getattr(tool, mode)() + if data is False: + logger.error("Failed to %s %s, skipping" % (mode, fname)) continue if setup['crypt_stdout']: if len(setup['args']) > 1: @@ -498,10 +427,10 @@ def main(): # pylint: disable=R0912,R0915 if len(setup['args']) > 1: print("") else: - write(fname, data=data) + tool.write(data) if (setup['remove'] and - encryptor.get_encrypted_filename(fname) != fname): + tool.get_destination_filename(fname) != fname): try: os.unlink(fname) except IOError: -- cgit v1.2.3-1-g7c22