summaryrefslogtreecommitdiffstats
path: root/src/sbin/bcfg2-crypt
diff options
context:
space:
mode:
Diffstat (limited to 'src/sbin/bcfg2-crypt')
-rwxr-xr-xsrc/sbin/bcfg2-crypt472
1 files changed, 2 insertions, 470 deletions
diff --git a/src/sbin/bcfg2-crypt b/src/sbin/bcfg2-crypt
index 98a1ca4b0..26d5eedf1 100755
--- a/src/sbin/bcfg2-crypt
+++ b/src/sbin/bcfg2-crypt
@@ -1,476 +1,8 @@
#!/usr/bin/env python
""" helper for encrypting/decrypting Cfg and Properties files """
-import os
import sys
-import copy
-import select
-import logging
-import lxml.etree
-import Bcfg2.Logger
-import Bcfg2.Options
-from Bcfg2.Server import XMLParser
-from Bcfg2.Compat import input # pylint: disable=W0622
-try:
- import Bcfg2.Encryption
-except ImportError:
- print("Could not import %s. Is M2Crypto installed?" % sys.exc_info()[1])
- raise SystemExit(1)
-
-
-def print_xml(element, keep_text=False):
- """ Render an XML element for error output. This prefixes the
- line number and removes children for nicer display.
-
- :param element: The element to render
- :type element: lxml.etree._Element
- :param keep_text: Do not discard text content from the element for
- display
- :type keep_text: boolean
- """
- xml = None
- if len(element) or element.text:
- el = copy.copy(element)
- if el.text and not keep_text:
- el.text = '...'
- for child in el.iterchildren():
- el.remove(child)
- xml = lxml.etree.tostring(
- el,
- xml_declaration=False).decode("UTF-8").strip()
- else:
- xml = lxml.etree.tostring(
- element,
- xml_declaration=False).decode("UTF-8").strip()
- return "%s (line %s)" % (xml, element.sourceline)
-
-
-class PassphraseError(Exception):
- """ Exception raised when there's a problem determining the
- passphrase to encrypt or decrypt with """
-
-
-class CryptoTool(object):
- """ Generic decryption/encryption interface base object """
- def __init__(self, filename, setup):
- self.setup = setup
- self.logger = logging.getLogger(self.__class__.__name__)
- self.passphrases = Bcfg2.Encryption.get_passphrases(self.setup)
-
- self.filename = filename
- try:
- self.data = open(self.filename).read()
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error reading %s, skipping: %s" % (filename,
- err))
- return False
-
- self.pname, self.passphrase = self._get_passphrase()
-
- def _get_passphrase(self):
- """ get the passphrase for the current file """
- if (not self.setup.cfp.has_section(Bcfg2.Encryption.CFG_SECTION) or
- len(Bcfg2.Encryption.get_passphrases(self.setup)) == 0):
- raise PassphraseError("No passphrases available in %s" %
- self.setup['configfile'])
-
- pname = None
- if self.setup['passphrase']:
- pname = self.setup['passphrase']
-
- if pname:
- if self.setup.cfp.has_option(Bcfg2.Encryption.CFG_SECTION,
- pname):
- passphrase = self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION,
- pname)
- self.logger.debug("Using passphrase %s specified on command "
- "line" % pname)
- return (pname, passphrase)
- else:
- raise PassphraseError("Could not find passphrase %s in %s" %
- (pname, self.setup['configfile']))
- else:
- pnames = Bcfg2.Encryption.get_passphrases(self.setup)
- if len(pnames) == 1:
- pname = pnames.keys()[0]
- passphrase = pnames[pname]
- self.logger.info("Using passphrase %s" % pname)
- return (pname, passphrase)
- elif len(pnames) > 1:
- return (None, None)
- raise PassphraseError("No passphrase could be determined")
-
- def get_destination_filename(self, original_filename):
- """ Get the filename where data should be written """
- return original_filename
-
- def write(self, data):
- """ write data to disk """
- new_fname = self.get_destination_filename(self.filename)
- try:
- self._write(new_fname, data)
- self.logger.info("Wrote data to %s" % new_fname)
- return True
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error writing data from %s to %s: %s" %
- (self.filename, new_fname, err))
- return False
-
- def _write(self, filename, data):
- """ Perform the actual write of data. This is separate from
- :func:`CryptoTool.write` so it can be easily
- overridden. """
- open(filename, "wb").write(data)
-
-
-class Decryptor(CryptoTool):
- """ Decryptor interface """
- def decrypt(self):
- """ decrypt the file, returning the encrypted data """
- raise NotImplementedError
-
-
-class Encryptor(CryptoTool):
- """ encryptor interface """
- def encrypt(self):
- """ encrypt the file, returning the encrypted data """
- raise NotImplementedError
-
-
-class CfgEncryptor(Encryptor):
- """ encryptor class for Cfg files """
-
- def __init__(self, filename, setup):
- Encryptor.__init__(self, filename, setup)
- if self.passphrase is None:
- raise PassphraseError("Multiple passphrases found in %s, "
- "specify one on the command line with -p" %
- self.setup['configfile'])
-
- def encrypt(self):
- return Bcfg2.Encryption.ssl_encrypt(
- self.data, self.passphrase,
- Bcfg2.Encryption.get_algorithm(self.setup))
-
- def get_destination_filename(self, original_filename):
- return original_filename + ".crypt"
-
-
-class CfgDecryptor(Decryptor):
- """ Decrypt Cfg files """
-
- def decrypt(self):
- """ decrypt the given file, returning the plaintext data """
- if self.passphrase:
- try:
- return Bcfg2.Encryption.ssl_decrypt(
- self.data, self.passphrase,
- Bcfg2.Encryption.get_algorithm(self.setup))
- except Bcfg2.Encryption.EVPError:
- self.logger.info("Could not decrypt %s with the "
- "specified passphrase" % self.filename)
- return False
- except:
- err = sys.exc_info()[1]
- self.logger.error("Error decrypting %s: %s" %
- (self.filename, err))
- return False
- else: # no passphrase given, brute force
- try:
- return Bcfg2.Encryption.bruteforce_decrypt(
- self.data, passphrases=self.passphrases.values(),
- algorithm=Bcfg2.Encryption.get_algorithm(self.setup))
- except Bcfg2.Encryption.EVPError:
- self.logger.info("Could not decrypt %s with any passphrase" %
- self.filename)
- return False
-
- def get_destination_filename(self, original_filename):
- if original_filename.endswith(".crypt"):
- return original_filename[:-6]
- else:
- return Decryptor.get_plaintext_filename(self, original_filename)
-
-
-class PropertiesCryptoMixin(object):
- """ Mixin to provide some common methods for Properties crypto """
- default_xpath = '//*'
-
- def _get_elements(self, xdata):
- """ Get the list of elements to encrypt or decrypt """
- if self.setup['xpath']:
- elements = xdata.xpath(self.setup['xpath'])
- if not elements:
- self.logger.warning("XPath expression %s matched no "
- "elements" % self.setup['xpath'])
- else:
- elements = xdata.xpath(self.default_xpath)
- if not elements:
- elements = list(xdata.getiterator(tag=lxml.etree.Element))
-
- # filter out elements without text data
- for el in elements[:]:
- if not el.text:
- elements.remove(el)
-
- if self.setup['interactive']:
- for element in elements[:]:
- if len(element):
- elt = copy.copy(element)
- for child in elt.iterchildren():
- elt.remove(child)
- else:
- elt = element
- print(lxml.etree.tostring(
- elt,
- xml_declaration=False).decode("UTF-8").strip())
- # flush input buffer
- while len(select.select([sys.stdin.fileno()], [], [],
- 0.0)[0]) > 0:
- os.read(sys.stdin.fileno(), 4096)
- ans = input("Encrypt this element? [y/N] ")
- if not ans.lower().startswith("y"):
- elements.remove(element)
- return elements
-
- def _get_element_passphrase(self, element):
- """ Get the passphrase to use to encrypt or decrypt a given
- element """
- pname = element.get("encrypted")
- if pname in self.passphrases:
- passphrase = self.passphrases[pname]
- elif self.passphrase:
- if pname:
- self.logger.warning("Passphrase %s not found in %s, "
- "using passphrase given on command line"
- % (pname, self.setup['configfile']))
- passphrase = self.passphrase
- pname = self.pname
- else:
- raise PassphraseError("Multiple passphrases found in %s, "
- "specify one on the command line with -p" %
- self.setup['configfile'])
- return (pname, passphrase)
-
- def _write(self, filename, data):
- """ Write the data """
- data.getroottree().write(filename,
- xml_declaration=False,
- pretty_print=True)
-
-
-class PropertiesEncryptor(Encryptor, PropertiesCryptoMixin):
- """ encryptor class for Properties files """
-
- def encrypt(self):
- xdata = lxml.etree.XML(self.data, parser=XMLParser)
- for elt in self._get_elements(xdata):
- try:
- pname, passphrase = self._get_element_passphrase(elt)
- except PassphraseError:
- self.logger.error(str(sys.exc_info()[1]))
- return False
- self.logger.debug("Encrypting %s" % print_xml(elt))
- elt.text = Bcfg2.Encryption.ssl_encrypt(
- elt.text, passphrase,
- Bcfg2.Encryption.get_algorithm(self.setup)).strip()
- elt.set("encrypted", pname)
- return xdata
-
- def _write(self, filename, data):
- PropertiesCryptoMixin._write(self, filename, data)
-
-
-class PropertiesDecryptor(Decryptor, PropertiesCryptoMixin):
- """ decryptor class for Properties files """
- default_xpath = '//*[@encrypted]'
-
- def decrypt(self):
- xdata = lxml.etree.XML(self.data, parser=XMLParser)
- for elt in self._get_elements(xdata):
- try:
- pname, passphrase = self._get_element_passphrase(elt)
- except PassphraseError:
- self.logger.error(str(sys.exc_info()[1]))
- return False
- self.logger.debug("Decrypting %s" % print_xml(elt))
- try:
- decrypted = Bcfg2.Encryption.ssl_decrypt(
- elt.text, passphrase,
- Bcfg2.Encryption.get_algorithm(self.setup)).strip()
- except Bcfg2.Encryption.EVPError:
- self.logger.error("Could not decrypt %s, skipping" %
- print_xml(elt))
- try:
- elt.text = decrypted.encode('ascii', 'xmlcharrefreplace')
- elt.set("encrypted", pname)
- except UnicodeDecodeError:
- # we managed to decrypt the value, but it contains
- # content that can't even be encoded into xml
- # entities. what probably happened here is that we
- # coincidentally could decrypt a value encrypted with
- # a different key, and wound up with gibberish.
- self.logger.warning("Decrypted %s to gibberish, skipping" %
- elt.tag)
- return xdata
-
- def _write(self, filename, data):
- PropertiesCryptoMixin._write(self, filename, data)
-
-
-def main(): # pylint: disable=R0912,R0915
- optinfo = dict(interactive=Bcfg2.Options.INTERACTIVE)
- optinfo.update(Bcfg2.Options.CRYPT_OPTIONS)
- optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS)
- setup = Bcfg2.Options.OptionParser(optinfo)
- setup.hm = " bcfg2-crypt [options] <filename>\nOptions:\n%s" % \
- setup.buildHelpMessage()
- setup.parse(sys.argv[1:])
-
- if not setup['args']:
- print(setup.hm)
- raise SystemExit(1)
-
- log_args = dict(to_syslog=setup['syslog'], to_console=logging.WARNING)
- if setup['verbose']:
- log_args['to_console'] = logging.DEBUG
- Bcfg2.Logger.setup_logging('bcfg2-crypt', **log_args)
- logger = logging.getLogger('bcfg2-crypt')
-
- if setup['decrypt']:
- if setup['encrypt']:
- logger.error("You cannot specify both --encrypt and --decrypt")
- raise SystemExit(1)
- elif setup['remove']:
- logger.error("--remove cannot be used with --decrypt, ignoring")
- setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default
- elif setup['interactive']:
- logger.error("Cannot decrypt interactively")
- setup['interactive'] = False
-
- if setup['cfg']:
- if setup['properties']:
- logger.error("You cannot specify both --cfg and --properties")
- raise SystemExit(1)
- if setup['xpath']:
- logger.error("Specifying --xpath with --cfg is nonsensical, "
- "ignoring --xpath")
- setup['xpath'] = Bcfg2.Options.CRYPT_XPATH.default
- if setup['interactive']:
- logger.error("You cannot use interactive mode with --cfg, "
- "ignoring -I")
- setup['interactive'] = False
- elif setup['properties']:
- if setup['remove']:
- logger.error("--remove cannot be used with --properties, ignoring")
- setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default
-
- for fname in setup['args']:
- if not os.path.exists(fname):
- logger.error("%s does not exist, skipping" % fname)
- continue
-
- # figure out if we need to encrypt this as a Properties file
- # or as a Cfg file
- props = False
- if setup['properties']:
- props = True
- elif setup['cfg']:
- props = False
- elif fname.endswith(".xml"):
- try:
- xroot = lxml.etree.parse(fname).getroot()
- if xroot.tag == "Properties":
- props = True
- else:
- props = False
- except IOError:
- err = sys.exc_info()[1]
- logger.error("Error reading %s, skipping: %s" % (fname, err))
- continue
- except lxml.etree.XMLSyntaxError:
- props = False
- else:
- props = False
-
- if props:
- if setup['remove']:
- logger.info("Cannot use --remove with Properties file %s, "
- "ignoring for this file" % fname)
- tools = (PropertiesEncryptor, PropertiesDecryptor)
- else:
- if setup['xpath']:
- logger.info("Cannot use xpath with Cfg file %s, ignoring "
- "xpath for this file" % fname)
- if setup['interactive']:
- logger.info("Cannot use interactive mode with Cfg file %s, "
- "ignoring -I for this file" % fname)
- tools = (CfgEncryptor, CfgDecryptor)
-
- data = None
- mode = None
- if setup['encrypt']:
- try:
- tool = tools[0](fname, setup)
- except PassphraseError:
- logger.error(str(sys.exc_info()[1]))
- return 2
- mode = "encrypt"
- elif setup['decrypt']:
- try:
- tool = tools[1](fname, setup)
- except PassphraseError:
- logger.error(str(sys.exc_info()[1]))
- return 2
- mode = "decrypt"
- else:
- logger.info("Neither --encrypt nor --decrypt specified, "
- "determining mode")
- try:
- tool = tools[1](fname, setup)
- except PassphraseError:
- logger.error(str(sys.exc_info()[1]))
- return 2
-
- try:
- data = tool.decrypt()
- mode = "decrypt"
- except: # pylint: disable=W0702
- pass
- if data is False:
- data = None
- logger.info("Failed to decrypt %s, trying encryption" % fname)
- try:
- tool = tools[0](fname, setup)
- except PassphraseError:
- logger.error(str(sys.exc_info()[1]))
- return 2
- mode = "encrypt"
-
- if data is None:
- data = getattr(tool, mode)()
- if data is None:
- logger.error("Failed to %s %s, skipping" % (mode, fname))
- continue
- if setup['crypt_stdout']:
- if len(setup['args']) > 1:
- print("----- %s -----" % fname)
- print(data)
- if len(setup['args']) > 1:
- print("")
- else:
- tool.write(data)
-
- if (setup['remove'] and
- tool.get_destination_filename(fname) != fname):
- try:
- os.unlink(fname)
- except IOError:
- err = sys.exc_info()[1]
- logger.error("Error removing %s: %s" % (fname, err))
- continue
+from Bcfg2.Server.Encryption import CLI
if __name__ == '__main__':
- sys.exit(main())
+ sys.exit(CLI().run())