summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py2
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Properties.py24
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py204
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py155
4 files changed, 201 insertions, 184 deletions
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py
index bb54c6faa..597f8f57b 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py
@@ -235,7 +235,7 @@ class CfgPrivateKeyCreator(CfgCreator, StructFile):
def _decrypt(self, element):
""" Decrypt a single encrypted element """
- if not element.text.strip():
+ if not element.text or not element.text.strip():
return
passes = Bcfg2.Encryption.get_passphrases(SETUP)
try:
diff --git a/src/lib/Bcfg2/Server/Plugins/Properties.py b/src/lib/Bcfg2/Server/Plugins/Properties.py
index e9653a212..b3c0a6ae5 100644
--- a/src/lib/Bcfg2/Server/Plugins/Properties.py
+++ b/src/lib/Bcfg2/Server/Plugins/Properties.py
@@ -10,8 +10,7 @@ import lxml.etree
import Bcfg2.Server.Plugin
from Bcfg2.Server.Plugin import PluginExecutionError
try:
- from Bcfg2.Encryption import ssl_decrypt, get_passphrases, \
- get_algorithm, bruteforce_decrypt, EVPError
+ import Bcfg2.Encryption
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
@@ -221,7 +220,7 @@ class XMLPropertyFile(Bcfg2.Server.Plugin.StructFile, PropertyFile):
except UnicodeDecodeError:
LOGGER.info("Properties: Decrypted %s to gibberish, "
"skipping" % el.tag)
- except EVPError:
+ except Bcfg2.Encryption.EVPError:
msg = "Properties: Failed to decrypt %s element in %s" % \
(el.tag, self.name)
if strict:
@@ -232,24 +231,25 @@ class XMLPropertyFile(Bcfg2.Server.Plugin.StructFile, PropertyFile):
def _decrypt(self, element):
""" Decrypt a single encrypted properties file element """
- if not element.text.strip():
+ if not element.text or not element.text.strip():
return
- passes = get_passphrases(SETUP)
+ passes = Bcfg2.Encryption.get_passphrases(SETUP)
try:
passphrase = passes[element.get("encrypted")]
try:
- return ssl_decrypt(element.text, passphrase,
- algorithm=get_algorithm(SETUP))
- except EVPError:
+ return Bcfg2.Encryption.ssl_decrypt(
+ element.text, passphrase,
+ algorithm=Bcfg2.Encryption.get_algorithm(SETUP))
+ except Bcfg2.Encryption.EVPError:
# error is raised below
pass
except KeyError:
# bruteforce_decrypt raises an EVPError with a sensible
# error message, so we just let it propagate up the stack
- return bruteforce_decrypt(element.text,
- passphrases=passes.values(),
- algorithm=get_algorithm(SETUP))
- raise EVPError("Failed to decrypt")
+ return Bcfg2.Encryption.bruteforce_decrypt(
+ element.text, passphrases=passes.values(),
+ algorithm=Bcfg2.Encryption.get_algorithm(SETUP))
+ raise Bcfg2.Encryption.EVPError("Failed to decrypt")
def get_additional_data(self, metadata):
if SETUP.cfp.getboolean("properties", "automatch", default=False):
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
index 1181fe648..dc4b11241 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
@@ -67,36 +67,33 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
cfp.get.assert_called_with("sshkeys", "category")
@skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_passphrase(self):
- @patch("Bcfg2.Encryption.get_passphrases")
- def inner(mock_get_passphrases):
- pkc = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
-
- self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
-
- cfp.reset_mock()
- cfp.has_section.return_value = True
- self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
-
- cfp.reset_mock()
- cfp.get.return_value = "test"
- mock_get_passphrases.return_value = dict(test="foo", test2="bar")
- cfp.has_option.return_value = True
- self.assertEqual(pkc.passphrase, "foo")
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
- cfp.get.assert_called_with("sshkeys", "passphrase")
- mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
+ def test_passphrase(self, mock_get_passphrases):
+ pkc = self.get_obj()
+ cfp = Mock()
+ cfp.has_section.return_value = False
+ cfp.has_option.return_value = False
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
- inner()
+ self.assertIsNone(pkc.passphrase)
+ cfp.has_section.assert_called_with("sshkeys")
+
+ cfp.reset_mock()
+ cfp.has_section.return_value = True
+ self.assertIsNone(pkc.passphrase)
+ cfp.has_section.assert_called_with("sshkeys")
+ cfp.has_option.assert_called_with("sshkeys", "passphrase")
+
+ cfp.reset_mock()
+ cfp.get.return_value = "test"
+ mock_get_passphrases.return_value = dict(test="foo", test2="bar")
+ cfp.has_option.return_value = True
+ self.assertEqual(pkc.passphrase, "foo")
+ cfp.has_section.assert_called_with("sshkeys")
+ cfp.has_option.assert_called_with("sshkeys", "passphrase")
+ cfp.get.assert_called_with("sshkeys", "passphrase")
+ mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
@patch("shutil.rmtree")
@patch("tempfile.mkdtemp")
@@ -360,74 +357,81 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
@skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_decrypt(self):
-
- @patch("Bcfg2.Encryption.ssl_decrypt")
- @patch("Bcfg2.Encryption.get_algorithm")
- @patch("Bcfg2.Encryption.get_passphrases")
- @patch("Bcfg2.Encryption.bruteforce_decrypt")
- def inner(mock_bruteforce, mock_get_passphrases, mock_get_algorithm,
- mock_ssl):
- pkc = self.get_obj()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass",
- bar="barpass")
- mock_get_algorithm.return_value = "bf_cbc"
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pkc._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with(SETUP)
- mock_get_algorithm.assert_called_with(SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(SETUP)
- mock_get_algorithm.assert_called_with(SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with(SETUP)
- mock_get_algorithm.assert_called_with(SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(SETUP)
- mock_get_algorithm.assert_called_with(SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
+ def test_decrypt(self, mock_bruteforce, mock_get_passphrases,
+ mock_get_algorithm, mock_ssl):
+ pkc = self.get_obj()
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock()
+
+ def reset():
+ mock_bruteforce.reset_mock()
+ mock_get_algorithm.reset_mock()
+ mock_get_passphrases.reset_mock()
+ mock_ssl.reset_mock()
+
+ # test element without text contents
+ self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test")))
+ self.assertFalse(mock_bruteforce.called)
+ self.assertFalse(mock_get_passphrases.called)
+ self.assertFalse(mock_ssl.called)
+
+ # test element with a passphrase in the config file
+ reset()
+ el = lxml.etree.Element("Test", encrypted="foo")
+ el.text = "crypted"
+ mock_get_passphrases.return_value = dict(foo="foopass",
+ bar="barpass")
+ mock_get_algorithm.return_value = "bf_cbc"
+ mock_ssl.return_value = "decrypted with ssl"
+ self.assertEqual(pkc._decrypt(el), mock_ssl.return_value)
+ mock_get_passphrases.assert_called_with(
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ mock_get_algorithm.assert_called_with(
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ mock_ssl.assert_called_with(el.text, "foopass",
+ algorithm="bf_cbc")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test failure to decrypt element with a passphrase in the config
+ reset()
+ mock_ssl.side_effect = EVPError
+ self.assertRaises(EVPError, pkc._decrypt, el)
+ mock_get_passphrases.assert_called_with(
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ mock_get_algorithm.assert_called_with(
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ mock_ssl.assert_called_with(el.text, "foopass",
+ algorithm="bf_cbc")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test element without valid passphrase
+ reset()
+ el.set("encrypted", "true")
+ mock_bruteforce.return_value = "decrypted with bruteforce"
+ self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value)
+ mock_get_passphrases.assert_called_with(
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ mock_get_algorithm.assert_called_with(
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ mock_bruteforce.assert_called_with(el.text,
+ passphrases=["foopass",
+ "barpass"],
+ algorithm="bf_cbc")
+ self.assertFalse(mock_ssl.called)
+
+ # test failure to decrypt element without valid passphrase
+ reset()
+ mock_bruteforce.side_effect = EVPError
+ self.assertRaises(EVPError, pkc._decrypt, el)
+ mock_get_passphrases.assert_called_with(
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ mock_get_algorithm.assert_called_with(
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ mock_bruteforce.assert_called_with(el.text,
+ passphrases=["foopass",
+ "barpass"],
+ algorithm="bf_cbc")
+ self.assertFalse(mock_ssl.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
index 2fff67f8b..1a8619097 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
@@ -19,6 +19,12 @@ from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \
TestPlugin, TestDirectoryBacked
try:
+ from Bcfg2.Encryption import EVPError
+ HAS_CRYPTO = True
+except:
+ HAS_CRYPTO = False
+
+try:
import json
JSON = "json"
except ImportError:
@@ -35,7 +41,7 @@ class TestPropertyFile(Bcfg2TestCase):
return self.test_obj(path)
def test_write(self):
- Bcfg2.Server.Plugins.Properties.SETUP = Mock()
+ Bcfg2.Server.Plugins.Properties.SETUP = MagicMock()
pf = self.get_obj()
pf.validate_data = Mock()
pf._write = Mock()
@@ -288,77 +294,84 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile):
[call(el) for el in pf.xdata.xpath("//Crypted")])
@skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_decrypt(self):
-
- @patch("Bcfg2.Encryption.ssl_decrypt")
- @patch("Bcfg2.Encryption.get_algorithm")
- @patch("Bcfg2.Encryption.get_passphrases")
- @patch("Bcfg2.Encryption.bruteforce_decrypt")
- def inner(mock_bruteforce, mock_get_passphrases, mock_get_algorithm,
- mock_ssl):
- pf = self.get_obj()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pf._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
+ def test_decrypt(self, mock_bruteforce, mock_get_passphrases,
+ mock_get_algorithm, mock_ssl):
+ pf = self.get_obj()
+ Bcfg2.Server.Plugins.Properties.SETUP = MagicMock()
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass",
- bar="barpass")
- mock_get_algorithm.return_value = "bf_cbc"
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pf._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with(SETUP)
- mock_get_algorithm.assert_called_with(SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with(SETUP)
- mock_get_algorithm.assert_called_with(SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with(SETUP)
- mock_get_algorithm.assert_called_with(SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with(SETUP)
- mock_get_algorithm.assert_called_with(SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
+ def reset():
+ mock_bruteforce.reset_mock()
+ mock_get_algorithm.reset_mock()
+ mock_get_passphrases.reset_mock()
+ mock_ssl.reset_mock()
+
+ # test element without text contents
+ self.assertIsNone(pf._decrypt(lxml.etree.Element("Test")))
+ self.assertFalse(mock_bruteforce.called)
+ self.assertFalse(mock_get_passphrases.called)
+ self.assertFalse(mock_ssl.called)
+
+ # test element with a passphrase in the config file
+ reset()
+ el = lxml.etree.Element("Test", encrypted="foo")
+ el.text = "crypted"
+ mock_get_passphrases.return_value = dict(foo="foopass",
+ bar="barpass")
+ mock_get_algorithm.return_value = "bf_cbc"
+ mock_ssl.return_value = "decrypted with ssl"
+ self.assertEqual(pf._decrypt(el), mock_ssl.return_value)
+ mock_get_passphrases.assert_called_with(
+ Bcfg2.Server.Plugins.Properties.SETUP)
+ mock_get_algorithm.assert_called_with(
+ Bcfg2.Server.Plugins.Properties.SETUP)
+ mock_ssl.assert_called_with(el.text, "foopass",
+ algorithm="bf_cbc")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test failure to decrypt element with a passphrase in the config
+ reset()
+ mock_ssl.side_effect = EVPError
+ self.assertRaises(EVPError, pf._decrypt, el)
+ mock_get_passphrases.assert_called_with(
+ Bcfg2.Server.Plugins.Properties.SETUP)
+ mock_get_algorithm.assert_called_with(
+ Bcfg2.Server.Plugins.Properties.SETUP)
+ mock_ssl.assert_called_with(el.text, "foopass",
+ algorithm="bf_cbc")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test element without valid passphrase
+ reset()
+ el.set("encrypted", "true")
+ mock_bruteforce.return_value = "decrypted with bruteforce"
+ self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value)
+ mock_get_passphrases.assert_called_with(
+ Bcfg2.Server.Plugins.Properties.SETUP)
+ mock_get_algorithm.assert_called_with(
+ Bcfg2.Server.Plugins.Properties.SETUP)
+ mock_bruteforce.assert_called_with(el.text,
+ passphrases=["foopass",
+ "barpass"],
+ algorithm="bf_cbc")
+ self.assertFalse(mock_ssl.called)
+
+ # test failure to decrypt element without valid passphrase
+ reset()
+ mock_bruteforce.side_effect = EVPError
+ self.assertRaises(EVPError, pf._decrypt, el)
+ mock_get_passphrases.assert_called_with(
+ Bcfg2.Server.Plugins.Properties.SETUP)
+ mock_get_algorithm.assert_called_with(
+ Bcfg2.Server.Plugins.Properties.SETUP)
+ mock_bruteforce.assert_called_with(el.text,
+ passphrases=["foopass",
+ "barpass"],
+ algorithm="bf_cbc")
+ self.assertFalse(mock_ssl.called)
@patch("copy.copy")
def test_get_additional_data(self, mock_copy):