From 919059f0971c0f8bf18ca2cedb3c7e5319632726 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 27 Jun 2013 10:31:07 -0400 Subject: Options: migrated bcfg2-crypt and Encryption libs to new parser --- src/sbin/bcfg2-crypt | 503 +-------------------------------------------------- 1 file changed, 2 insertions(+), 501 deletions(-) (limited to 'src/sbin') diff --git a/src/sbin/bcfg2-crypt b/src/sbin/bcfg2-crypt index a75c0da9d..26d5eedf1 100755 --- a/src/sbin/bcfg2-crypt +++ b/src/sbin/bcfg2-crypt @@ -1,507 +1,8 @@ #!/usr/bin/env python """ helper for encrypting/decrypting Cfg and Properties files """ -import os import sys -import copy -import select -import logging -import lxml.etree -import Bcfg2.Logger -import Bcfg2.Options -from Bcfg2.Server import XMLParser -from Bcfg2.Compat import input # pylint: disable=W0622 -try: - import Bcfg2.Server.Encryption -except ImportError: - print("Could not import %s. Is M2Crypto installed?" % sys.exc_info()[1]) - raise SystemExit(1) - - -class EncryptionChunkingError(Exception): - """ error raised when Encryptor cannot break a file up into chunks - to be encrypted, or cannot reassemble the chunks """ - pass - - -class Encryptor(object): - """ Generic encryptor for all files """ - - def __init__(self): - self.setup = Bcfg2.Options.get_option_parser() - self.passphrase = None - self.pname = None - self.logger = logging.getLogger(self.__class__.__name__) - - def get_encrypted_filename(self, plaintext_filename): - """ get the name of the file encrypted data should be written to """ - return plaintext_filename - - def get_plaintext_filename(self, encrypted_filename): - """ get the name of the file decrypted data should be written to """ - return encrypted_filename - - def chunk(self, data): - """ generator to break the file up into smaller chunks that - will each be individually encrypted or decrypted """ - yield data - - def unchunk(self, data, original): # pylint: disable=W0613 - """ given chunks of a file, reassemble then into the whole file """ - try: - return data[0] - except IndexError: - raise EncryptionChunkingError("No data to unchunk") - - def set_passphrase(self): - """ set the passphrase for the current file """ - if (not self.setup.cfp.has_section(Bcfg2.Server.Encryption.CFG_SECTION) - or len(Bcfg2.Server.Encryption.get_passphrases()) == 0): - self.logger.error("No passphrases available in %s" % - self.setup['configfile']) - return False - - if self.passphrase: - self.logger.debug("Using previously determined passphrase %s" % - self.pname) - return True - - if self.setup['passphrase']: - self.pname = self.setup['passphrase'] - - if self.pname: - if self.setup.cfp.has_option(Bcfg2.Server.Encryption.CFG_SECTION, - self.pname): - self.passphrase = \ - self.setup.cfp.get(Bcfg2.Server.Encryption.CFG_SECTION, - self.pname) - self.logger.debug("Using passphrase %s specified on command " - "line" % self.pname) - return True - else: - self.logger.error("Could not find passphrase %s in %s" % - (self.pname, self.setup['configfile'])) - return False - else: - pnames = Bcfg2.Server.Encryption.get_passphrases() - if len(pnames) == 1: - self.pname = pnames.keys()[0] - self.passphrase = pnames[self.pname] - self.logger.info("Using passphrase %s" % self.pname) - return True - elif len(pnames) > 1: - self.logger.warning("Multiple passphrases found in %s, " - "specify one on the command line with -p" % - self.setup['configfile']) - self.logger.info("No passphrase could be determined") - return False - - def encrypt(self, fname): - """ encrypt the given file, returning the encrypted data """ - try: - plaintext = open(fname).read() - except IOError: - err = sys.exc_info()[1] - self.logger.error("Error reading %s, skipping: %s" % (fname, err)) - return False - - if not self.set_passphrase(): - return False - - crypted = [] - try: - for chunk in self.chunk(plaintext): - try: - passphrase, pname = self.get_passphrase(chunk) - except TypeError: - return False - - crypted.append(self._encrypt(chunk, passphrase, name=pname)) - except EncryptionChunkingError: - err = sys.exc_info()[1] - self.logger.error("Error getting data to encrypt from %s: %s" % - (fname, err)) - return False - return self.unchunk(crypted, plaintext) - - # pylint: disable=W0613 - def _encrypt(self, plaintext, passphrase, name=None): - """ encrypt a single chunk of a file """ - return Bcfg2.Server.Encryption.ssl_encrypt(plaintext, passphrase) - # pylint: enable=W0613 - - def decrypt(self, fname): - """ decrypt the given file, returning the plaintext data """ - try: - crypted = open(fname).read() - except IOError: - err = sys.exc_info()[1] - self.logger.error("Error reading %s, skipping: %s" % (fname, err)) - return False - - self.set_passphrase() - - plaintext = [] - try: - for chunk in self.chunk(crypted): - try: - passphrase, pname = self.get_passphrase(chunk) - try: - plaintext.append(self._decrypt(chunk, passphrase)) - except Bcfg2.Server.Encryption.EVPError: - self.logger.info("Could not decrypt %s with the " - "specified passphrase" % fname) - continue - except: - err = sys.exc_info()[1] - self.logger.error("Error decrypting %s: %s" % - (fname, err)) - continue - except TypeError: - pchunk = None - for pname, passphrase in \ - Bcfg2.Server.Encryption.get_passphrases().items(): - self.logger.debug("Trying passphrase %s" % pname) - try: - pchunk = self._decrypt(chunk, passphrase) - break - except Bcfg2.Server.Encryption.EVPError: - pass - except: - err = sys.exc_info()[1] - self.logger.error("Error decrypting %s: %s" % - (fname, err)) - if pchunk is not None: - plaintext.append(pchunk) - else: - self.logger.error("Could not decrypt %s with any " - "passphrase in %s" % - (fname, self.setup['configfile'])) - continue - except EncryptionChunkingError: - err = sys.exc_info()[1] - self.logger.error("Error getting encrypted data from %s: %s" % - (fname, err)) - return False - - try: - return self.unchunk(plaintext, crypted) - except EncryptionChunkingError: - err = sys.exc_info()[1] - self.logger.error("Error assembling plaintext data from %s: %s" % - (fname, err)) - return False - - def _decrypt(self, crypted, passphrase): - """ decrypt a single chunk """ - return Bcfg2.Server.Encryption.ssl_decrypt(crypted, passphrase) - - def write_encrypted(self, fname, data=None): - """ write encrypted data to disk """ - if data is None: - data = self.decrypt(fname) - new_fname = self.get_encrypted_filename(fname) - try: - open(new_fname, "wb").write(data) - self.logger.info("Wrote encrypted data to %s" % new_fname) - return True - except IOError: - err = sys.exc_info()[1] - self.logger.error("Error writing encrypted data from %s to %s: %s" - % (fname, new_fname, err)) - return False - except EncryptionChunkingError: - err = sys.exc_info()[1] - self.logger.error("Error assembling encrypted data from %s: %s" % - (fname, err)) - return False - - def write_decrypted(self, fname, data=None): - """ write decrypted data to disk """ - if data is None: - data = self.decrypt(fname) - new_fname = self.get_plaintext_filename(fname) - try: - open(new_fname, "wb").write(data) - self.logger.info("Wrote decrypted data to %s" % new_fname) - return True - except IOError: - err = sys.exc_info()[1] - self.logger.error("Error writing encrypted data from %s to %s: %s" - % (fname, new_fname, err)) - return False - - def get_passphrase(self, chunk): - """ get the passphrase for a chunk of a file """ - pname = self._get_passphrase(chunk) - if not self.pname: - if not pname: - self.logger.info("No passphrase given on command line or " - "found in file") - return False - elif self.setup.cfp.has_option(Bcfg2.Server.Encryption.CFG_SECTION, - pname): - passphrase = self.setup.cfp.get( - Bcfg2.Server.Encryption.CFG_SECTION, - pname) - else: - self.logger.error("Could not find passphrase %s in %s" % - (pname, self.setup['configfile'])) - return False - else: - pname = self.pname - passphrase = self.passphrase - if self.pname != pname: - self.logger.warning("Passphrase given on command line (%s) " - "differs from passphrase embedded in " - "file (%s), using command-line option" % - (self.pname, pname)) - return (passphrase, pname) - - def _get_passphrase(self, chunk): # pylint: disable=W0613 - """ get the passphrase for a chunk of a file """ - return None - - -class CfgEncryptor(Encryptor): - """ encryptor class for Cfg files """ - - def get_encrypted_filename(self, plaintext_filename): - return plaintext_filename + ".crypt" - - def get_plaintext_filename(self, encrypted_filename): - if encrypted_filename.endswith(".crypt"): - return encrypted_filename[:-6] - else: - return Encryptor.get_plaintext_filename(self, encrypted_filename) - - -class PropertiesEncryptor(Encryptor): - """ encryptor class for Properties files """ - - def _encrypt(self, plaintext, passphrase, name=None): - # plaintext is an lxml.etree._Element - if name is None: - name = "true" - if plaintext.text and plaintext.text.strip(): - plaintext.text = \ - Bcfg2.Server.Encryption.ssl_encrypt(plaintext.text, - passphrase).strip() - plaintext.set("encrypted", name) - return plaintext - - def chunk(self, data): - xdata = lxml.etree.XML(data, parser=XMLParser) - if self.setup['xpath']: - elements = xdata.xpath(self.setup['xpath']) - if not elements: - raise EncryptionChunkingError("XPath expression %s matched no " - "elements" % self.setup['xpath']) - else: - elements = xdata.xpath('//*[@encrypted]') - if not elements: - elements = list(xdata.getiterator(tag=lxml.etree.Element)) - - # filter out elements without text data - for el in elements[:]: - if not el.text: - elements.remove(el) - - if self.setup['interactive']: - for element in elements[:]: - if len(element): - elt = copy.copy(element) - for child in elt.iterchildren(): - elt.remove(child) - else: - elt = element - print(lxml.etree.tostring( - elt, - xml_declaration=False).decode("UTF-8").strip()) - # flush input buffer - while len(select.select([sys.stdin.fileno()], [], [], - 0.0)[0]) > 0: - os.read(sys.stdin.fileno(), 4096) - ans = input("Encrypt this element? [y/N] ") - if not ans.lower().startswith("y"): - elements.remove(element) - - # this is not a good use of a generator, but we need to - # generate the full list of elements in order to ensure that - # some exist before we know what to return - for elt in elements: - yield elt - - def unchunk(self, data, original): - # Properties elements are modified in-place, so we don't - # actually need to unchunk anything - xdata = data[0] - # find root element - while xdata.getparent() is not None: - xdata = xdata.getparent() - return lxml.etree.tostring(xdata, - xml_declaration=False, - pretty_print=True).decode('UTF-8') - - def _get_passphrase(self, chunk): - pname = chunk.get("encrypted") - if pname and pname.lower() != "true": - return pname - return None - - def _decrypt(self, crypted, passphrase): - # crypted is in lxml.etree._Element - if not crypted.text or not crypted.text.strip(): - self.logger.warning("Skipping empty element %s" % crypted.tag) - return crypted - decrypted = Bcfg2.Server.Encryption.ssl_decrypt(crypted.text, - passphrase).strip() - try: - crypted.text = decrypted.encode('ascii', 'xmlcharrefreplace') - except UnicodeDecodeError: - # we managed to decrypt the value, but it contains content - # that can't even be encoded into xml entities. what - # probably happened here is that we coincidentally could - # decrypt a value encrypted with a different key, and - # wound up with gibberish. - self.logger.warning("Decrypted %s to gibberish, skipping" % - crypted.tag) - return crypted - - -def main(): # pylint: disable=R0912,R0915 - optinfo = dict(interactive=Bcfg2.Options.INTERACTIVE) - optinfo.update(Bcfg2.Options.CRYPT_OPTIONS) - optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS) - setup = Bcfg2.Options.load_option_parser(optinfo) - setup.hm = " bcfg2-crypt [options] \nOptions:\n%s" % \ - setup.buildHelpMessage() - setup.parse() - - if not setup['args']: - print(setup.hm) - raise SystemExit(1) - - log_args = dict(to_syslog=setup['syslog'], to_console=logging.WARNING) - if setup['verbose']: - log_args['to_console'] = logging.DEBUG - Bcfg2.Logger.setup_logging('bcfg2-crypt', **log_args) - logger = logging.getLogger('bcfg2-crypt') - - if setup['decrypt']: - if setup['encrypt']: - logger.error("You cannot specify both --encrypt and --decrypt") - raise SystemExit(1) - elif setup['remove']: - logger.error("--remove cannot be used with --decrypt, ignoring") - setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default - elif setup['interactive']: - logger.error("Cannot decrypt interactively") - setup['interactive'] = False - - if setup['cfg']: - if setup['properties']: - logger.error("You cannot specify both --cfg and --properties") - raise SystemExit(1) - if setup['xpath']: - logger.error("Specifying --xpath with --cfg is nonsensical, " - "ignoring --xpath") - setup['xpath'] = Bcfg2.Options.CRYPT_XPATH.default - if setup['interactive']: - logger.error("You cannot use interactive mode with --cfg, " - "ignoring -I") - setup['interactive'] = False - elif setup['properties']: - if setup['remove']: - logger.error("--remove cannot be used with --properties, ignoring") - setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default - - props_crypt = PropertiesEncryptor(setup) - cfg_crypt = CfgEncryptor(setup) - - for fname in setup['args']: - if not os.path.exists(fname): - logger.error("%s does not exist, skipping" % fname) - continue - - # figure out if we need to encrypt this as a Properties file - # or as a Cfg file - props = False - if setup['properties']: - props = True - elif setup['cfg']: - props = False - elif fname.endswith(".xml"): - try: - xroot = lxml.etree.parse(fname).getroot() - if xroot.tag == "Properties": - props = True - else: - props = False - except IOError: - err = sys.exc_info()[1] - logger.error("Error reading %s, skipping: %s" % (fname, err)) - continue - except lxml.etree.XMLSyntaxError: - props = False - else: - props = False - - if props: - encryptor = props_crypt - if setup['remove']: - logger.info("Cannot use --remove with Properties file %s, " - "ignoring for this file" % fname) - else: - if setup['xpath']: - logger.info("Cannot use xpath with Cfg file %s, ignoring " - "xpath for this file" % fname) - if setup['interactive']: - logger.info("Cannot use interactive mode with Cfg file %s, " - "ignoring -I for this file" % fname) - encryptor = cfg_crypt - - data = None - if setup['encrypt']: - xform = encryptor.encrypt - write = encryptor.write_encrypted - elif setup['decrypt']: - xform = encryptor.decrypt - write = encryptor.write_decrypted - else: - logger.info("Neither --encrypt nor --decrypt specified, " - "determining mode") - data = encryptor.decrypt(fname) - if data: - write = encryptor.write_decrypted - else: - logger.info("Failed to decrypt %s, trying encryption" % fname) - data = None - xform = encryptor.encrypt - write = encryptor.write_encrypted - - if data is None: - data = xform(fname) - if not data: - logger.error("Failed to %s %s, skipping" % (xform.__name__, fname)) - continue - if setup['crypt_stdout']: - if len(setup['args']) > 1: - print("----- %s -----" % fname) - print(data) - if len(setup['args']) > 1: - print("") - else: - write(fname, data=data) - - if (setup['remove'] and - encryptor.get_encrypted_filename(fname) != fname): - try: - os.unlink(fname) - except IOError: - err = sys.exc_info()[1] - logger.error("Error removing %s: %s" % (fname, err)) - continue +from Bcfg2.Server.Encryption import CLI if __name__ == '__main__': - sys.exit(main()) + sys.exit(CLI().run()) -- cgit v1.2.3-1-g7c22