From 72a80f89361145f1560ccc248f357a9de82eded6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 17 Jan 2013 08:01:44 -0500 Subject: abstracted encryption support from Properties/CfgPrivateKeyCreator to StructFile --- doc/server/encryption.txt | 9 +- doc/server/plugins/generators/cfg.txt | 3 + schemas/types.xsd | 8 ++ src/lib/Bcfg2/Server/Plugin/helpers.py | 64 +++++++++++- .../Server/Plugins/Cfg/CfgPrivateKeyCreator.py | 43 +------- .../Server/Plugins/Cfg/CfgPublicKeyCreator.py | 3 + src/lib/Bcfg2/Server/Plugins/FileProbes.py | 13 ++- src/lib/Bcfg2/Server/Plugins/NagiosGen.py | 4 +- .../Server/Plugins/Packages/PackagesSources.py | 2 + src/lib/Bcfg2/Server/Plugins/Properties.py | 43 -------- src/lib/Bcfg2/Server/Plugins/SSLCA.py | 1 + .../Testlib/TestServer/TestPlugin/Testhelpers.py | 109 ++++++++++++++++++- .../TestCfg/TestCfgPrivateKeyCreator.py | 99 ------------------ .../TestServer/TestPlugins/TestProperties.py | 116 --------------------- 14 files changed, 208 insertions(+), 309 deletions(-) diff --git a/doc/server/encryption.txt b/doc/server/encryption.txt index e84b9fb31..1f6cb72e6 100644 --- a/doc/server/encryption.txt +++ b/doc/server/encryption.txt @@ -23,7 +23,7 @@ separations between teams, environments, etc. single Bcfg2 repository with multiple admins who should not necessarily have access to each other's sensitive data. -Two types of data can be encrypted: +Two basic types of data can be encrypted: * :ref:`server-plugins-generators-cfg` files can be encrypted as whole files. See :ref:`server-plugins-generators-cfg-encryption` @@ -50,6 +50,13 @@ In general, Properties encryption is preferred for a few reasons: amongst different teams, this lets teams collaborate more closely on files and other data. +Other types of data that can be encrypted are: + +* Text content of Path tags in + :ref:`server-plugins-structures-bundler-index` +* Passphrases in XML description files for generated + :ref:`server-plugins-generators-cfg-sshkeys` + .. _bcfg2-crypt: bcfg2-crypt diff --git a/doc/server/plugins/generators/cfg.txt b/doc/server/plugins/generators/cfg.txt index e843b1d2d..1cb4b8727 100644 --- a/doc/server/plugins/generators/cfg.txt +++ b/doc/server/plugins/generators/cfg.txt @@ -583,6 +583,9 @@ influenced by several options in the ``[sshkeys]`` section of | | group from. | | | +----------------+---------------------------------------------------------+-----------------------+------------+ +See :ref:`server-encryption` for more details on encryption in Bcfg2 +in general. + Deltas ====== diff --git a/schemas/types.xsd b/schemas/types.xsd index 524b327c5..144ef7337 100644 --- a/schemas/types.xsd +++ b/schemas/types.xsd @@ -371,6 +371,14 @@ + + + + The name of the encryption passphrase that the text content + of this tag is encrypted with. + + + diff --git a/src/lib/Bcfg2/Server/Plugin/helpers.py b/src/lib/Bcfg2/Server/Plugin/helpers.py index 7b22d52ca..879c68b85 100644 --- a/src/lib/Bcfg2/Server/Plugin/helpers.py +++ b/src/lib/Bcfg2/Server/Plugin/helpers.py @@ -17,6 +17,12 @@ from Bcfg2.Server.Plugin.interfaces import Generator from Bcfg2.Server.Plugin.exceptions import SpecificityError, \ PluginExecutionError +try: + import Bcfg2.Encryption + HAS_CRYPTO = True +except ImportError: + HAS_CRYPTO = False + try: import django # pylint: disable=W0611 HAS_DJANGO = True @@ -571,13 +577,69 @@ class XMLFileBacked(FileBacked): class StructFile(XMLFileBacked): """ StructFiles are XML files that contain a set of structure file formatting logic for handling ```` and ```` - tags. """ + tags. + + .. ----- + .. autoattribute:: __identifier__ + """ #: If ``__identifier__`` is not None, then it must be the name of #: an XML attribute that will be required on the top-level tag of #: the file being cached __identifier__ = None + #: Whether or not encryption support is enabled in this file + encryption = True + + def __init__(self, filename, fam=None, should_monitor=False): + XMLFileBacked.__init__(self, filename, fam=fam, + should_monitor=should_monitor) + self.setup = Bcfg2.Options.get_option_parser() + + def Index(self): + Bcfg2.Server.Plugin.XMLFileBacked.Index(self) + if self.encryption: + strict = self.xdata.get( + "decrypt", + self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION, "decrypt", + default="strict")) == "strict" + for el in self.xdata.xpath("//*[@encrypted]"): + if not HAS_CRYPTO: + raise PluginExecutionError("Properties: M2Crypto is not " + "available: %s" % self.name) + try: + el.text = self._decrypt(el).encode('ascii', + 'xmlcharrefreplace') + except UnicodeDecodeError: + LOGGER.info("%s: Decrypted %s to gibberish, skipping" % + (self.name, el.tag)) + except Bcfg2.Encryption.EVPError: + msg = "Failed to decrypt %s element in %s" % (el.tag, + self.name) + if strict: + raise PluginExecutionError(msg) + else: + LOGGER.warning(msg) + Index.__doc__ = XMLFileBacked.Index.__doc__ + + def _decrypt(self, element): + """ Decrypt a single encrypted properties file element """ + if not element.text or not element.text.strip(): + return + passes = Bcfg2.Encryption.get_passphrases() + try: + passphrase = passes[element.get("encrypted")] + try: + return Bcfg2.Encryption.ssl_decrypt(element.text, passphrase) + except Bcfg2.Encryption.EVPError: + # error is raised below + pass + except KeyError: + # bruteforce_decrypt raises an EVPError with a sensible + # error message, so we just let it propagate up the stack + return Bcfg2.Encryption.bruteforce_decrypt(element.text) + raise Bcfg2.Encryption.EVPError("Failed to decrypt") + def _include_element(self, item, metadata): """ determine if an XML element matches the metadata """ if isinstance(item, lxml.etree._Comment): # pylint: disable=W0212 diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py index 54fa75b41..4d6639e4d 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py @@ -5,7 +5,7 @@ import shutil import tempfile import subprocess from Bcfg2.Options import get_option_parser -from Bcfg2.Server.Plugin import PluginExecutionError, StructFile +from Bcfg2.Server.Plugin import StructFile from Bcfg2.Server.Plugins.Cfg import CfgCreator, CfgCreationError from Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator import CfgPublicKeyCreator try: @@ -211,44 +211,3 @@ class CfgPrivateKeyCreator(CfgCreator, StructFile): finally: shutil.rmtree(os.path.dirname(filename)) # pylint: enable=W0221 - - def Index(self): - StructFile.Index(self) - if HAS_CRYPTO: - strict = self.xdata.get( - "decrypt", - SETUP.cfp.get(Bcfg2.Encryption.CFG_SECTION, "decrypt", - default="strict")) == "strict" - for el in self.xdata.xpath("//*[@encrypted]"): - try: - el.text = self._decrypt(el).encode('ascii', - 'xmlcharrefreplace') - except UnicodeDecodeError: - self.logger.info("Cfg: Decrypted %s to gibberish, skipping" - % el.tag) - except Bcfg2.Encryption.EVPError: - msg = "Cfg: Failed to decrypt %s element in %s" % \ - (el.tag, self.name) - if strict: - raise PluginExecutionError(msg) - else: - self.logger.warning(msg) - Index.__doc__ = StructFile.Index.__doc__ - - def _decrypt(self, element): - """ Decrypt a single encrypted element """ - if not element.text or not element.text.strip(): - return - passes = Bcfg2.Encryption.get_passphrases() - try: - passphrase = passes[element.get("encrypted")] - try: - return Bcfg2.Encryption.ssl_decrypt(element.text, passphrase) - except Bcfg2.Encryption.EVPError: - # error is raised below - pass - except KeyError: - # bruteforce_decrypt raises an EVPError with a sensible - # error message, so we just let it propagate up the stack - return Bcfg2.Encryption.bruteforce_decrypt(element.text) - raise Bcfg2.Encryption.EVPError("Failed to decrypt") diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py index 6be438462..4c61e338e 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py @@ -23,6 +23,9 @@ class CfgPublicKeyCreator(CfgCreator, StructFile): #: Handle XML specifications of private keys __basenames__ = ['pubkey.xml'] + #: No text content on any tags, so encryption support disabled + encryption = False + def __init__(self, fname): CfgCreator.__init__(self, fname) StructFile.__init__(self, fname) diff --git a/src/lib/Bcfg2/Server/Plugins/FileProbes.py b/src/lib/Bcfg2/Server/Plugins/FileProbes.py index 5ec0d7280..365549e85 100644 --- a/src/lib/Bcfg2/Server/Plugins/FileProbes.py +++ b/src/lib/Bcfg2/Server/Plugins/FileProbes.py @@ -51,6 +51,11 @@ print(Bcfg2.Client.XML.tostring(data, xml_declaration=False).decode('UTF-8')) """ +class FileProbesConfig(Bcfg2.Server.Plugin.StructFile): + """ Config file for FileProbes """ + encryption = False + + class FileProbes(Bcfg2.Server.Plugin.Plugin, Bcfg2.Server.Plugin.Probing): """ This module allows you to probe a client for a file, which is then @@ -63,11 +68,9 @@ class FileProbes(Bcfg2.Server.Plugin.Plugin, def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) Bcfg2.Server.Plugin.Probing.__init__(self) - self.config = \ - Bcfg2.Server.Plugin.StructFile(os.path.join(self.data, - 'config.xml'), - fam=core.fam, - should_monitor=True) + self.config = FileProbesConfig(os.path.join(self.data, 'config.xml'), + fam=core.fam, + should_monitor=True) self.entries = dict() self.probes = dict() diff --git a/src/lib/Bcfg2/Server/Plugins/NagiosGen.py b/src/lib/Bcfg2/Server/Plugins/NagiosGen.py index c39bd4c42..baea5fe23 100644 --- a/src/lib/Bcfg2/Server/Plugins/NagiosGen.py +++ b/src/lib/Bcfg2/Server/Plugins/NagiosGen.py @@ -15,12 +15,14 @@ LOGGER = logging.getLogger(__name__) class NagiosGenConfig(Bcfg2.Server.Plugin.StructFile): """ NagiosGen config file handler """ + encryption = False + def __init__(self, filename, fam): # create config.xml if missing if not os.path.exists(filename): LOGGER.warning("NagiosGen: %s missing. " "Creating empty one for you." % filename) - open(filename, "w").write("") + open(filename, "w").write("") Bcfg2.Server.Plugin.StructFile.__init__(self, filename, fam=fam, should_monitor=True) diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py index 2735e389a..afa5da4c5 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py @@ -17,6 +17,8 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile, __identifier__ = None + encryption = False + def __init__(self, filename, cachepath, fam, packages, setup): """ :param filename: The full path to ``sources.xml`` diff --git a/src/lib/Bcfg2/Server/Plugins/Properties.py b/src/lib/Bcfg2/Server/Plugins/Properties.py index c5b5ea2d1..24daa2107 100644 --- a/src/lib/Bcfg2/Server/Plugins/Properties.py +++ b/src/lib/Bcfg2/Server/Plugins/Properties.py @@ -203,49 +203,6 @@ class XMLPropertyFile(Bcfg2.Server.Plugin.StructFile, PropertyFile): return True validate_data.__doc__ = PropertyFile.validate_data.__doc__ - def Index(self): - Bcfg2.Server.Plugin.StructFile.Index(self) - strict = self.xdata.get( - "decrypt", - SETUP.cfp.get(Bcfg2.Encryption.CFG_SECTION, "decrypt", - default="strict")) == "strict" - for el in self.xdata.xpath("//*[@encrypted]"): - if not HAS_CRYPTO: - raise PluginExecutionError("Properties: M2Crypto is not " - "available: %s" % self.name) - try: - el.text = self._decrypt(el).encode('ascii', - 'xmlcharrefreplace') - except UnicodeDecodeError: - LOGGER.info("Properties: Decrypted %s to gibberish, " - "skipping" % el.tag) - except Bcfg2.Encryption.EVPError: - msg = "Properties: Failed to decrypt %s element in %s" % \ - (el.tag, self.name) - if strict: - raise PluginExecutionError(msg) - else: - LOGGER.warning(msg) - Index.__doc__ = Bcfg2.Server.Plugin.StructFile.Index.__doc__ - - def _decrypt(self, element): - """ Decrypt a single encrypted properties file element """ - if not element.text or not element.text.strip(): - return - passes = Bcfg2.Encryption.get_passphrases() - try: - passphrase = passes[element.get("encrypted")] - try: - return Bcfg2.Encryption.ssl_decrypt(element.text, passphrase) - except Bcfg2.Encryption.EVPError: - # error is raised below - pass - except KeyError: - # bruteforce_decrypt raises an EVPError with a sensible - # error message, so we just let it propagate up the stack - return Bcfg2.Encryption.bruteforce_decrypt(element.text) - raise Bcfg2.Encryption.EVPError("Failed to decrypt") - def get_additional_data(self, metadata): if self.setup.cfp.getboolean("properties", "automatch", default=False): default_automatch = "true" diff --git a/src/lib/Bcfg2/Server/Plugins/SSLCA.py b/src/lib/Bcfg2/Server/Plugins/SSLCA.py index 0d51adf18..cc1a2ceac 100644 --- a/src/lib/Bcfg2/Server/Plugins/SSLCA.py +++ b/src/lib/Bcfg2/Server/Plugins/SSLCA.py @@ -17,6 +17,7 @@ LOGGER = logging.getLogger(__name__) class SSLCAXMLSpec(Bcfg2.Server.Plugin.StructFile): """ Base class to handle key.xml and cert.xml """ + encryption = False attrs = dict() tag = None diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 75cc41a34..3d4df3e0b 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -1,5 +1,4 @@ import os -import re import sys import copy import lxml.etree @@ -21,6 +20,12 @@ from common import * from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable from TestServer.TestPlugin.Testinterfaces import TestGenerator +try: + from Bcfg2.Encryption import EVPError + HAS_CRYPTO = True +except: + HAS_CRYPTO = False + def tostring(el): return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8') @@ -674,6 +679,108 @@ class TestStructFile(TestXMLFileBacked): children[4] = standalone return (xdata, groups, subgroups, children, subchildren, standalone) + def test_Index(self): + has_crypto = Bcfg2.Server.Plugin.helpers.HAS_CRYPTO + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = False + TestXMLFileBacked.test_Index(self) + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = has_crypto + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_Index_crypto(self): + if not self.test_obj.encryption: + return skip("Encryption disabled on %s objects, skipping" % + self.test_obj.__name__) + + sf = self.get_obj() + sf.setup = Mock() + sf.setup.cfp.get.return_value = "strict" + sf._decrypt = Mock() + sf._decrypt.return_value = 'plaintext' + sf.data = ''' + + + crypted + + + plain + +''' + + # test successful decryption + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + for el in sf.xdata.xpath("//*[@encrypted]"): + self.assertEqual(el.text, sf._decrypt.return_value) + + # test failed decryption, strict + sf._decrypt.reset_mock() + sf._decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, sf.Index) + + # test failed decryption, lax + sf.setup.cfp.get.return_value = "lax" + sf._decrypt.reset_mock() + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") + @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): + sf = self.get_obj() + + def reset(): + mock_bruteforce.reset_mock() + mock_get_passphrases.reset_mock() + mock_ssl.reset_mock() + + # test element without text contents + self.assertIsNone(sf._decrypt(lxml.etree.Element("Test"))) + self.assertFalse(mock_bruteforce.called) + self.assertFalse(mock_get_passphrases.called) + self.assertFalse(mock_ssl.called) + + # test element with a passphrase in the config file + reset() + el = lxml.etree.Element("Test", encrypted="foo") + el.text = "crypted" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") + mock_ssl.return_value = "decrypted with ssl" + self.assertEqual(sf._decrypt(el), mock_ssl.return_value) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test failure to decrypt element with a passphrase in the config + reset() + mock_ssl.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test element without valid passphrase + reset() + el.set("encrypted", "true") + mock_bruteforce.return_value = "decrypted with bruteforce" + self.assertEqual(sf._decrypt(el), mock_bruteforce.return_value) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + + # test failure to decrypt element without valid passphrase + reset() + mock_bruteforce.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + def test_include_element(self): sf = self.get_obj() metadata = Mock() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index 4c8ab8b43..bf34d4c3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -306,102 +306,3 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): inner2() - def test_Index(self): - has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False - TestStructFile.test_Index(self) - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pkc = self.get_obj() - pkc.setup = Mock() - pkc.setup.cfp.get.return_value = "strict" - pkc._decrypt = Mock() - pkc._decrypt.return_value = 'plaintext' - pkc.data = ''' - - - crypted - - - plain - -''' - - # test successful decryption - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - for el in pkc.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pkc._decrypt.return_value) - - # test failed decryption, strict - pkc._decrypt.reset_mock() - pkc._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pkc.Index) - - # test failed decryption, lax - pkc.setup.cfp.get.return_value = "lax" - pkc._decrypt.reset_mock() - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pkc = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index b0ca77a78..b63d08524 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -18,12 +18,6 @@ from common import * from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \ TestPlugin, TestDirectoryBacked -try: - from Bcfg2.Encryption import EVPError - HAS_CRYPTO = True -except: - HAS_CRYPTO = False - try: import json JSON = "json" @@ -243,116 +237,6 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) - def test_Index(self): - TestStructFile.test_Index(self) - - pf = self.get_obj() - pf.xdata = lxml.etree.Element("Properties") - lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo") - pf.data = lxml.etree.tostring(pf.xdata) - # extra test: crypto is not available, but properties file is - # encrypted - has_crypto = Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = False - try: - self.assertRaises(PluginExecutionError, pf.Index) - finally: - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pf = self.get_obj() - pf._decrypt = Mock() - pf._decrypt.return_value = 'plaintext' - pf.data = ''' - - - crypted - plain - - crypted - plain - - crypted - -''' - - print "HAS_CRYPTO: %s" % HAS_CRYPTO - print "Properties HAS_CRYPTO: %s" % Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - - # test successful decryption - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - for el in pf.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pf._decrypt.return_value) - - # test failed decryption, strict - pf._decrypt.reset_mock() - pf._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pf.Index) - - # test failed decryption, lax - pf.data = pf.data.replace("strict", "lax") - pf._decrypt.reset_mock() - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pf = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - @patch("copy.copy") def test_get_additional_data(self, mock_copy): pf = self.get_obj() -- cgit v1.2.3-1-g7c22