summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/server/encryption.txt9
-rw-r--r--doc/server/plugins/generators/cfg.txt3
-rw-r--r--schemas/types.xsd8
-rw-r--r--src/lib/Bcfg2/Server/Plugin/helpers.py64
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py43
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py3
-rw-r--r--src/lib/Bcfg2/Server/Plugins/FileProbes.py13
-rw-r--r--src/lib/Bcfg2/Server/Plugins/NagiosGen.py4
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py2
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Properties.py43
-rw-r--r--src/lib/Bcfg2/Server/Plugins/SSLCA.py1
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py109
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py99
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py116
14 files changed, 208 insertions, 309 deletions
diff --git a/doc/server/encryption.txt b/doc/server/encryption.txt
index e84b9fb31..1f6cb72e6 100644
--- a/doc/server/encryption.txt
+++ b/doc/server/encryption.txt
@@ -23,7 +23,7 @@ separations between teams, environments, etc.
single Bcfg2 repository with multiple admins who should not
necessarily have access to each other's sensitive data.
-Two types of data can be encrypted:
+Two basic types of data can be encrypted:
* :ref:`server-plugins-generators-cfg` files can be encrypted
as whole files. See :ref:`server-plugins-generators-cfg-encryption`
@@ -50,6 +50,13 @@ In general, Properties encryption is preferred for a few reasons:
amongst different teams, this lets teams collaborate more closely on
files and other data.
+Other types of data that can be encrypted are:
+
+* Text content of Path tags in
+ :ref:`server-plugins-structures-bundler-index`
+* Passphrases in XML description files for generated
+ :ref:`server-plugins-generators-cfg-sshkeys`
+
.. _bcfg2-crypt:
bcfg2-crypt
diff --git a/doc/server/plugins/generators/cfg.txt b/doc/server/plugins/generators/cfg.txt
index e843b1d2d..1cb4b8727 100644
--- a/doc/server/plugins/generators/cfg.txt
+++ b/doc/server/plugins/generators/cfg.txt
@@ -583,6 +583,9 @@ influenced by several options in the ``[sshkeys]`` section of
| | group from. | | |
+----------------+---------------------------------------------------------+-----------------------+------------+
+See :ref:`server-encryption` for more details on encryption in Bcfg2
+in general.
+
Deltas
======
diff --git a/schemas/types.xsd b/schemas/types.xsd
index 524b327c5..144ef7337 100644
--- a/schemas/types.xsd
+++ b/schemas/types.xsd
@@ -371,6 +371,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute type="xsd:string" name="encrypted">
+ <xsd:annotation>
+ <xsd:documentation>
+ The name of the encryption passphrase that the text content
+ of this tag is encrypted with.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attributeGroup ref="py:genshiAttrs"/>
</xsd:complexType>
diff --git a/src/lib/Bcfg2/Server/Plugin/helpers.py b/src/lib/Bcfg2/Server/Plugin/helpers.py
index 7b22d52ca..879c68b85 100644
--- a/src/lib/Bcfg2/Server/Plugin/helpers.py
+++ b/src/lib/Bcfg2/Server/Plugin/helpers.py
@@ -18,6 +18,12 @@ from Bcfg2.Server.Plugin.exceptions import SpecificityError, \
PluginExecutionError
try:
+ import Bcfg2.Encryption
+ HAS_CRYPTO = True
+except ImportError:
+ HAS_CRYPTO = False
+
+try:
import django # pylint: disable=W0611
HAS_DJANGO = True
except ImportError:
@@ -571,13 +577,69 @@ class XMLFileBacked(FileBacked):
class StructFile(XMLFileBacked):
""" StructFiles are XML files that contain a set of structure file
formatting logic for handling ``<Group>`` and ``<Client>``
- tags. """
+ tags.
+
+ .. -----
+ .. autoattribute:: __identifier__
+ """
#: If ``__identifier__`` is not None, then it must be the name of
#: an XML attribute that will be required on the top-level tag of
#: the file being cached
__identifier__ = None
+ #: Whether or not encryption support is enabled in this file
+ encryption = True
+
+ def __init__(self, filename, fam=None, should_monitor=False):
+ XMLFileBacked.__init__(self, filename, fam=fam,
+ should_monitor=should_monitor)
+ self.setup = Bcfg2.Options.get_option_parser()
+
+ def Index(self):
+ Bcfg2.Server.Plugin.XMLFileBacked.Index(self)
+ if self.encryption:
+ strict = self.xdata.get(
+ "decrypt",
+ self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION, "decrypt",
+ default="strict")) == "strict"
+ for el in self.xdata.xpath("//*[@encrypted]"):
+ if not HAS_CRYPTO:
+ raise PluginExecutionError("Properties: M2Crypto is not "
+ "available: %s" % self.name)
+ try:
+ el.text = self._decrypt(el).encode('ascii',
+ 'xmlcharrefreplace')
+ except UnicodeDecodeError:
+ LOGGER.info("%s: Decrypted %s to gibberish, skipping" %
+ (self.name, el.tag))
+ except Bcfg2.Encryption.EVPError:
+ msg = "Failed to decrypt %s element in %s" % (el.tag,
+ self.name)
+ if strict:
+ raise PluginExecutionError(msg)
+ else:
+ LOGGER.warning(msg)
+ Index.__doc__ = XMLFileBacked.Index.__doc__
+
+ def _decrypt(self, element):
+ """ Decrypt a single encrypted properties file element """
+ if not element.text or not element.text.strip():
+ return
+ passes = Bcfg2.Encryption.get_passphrases()
+ try:
+ passphrase = passes[element.get("encrypted")]
+ try:
+ return Bcfg2.Encryption.ssl_decrypt(element.text, passphrase)
+ except Bcfg2.Encryption.EVPError:
+ # error is raised below
+ pass
+ except KeyError:
+ # bruteforce_decrypt raises an EVPError with a sensible
+ # error message, so we just let it propagate up the stack
+ return Bcfg2.Encryption.bruteforce_decrypt(element.text)
+ raise Bcfg2.Encryption.EVPError("Failed to decrypt")
+
def _include_element(self, item, metadata):
""" determine if an XML element matches the metadata """
if isinstance(item, lxml.etree._Comment): # pylint: disable=W0212
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py
index 54fa75b41..4d6639e4d 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py
@@ -5,7 +5,7 @@ import shutil
import tempfile
import subprocess
from Bcfg2.Options import get_option_parser
-from Bcfg2.Server.Plugin import PluginExecutionError, StructFile
+from Bcfg2.Server.Plugin import StructFile
from Bcfg2.Server.Plugins.Cfg import CfgCreator, CfgCreationError
from Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator import CfgPublicKeyCreator
try:
@@ -211,44 +211,3 @@ class CfgPrivateKeyCreator(CfgCreator, StructFile):
finally:
shutil.rmtree(os.path.dirname(filename))
# pylint: enable=W0221
-
- def Index(self):
- StructFile.Index(self)
- if HAS_CRYPTO:
- strict = self.xdata.get(
- "decrypt",
- SETUP.cfp.get(Bcfg2.Encryption.CFG_SECTION, "decrypt",
- default="strict")) == "strict"
- for el in self.xdata.xpath("//*[@encrypted]"):
- try:
- el.text = self._decrypt(el).encode('ascii',
- 'xmlcharrefreplace')
- except UnicodeDecodeError:
- self.logger.info("Cfg: Decrypted %s to gibberish, skipping"
- % el.tag)
- except Bcfg2.Encryption.EVPError:
- msg = "Cfg: Failed to decrypt %s element in %s" % \
- (el.tag, self.name)
- if strict:
- raise PluginExecutionError(msg)
- else:
- self.logger.warning(msg)
- Index.__doc__ = StructFile.Index.__doc__
-
- def _decrypt(self, element):
- """ Decrypt a single encrypted element """
- if not element.text or not element.text.strip():
- return
- passes = Bcfg2.Encryption.get_passphrases()
- try:
- passphrase = passes[element.get("encrypted")]
- try:
- return Bcfg2.Encryption.ssl_decrypt(element.text, passphrase)
- except Bcfg2.Encryption.EVPError:
- # error is raised below
- pass
- except KeyError:
- # bruteforce_decrypt raises an EVPError with a sensible
- # error message, so we just let it propagate up the stack
- return Bcfg2.Encryption.bruteforce_decrypt(element.text)
- raise Bcfg2.Encryption.EVPError("Failed to decrypt")
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py
index 6be438462..4c61e338e 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py
@@ -23,6 +23,9 @@ class CfgPublicKeyCreator(CfgCreator, StructFile):
#: Handle XML specifications of private keys
__basenames__ = ['pubkey.xml']
+ #: No text content on any tags, so encryption support disabled
+ encryption = False
+
def __init__(self, fname):
CfgCreator.__init__(self, fname)
StructFile.__init__(self, fname)
diff --git a/src/lib/Bcfg2/Server/Plugins/FileProbes.py b/src/lib/Bcfg2/Server/Plugins/FileProbes.py
index 5ec0d7280..365549e85 100644
--- a/src/lib/Bcfg2/Server/Plugins/FileProbes.py
+++ b/src/lib/Bcfg2/Server/Plugins/FileProbes.py
@@ -51,6 +51,11 @@ print(Bcfg2.Client.XML.tostring(data, xml_declaration=False).decode('UTF-8'))
"""
+class FileProbesConfig(Bcfg2.Server.Plugin.StructFile):
+ """ Config file for FileProbes """
+ encryption = False
+
+
class FileProbes(Bcfg2.Server.Plugin.Plugin,
Bcfg2.Server.Plugin.Probing):
""" This module allows you to probe a client for a file, which is then
@@ -63,11 +68,9 @@ class FileProbes(Bcfg2.Server.Plugin.Plugin,
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
Bcfg2.Server.Plugin.Probing.__init__(self)
- self.config = \
- Bcfg2.Server.Plugin.StructFile(os.path.join(self.data,
- 'config.xml'),
- fam=core.fam,
- should_monitor=True)
+ self.config = FileProbesConfig(os.path.join(self.data, 'config.xml'),
+ fam=core.fam,
+ should_monitor=True)
self.entries = dict()
self.probes = dict()
diff --git a/src/lib/Bcfg2/Server/Plugins/NagiosGen.py b/src/lib/Bcfg2/Server/Plugins/NagiosGen.py
index c39bd4c42..baea5fe23 100644
--- a/src/lib/Bcfg2/Server/Plugins/NagiosGen.py
+++ b/src/lib/Bcfg2/Server/Plugins/NagiosGen.py
@@ -15,12 +15,14 @@ LOGGER = logging.getLogger(__name__)
class NagiosGenConfig(Bcfg2.Server.Plugin.StructFile):
""" NagiosGen config file handler """
+ encryption = False
+
def __init__(self, filename, fam):
# create config.xml if missing
if not os.path.exists(filename):
LOGGER.warning("NagiosGen: %s missing. "
"Creating empty one for you." % filename)
- open(filename, "w").write("<NagiosGen></NagiosGen>")
+ open(filename, "w").write("<NagiosGen/>")
Bcfg2.Server.Plugin.StructFile.__init__(self, filename, fam=fam,
should_monitor=True)
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
index 2735e389a..afa5da4c5 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
@@ -17,6 +17,8 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile,
__identifier__ = None
+ encryption = False
+
def __init__(self, filename, cachepath, fam, packages, setup):
"""
:param filename: The full path to ``sources.xml``
diff --git a/src/lib/Bcfg2/Server/Plugins/Properties.py b/src/lib/Bcfg2/Server/Plugins/Properties.py
index c5b5ea2d1..24daa2107 100644
--- a/src/lib/Bcfg2/Server/Plugins/Properties.py
+++ b/src/lib/Bcfg2/Server/Plugins/Properties.py
@@ -203,49 +203,6 @@ class XMLPropertyFile(Bcfg2.Server.Plugin.StructFile, PropertyFile):
return True
validate_data.__doc__ = PropertyFile.validate_data.__doc__
- def Index(self):
- Bcfg2.Server.Plugin.StructFile.Index(self)
- strict = self.xdata.get(
- "decrypt",
- SETUP.cfp.get(Bcfg2.Encryption.CFG_SECTION, "decrypt",
- default="strict")) == "strict"
- for el in self.xdata.xpath("//*[@encrypted]"):
- if not HAS_CRYPTO:
- raise PluginExecutionError("Properties: M2Crypto is not "
- "available: %s" % self.name)
- try:
- el.text = self._decrypt(el).encode('ascii',
- 'xmlcharrefreplace')
- except UnicodeDecodeError:
- LOGGER.info("Properties: Decrypted %s to gibberish, "
- "skipping" % el.tag)
- except Bcfg2.Encryption.EVPError:
- msg = "Properties: Failed to decrypt %s element in %s" % \
- (el.tag, self.name)
- if strict:
- raise PluginExecutionError(msg)
- else:
- LOGGER.warning(msg)
- Index.__doc__ = Bcfg2.Server.Plugin.StructFile.Index.__doc__
-
- def _decrypt(self, element):
- """ Decrypt a single encrypted properties file element """
- if not element.text or not element.text.strip():
- return
- passes = Bcfg2.Encryption.get_passphrases()
- try:
- passphrase = passes[element.get("encrypted")]
- try:
- return Bcfg2.Encryption.ssl_decrypt(element.text, passphrase)
- except Bcfg2.Encryption.EVPError:
- # error is raised below
- pass
- except KeyError:
- # bruteforce_decrypt raises an EVPError with a sensible
- # error message, so we just let it propagate up the stack
- return Bcfg2.Encryption.bruteforce_decrypt(element.text)
- raise Bcfg2.Encryption.EVPError("Failed to decrypt")
-
def get_additional_data(self, metadata):
if self.setup.cfp.getboolean("properties", "automatch", default=False):
default_automatch = "true"
diff --git a/src/lib/Bcfg2/Server/Plugins/SSLCA.py b/src/lib/Bcfg2/Server/Plugins/SSLCA.py
index 0d51adf18..cc1a2ceac 100644
--- a/src/lib/Bcfg2/Server/Plugins/SSLCA.py
+++ b/src/lib/Bcfg2/Server/Plugins/SSLCA.py
@@ -17,6 +17,7 @@ LOGGER = logging.getLogger(__name__)
class SSLCAXMLSpec(Bcfg2.Server.Plugin.StructFile):
""" Base class to handle key.xml and cert.xml """
+ encryption = False
attrs = dict()
tag = None
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
index 75cc41a34..3d4df3e0b 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
@@ -1,5 +1,4 @@
import os
-import re
import sys
import copy
import lxml.etree
@@ -21,6 +20,12 @@ from common import *
from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable
from TestServer.TestPlugin.Testinterfaces import TestGenerator
+try:
+ from Bcfg2.Encryption import EVPError
+ HAS_CRYPTO = True
+except:
+ HAS_CRYPTO = False
+
def tostring(el):
return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8')
@@ -674,6 +679,108 @@ class TestStructFile(TestXMLFileBacked):
children[4] = standalone
return (xdata, groups, subgroups, children, subchildren, standalone)
+ def test_Index(self):
+ has_crypto = Bcfg2.Server.Plugin.helpers.HAS_CRYPTO
+ Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = False
+ TestXMLFileBacked.test_Index(self)
+ Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = has_crypto
+
+ @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
+ def test_Index_crypto(self):
+ if not self.test_obj.encryption:
+ return skip("Encryption disabled on %s objects, skipping" %
+ self.test_obj.__name__)
+
+ sf = self.get_obj()
+ sf.setup = Mock()
+ sf.setup.cfp.get.return_value = "strict"
+ sf._decrypt = Mock()
+ sf._decrypt.return_value = 'plaintext'
+ sf.data = '''
+<EncryptedData>
+ <Group name="test">
+ <Datum encrypted="foo">crypted</Datum>
+ </Group>
+ <Group name="test" negate="true">
+ <Datum>plain</Datum>
+ </Group>
+</EncryptedData>'''
+
+ # test successful decryption
+ sf.Index()
+ self.assertItemsEqual(
+ sf._decrypt.call_args_list,
+ [call(el) for el in sf.xdata.xpath("//*[@encrypted]")])
+ for el in sf.xdata.xpath("//*[@encrypted]"):
+ self.assertEqual(el.text, sf._decrypt.return_value)
+
+ # test failed decryption, strict
+ sf._decrypt.reset_mock()
+ sf._decrypt.side_effect = EVPError
+ self.assertRaises(PluginExecutionError, sf.Index)
+
+ # test failed decryption, lax
+ sf.setup.cfp.get.return_value = "lax"
+ sf._decrypt.reset_mock()
+ sf.Index()
+ self.assertItemsEqual(
+ sf._decrypt.call_args_list,
+ [call(el) for el in sf.xdata.xpath("//*[@encrypted]")])
+
+ @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
+ def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl):
+ sf = self.get_obj()
+
+ def reset():
+ mock_bruteforce.reset_mock()
+ mock_get_passphrases.reset_mock()
+ mock_ssl.reset_mock()
+
+ # test element without text contents
+ self.assertIsNone(sf._decrypt(lxml.etree.Element("Test")))
+ self.assertFalse(mock_bruteforce.called)
+ self.assertFalse(mock_get_passphrases.called)
+ self.assertFalse(mock_ssl.called)
+
+ # test element with a passphrase in the config file
+ reset()
+ el = lxml.etree.Element("Test", encrypted="foo")
+ el.text = "crypted"
+ mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass")
+ mock_ssl.return_value = "decrypted with ssl"
+ self.assertEqual(sf._decrypt(el), mock_ssl.return_value)
+ mock_get_passphrases.assert_called_with()
+ mock_ssl.assert_called_with(el.text, "foopass")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test failure to decrypt element with a passphrase in the config
+ reset()
+ mock_ssl.side_effect = EVPError
+ self.assertRaises(EVPError, sf._decrypt, el)
+ mock_get_passphrases.assert_called_with()
+ mock_ssl.assert_called_with(el.text, "foopass")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test element without valid passphrase
+ reset()
+ el.set("encrypted", "true")
+ mock_bruteforce.return_value = "decrypted with bruteforce"
+ self.assertEqual(sf._decrypt(el), mock_bruteforce.return_value)
+ mock_get_passphrases.assert_called_with()
+ mock_bruteforce.assert_called_with(el.text)
+ self.assertFalse(mock_ssl.called)
+
+ # test failure to decrypt element without valid passphrase
+ reset()
+ mock_bruteforce.side_effect = EVPError
+ self.assertRaises(EVPError, sf._decrypt, el)
+ mock_get_passphrases.assert_called_with()
+ mock_bruteforce.assert_called_with(el.text)
+ self.assertFalse(mock_ssl.called)
+
def test_include_element(self):
sf = self.get_obj()
metadata = Mock()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
index 4c8ab8b43..bf34d4c3c 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
@@ -306,102 +306,3 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
inner2()
- def test_Index(self):
- has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False
- TestStructFile.test_Index(self)
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_Index_crypto(self):
- pkc = self.get_obj()
- pkc.setup = Mock()
- pkc.setup.cfp.get.return_value = "strict"
- pkc._decrypt = Mock()
- pkc._decrypt.return_value = 'plaintext'
- pkc.data = '''
-<PrivateKey>
- <Group name="test">
- <Passphrase encrypted="foo">crypted</Passphrase>
- </Group>
- <Group name="test" negate="true">
- <Passphrase>plain</Passphrase>
- </Group>
-</PrivateKey>'''
-
- # test successful decryption
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
- for el in pkc.xdata.xpath("//Crypted"):
- self.assertEqual(el.text, pkc._decrypt.return_value)
-
- # test failed decryption, strict
- pkc._decrypt.reset_mock()
- pkc._decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError, pkc.Index)
-
- # test failed decryption, lax
- pkc.setup.cfp.get.return_value = "lax"
- pkc._decrypt.reset_mock()
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
- def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl):
- pkc = self.get_obj()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass")
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pkc._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with()
- mock_ssl.assert_called_with(el.text, "foopass")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with()
- mock_ssl.assert_called_with(el.text, "foopass")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with()
- mock_bruteforce.assert_called_with(el.text)
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with()
- mock_bruteforce.assert_called_with(el.text)
- self.assertFalse(mock_ssl.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
index b0ca77a78..b63d08524 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
@@ -19,12 +19,6 @@ from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \
TestPlugin, TestDirectoryBacked
try:
- from Bcfg2.Encryption import EVPError
- HAS_CRYPTO = True
-except:
- HAS_CRYPTO = False
-
-try:
import json
JSON = "json"
except ImportError:
@@ -243,116 +237,6 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile):
mock_exists.assert_called_with(schemafile)
mock_XMLSchema.assert_called_with(file=schemafile)
- def test_Index(self):
- TestStructFile.test_Index(self)
-
- pf = self.get_obj()
- pf.xdata = lxml.etree.Element("Properties")
- lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo")
- pf.data = lxml.etree.tostring(pf.xdata)
- # extra test: crypto is not available, but properties file is
- # encrypted
- has_crypto = Bcfg2.Server.Plugins.Properties.HAS_CRYPTO
- Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = False
- try:
- self.assertRaises(PluginExecutionError, pf.Index)
- finally:
- Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = has_crypto
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_Index_crypto(self):
- pf = self.get_obj()
- pf._decrypt = Mock()
- pf._decrypt.return_value = 'plaintext'
- pf.data = '''
-<Properties decrypt="strict">
- <Crypted encrypted="foo">
- crypted
- <Plain foo="bar">plain</Plain>
- </Crypted>
- <Crypted encrypted="bar">crypted</Crypted>
- <Plain bar="baz">plain</Plain>
- <Plain>
- <Crypted encrypted="foo">crypted</Crypted>
- </Plain>
-</Properties>'''
-
- print "HAS_CRYPTO: %s" % HAS_CRYPTO
- print "Properties HAS_CRYPTO: %s" % Bcfg2.Server.Plugins.Properties.HAS_CRYPTO
-
- # test successful decryption
- pf.Index()
- self.assertItemsEqual(pf._decrypt.call_args_list,
- [call(el) for el in pf.xdata.xpath("//Crypted")])
- for el in pf.xdata.xpath("//Crypted"):
- self.assertEqual(el.text, pf._decrypt.return_value)
-
- # test failed decryption, strict
- pf._decrypt.reset_mock()
- pf._decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError, pf.Index)
-
- # test failed decryption, lax
- pf.data = pf.data.replace("strict", "lax")
- pf._decrypt.reset_mock()
- pf.Index()
- self.assertItemsEqual(pf._decrypt.call_args_list,
- [call(el) for el in pf.xdata.xpath("//Crypted")])
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
- def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl):
- pf = self.get_obj()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pf._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass")
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pf._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with()
- mock_ssl.assert_called_with(el.text, "foopass")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with()
- mock_ssl.assert_called_with(el.text, "foopass")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with()
- mock_bruteforce.assert_called_with(el.text)
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with()
- mock_bruteforce.assert_called_with(el.text)
- self.assertFalse(mock_ssl.called)
-
@patch("copy.copy")
def test_get_additional_data(self, mock_copy):
pf = self.get_obj()