#!/usr/bin/env python """ helper for encrypting/decrypting Cfg and Properties files """ import os import sys import logging import lxml.etree import Bcfg2.Logger import Bcfg2.Options try: import Bcfg2.Encryption except ImportError: err = sys.exc_info()[1] print("Import failed '%s'. Is M2Crypto installed?" % err) raise SystemExit(1) LOGGER = None def get_logger(verbose=0): """ set up logging according to the verbose level given on the command line """ global LOGGER if LOGGER is None: LOGGER = logging.getLogger(sys.argv[0]) stderr = logging.StreamHandler() if verbose: level = logging.DEBUG else: level = logging.WARNING LOGGER.setLevel(level) LOGGER.addHandler(stderr) syslog = logging.handlers.SysLogHandler("/dev/log") syslog.setFormatter(logging.Formatter("%(name)s: %(message)s")) LOGGER.addHandler(syslog) return LOGGER class Encryptor(object): def __init__(self, setup): self.setup = setup self.logger = get_logger() self.passphrase = None self.pname = None def get_encrypted_filename(self, plaintext_filename): return plaintext_filename def get_plaintext_filename(self, encrypted_filename): return encrypted_filename def chunk(self, data): yield data def unchunk(self, data, original): return data[0] def set_passphrase(self): if (not self.setup.cfp.has_section("encryption") or self.setup.cfp.options("encryption") == 0): self.logger.error("No passphrases available in %s" % self.setup['configfile']) return False if self.passphrase: self.logger.debug("Using previously determined passphrase %s" % self.pname) return True if self.setup['passphrase']: self.pname = self.setup['passphrase'] if self.pname: if self.setup.cfp.has_option("encryption", self.pname): self.passphrase = self.setup.cfp.get("encryption", self.pname) self.logger.debug("Using passphrase %s specified on command " "line" % self.pname) return True else: self.logger.error("Could not find passphrase %s in %s" % (self.pname, self.setup['configfile'])) return False else: pnames = self.setup.cfp.options("encryption") if len(pnames) == 1: self.passphrase = self.setup.cfp.get(pnames[0]) self.pname = pnames[0] self.logger.info("Using passphrase %s" % pnames[0]) return True self.logger.info("No passphrase could be determined") return False def encrypt(self, fname): try: plaintext = open(fname).read() except IOError: err = sys.exc_info()[1] self.logger.error("Error reading %s, skipping: %s" % (fname, err)) return False self.set_passphrase() crypted = [] for chunk in self.chunk(plaintext): try: passphrase, pname = self.get_passphrase(chunk) except TypeError: return False crypted.append(self._encrypt(chunk, passphrase, name=pname)) new_fname = self.get_encrypted_filename(fname) try: open(new_fname, "wb").write(self.unchunk(crypted, plaintext)) self.logger.info("Wrote encrypted data to %s" % new_fname) return True except IOError: err = sys.exc_info()[1] self.logger.error("Error writing encrypted data from %s to %s: %s" % (fname, new_fname, err)) return False def _encrypt(self, plaintext, passphrase, name=None): return Bcfg2.Encryption.ssl_encrypt(plaintext, passphrase) def decrypt(self, fname): try: crypted = open(fname).read() except IOError: err = sys.exc_info()[1] self.logger.error("Error reading %s, skipping: %s" % (fname, err)) return False self.set_passphrase() plaintext = [] for chunk in self.chunk(crypted): try: passphrase, pname = self.get_passphrase(chunk) try: plaintext.append(self._decrypt(chunk, passphrase)) except Bcfg2.Encryption.EVPError: self.logger.info("Could not decrypt %s with the specified " "passphrase" % fname) return False except: err = sys.exc_info()[1] self.logger.error("Error decrypting %s: %s" % (fname, err)) return False except TypeError: pchunk = None for pname in self.setup.cfp.options('encryption'): self.logger.debug("Trying passphrase %s" % pname) passphrase = self.setup.cfp.get('encryption', pname) try: pchunk = self._decrypt(chunk, passphrase) break except Bcfg2.Encryption.EVPError: pass except: err = sys.exc_info()[1] self.logger.error("Error decrypting %s: %s" % (fname, err)) if pchunk is not None: plaintext.append(pchunk) else: self.logger.error("Could not decrypt %s with any " "passphrase in %s" % (fname, self.setup['configfile'])) return False new_fname = self.get_plaintext_filename(fname) try: open(new_fname, "wb").write(self.unchunk(plaintext, crypted)) self.logger.info("Wrote decrypted data to %s" % new_fname) return True except IOError: err = sys.exc_info()[1] self.logger.error("Error writing encrypted data from %s to %s: %s" % (fname, new_fname, err)) return False def get_passphrase(self, chunk): pname = self._get_passphrase(chunk) if not self.pname: if not pname: self.logger.info("No passphrase given on command line or " "found in file") return False elif self.setup.cfp.has_option("encryption", pname): passphrase = self.setup.cfp.get("encryption", pname) else: self.logger.error("Could not find passphrase %s in %s" % (pname, self.setup['configfile'])) return False else: pname = self.pname passphrase = self.passphrase if self.pname != pname: self.logger.warning("Passphrase given on command line (%s) " "differs from passphrase embedded in " "file (%s), using command-line option" % (self.pname, pname)) return (passphrase, pname) def _get_passphrase(self, chunk): return None def _decrypt(self, crypted, passphrase): return Bcfg2.Encryption.ssl_decrypt(crypted, passphrase) class CfgEncryptor(Encryptor): def get_encrypted_filename(self, plaintext_filename): return plaintext_filename + ".crypt" def get_plaintext_filename(self, encrypted_filename): if encrypted_filename.endswith(".crypt"): return encrypted_filename[:-6] else: return Encryptor.get_plaintext_filename(self, encrypted_filename) class PropertiesEncryptor(Encryptor): def _encrypt(self, plaintext, passphrase, name=None): # plaintext is an lxml.etree._Element if name is None: name = "true" if plaintext.text and plaintext.text.strip(): plaintext.text = Bcfg2.Encryption.ssl_encrypt(plaintext.text, passphrase) plaintext.set("encrypted", name) return plaintext def chunk(self, data): xdata = lxml.etree.XML(data) if self.setup['xpath']: elements = xdata.xpath(self.setup['xpath']) else: elements = xdata.xpath('//*[@encrypted]') if not elements: elements = list(xdata.getiterator()) # this is not a good use of a generator, but we need to # generate the full list of elements in order to ensure that # some exist before we know what to return for elt in elements: yield elt def unchunk(self, data, original): # Properties elements are modified in-place, so we don't # actually need to unchunk anything xdata = data[0] # find root element while xdata.getparent() != None: xdata = xdata.getparent() xdata.set("encryption", "true") return lxml.etree.tostring(xdata) def _get_passphrase(self, chunk): pname = chunk.get("encrypted") or chunk.get("encryption") if pname and pname.lower() != "true": return pname return None def _decrypt(self, crypted, passphrase): # crypted is in lxml.etree._Element if not crypted.text or not crypted.text.strip(): self.logger.warning("Skipping empty element %s" % crypted.tag) return crypted rv = Bcfg2.Encryption.ssl_decrypt(crypted.text, passphrase) crypted.text = rv return crypted def main(): optinfo = dict() optinfo.update(Bcfg2.Options.CRYPT_OPTIONS) optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS) setup = Bcfg2.Options.OptionParser(optinfo) setup.hm = " bcfg2-crypt [options] \nOptions:\n%s" % \ setup.buildHelpMessage() setup.parse(sys.argv[1:]) if not setup['args']: print(setup.hm) raise SystemExit(1) elif setup['encrypt'] and setup['decrypt']: print("You cannot specify both --encrypt) and --decrypt") raise SystemExit(1) elif setup['cfg'] and setup['properties']: print("You cannot specify both --cfg and --properties") raise SystemExit(1) elif setup['cfg'] and setup['properties']: print("Specifying --xpath with --cfg is nonsensical, ignoring --xpath") setup['xpath'] = Bcfg2.Options.CRYPT_XPATH.default elif setup['decrypt'] and setup['remove']: print("--remove cannot be used with --decrypt, ignoring") setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default logger = get_logger(setup['verbose']) props_crypt = PropertiesEncryptor(setup) cfg_crypt = CfgEncryptor(setup) for fname in setup['args']: if not os.path.exists(fname): logger.error("%s does not exist, skipping" % fname) continue # figure out if we need to encrypt this as a Properties file # or as a Cfg file props = False if setup['properties']: props = True elif setup['cfg']: props = False elif fname.endswith(".xml"): try: xroot = lxml.etree.parse(fname).getroot() if xroot.tag == "Properties": props = True else: props = False except IOError: err = sys.exc_info()[1] logger.error("Error reading %s, skipping: %s" % (fname, err)) continue except lxml.etree.XMLSyntaxError: props = False else: props = False if props: encryptor = props_crypt else: encryptor = cfg_crypt if setup['encrypt']: if not encryptor.encrypt(fname): print("Failed to encrypt %s, skipping" % fname) elif setup['decrypt']: if not encryptor.decrypt(fname): print("Failed to decrypt %s, skipping" % fname) else: logger.info("Neither --encrypt nor --decrypt specified, " "determining mode") if not encryptor.decrypt(fname): logger.info("Failed to decrypt %s, trying encryption" % fname) if not encryptor.encrypt(fname): print("Failed to encrypt %s, skipping" % fname) if setup['remove'] and encryptor.get_encrypted_filename(fname) != fname: try: os.unlink(fname) except IOError: err = sys.exc_info()[1] logger.error("Error removing %s: %s" % (fname, err)) continue if __name__ == '__main__': sys.exit(main())