summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-06-27 10:31:07 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-06-27 10:41:53 -0400
commit919059f0971c0f8bf18ca2cedb3c7e5319632726 (patch)
treee213c638a9a88e6bbdd15c97668865edf14e0c5f /src
parentc8641d05512d03e7335af0c8ca84cec142616f25 (diff)
downloadbcfg2-919059f0971c0f8bf18ca2cedb3c7e5319632726.tar.gz
bcfg2-919059f0971c0f8bf18ca2cedb3c7e5319632726.tar.bz2
bcfg2-919059f0971c0f8bf18ca2cedb3c7e5319632726.zip
Options: migrated bcfg2-crypt and Encryption libs to new parser
Diffstat (limited to 'src')
-rwxr-xr-xsrc/lib/Bcfg2/Server/Encryption.py571
-rwxr-xr-xsrc/sbin/bcfg2-crypt503
2 files changed, 532 insertions, 542 deletions
diff --git a/src/lib/Bcfg2/Server/Encryption.py b/src/lib/Bcfg2/Server/Encryption.py
index 797b44ab9..5c200410e 100755
--- a/src/lib/Bcfg2/Server/Encryption.py
+++ b/src/lib/Bcfg2/Server/Encryption.py
@@ -3,10 +3,17 @@ for handling encryption in Bcfg2. See :ref:`server-encryption` for
more details. """
import os
+import sys
+import copy
+import logging
+import lxml.etree
+import Bcfg2.Logger
import Bcfg2.Options
from M2Crypto import Rand
from M2Crypto.EVP import Cipher, EVPError
-from Bcfg2.Compat import StringIO, md5, b64encode, b64decode
+from Bcfg2.Utils import safe_input
+from Bcfg2.Server import XMLParser
+from Bcfg2.Compat import md5, b64encode, b64decode, StringIO
#: Constant representing the encryption operation for
#: :class:`M2Crypto.EVP.Cipher`, which uses a simple integer. This
@@ -23,26 +30,22 @@ DECRYPT = 0
#: automated fashion.
IV = r'\0' * 16
-#: The config file section encryption options and passphrases are
-#: stored in
-CFG_SECTION = "encryption"
-#: The config option used to store the algorithm
-CFG_ALGORITHM = "algorithm"
+class _OptionContainer(object):
+ options = [
+ Bcfg2.Options.BooleanOption(
+ cf=("encryption", "lax_decryption"),
+ help="Decryption failures should cause warnings, not errors"),
+ Bcfg2.Options.Option(
+ cf=("encryption", "algorithm"), default="aes_256_cbc",
+ type=lambda v: v.lower().replace("-", "_"),
+ help="The encryption algorithm to use"),
+ Bcfg2.Options.Option(
+ cf=("encryption", "*"), dest='passphrases', default=dict(),
+ help="Encryption passphrases")]
-#: The config option used to store the decryption strictness
-CFG_DECRYPT = "decrypt"
-
-#: Default cipher algorithm. To get a full list of valid algorithms,
-#: you can run::
-#:
-#: openssl list-cipher-algorithms | grep -v ' => ' | \
-#: tr 'A-Z-' 'a-z_' | sort -u
-ALGORITHM = Bcfg2.Options.get_option_parser().cfp.get( # pylint: disable=E1103
- CFG_SECTION,
- CFG_ALGORITHM,
- default="aes_256_cbc").lower().replace("-", "_")
+Bcfg2.Options.get_parser().add_component(_OptionContainer)
Rand.rand_seed(os.urandom(1024))
@@ -64,7 +67,7 @@ def _cipher_filter(cipher, instr):
return rv
-def str_encrypt(plaintext, key, iv=IV, algorithm=ALGORITHM, salt=None):
+def str_encrypt(plaintext, key, iv=IV, algorithm=None, salt=None):
""" Encrypt a string with a key. For a higher-level encryption
interface, see :func:`ssl_encrypt`.
@@ -80,11 +83,13 @@ def str_encrypt(plaintext, key, iv=IV, algorithm=ALGORITHM, salt=None):
:type salt: string
:returns: string - The decrypted data
"""
+ if algorithm is None:
+ algorithm = Bcfg2.Options.setup.algorithm
cipher = Cipher(alg=algorithm, key=key, iv=iv, op=ENCRYPT, salt=salt)
return _cipher_filter(cipher, plaintext)
-def str_decrypt(crypted, key, iv=IV, algorithm=ALGORITHM):
+def str_decrypt(crypted, key, iv=IV, algorithm=None):
""" Decrypt a string with a key. For a higher-level decryption
interface, see :func:`ssl_decrypt`.
@@ -98,11 +103,13 @@ def str_decrypt(crypted, key, iv=IV, algorithm=ALGORITHM):
:type algorithm: string
:returns: string - The decrypted data
"""
+ if algorithm is None:
+ algorithm = Bcfg2.Options.setup.algorithm
cipher = Cipher(alg=algorithm, key=key, iv=iv, op=DECRYPT)
return _cipher_filter(cipher, crypted)
-def ssl_decrypt(data, passwd, algorithm=ALGORITHM):
+def ssl_decrypt(data, passwd, algorithm=None):
""" Decrypt openssl-encrypted data. This can decrypt data
encrypted by :func:`ssl_encrypt`, or ``openssl enc``. It performs
a base64 decode first if the data is base64 encoded, and
@@ -132,7 +139,7 @@ def ssl_decrypt(data, passwd, algorithm=ALGORITHM):
return str_decrypt(data[16:], key=key, iv=iv, algorithm=algorithm)
-def ssl_encrypt(plaintext, passwd, algorithm=ALGORITHM, salt=None):
+def ssl_encrypt(plaintext, passwd, algorithm=None, salt=None):
""" Encrypt data in a format that is openssl compatible.
:param plaintext: The plaintext data to encrypt
@@ -164,25 +171,10 @@ def ssl_encrypt(plaintext, passwd, algorithm=ALGORITHM, salt=None):
return b64encode("Salted__" + salt + crypted) + "\n"
-def get_passphrases():
- """ Get all candidate encryption passphrases from the config file.
-
- :returns: dict - a dict of ``<passphrase name>``: ``<passphrase>``
- """
- setup = Bcfg2.Options.get_option_parser()
- if setup.cfp.has_section(CFG_SECTION):
- return dict([(o, setup.cfp.get(CFG_SECTION, o))
- for o in setup.cfp.options(CFG_SECTION)
- if o not in [CFG_ALGORITHM, CFG_DECRYPT]])
- else:
- return dict()
-
-
-def bruteforce_decrypt(crypted, passphrases=None, algorithm=ALGORITHM):
+def bruteforce_decrypt(crypted, passphrases=None, algorithm=None):
""" Convenience method to decrypt the given encrypted string by
- trying the given passphrases or all passphrases (as returned by
- :func:`get_passphrases`) sequentially until one is found that
- works.
+ trying the given passphrases or all passphrases sequentially until
+ one is found that works.
:param crypted: The data to decrypt
:type crypted: string
@@ -194,10 +186,507 @@ def bruteforce_decrypt(crypted, passphrases=None, algorithm=ALGORITHM):
:raises: :class:`M2Crypto.EVP.EVPError`, if the data cannot be decrypted
"""
if passphrases is None:
- passphrases = get_passphrases().values()
+ passphrases = Bcfg2.Options.setup.passphrases.values()
for passwd in passphrases:
try:
return ssl_decrypt(crypted, passwd, algorithm=algorithm)
except EVPError:
pass
raise EVPError("Failed to decrypt")
+
+
+class EncryptionChunkingError(Exception):
+ """ error raised when Encryptor cannot break a file up into chunks
+ to be encrypted, or cannot reassemble the chunks """
+ pass
+
+
+class Encryptor(object):
+ """ Generic encryptor for all files """
+
+ def __init__(self):
+ self.passphrase = None
+ self.pname = None
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ def get_encrypted_filename(self, plaintext_filename):
+ """ get the name of the file encrypted data should be written to """
+ return plaintext_filename
+
+ def get_plaintext_filename(self, encrypted_filename):
+ """ get the name of the file decrypted data should be written to """
+ return encrypted_filename
+
+ def chunk(self, data):
+ """ generator to break the file up into smaller chunks that
+ will each be individually encrypted or decrypted """
+ yield data
+
+ def unchunk(self, data, original): # pylint: disable=W0613
+ """ given chunks of a file, reassemble then into the whole file """
+ try:
+ return data[0]
+ except IndexError:
+ raise EncryptionChunkingError("No data to unchunk")
+
+ def set_passphrase(self):
+ """ set the passphrase for the current file """
+ if not Bcfg2.Options.setup.passphrases:
+ self.logger.error("No passphrases available in %s" %
+ Bcfg2.Options.setup.config)
+ return False
+
+ if self.passphrase:
+ self.logger.debug("Using previously determined passphrase %s" %
+ self.pname)
+ return True
+
+ if Bcfg2.Options.setup.passphrase:
+ self.pname = Bcfg2.Options.setup.passphrase
+
+ if self.pname:
+ try:
+ self.passphrase = Bcfg2.Options.setup.passphrases[self.pname]
+ self.logger.debug("Using passphrase %s specified on command "
+ "line" % self.pname)
+ return True
+ except KeyError:
+ self.logger.error("Could not find passphrase %s in %s" %
+ (self.pname, Bcfg2.Options.setup.config))
+ return False
+ else:
+ pnames = Bcfg2.Options.setup.passphrases
+ if len(pnames) == 1:
+ self.pname = pnames.keys()[0]
+ self.passphrase = pnames[self.pname]
+ self.logger.info("Using passphrase %s" % self.pname)
+ return True
+ elif len(pnames) > 1:
+ self.logger.warning("Multiple passphrases found in %s, "
+ "specify one on the command line with -p" %
+ Bcfg2.Options.setup.config)
+ self.logger.info("No passphrase could be determined")
+ return False
+
+ def encrypt(self, fname):
+ """ encrypt the given file, returning the encrypted data """
+ try:
+ plaintext = open(fname).read()
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error reading %s, skipping: %s" % (fname, err))
+ return False
+
+ if not self.set_passphrase():
+ return False
+
+ crypted = []
+ try:
+ for chunk in self.chunk(plaintext):
+ try:
+ passphrase, pname = self.get_passphrase(chunk)
+ except TypeError:
+ return False
+
+ crypted.append(self._encrypt(chunk, passphrase, name=pname))
+ except EncryptionChunkingError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error getting data to encrypt from %s: %s" %
+ (fname, err))
+ return False
+ return self.unchunk(crypted, plaintext)
+
+ # pylint: disable=W0613
+ def _encrypt(self, plaintext, passphrase, name=None):
+ """ encrypt a single chunk of a file """
+ return ssl_encrypt(plaintext, passphrase)
+ # pylint: enable=W0613
+
+ def decrypt(self, fname):
+ """ decrypt the given file, returning the plaintext data """
+ try:
+ crypted = open(fname).read()
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error reading %s, skipping: %s" % (fname, err))
+ return False
+
+ self.set_passphrase()
+
+ plaintext = []
+ try:
+ for chunk in self.chunk(crypted):
+ try:
+ passphrase, pname = self.get_passphrase(chunk)
+ try:
+ plaintext.append(self._decrypt(chunk, passphrase))
+ except EVPError:
+ self.logger.info("Could not decrypt %s with the "
+ "specified passphrase" % fname)
+ continue
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("Error decrypting %s: %s" %
+ (fname, err))
+ continue
+ except TypeError:
+ pchunk = None
+ for pname, passphrase in \
+ Bcfg2.Options.setup.passphrases.items():
+ self.logger.debug("Trying passphrase %s" % pname)
+ try:
+ pchunk = self._decrypt(chunk, passphrase)
+ break
+ except EVPError:
+ pass
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("Error decrypting %s: %s" %
+ (fname, err))
+ if pchunk is not None:
+ plaintext.append(pchunk)
+ else:
+ self.logger.error("Could not decrypt %s with any "
+ "passphrase in %s" %
+ (fname, Bcfg2.Options.setup.config))
+ continue
+ except EncryptionChunkingError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error getting encrypted data from %s: %s" %
+ (fname, err))
+ return False
+
+ try:
+ return self.unchunk(plaintext, crypted)
+ except EncryptionChunkingError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error assembling plaintext data from %s: %s" %
+ (fname, err))
+ return False
+
+ def _decrypt(self, crypted, passphrase):
+ """ decrypt a single chunk """
+ return ssl_decrypt(crypted, passphrase)
+
+ def write_encrypted(self, fname, data=None):
+ """ write encrypted data to disk """
+ if data is None:
+ data = self.decrypt(fname)
+ new_fname = self.get_encrypted_filename(fname)
+ try:
+ open(new_fname, "wb").write(data)
+ self.logger.info("Wrote encrypted data to %s" % new_fname)
+ return True
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error writing encrypted data from %s to %s: %s"
+ % (fname, new_fname, err))
+ return False
+ except EncryptionChunkingError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error assembling encrypted data from %s: %s" %
+ (fname, err))
+ return False
+
+ def write_decrypted(self, fname, data=None):
+ """ write decrypted data to disk """
+ if data is None:
+ data = self.decrypt(fname)
+ new_fname = self.get_plaintext_filename(fname)
+ try:
+ open(new_fname, "wb").write(data)
+ self.logger.info("Wrote decrypted data to %s" % new_fname)
+ return True
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error writing encrypted data from %s to %s: %s"
+ % (fname, new_fname, err))
+ return False
+
+ def get_passphrase(self, chunk):
+ """ get the passphrase for a chunk of a file """
+ pname = self._get_passphrase(chunk)
+ if not self.pname:
+ if not pname:
+ self.logger.info("No passphrase given on command line or "
+ "found in file")
+ return False
+ elif pname in Bcfg2.Options.setup.passphrases:
+ passphrase = Bcfg2.Options.setup.passphrases[pname]
+ else:
+ self.logger.error("Could not find passphrase %s in %s" %
+ (pname, Bcfg2.Options.setup.config))
+ return False
+ else:
+ pname = self.pname
+ passphrase = self.passphrase
+ if self.pname != pname:
+ self.logger.warning("Passphrase given on command line (%s) "
+ "differs from passphrase embedded in "
+ "file (%s), using command-line option" %
+ (self.pname, pname))
+ return (passphrase, pname)
+
+ def _get_passphrase(self, chunk): # pylint: disable=W0613
+ """ get the passphrase for a chunk of a file """
+ return None
+
+
+class CfgEncryptor(Encryptor):
+ """ encryptor class for Cfg files """
+
+ def get_encrypted_filename(self, plaintext_filename):
+ return plaintext_filename + ".crypt"
+
+ def get_plaintext_filename(self, encrypted_filename):
+ if encrypted_filename.endswith(".crypt"):
+ return encrypted_filename[:-6]
+ else:
+ return Encryptor.get_plaintext_filename(self, encrypted_filename)
+
+
+class PropertiesEncryptor(Encryptor):
+ """ encryptor class for Properties files """
+
+ def _encrypt(self, plaintext, passphrase, name=None):
+ # plaintext is an lxml.etree._Element
+ if name is None:
+ name = "true"
+ if plaintext.text and plaintext.text.strip():
+ plaintext.text = ssl_encrypt(plaintext.text, passphrase).strip()
+ plaintext.set("encrypted", name)
+ return plaintext
+
+ def chunk(self, data):
+ xdata = lxml.etree.XML(data, parser=XMLParser)
+ if Bcfg2.Options.setup.xpath:
+ elements = xdata.xpath(Bcfg2.Options.setup.xpath)
+ if not elements:
+ raise EncryptionChunkingError(
+ "XPath expression %s matched no elements" %
+ Bcfg2.Options.setup.xpath)
+ else:
+ elements = xdata.xpath('//*[@encrypted]')
+ if not elements:
+ elements = list(xdata.getiterator(tag=lxml.etree.Element))
+
+ # filter out elements without text data
+ for el in elements[:]:
+ if not el.text:
+ elements.remove(el)
+
+ if Bcfg2.Options.setup.interactive:
+ for element in elements[:]:
+ if len(element):
+ elt = copy.copy(element)
+ for child in elt.iterchildren():
+ elt.remove(child)
+ else:
+ elt = element
+ print(lxml.etree.tostring(
+ elt,
+ xml_declaration=False).decode("UTF-8").strip())
+ ans = safe_input("Encrypt this element? [y/N] ")
+ if not ans.lower().startswith("y"):
+ elements.remove(element)
+
+ # this is not a good use of a generator, but we need to
+ # generate the full list of elements in order to ensure that
+ # some exist before we know what to return
+ for elt in elements:
+ yield elt
+
+ def unchunk(self, data, original):
+ # Properties elements are modified in-place, so we don't
+ # actually need to unchunk anything
+ xdata = Encryptor.unchunk(self, data, original)
+ # find root element
+ while xdata.getparent() is not None:
+ xdata = xdata.getparent()
+ return lxml.etree.tostring(xdata,
+ xml_declaration=False,
+ pretty_print=True).decode('UTF-8')
+
+ def _get_passphrase(self, chunk):
+ pname = chunk.get("encrypted")
+ if pname and pname.lower() != "true":
+ return pname
+ return None
+
+ def _decrypt(self, crypted, passphrase):
+ # crypted is in lxml.etree._Element
+ if not crypted.text or not crypted.text.strip():
+ self.logger.warning("Skipping empty element %s" % crypted.tag)
+ return crypted
+ decrypted = ssl_decrypt(crypted.text, passphrase).strip()
+ try:
+ crypted.text = decrypted.encode('ascii', 'xmlcharrefreplace')
+ except UnicodeDecodeError:
+ # we managed to decrypt the value, but it contains content
+ # that can't even be encoded into xml entities. what
+ # probably happened here is that we coincidentally could
+ # decrypt a value encrypted with a different key, and
+ # wound up with gibberish.
+ self.logger.warning("Decrypted %s to gibberish, skipping" %
+ crypted.tag)
+ return crypted
+
+
+class CLI(object):
+ """ The bcfg2-crypt CLI """
+
+ options = [
+ Bcfg2.Options.ExclusiveOptionGroup(
+ Bcfg2.Options.BooleanOption(
+ "--encrypt", help='Encrypt the specified file'),
+ Bcfg2.Options.BooleanOption(
+ "--decrypt", help='Decrypt the specified file')),
+ Bcfg2.Options.BooleanOption(
+ "--stdout",
+ help='Decrypt or encrypt the specified file to stdout'),
+ Bcfg2.Options.Option(
+ "-p", "--passphrase", metavar="NAME",
+ help='Encryption passphrase name'),
+ Bcfg2.Options.ExclusiveOptionGroup(
+ Bcfg2.Options.BooleanOption(
+ "--properties",
+ help='Encrypt the specified file as a Properties file'),
+ Bcfg2.Options.BooleanOption(
+ "--cfg", help='Encrypt the specified file as a Cfg file')),
+ Bcfg2.Options.OptionGroup(
+ Bcfg2.Options.Common.interactive,
+ Bcfg2.Options.Option(
+ "--xpath",
+ help='XPath expression to select elements to encrypt'),
+ title="Options for handling Properties files"),
+ Bcfg2.Options.OptionGroup(
+ Bcfg2.Options.BooleanOption(
+ "--remove", help='Remove the plaintext file after encrypting'),
+ title="Options for handling Cfg files"),
+ Bcfg2.Options.PathOption(
+ "files", help="File(s) to encrypt or decrypt", nargs='+')]
+
+ def __init__(self):
+ parser = Bcfg2.Options.get_parser(
+ description="Encrypt and decrypt Bcfg2 data",
+ components=[self, OptionContainer])
+ parser.parse()
+ self.logger = logging.getLogger(parser.prog)
+
+ if Bcfg2.Options.setup.decrypt:
+ if Bcfg2.Options.setup.remove:
+ self.logger.error("--remove cannot be used with --decrypt, "
+ "ignoring --remove")
+ Bcfg2.Options.setup.remove = False
+ elif Bcfg2.Options.setup.interactive:
+ self.logger.error("Cannot decrypt interactively")
+ Bcfg2.Options.setup.interactive = False
+
+ if Bcfg2.Options.setup.cfg:
+ if Bcfg2.Options.setup.xpath:
+ self.logger.error("Specifying --xpath with --cfg is "
+ "nonsensical, ignoring --xpath")
+ Bcfg2.Options.setup.xpath = None
+ if Bcfg2.Options.setup.interactive:
+ self.logger.error("Cannot use interactive mode with --cfg, "
+ "ignoring --interactive")
+ Bcfg2.Options.setup.interactive = False
+ elif Bcfg2.Options.setup.properties:
+ if Bcfg2.Options.setup.remove:
+ self.logger.error("--remove cannot be used with --properties, "
+ "ignoring --remove")
+ Bcfg2.Options.setup.remove = False
+
+ self.props_crypt = PropertiesEncryptor()
+ self.cfg_crypt = CfgEncryptor()
+
+ def _is_properties(self, filename):
+ """ Determine if a given file is a Properties file or not """
+ if Bcfg2.Options.setup.properties:
+ return True
+ elif Bcfg2.Options.setup.cfg:
+ return False
+ elif fname.endswith(".xml"):
+ try:
+ xroot = lxml.etree.parse(fname).getroot()
+ return xroot.tag == "Properties"
+ except lxml.etree.XMLSyntaxError:
+ return False
+ else:
+ return False
+
+ def run(self): # pylint: disable=R0912,R0915
+ for fname in Bcfg2.Options.setup.files:
+ if not os.path.exists(fname):
+ self.logger.error("%s does not exist, skipping" % fname)
+ continue
+
+ # figure out if we need to encrypt this as a Properties file
+ # or as a Cfg file
+ try:
+ props = self._is_properties(fname)
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error reading %s, skipping: %s" %
+ (fname, err))
+ continue
+
+ if props:
+ encryptor = self.props_crypt
+ if Bcfg2.Options.setup.remove:
+ self.logger.warning("Cannot use --remove with Properties "
+ "file %s, ignoring for this file" %
+ fname)
+ else:
+ if Bcfg2.Options.setup.xpath:
+ self.logger.warning("Cannot use xpath with Cfg file %s, "
+ "ignoring xpath for this file" % fname)
+ if Bcfg2.Options.setup.interactive:
+ self.logger.warning("Cannot use interactive mode with Cfg "
+ "file %s, ignoring --interactive for "
+ "this file" % fname)
+ encryptor = self.cfg_crypt
+
+ data = None
+ if Bcfg2.Options.setup.encrypt:
+ xform = encryptor.encrypt
+ write = encryptor.write_encrypted
+ elif Bcfg2.Options.setup.decrypt:
+ xform = encryptor.decrypt
+ write = encryptor.write_decrypted
+ else:
+ self.logger.warning("Neither --encrypt nor --decrypt "
+ "specified, determining mode")
+ data = encryptor.decrypt(fname)
+ if data:
+ write = encryptor.write_decrypted
+ else:
+ self.logger.warning("Failed to decrypt %s, trying "
+ "encryption" % fname)
+ data = None
+ xform = encryptor.encrypt
+ write = encryptor.write_encrypted
+
+ if data is None:
+ data = xform(fname)
+ if not data:
+ self.logger.error("Failed to %s %s, skipping" %
+ (xform.__name__, fname))
+ continue
+ if Bcfg2.Options.setup.stdout:
+ if len(Bcfg2.Options.setup.files) > 1:
+ print("----- %s -----" % fname)
+ print(data)
+ if len(Bcfg2.Options.setup.files) > 1:
+ print("")
+ else:
+ write(fname, data=data)
+
+ if (Bcfg2.Options.setup.remove and
+ encryptor.get_encrypted_filename(fname) != fname):
+ try:
+ os.unlink(fname)
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error removing %s: %s" % (fname, err))
+ continue
diff --git a/src/sbin/bcfg2-crypt b/src/sbin/bcfg2-crypt
index a75c0da9d..26d5eedf1 100755
--- a/src/sbin/bcfg2-crypt
+++ b/src/sbin/bcfg2-crypt
@@ -1,507 +1,8 @@
#!/usr/bin/env python
""" helper for encrypting/decrypting Cfg and Properties files """
-import os
import sys
-import copy
-import select
-import logging
-import lxml.etree
-import Bcfg2.Logger
-import Bcfg2.Options
-from Bcfg2.Server import XMLParser
-from Bcfg2.Compat import input # pylint: disable=W0622
-try:
- import Bcfg2.Server.Encryption
-except ImportError:
- print("Could not import %s. Is M2Crypto installed?" % sys.exc_info()[1])
- raise SystemExit(1)
-
-
-class EncryptionChunkingError(Exception):
- """ error raised when Encryptor cannot break a file up into chunks
- to be encrypted, or cannot reassemble the chunks """
- pass
-
-
-class Encryptor(object):
- """ Generic encryptor for all files """
-
- def __init__(self):
- self.setup = Bcfg2.Options.get_option_parser()
- self.passphrase = None
- self.pname = None
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def get_encrypted_filename(self, plaintext_filename):
- """ get the name of the file encrypted data should be written to """
- return plaintext_filename
-
- def get_plaintext_filename(self, encrypted_filename):
- """ get the name of the file decrypted data should be written to """
- return encrypted_filename
-
- def chunk(self, data):
- """ generator to break the file up into smaller chunks that
- will each be individually encrypted or decrypted """
- yield data
-
- def unchunk(self, data, original): # pylint: disable=W0613
- """ given chunks of a file, reassemble then into the whole file """
- try:
- return data[0]
- except IndexError:
- raise EncryptionChunkingError("No data to unchunk")
-
- def set_passphrase(self):
- """ set the passphrase for the current file """
- if (not self.setup.cfp.has_section(Bcfg2.Server.Encryption.CFG_SECTION)
- or len(Bcfg2.Server.Encryption.get_passphrases()) == 0):
- self.logger.error("No passphrases available in %s" %
- self.setup['configfile'])
- return False
-
- if self.passphrase:
- self.logger.debug("Using previously determined passphrase %s" %
- self.pname)
- return True
-
- if self.setup['passphrase']:
- self.pname = self.setup['passphrase']
-
- if self.pname:
- if self.setup.cfp.has_option(Bcfg2.Server.Encryption.CFG_SECTION,
- self.pname):
- self.passphrase = \
- self.setup.cfp.get(Bcfg2.Server.Encryption.CFG_SECTION,
- self.pname)
- self.logger.debug("Using passphrase %s specified on command "
- "line" % self.pname)
- return True
- else:
- self.logger.error("Could not find passphrase %s in %s" %
- (self.pname, self.setup['configfile']))
- return False
- else:
- pnames = Bcfg2.Server.Encryption.get_passphrases()
- if len(pnames) == 1:
- self.pname = pnames.keys()[0]
- self.passphrase = pnames[self.pname]
- self.logger.info("Using passphrase %s" % self.pname)
- return True
- elif len(pnames) > 1:
- self.logger.warning("Multiple passphrases found in %s, "
- "specify one on the command line with -p" %
- self.setup['configfile'])
- self.logger.info("No passphrase could be determined")
- return False
-
- def encrypt(self, fname):
- """ encrypt the given file, returning the encrypted data """
- try:
- plaintext = open(fname).read()
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error reading %s, skipping: %s" % (fname, err))
- return False
-
- if not self.set_passphrase():
- return False
-
- crypted = []
- try:
- for chunk in self.chunk(plaintext):
- try:
- passphrase, pname = self.get_passphrase(chunk)
- except TypeError:
- return False
-
- crypted.append(self._encrypt(chunk, passphrase, name=pname))
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error getting data to encrypt from %s: %s" %
- (fname, err))
- return False
- return self.unchunk(crypted, plaintext)
-
- # pylint: disable=W0613
- def _encrypt(self, plaintext, passphrase, name=None):
- """ encrypt a single chunk of a file """
- return Bcfg2.Server.Encryption.ssl_encrypt(plaintext, passphrase)
- # pylint: enable=W0613
-
- def decrypt(self, fname):
- """ decrypt the given file, returning the plaintext data """
- try:
- crypted = open(fname).read()
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error reading %s, skipping: %s" % (fname, err))
- return False
-
- self.set_passphrase()
-
- plaintext = []
- try:
- for chunk in self.chunk(crypted):
- try:
- passphrase, pname = self.get_passphrase(chunk)
- try:
- plaintext.append(self._decrypt(chunk, passphrase))
- except Bcfg2.Server.Encryption.EVPError:
- self.logger.info("Could not decrypt %s with the "
- "specified passphrase" % fname)
- continue
- except:
- err = sys.exc_info()[1]
- self.logger.error("Error decrypting %s: %s" %
- (fname, err))
- continue
- except TypeError:
- pchunk = None
- for pname, passphrase in \
- Bcfg2.Server.Encryption.get_passphrases().items():
- self.logger.debug("Trying passphrase %s" % pname)
- try:
- pchunk = self._decrypt(chunk, passphrase)
- break
- except Bcfg2.Server.Encryption.EVPError:
- pass
- except:
- err = sys.exc_info()[1]
- self.logger.error("Error decrypting %s: %s" %
- (fname, err))
- if pchunk is not None:
- plaintext.append(pchunk)
- else:
- self.logger.error("Could not decrypt %s with any "
- "passphrase in %s" %
- (fname, self.setup['configfile']))
- continue
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error getting encrypted data from %s: %s" %
- (fname, err))
- return False
-
- try:
- return self.unchunk(plaintext, crypted)
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error assembling plaintext data from %s: %s" %
- (fname, err))
- return False
-
- def _decrypt(self, crypted, passphrase):
- """ decrypt a single chunk """
- return Bcfg2.Server.Encryption.ssl_decrypt(crypted, passphrase)
-
- def write_encrypted(self, fname, data=None):
- """ write encrypted data to disk """
- if data is None:
- data = self.decrypt(fname)
- new_fname = self.get_encrypted_filename(fname)
- try:
- open(new_fname, "wb").write(data)
- self.logger.info("Wrote encrypted data to %s" % new_fname)
- return True
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error writing encrypted data from %s to %s: %s"
- % (fname, new_fname, err))
- return False
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error assembling encrypted data from %s: %s" %
- (fname, err))
- return False
-
- def write_decrypted(self, fname, data=None):
- """ write decrypted data to disk """
- if data is None:
- data = self.decrypt(fname)
- new_fname = self.get_plaintext_filename(fname)
- try:
- open(new_fname, "wb").write(data)
- self.logger.info("Wrote decrypted data to %s" % new_fname)
- return True
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error writing encrypted data from %s to %s: %s"
- % (fname, new_fname, err))
- return False
-
- def get_passphrase(self, chunk):
- """ get the passphrase for a chunk of a file """
- pname = self._get_passphrase(chunk)
- if not self.pname:
- if not pname:
- self.logger.info("No passphrase given on command line or "
- "found in file")
- return False
- elif self.setup.cfp.has_option(Bcfg2.Server.Encryption.CFG_SECTION,
- pname):
- passphrase = self.setup.cfp.get(
- Bcfg2.Server.Encryption.CFG_SECTION,
- pname)
- else:
- self.logger.error("Could not find passphrase %s in %s" %
- (pname, self.setup['configfile']))
- return False
- else:
- pname = self.pname
- passphrase = self.passphrase
- if self.pname != pname:
- self.logger.warning("Passphrase given on command line (%s) "
- "differs from passphrase embedded in "
- "file (%s), using command-line option" %
- (self.pname, pname))
- return (passphrase, pname)
-
- def _get_passphrase(self, chunk): # pylint: disable=W0613
- """ get the passphrase for a chunk of a file """
- return None
-
-
-class CfgEncryptor(Encryptor):
- """ encryptor class for Cfg files """
-
- def get_encrypted_filename(self, plaintext_filename):
- return plaintext_filename + ".crypt"
-
- def get_plaintext_filename(self, encrypted_filename):
- if encrypted_filename.endswith(".crypt"):
- return encrypted_filename[:-6]
- else:
- return Encryptor.get_plaintext_filename(self, encrypted_filename)
-
-
-class PropertiesEncryptor(Encryptor):
- """ encryptor class for Properties files """
-
- def _encrypt(self, plaintext, passphrase, name=None):
- # plaintext is an lxml.etree._Element
- if name is None:
- name = "true"
- if plaintext.text and plaintext.text.strip():
- plaintext.text = \
- Bcfg2.Server.Encryption.ssl_encrypt(plaintext.text,
- passphrase).strip()
- plaintext.set("encrypted", name)
- return plaintext
-
- def chunk(self, data):
- xdata = lxml.etree.XML(data, parser=XMLParser)
- if self.setup['xpath']:
- elements = xdata.xpath(self.setup['xpath'])
- if not elements:
- raise EncryptionChunkingError("XPath expression %s matched no "
- "elements" % self.setup['xpath'])
- else:
- elements = xdata.xpath('//*[@encrypted]')
- if not elements:
- elements = list(xdata.getiterator(tag=lxml.etree.Element))
-
- # filter out elements without text data
- for el in elements[:]:
- if not el.text:
- elements.remove(el)
-
- if self.setup['interactive']:
- for element in elements[:]:
- if len(element):
- elt = copy.copy(element)
- for child in elt.iterchildren():
- elt.remove(child)
- else:
- elt = element
- print(lxml.etree.tostring(
- elt,
- xml_declaration=False).decode("UTF-8").strip())
- # flush input buffer
- while len(select.select([sys.stdin.fileno()], [], [],
- 0.0)[0]) > 0:
- os.read(sys.stdin.fileno(), 4096)
- ans = input("Encrypt this element? [y/N] ")
- if not ans.lower().startswith("y"):
- elements.remove(element)
-
- # this is not a good use of a generator, but we need to
- # generate the full list of elements in order to ensure that
- # some exist before we know what to return
- for elt in elements:
- yield elt
-
- def unchunk(self, data, original):
- # Properties elements are modified in-place, so we don't
- # actually need to unchunk anything
- xdata = data[0]
- # find root element
- while xdata.getparent() is not None:
- xdata = xdata.getparent()
- return lxml.etree.tostring(xdata,
- xml_declaration=False,
- pretty_print=True).decode('UTF-8')
-
- def _get_passphrase(self, chunk):
- pname = chunk.get("encrypted")
- if pname and pname.lower() != "true":
- return pname
- return None
-
- def _decrypt(self, crypted, passphrase):
- # crypted is in lxml.etree._Element
- if not crypted.text or not crypted.text.strip():
- self.logger.warning("Skipping empty element %s" % crypted.tag)
- return crypted
- decrypted = Bcfg2.Server.Encryption.ssl_decrypt(crypted.text,
- passphrase).strip()
- try:
- crypted.text = decrypted.encode('ascii', 'xmlcharrefreplace')
- except UnicodeDecodeError:
- # we managed to decrypt the value, but it contains content
- # that can't even be encoded into xml entities. what
- # probably happened here is that we coincidentally could
- # decrypt a value encrypted with a different key, and
- # wound up with gibberish.
- self.logger.warning("Decrypted %s to gibberish, skipping" %
- crypted.tag)
- return crypted
-
-
-def main(): # pylint: disable=R0912,R0915
- optinfo = dict(interactive=Bcfg2.Options.INTERACTIVE)
- optinfo.update(Bcfg2.Options.CRYPT_OPTIONS)
- optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS)
- setup = Bcfg2.Options.load_option_parser(optinfo)
- setup.hm = " bcfg2-crypt [options] <filename>\nOptions:\n%s" % \
- setup.buildHelpMessage()
- setup.parse()
-
- if not setup['args']:
- print(setup.hm)
- raise SystemExit(1)
-
- log_args = dict(to_syslog=setup['syslog'], to_console=logging.WARNING)
- if setup['verbose']:
- log_args['to_console'] = logging.DEBUG
- Bcfg2.Logger.setup_logging('bcfg2-crypt', **log_args)
- logger = logging.getLogger('bcfg2-crypt')
-
- if setup['decrypt']:
- if setup['encrypt']:
- logger.error("You cannot specify both --encrypt and --decrypt")
- raise SystemExit(1)
- elif setup['remove']:
- logger.error("--remove cannot be used with --decrypt, ignoring")
- setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default
- elif setup['interactive']:
- logger.error("Cannot decrypt interactively")
- setup['interactive'] = False
-
- if setup['cfg']:
- if setup['properties']:
- logger.error("You cannot specify both --cfg and --properties")
- raise SystemExit(1)
- if setup['xpath']:
- logger.error("Specifying --xpath with --cfg is nonsensical, "
- "ignoring --xpath")
- setup['xpath'] = Bcfg2.Options.CRYPT_XPATH.default
- if setup['interactive']:
- logger.error("You cannot use interactive mode with --cfg, "
- "ignoring -I")
- setup['interactive'] = False
- elif setup['properties']:
- if setup['remove']:
- logger.error("--remove cannot be used with --properties, ignoring")
- setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default
-
- props_crypt = PropertiesEncryptor(setup)
- cfg_crypt = CfgEncryptor(setup)
-
- for fname in setup['args']:
- if not os.path.exists(fname):
- logger.error("%s does not exist, skipping" % fname)
- continue
-
- # figure out if we need to encrypt this as a Properties file
- # or as a Cfg file
- props = False
- if setup['properties']:
- props = True
- elif setup['cfg']:
- props = False
- elif fname.endswith(".xml"):
- try:
- xroot = lxml.etree.parse(fname).getroot()
- if xroot.tag == "Properties":
- props = True
- else:
- props = False
- except IOError:
- err = sys.exc_info()[1]
- logger.error("Error reading %s, skipping: %s" % (fname, err))
- continue
- except lxml.etree.XMLSyntaxError:
- props = False
- else:
- props = False
-
- if props:
- encryptor = props_crypt
- if setup['remove']:
- logger.info("Cannot use --remove with Properties file %s, "
- "ignoring for this file" % fname)
- else:
- if setup['xpath']:
- logger.info("Cannot use xpath with Cfg file %s, ignoring "
- "xpath for this file" % fname)
- if setup['interactive']:
- logger.info("Cannot use interactive mode with Cfg file %s, "
- "ignoring -I for this file" % fname)
- encryptor = cfg_crypt
-
- data = None
- if setup['encrypt']:
- xform = encryptor.encrypt
- write = encryptor.write_encrypted
- elif setup['decrypt']:
- xform = encryptor.decrypt
- write = encryptor.write_decrypted
- else:
- logger.info("Neither --encrypt nor --decrypt specified, "
- "determining mode")
- data = encryptor.decrypt(fname)
- if data:
- write = encryptor.write_decrypted
- else:
- logger.info("Failed to decrypt %s, trying encryption" % fname)
- data = None
- xform = encryptor.encrypt
- write = encryptor.write_encrypted
-
- if data is None:
- data = xform(fname)
- if not data:
- logger.error("Failed to %s %s, skipping" % (xform.__name__, fname))
- continue
- if setup['crypt_stdout']:
- if len(setup['args']) > 1:
- print("----- %s -----" % fname)
- print(data)
- if len(setup['args']) > 1:
- print("")
- else:
- write(fname, data=data)
-
- if (setup['remove'] and
- encryptor.get_encrypted_filename(fname) != fname):
- try:
- os.unlink(fname)
- except IOError:
- err = sys.exc_info()[1]
- logger.error("Error removing %s: %s" % (fname, err))
- continue
+from Bcfg2.Server.Encryption import CLI
if __name__ == '__main__':
- sys.exit(main())
+ sys.exit(CLI().run())