summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/Encryption.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Server/Encryption.py')
-rwxr-xr-xsrc/lib/Bcfg2/Server/Encryption.py550
1 files changed, 228 insertions, 322 deletions
diff --git a/src/lib/Bcfg2/Server/Encryption.py b/src/lib/Bcfg2/Server/Encryption.py
index 5c200410e..7e1294587 100755
--- a/src/lib/Bcfg2/Server/Encryption.py
+++ b/src/lib/Bcfg2/Server/Encryption.py
@@ -195,278 +195,148 @@ def bruteforce_decrypt(crypted, passphrases=None, algorithm=None):
raise EVPError("Failed to decrypt")
-class EncryptionChunkingError(Exception):
- """ error raised when Encryptor cannot break a file up into chunks
- to be encrypted, or cannot reassemble the chunks """
- pass
+class PassphraseError(Exception):
+ """ Exception raised when there's a problem determining the
+ passphrase to encrypt or decrypt with """
-class Encryptor(object):
- """ Generic encryptor for all files """
+class CryptoTool(object):
+ """ Generic decryption/encryption interface base object """
- def __init__(self):
- self.passphrase = None
- self.pname = None
+ def __init__(self, filename):
self.logger = logging.getLogger(self.__class__.__name__)
+ self.filename = filename
+ self.data = open(self.filename).read()
+ self.pname, self.passphrase = self._get_passphrase()
- def get_encrypted_filename(self, plaintext_filename):
- """ get the name of the file encrypted data should be written to """
- return plaintext_filename
-
- def get_plaintext_filename(self, encrypted_filename):
- """ get the name of the file decrypted data should be written to """
- return encrypted_filename
-
- def chunk(self, data):
- """ generator to break the file up into smaller chunks that
- will each be individually encrypted or decrypted """
- yield data
-
- def unchunk(self, data, original): # pylint: disable=W0613
- """ given chunks of a file, reassemble then into the whole file """
- try:
- return data[0]
- except IndexError:
- raise EncryptionChunkingError("No data to unchunk")
-
- def set_passphrase(self):
- """ set the passphrase for the current file """
+ def _get_passphrase(self):
+ """ get the passphrase for the current file """
if not Bcfg2.Options.setup.passphrases:
- self.logger.error("No passphrases available in %s" %
- Bcfg2.Options.setup.config)
- return False
-
- if self.passphrase:
- self.logger.debug("Using previously determined passphrase %s" %
- self.pname)
- return True
+ raise PassphraseError("No passphrases available in %s" %
+ Bcfg2.Options.setup.configfile)
+ pname = None
if Bcfg2.Options.setup.passphrase:
- self.pname = Bcfg2.Options.setup.passphrase
+ pname = Bcfg2.Options.setup.passphrase
- if self.pname:
+ if pname:
try:
- self.passphrase = Bcfg2.Options.setup.passphrases[self.pname]
+ passphrase = Bcfg2.Options.setup.passphrases[pname]
self.logger.debug("Using passphrase %s specified on command "
- "line" % self.pname)
- return True
+ "line" % pname)
+ return (pname, passphrase)
except KeyError:
- self.logger.error("Could not find passphrase %s in %s" %
- (self.pname, Bcfg2.Options.setup.config))
- return False
+ raise PassphraseError("Could not find passphrase %s in %s" %
+ (pname, Bcfg2.Options.setup.configfile))
else:
- pnames = Bcfg2.Options.setup.passphrases
- if len(pnames) == 1:
- self.pname = pnames.keys()[0]
- self.passphrase = pnames[self.pname]
- self.logger.info("Using passphrase %s" % self.pname)
- return True
- elif len(pnames) > 1:
- self.logger.warning("Multiple passphrases found in %s, "
- "specify one on the command line with -p" %
- Bcfg2.Options.setup.config)
- self.logger.info("No passphrase could be determined")
- return False
-
- def encrypt(self, fname):
- """ encrypt the given file, returning the encrypted data """
+ if len(Bcfg2.Options.setup.passphrases) == 1:
+ pname, passphrase = Bcfg2.Options.setup.passphrases.items()[0]
+ self.logger.info("Using passphrase %s" % pname)
+ return (pname, passphrase)
+ elif len(Bcfg2.Options.setup.passphrases) > 1:
+ return (None, None)
+ raise PassphraseError("No passphrase could be determined")
+
+ def get_destination_filename(self, original_filename):
+ """ Get the filename where data should be written """
+ return original_filename
+
+ def write(self, data):
+ """ write data to disk """
+ new_fname = self.get_destination_filename(self.filename)
try:
- plaintext = open(fname).read()
+ self._write(new_fname, data)
+ self.logger.info("Wrote data to %s" % new_fname)
+ return True
except IOError:
err = sys.exc_info()[1]
- self.logger.error("Error reading %s, skipping: %s" % (fname, err))
+ self.logger.error("Error writing data from %s to %s: %s" %
+ (self.filename, new_fname, err))
return False
- if not self.set_passphrase():
- return False
+ def _write(self, filename, data):
+ """ Perform the actual write of data. This is separate from
+ :func:`CryptoTool.write` so it can be easily
+ overridden. """
+ open(filename, "wb").write(data)
- crypted = []
- try:
- for chunk in self.chunk(plaintext):
- try:
- passphrase, pname = self.get_passphrase(chunk)
- except TypeError:
- return False
- crypted.append(self._encrypt(chunk, passphrase, name=pname))
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error getting data to encrypt from %s: %s" %
- (fname, err))
- return False
- return self.unchunk(crypted, plaintext)
+class Decryptor(CryptoTool):
+ """ Decryptor interface """
+ def decrypt(self):
+ """ decrypt the file, returning the encrypted data """
+ raise NotImplementedError
- # pylint: disable=W0613
- def _encrypt(self, plaintext, passphrase, name=None):
- """ encrypt a single chunk of a file """
- return ssl_encrypt(plaintext, passphrase)
- # pylint: enable=W0613
- def decrypt(self, fname):
- """ decrypt the given file, returning the plaintext data """
- try:
- crypted = open(fname).read()
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error reading %s, skipping: %s" % (fname, err))
- return False
+class Encryptor(CryptoTool):
+ """ encryptor interface """
+ def encrypt(self):
+ """ encrypt the file, returning the encrypted data """
+ raise NotImplementedError
- self.set_passphrase()
- plaintext = []
- try:
- for chunk in self.chunk(crypted):
- try:
- passphrase, pname = self.get_passphrase(chunk)
- try:
- plaintext.append(self._decrypt(chunk, passphrase))
- except EVPError:
- self.logger.info("Could not decrypt %s with the "
- "specified passphrase" % fname)
- continue
- except:
- err = sys.exc_info()[1]
- self.logger.error("Error decrypting %s: %s" %
- (fname, err))
- continue
- except TypeError:
- pchunk = None
- for pname, passphrase in \
- Bcfg2.Options.setup.passphrases.items():
- self.logger.debug("Trying passphrase %s" % pname)
- try:
- pchunk = self._decrypt(chunk, passphrase)
- break
- except EVPError:
- pass
- except:
- err = sys.exc_info()[1]
- self.logger.error("Error decrypting %s: %s" %
- (fname, err))
- if pchunk is not None:
- plaintext.append(pchunk)
- else:
- self.logger.error("Could not decrypt %s with any "
- "passphrase in %s" %
- (fname, Bcfg2.Options.setup.config))
- continue
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error getting encrypted data from %s: %s" %
- (fname, err))
- return False
+class CfgEncryptor(Encryptor):
+ """ encryptor class for Cfg files """
- try:
- return self.unchunk(plaintext, crypted)
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error assembling plaintext data from %s: %s" %
- (fname, err))
- return False
+ def __init__(self, filename):
+ Encryptor.__init__(self, filename)
+ if self.passphrase is None:
+ raise PassphraseError("Multiple passphrases found in %s, "
+ "specify one on the command line with -p" %
+ Bcfg2.Options.setup.configfile)
- def _decrypt(self, crypted, passphrase):
- """ decrypt a single chunk """
- return ssl_decrypt(crypted, passphrase)
+ def encrypt(self):
+ return ssl_encrypt(self.data, self.passphrase)
- def write_encrypted(self, fname, data=None):
- """ write encrypted data to disk """
- if data is None:
- data = self.decrypt(fname)
- new_fname = self.get_encrypted_filename(fname)
- try:
- open(new_fname, "wb").write(data)
- self.logger.info("Wrote encrypted data to %s" % new_fname)
- return True
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error writing encrypted data from %s to %s: %s"
- % (fname, new_fname, err))
- return False
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error assembling encrypted data from %s: %s" %
- (fname, err))
- return False
+ def get_destination_filename(self, original_filename):
+ return original_filename + ".crypt"
- def write_decrypted(self, fname, data=None):
- """ write decrypted data to disk """
- if data is None:
- data = self.decrypt(fname)
- new_fname = self.get_plaintext_filename(fname)
- try:
- open(new_fname, "wb").write(data)
- self.logger.info("Wrote decrypted data to %s" % new_fname)
- return True
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error writing encrypted data from %s to %s: %s"
- % (fname, new_fname, err))
- return False
- def get_passphrase(self, chunk):
- """ get the passphrase for a chunk of a file """
- pname = self._get_passphrase(chunk)
- if not self.pname:
- if not pname:
- self.logger.info("No passphrase given on command line or "
- "found in file")
+class CfgDecryptor(Decryptor):
+ """ Decrypt Cfg files """
+
+ def decrypt(self):
+ """ decrypt the given file, returning the plaintext data """
+ if self.passphrase:
+ try:
+ return ssl_decrypt(self.data, self.passphrase)
+ except EVPError:
+ self.logger.info("Could not decrypt %s with the "
+ "specified passphrase" % self.filename)
return False
- elif pname in Bcfg2.Options.setup.passphrases:
- passphrase = Bcfg2.Options.setup.passphrases[pname]
- else:
- self.logger.error("Could not find passphrase %s in %s" %
- (pname, Bcfg2.Options.setup.config))
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("Error decrypting %s: %s" %
+ (self.filename, err))
+ return False
+ else: # no passphrase given, brute force
+ try:
+ return bruteforce_decrypt(self.data)
+ except EVPError:
+ self.logger.info("Could not decrypt %s with any passphrase" %
+ self.filename)
return False
- else:
- pname = self.pname
- passphrase = self.passphrase
- if self.pname != pname:
- self.logger.warning("Passphrase given on command line (%s) "
- "differs from passphrase embedded in "
- "file (%s), using command-line option" %
- (self.pname, pname))
- return (passphrase, pname)
-
- def _get_passphrase(self, chunk): # pylint: disable=W0613
- """ get the passphrase for a chunk of a file """
- return None
-
-
-class CfgEncryptor(Encryptor):
- """ encryptor class for Cfg files """
-
- def get_encrypted_filename(self, plaintext_filename):
- return plaintext_filename + ".crypt"
- def get_plaintext_filename(self, encrypted_filename):
- if encrypted_filename.endswith(".crypt"):
- return encrypted_filename[:-6]
+ def get_destination_filename(self, original_filename):
+ if original_filename.endswith(".crypt"):
+ return original_filename[:-6]
else:
- return Encryptor.get_plaintext_filename(self, encrypted_filename)
+ return Decryptor.get_destination_filename(self, original_filename)
-class PropertiesEncryptor(Encryptor):
- """ encryptor class for Properties files """
+class PropertiesCryptoMixin(object):
+ """ Mixin to provide some common methods for Properties crypto """
+ default_xpath = '//*'
- def _encrypt(self, plaintext, passphrase, name=None):
- # plaintext is an lxml.etree._Element
- if name is None:
- name = "true"
- if plaintext.text and plaintext.text.strip():
- plaintext.text = ssl_encrypt(plaintext.text, passphrase).strip()
- plaintext.set("encrypted", name)
- return plaintext
-
- def chunk(self, data):
- xdata = lxml.etree.XML(data, parser=XMLParser)
+ def _get_elements(self, xdata):
+ """ Get the list of elements to encrypt or decrypt """
if Bcfg2.Options.setup.xpath:
elements = xdata.xpath(Bcfg2.Options.setup.xpath)
if not elements:
- raise EncryptionChunkingError(
- "XPath expression %s matched no elements" %
- Bcfg2.Options.setup.xpath)
+ self.logger.warning("XPath expression %s matched no elements" %
+ Bcfg2.Options.setup.xpath)
else:
- elements = xdata.xpath('//*[@encrypted]')
+ elements = xdata.xpath(self.default_xpath)
if not elements:
elements = list(xdata.getiterator(tag=lxml.etree.Element))
@@ -489,47 +359,81 @@ class PropertiesEncryptor(Encryptor):
ans = safe_input("Encrypt this element? [y/N] ")
if not ans.lower().startswith("y"):
elements.remove(element)
+ return elements
+
+ def _get_element_passphrase(self, element):
+ """ Get the passphrase to use to encrypt or decrypt a given
+ element """
+ pname = element.get("encrypted")
+ if pname in self.passphrases:
+ passphrase = self.passphrases[pname]
+ elif self.passphrase:
+ if pname:
+ self.logger.warning("Passphrase %s not found in %s, "
+ "using passphrase given on command line" %
+ (pname, Bcfg2.Option.setup.configfile))
+ passphrase = self.passphrase
+ pname = self.pname
+ else:
+ raise PassphraseError("Multiple passphrases found in %s, "
+ "specify one on the command line with -p" %
+ Bcfg2.Options.setup.configfile)
+ return (pname, passphrase)
- # this is not a good use of a generator, but we need to
- # generate the full list of elements in order to ensure that
- # some exist before we know what to return
- for elt in elements:
- yield elt
-
- def unchunk(self, data, original):
- # Properties elements are modified in-place, so we don't
- # actually need to unchunk anything
- xdata = Encryptor.unchunk(self, data, original)
- # find root element
- while xdata.getparent() is not None:
- xdata = xdata.getparent()
- return lxml.etree.tostring(xdata,
- xml_declaration=False,
- pretty_print=True).decode('UTF-8')
-
- def _get_passphrase(self, chunk):
- pname = chunk.get("encrypted")
- if pname and pname.lower() != "true":
- return pname
- return None
-
- def _decrypt(self, crypted, passphrase):
- # crypted is in lxml.etree._Element
- if not crypted.text or not crypted.text.strip():
- self.logger.warning("Skipping empty element %s" % crypted.tag)
- return crypted
- decrypted = ssl_decrypt(crypted.text, passphrase).strip()
- try:
- crypted.text = decrypted.encode('ascii', 'xmlcharrefreplace')
- except UnicodeDecodeError:
- # we managed to decrypt the value, but it contains content
- # that can't even be encoded into xml entities. what
- # probably happened here is that we coincidentally could
- # decrypt a value encrypted with a different key, and
- # wound up with gibberish.
- self.logger.warning("Decrypted %s to gibberish, skipping" %
- crypted.tag)
- return crypted
+ def _write(self, filename, data):
+ """ Write the data """
+ data.getroottree().write(filename,
+ xml_declaration=False,
+ pretty_print=True)
+
+
+class PropertiesEncryptor(Encryptor, PropertiesCryptoMixin):
+ """ encryptor class for Properties files """
+
+ def encrypt(self):
+ xdata = lxml.etree.XML(self.data, parser=XMLParser)
+ for elt in self._get_elements(xdata):
+ try:
+ pname, passphrase = self._get_element_passphrase(elt)
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ return False
+ elt.text = ssl_encrypt(elt.text, passphrase).strip()
+ elt.set("encrypted", pname)
+ return xdata
+
+ def _write(self, filename, data):
+ PropertiesCryptoMixin._write(self, filename, data)
+
+
+class PropertiesDecryptor(Decryptor, PropertiesCryptoMixin):
+ """ decryptor class for Properties files """
+ default_xpath = '//*[@encrypted]'
+
+ def decrypt(self):
+ xdata = lxml.etree.XML(self.data, parser=XMLParser)
+ for elt in self._get_elements(xdata):
+ try:
+ pname, passphrase = self._get_element_passphrase(elt)
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ return False
+ decrypted = ssl_decrypt(elt.text, passphrase).strip()
+ try:
+ elt.text = decrypted.encode('ascii', 'xmlcharrefreplace')
+ elt.set("encrypted", pname)
+ except UnicodeDecodeError:
+ # we managed to decrypt the value, but it contains
+ # content that can't even be encoded into xml
+ # entities. what probably happened here is that we
+ # coincidentally could decrypt a value encrypted with
+ # a different key, and wound up with gibberish.
+ self.logger.warning("Decrypted %s to gibberish, skipping" %
+ elt.tag)
+ return xdata
+
+ def _write(self, filename, data):
+ PropertiesCryptoMixin._write(self, filename, data)
class CLI(object):
@@ -569,7 +473,7 @@ class CLI(object):
def __init__(self):
parser = Bcfg2.Options.get_parser(
description="Encrypt and decrypt Bcfg2 data",
- components=[self, OptionContainer])
+ components=[self, _OptionContainer])
parser.parse()
self.logger = logging.getLogger(parser.prog)
@@ -582,33 +486,15 @@ class CLI(object):
self.logger.error("Cannot decrypt interactively")
Bcfg2.Options.setup.interactive = False
- if Bcfg2.Options.setup.cfg:
- if Bcfg2.Options.setup.xpath:
- self.logger.error("Specifying --xpath with --cfg is "
- "nonsensical, ignoring --xpath")
- Bcfg2.Options.setup.xpath = None
- if Bcfg2.Options.setup.interactive:
- self.logger.error("Cannot use interactive mode with --cfg, "
- "ignoring --interactive")
- Bcfg2.Options.setup.interactive = False
- elif Bcfg2.Options.setup.properties:
- if Bcfg2.Options.setup.remove:
- self.logger.error("--remove cannot be used with --properties, "
- "ignoring --remove")
- Bcfg2.Options.setup.remove = False
-
- self.props_crypt = PropertiesEncryptor()
- self.cfg_crypt = CfgEncryptor()
-
def _is_properties(self, filename):
""" Determine if a given file is a Properties file or not """
if Bcfg2.Options.setup.properties:
return True
elif Bcfg2.Options.setup.cfg:
return False
- elif fname.endswith(".xml"):
+ elif filename.endswith(".xml"):
try:
- xroot = lxml.etree.parse(fname).getroot()
+ xroot = lxml.etree.parse(filename).getroot()
return xroot.tag == "Properties"
except lxml.etree.XMLSyntaxError:
return False
@@ -632,46 +518,66 @@ class CLI(object):
continue
if props:
- encryptor = self.props_crypt
if Bcfg2.Options.setup.remove:
- self.logger.warning("Cannot use --remove with Properties "
- "file %s, ignoring for this file" %
- fname)
+ self.logger.info("Cannot use --remove with Properties "
+ "file %s, ignoring for this file" % fname)
+ try:
+ tools = (PropertiesEncryptor(fname),
+ PropertiesDecryptor(fname))
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ continue
+ except IOError:
+ self.logger.error("Error reading %s, skipping: %s" %
+ (fname, err))
+ continue
else:
if Bcfg2.Options.setup.xpath:
- self.logger.warning("Cannot use xpath with Cfg file %s, "
- "ignoring xpath for this file" % fname)
+ self.logger.error("Specifying --xpath with --cfg is "
+ "nonsensical, ignoring --xpath")
+ Bcfg2.Options.setup.xpath = None
if Bcfg2.Options.setup.interactive:
- self.logger.warning("Cannot use interactive mode with Cfg "
- "file %s, ignoring --interactive for "
- "this file" % fname)
- encryptor = self.cfg_crypt
+ self.logger.error("Cannot use interactive mode with "
+ "--cfg, ignoring --interactive")
+ Bcfg2.Options.setup.interactive = False
+ try:
+ tools = (CfgEncryptor(fname), CfgDecryptor(fname))
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ continue
+ except IOError:
+ self.logger.error("Error reading %s, skipping: %s" %
+ (fname, err))
+ continue
data = None
+ mode = None
if Bcfg2.Options.setup.encrypt:
- xform = encryptor.encrypt
- write = encryptor.write_encrypted
+ tool = tools[0]
+ mode = "encrypt"
elif Bcfg2.Options.setup.decrypt:
- xform = encryptor.decrypt
- write = encryptor.write_decrypted
+ tool = tools[1]
+ mode = "decrypt"
else:
- self.logger.warning("Neither --encrypt nor --decrypt "
- "specified, determining mode")
- data = encryptor.decrypt(fname)
- if data:
- write = encryptor.write_decrypted
- else:
- self.logger.warning("Failed to decrypt %s, trying "
- "encryption" % fname)
+ self.logger.info("Neither --encrypt nor --decrypt specified, "
+ "determining mode")
+ tool = tools[1]
+ try:
+ data = tool.decrypt()
+ mode = "decrypt"
+ except: # pylint: disable=W0702
+ pass
+ if data is False:
data = None
- xform = encryptor.encrypt
- write = encryptor.write_encrypted
+ self.logger.info("Failed to decrypt %s, trying encryption"
+ % fname)
+ tool = tools[0]
+ mode = "encrypt"
if data is None:
- data = xform(fname)
+ data = getattr(tool, mode)()
if not data:
- self.logger.error("Failed to %s %s, skipping" %
- (xform.__name__, fname))
+ self.logger.error("Failed to %s %s, skipping" % (mode, fname))
continue
if Bcfg2.Options.setup.stdout:
if len(Bcfg2.Options.setup.files) > 1:
@@ -680,10 +586,10 @@ class CLI(object):
if len(Bcfg2.Options.setup.files) > 1:
print("")
else:
- write(fname, data=data)
+ tool.write(data)
if (Bcfg2.Options.setup.remove and
- encryptor.get_encrypted_filename(fname) != fname):
+ tool.get_destination_filename(fname) != fname):
try:
os.unlink(fname)
except IOError: