summaryrefslogtreecommitdiffstats
path: root/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg')
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py32
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py7
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py9
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py12
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py31
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py202
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py37
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py235
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py99
9 files changed, 229 insertions, 435 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
index d655a20cd..b77d52033 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
@@ -42,27 +42,25 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
def test_category(self):
akg = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp
+ akg.setup = Mock()
+ akg.setup.cfp.has_section.return_value = False
+ akg.setup.cfp.has_option.return_value = False
self.assertIsNone(akg.category)
- cfp.has_section.assert_called_with("sshkeys")
+ akg.setup.cfp.has_section.assert_called_with("sshkeys")
- cfp.reset_mock()
- cfp.has_section.return_value = True
+ akg.setup.reset_mock()
+ akg.setup.cfp.has_section.return_value = True
self.assertIsNone(akg.category)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
-
- cfp.reset_mock()
- cfp.has_option.return_value = True
- self.assertEqual(akg.category, cfp.get.return_value)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.get.assert_called_with("sshkeys", "category")
+ akg.setup.cfp.has_section.assert_called_with("sshkeys")
+ akg.setup.cfp.has_option.assert_called_with("sshkeys", "category")
+
+ akg.setup.reset_mock()
+ akg.setup.cfp.has_option.return_value = True
+ self.assertEqual(akg.category, akg.setup.cfp.get.return_value)
+ akg.setup.cfp.has_section.assert_called_with("sshkeys")
+ akg.setup.cfp.has_option.assert_called_with("sshkeys", "category")
+ akg.setup.cfp.get.assert_called_with("sshkeys", "category")
@patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata")
@patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category")
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
index fc5d5e53d..31227329c 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
@@ -31,11 +31,11 @@ if HAS_CHEETAH or can_skip:
ccg.data = "data"
entry = lxml.etree.Element("Path", name="/test.txt")
metadata = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock()
+ ccg.setup = MagicMock()
self.assertEqual(ccg.get_data(entry, metadata),
mock_Template.return_value.respond.return_value)
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo")
+ ccg.setup.__getitem__.assert_called_with("repo")
mock_Template.assert_called_with("data".decode(ccg.encoding),
compilerSettings=ccg.settings)
tmpl = mock_Template.return_value
@@ -44,5 +44,4 @@ if HAS_CHEETAH or can_skip:
self.assertEqual(tmpl.name, entry.get("name"))
self.assertEqual(tmpl.path, entry.get("name"))
self.assertEqual(tmpl.source_path, ccg.name)
- self.assertEqual(tmpl.repo,
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value)
+ self.assertEqual(tmpl.repo, ccg.setup.__getitem__.return_value)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
index 71a7410da..f8e9b1990 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
@@ -27,15 +27,12 @@ if can_skip or HAS_CRYPTO:
pass
@patchIf(HAS_CRYPTO,
- "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm")
- @patchIf(HAS_CRYPTO,
"Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt")
- def test_handle_event(self, mock_decrypt, mock_get_algorithm):
+ def test_handle_event(self, mock_decrypt):
@patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
def inner(mock_handle_event):
def reset():
mock_decrypt.reset_mock()
- mock_get_algorithm.reset_mock()
mock_handle_event.reset_mock()
def get_event_data(obj, event):
@@ -47,9 +44,7 @@ if can_skip or HAS_CRYPTO:
ceg = self.get_obj()
ceg.handle_event(event)
mock_handle_event.assert_called_with(ceg, event)
- mock_decrypt.assert_called_with("encrypted",
- setup=SETUP,
- algorithm=mock_get_algorithm.return_value)
+ mock_decrypt.assert_called_with("encrypted")
self.assertEqual(ceg.data, "plaintext")
reset()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
index b447a9bb8..330779c99 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
@@ -14,20 +14,14 @@ while path != "/":
path = os.path.dirname(path)
from common import *
-try:
- from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
- TestCfgGenshiGenerator
- HAS_GENSHI = True
-except ImportError:
- TestCfgGenshiGenerator = object
- HAS_GENSHI = False
+from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
+ TestCfgGenshiGenerator
-if can_skip or (HAS_CRYPTO and HAS_GENSHI):
+if can_skip or HAS_CRYPTO:
class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
test_obj = CfgEncryptedGenshiGenerator
@skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
def setUp(self):
pass
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
index 0f369113b..7ceedb7c2 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
@@ -21,35 +21,32 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier
class TestCfgExternalCommandVerifier(TestCfgVerifier):
test_obj = CfgExternalCommandVerifier
- @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen")
- def test_verify_entry(self, mock_Popen):
- proc = Mock()
- mock_Popen.return_value = proc
- proc.wait.return_value = 0
- proc.communicate.return_value = ("stdout", "stderr")
+ def test_verify_entry(self):
entry = lxml.etree.Element("Path", name="/test.txt")
metadata = Mock()
ecv = self.get_obj()
ecv.cmd = ["/bin/bash", "-x", "foo"]
+ ecv.exc = Mock()
+ ecv.exc.run.return_value = Mock()
+ ecv.exc.run.return_value.success = True
+
ecv.verify_entry(entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
- proc.communicate.assert_called_with(input="data")
- proc.wait.assert_called_with()
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
- mock_Popen.reset_mock()
- proc.wait.return_value = 13
+ ecv.exc.reset_mock()
+ ecv.exc.run.return_value.success = False
self.assertRaises(CfgVerificationError,
ecv.verify_entry, entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
- proc.communicate.assert_called_with(input="data")
- proc.wait.assert_called_with()
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
+
+ ecv.exc.reset_mock()
- mock_Popen.reset_mock()
- mock_Popen.side_effect = OSError
+ ecv.exc.reset_mock()
+ ecv.exc.run.side_effect = OSError
self.assertRaises(CfgVerificationError,
ecv.verify_entry, entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
@patch("os.access")
def test_handle_event(self, mock_access):
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
index 2e8b7bfa5..b73670fb7 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
@@ -19,111 +19,97 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if can_skip or HAS_GENSHI:
- class TestCfgGenshiGenerator(TestCfgGenerator):
- test_obj = CfgGenshiGenerator
-
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
- def setUp(self):
- pass
-
- def test_removecomment(self):
- data = [(None, "test", 1),
- (None, "test2", 2)]
- stream = [(genshi.core.COMMENT, "test", 0),
- data[0],
- (genshi.core.COMMENT, "test3", 0),
- data[1]]
- self.assertItemsEqual(list(removecomment(stream)), data)
-
- def test__init(self):
- TestCfgGenerator.test__init(self)
- cgg = self.get_obj()
- self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
-
- def test_get_data(self):
- cgg = self.get_obj()
- cgg._handle_genshi_exception = Mock()
- cgg.template = Mock()
- fltr = Mock()
- cgg.template.generate.return_value = fltr
- stream = Mock()
- fltr.filter.return_value = stream
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
-
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock()
-
- def reset():
- cgg.template.reset_mock()
- cgg._handle_genshi_exception.reset_mock()
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock()
-
- template_vars = dict(
- name=entry.get("name"),
- metadata=metadata,
- path=cgg.name,
- source_path=cgg.name,
- repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value)
-
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- def render(fmt, **kwargs):
- stream.render.side_effect = None
- raise TypeError
- stream.render.side_effect = render
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- self.assertEqual(stream.render.call_args_list,
- [call("text", encoding=cgg.encoding,
- strip_whitespace=False),
- call("text", encoding=cgg.encoding)])
-
- reset()
- stream.render.side_effect = UndefinedError("test")
- self.assertRaises(UndefinedError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- stream.render.side_effect = ValueError
- cgg._handle_genshi_exception.side_effect = ValueError
- self.assertRaises(ValueError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
- self.assertTrue(cgg._handle_genshi_exception.called)
-
- def test_handle_event(self):
- cgg = self.get_obj()
- cgg.loader = Mock()
- event = Mock()
- cgg.handle_event(event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
-
- cgg.loader.reset_mock()
- cgg.loader.load.side_effect = OSError
- self.assertRaises(PluginExecutionError,
- cgg.handle_event, event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
+class TestCfgGenshiGenerator(TestCfgGenerator):
+ test_obj = CfgGenshiGenerator
+
+ def test__init(self):
+ TestCfgGenerator.test__init(self)
+ cgg = self.get_obj()
+ self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
+
+ def test_get_data(self):
+ cgg = self.get_obj()
+ cgg._handle_genshi_exception = Mock()
+ cgg.setup = MagicMock()
+ cgg.template = Mock()
+ fltr = Mock()
+ cgg.template.generate.return_value = fltr
+ stream = Mock()
+ fltr.filter.return_value = stream
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+
+ def reset():
+ cgg.template.reset_mock()
+ cgg._handle_genshi_exception.reset_mock()
+ cgg.setup.reset_mock()
+
+ template_vars = dict(
+ name=entry.get("name"),
+ metadata=metadata,
+ path=cgg.name,
+ source_path=cgg.name,
+ repo=cgg.setup.__getitem__.return_value)
+
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+
+ reset()
+ def render(fmt, **kwargs):
+ stream.render.side_effect = None
+ raise TypeError
+ stream.render.side_effect = render
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ self.assertEqual(stream.render.call_args_list,
+ [call("text", encoding=cgg.encoding,
+ strip_whitespace=False),
+ call("text", encoding=cgg.encoding)])
+
+ reset()
+ stream.render.side_effect = UndefinedError("test")
+ self.assertRaises(UndefinedError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+
+ reset()
+ stream.render.side_effect = ValueError
+ cgg._handle_genshi_exception.side_effect = ValueError
+ self.assertRaises(ValueError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+ self.assertTrue(cgg._handle_genshi_exception.called)
+
+ def test_handle_event(self):
+ cgg = self.get_obj()
+ cgg.loader = Mock()
+ event = Mock()
+ cgg.handle_event(event)
+ cgg.loader.load.assert_called_with(cgg.name,
+ cls=NewTextTemplate,
+ encoding=cgg.encoding)
+
+ cgg.loader.reset_mock()
+ cgg.loader.load.side_effect = OSError
+ self.assertRaises(PluginExecutionError,
+ cgg.handle_event, event)
+ cgg.loader.load.assert_called_with(cgg.name,
+ cls=NewTextTemplate,
+ encoding=cgg.encoding)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
index 839e9c3b8..7e7cb5e3c 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
@@ -27,47 +27,16 @@ class TestCfgInfoXML(TestCfgInfo):
self.assertIsInstance(ci.infoxml, InfoXML)
def test_bind_info_to_entry(self):
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
ci = self.get_obj()
ci.infoxml = Mock()
- ci._set_info = Mock()
-
- self.assertRaises(PluginExecutionError,
- ci.bind_info_to_entry, entry, metadata)
- ci.infoxml.pnode.Match.assert_called_with(metadata, dict(),
- entry=entry)
- self.assertFalse(ci._set_info.called)
-
- ci.infoxml.reset_mock()
- ci._set_info.reset_mock()
- mdata_value = Mock()
- def set_mdata(metadata, mdata, entry=None):
- mdata['Info'] = {None: mdata_value}
+ entry = Mock()
+ metadata = Mock()
- ci.infoxml.pnode.Match.side_effect = set_mdata
ci.bind_info_to_entry(entry, metadata)
- ci.infoxml.pnode.Match.assert_called_with(metadata,
- dict(Info={None: mdata_value}),
- entry=entry)
- ci._set_info.assert_called_with(entry, mdata_value)
+ ci.infoxml.BindEntry.assert_called_with(entry, metadata)
def test_handle_event(self):
ci = self.get_obj()
ci.infoxml = Mock()
ci.handle_event(Mock)
ci.infoxml.HandleEvent.assert_called_with()
-
- def test__set_info(self):
- @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info")
- def inner(mock_set_info):
- ci = self.get_obj()
- entry = Mock()
- info = {"foo": "foo",
- "__children__": ["one", "two"]}
- ci._set_info(entry, info)
- self.assertItemsEqual(entry.append.call_args_list,
- [call(c) for c in info['__children__']])
-
- inner()
- TestCfgInfo.test__set_info(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
index dc4b11241..48d5cdbfe 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
@@ -7,7 +7,7 @@ from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import *
from Bcfg2.Server.Plugin import PluginExecutionError
import Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator
try:
- from Bcfg2.Encryption import EVPError
+ from Bcfg2.Server.Encryption import EVPError
HAS_CRYPTO = True
except:
HAS_CRYPTO = False
@@ -44,77 +44,76 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
def test_category(self):
pkc = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
+ pkc.setup = Mock()
+ pkc.setup.cfp = Mock()
+ pkc.setup.cfp.has_section.return_value = False
+ pkc.setup.cfp.has_option.return_value = False
self.assertIsNone(pkc.category)
- cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
- cfp.reset_mock()
- cfp.has_section.return_value = True
+ pkc.setup.reset_mock()
+ pkc.setup.cfp.has_section.return_value = True
self.assertIsNone(pkc.category)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.reset_mock()
- cfp.has_option.return_value = True
- self.assertEqual(pkc.category, cfp.get.return_value)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.get.assert_called_with("sshkeys", "category")
+ pkc.setup.reset_mock()
+ pkc.setup.cfp.has_option.return_value = True
+ self.assertEqual(pkc.category, pkc.setup.cfp.get.return_value)
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category")
+ pkc.setup.cfp.get.assert_called_with("sshkeys", "category")
@skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.get_passphrases")
def test_passphrase(self, mock_get_passphrases):
pkc = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
+ pkc.setup = Mock()
+ pkc.setup.cfp = Mock()
+ pkc.setup.cfp.has_section.return_value = False
+ pkc.setup.cfp.has_option.return_value = False
self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
- cfp.reset_mock()
- cfp.has_section.return_value = True
+ pkc.setup.reset_mock()
+ pkc.setup.cfp.has_section.return_value = True
self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_option.assert_called_with("sshkeys",
+ "passphrase")
- cfp.reset_mock()
- cfp.get.return_value = "test"
+ pkc.setup.reset_mock()
+ pkc.setup.cfp.get.return_value = "test"
mock_get_passphrases.return_value = dict(test="foo", test2="bar")
- cfp.has_option.return_value = True
+ pkc.setup.cfp.has_option.return_value = True
self.assertEqual(pkc.passphrase, "foo")
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
- cfp.get.assert_called_with("sshkeys", "passphrase")
- mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_option.assert_called_with("sshkeys",
+ "passphrase")
+ pkc.setup.cfp.get.assert_called_with("sshkeys", "passphrase")
+ mock_get_passphrases.assert_called_with()
@patch("shutil.rmtree")
@patch("tempfile.mkdtemp")
- @patch("subprocess.Popen")
- def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree):
+ def test__gen_keypair(self, mock_mkdtemp, mock_rmtree):
pkc = self.get_obj()
+ pkc.cmd = Mock()
pkc.XMLMatch = Mock()
mock_mkdtemp.return_value = datastore
metadata = Mock()
- proc = Mock()
- proc.wait.return_value = 0
- proc.communicate.return_value = MagicMock()
- mock_Popen.return_value = proc
+ exc = Mock()
+ exc.success = True
+ pkc.cmd.run.return_value = exc
spec = lxml.etree.Element("PrivateKey")
pkc.XMLMatch.return_value = spec
def reset():
pkc.XMLMatch.reset_mock()
- mock_Popen.reset_mock()
+ pkc.cmd.reset_mock()
mock_mkdtemp.reset_mock()
mock_rmtree.reset_mock()
@@ -122,10 +121,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
os.path.join(datastore, "privkey"))
pkc.XMLMatch.assert_called_with(metadata)
mock_mkdtemp.assert_called_with()
- self.assertItemsEqual(mock_Popen.call_args[0][0],
- ["ssh-keygen", "-f",
- os.path.join(datastore, "privkey"),
- "-t", "rsa", "-N", ""])
+ pkc.cmd.run.assert_called_with(["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "rsa", "-N", ""])
reset()
lxml.etree.SubElement(spec, "Params", bits="768", type="dsa")
@@ -136,13 +134,12 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
os.path.join(datastore, "privkey"))
pkc.XMLMatch.assert_called_with(metadata)
mock_mkdtemp.assert_called_with()
- self.assertItemsEqual(mock_Popen.call_args[0][0],
- ["ssh-keygen", "-f",
- os.path.join(datastore, "privkey"),
- "-t", "dsa", "-b", "768", "-N", "foo"])
+ pkc.cmd.run.assert_called_with(["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "dsa", "-b", "768", "-N", "foo"])
reset()
- proc.wait.return_value = 1
+ pkc.cmd.run.return_value.success = False
self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata)
mock_rmtree.assert_called_with(datastore)
@@ -281,9 +278,8 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
if HAS_CRYPTO:
@patch(passphrase, "foo")
- @patch("Bcfg2.Encryption.ssl_encrypt")
- @patch("Bcfg2.Encryption.get_algorithm")
- def inner2(mock_get_algorithm, mock_ssl_encrypt):
+ @patch("Bcfg2.Server.Encryption.ssl_encrypt")
+ def inner2(mock_ssl_encrypt):
reset()
mock_ssl_encrypt.return_value = "encryptedprivatekey"
Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True
@@ -302,136 +298,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
"ssh-rsa publickey pubkey.filename\n", group="foo")
pkc.write_data.assert_called_with("encryptedprivatekey",
group="foo", ext=".crypt")
- mock_ssl_encrypt.assert_called_with(
- "privatekey", "foo",
- algorithm=mock_get_algorithm.return_value)
+ mock_ssl_encrypt.assert_called_with("privatekey", "foo")
mock_rmtree.assert_called_with(datastore)
inner2()
-
- def test_Index(self):
- has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False
- TestStructFile.test_Index(self)
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_Index_crypto(self):
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict"
-
- pkc = self.get_obj()
- pkc._decrypt = Mock()
- pkc._decrypt.return_value = 'plaintext'
- pkc.data = '''
-<PrivateKey>
- <Group name="test">
- <Passphrase encrypted="foo">crypted</Passphrase>
- </Group>
- <Group name="test" negate="true">
- <Passphrase>plain</Passphrase>
- </Group>
-</PrivateKey>'''
-
- # test successful decryption
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
- for el in pkc.xdata.xpath("//Crypted"):
- self.assertEqual(el.text, pkc._decrypt.return_value)
-
- # test failed decryption, strict
- pkc._decrypt.reset_mock()
- pkc._decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError, pkc.Index)
-
- # test failed decryption, lax
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax"
- pkc._decrypt.reset_mock()
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
- def test_decrypt(self, mock_bruteforce, mock_get_passphrases,
- mock_get_algorithm, mock_ssl):
- pkc = self.get_obj()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass",
- bar="barpass")
- mock_get_algorithm.return_value = "bf_cbc"
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pkc._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
index ea3549c1b..07a1b120e 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
@@ -159,21 +159,6 @@ class TestCfgInfo(TestCfgBaseFileMatcher):
self.assertRaises(NotImplementedError,
ci.bind_info_to_entry, Mock(), Mock())
- def test__set_info(self):
- ci = self.get_obj()
- entry = Mock()
- entry.attrib = dict()
-
- info = {"foo": "foo",
- "_bar": "bar",
- "bar:baz=quux": "quux",
- "baz__": "baz",
- "__quux": "quux"}
- ci._set_info(entry, info)
- self.assertItemsEqual(entry.attrib,
- dict([(k, v) for k, v in info.items()
- if not k.startswith("__")]))
-
class TestCfgVerifier(TestCfgBaseFileMatcher):
test_obj = CfgVerifier
@@ -259,29 +244,26 @@ class TestCfgCreator(TestCfgBaseFileMatcher):
class TestCfgDefaultInfo(TestCfgInfo):
test_obj = CfgDefaultInfo
- def get_obj(self, defaults=None):
- if defaults is None:
- defaults = dict()
- return self.test_obj(defaults)
+ def get_obj(self, *_):
+ return self.test_obj()
- @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__")
- def test__init(self, mock__init):
- defaults = Mock()
- cdi = self.get_obj(defaults=defaults)
- mock__init.assert_called_with(cdi, '')
- self.assertEqual(defaults, cdi.defaults)
+ def test__init(self):
+ pass
def test_handle_event(self):
# this CfgInfo handler doesn't handle any events -- it's not
# file-driven, but based on the built-in defaults
pass
- def test_bind_info_to_entry(self):
+ @patch("Bcfg2.Server.Plugin.default_path_metadata")
+ def test_bind_info_to_entry(self, mock_default_path_metadata):
cdi = self.get_obj()
- cdi._set_info = Mock()
- entry = Mock()
+ entry = lxml.etree.Element("Test", name="test")
+ mock_default_path_metadata.return_value = \
+ dict(owner="root", mode="0600")
cdi.bind_info_to_entry(entry, Mock())
- cdi._set_info.assert_called_with(entry, cdi.defaults)
+ self.assertItemsEqual(entry.attrib,
+ dict(owner="root", mode="0600", name="test"))
class TestCfgEntrySet(TestEntrySet):
@@ -438,8 +420,6 @@ class TestCfgEntrySet(TestEntrySet):
@patch("Bcfg2.Server.Plugins.Cfg.u_str")
@patch("Bcfg2.Server.Plugins.Cfg.b64encode")
def test_bind_entry(self, mock_b64encode, mock_u_str):
- Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False)
-
mock_u_str.side_effect = lambda x: x
eset = self.get_obj()
@@ -447,6 +427,7 @@ class TestCfgEntrySet(TestEntrySet):
eset._generate_data = Mock()
eset.get_handlers = Mock()
eset._validate_data = Mock()
+ eset.setup = dict(validate=False)
def reset():
mock_b64encode.reset_mock()
@@ -524,7 +505,7 @@ class TestCfgEntrySet(TestEntrySet):
# test successful validation
entry = reset()
- Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True
+ eset.setup['validate'] = True
bound = eset.bind_entry(entry, metadata)
eset.bind_info_to_entry.assert_called_with(entry, metadata)
eset._generate_data.assert_called_with(entry, metadata)
@@ -601,24 +582,24 @@ class TestCfgEntrySet(TestEntrySet):
if entry.specific is not None:
self.assertFalse(entry.specific.matches.called)
- def test_bind_info_to_entry(self):
- default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgDefaultInfo")
+ def test_bind_info_to_entry(self, mock_DefaultInfo):
eset = self.get_obj()
eset.get_handlers = Mock()
eset.get_handlers.return_value = []
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock()
metadata = Mock()
def reset():
eset.get_handlers.reset_mock()
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock()
+ mock_DefaultInfo.reset_mock()
return lxml.etree.Element("Path", name="/test.txt")
# test with no info handlers
entry = reset()
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
self.assertEqual(entry.get("type"), "file")
# test with one info handler
@@ -627,7 +608,8 @@ class TestCfgEntrySet(TestEntrySet):
eset.get_handlers.return_value = [handler]
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
handler.bind_info_to_entry.assert_called_with(entry, metadata)
self.assertEqual(entry.get("type"), "file")
@@ -637,7 +619,8 @@ class TestCfgEntrySet(TestEntrySet):
eset.get_handlers.return_value = handlers
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
# we don't care which handler gets called as long as exactly
# one of them does
called = 0
@@ -648,8 +631,6 @@ class TestCfgEntrySet(TestEntrySet):
self.assertEqual(called, 1)
self.assertEqual(entry.get("type"), "file")
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info
-
def test_create_data(self):
eset = self.get_obj()
eset.best_matching = Mock()
@@ -754,31 +735,39 @@ class TestCfg(TestGroupSpool, TestPullTarget):
def get_obj(self, core=None):
if core is None:
core = Mock()
- core.setup = MagicMock()
return TestGroupSpool.get_obj(self, core=core)
+ @patch("Bcfg2.Options.get_option_parser")
@patch("Bcfg2.Server.Plugin.GroupSpool.__init__")
@patch("Bcfg2.Server.Plugin.PullTarget.__init__")
- def test__init(self, mock_pulltarget_init, mock_groupspool_init):
+ def test__init(self, mock_pulltarget_init, mock_groupspool_init,
+ mock_get_option_parser):
+ setup = MagicMock()
+ setup.__contains__.return_value = False
+ mock_get_option_parser.return_value = setup
+
+ def reset():
+ core.reset_mock()
+ setup.reset_mock()
+ mock_pulltarget_init.reset_mock()
+ mock_groupspool_init.reset_mock()
+
core = Mock()
- core.setup = MagicMock()
cfg = self.test_obj(core, datastore)
mock_pulltarget_init.assert_called_with(cfg)
mock_groupspool_init.assert_called_with(cfg, core, datastore)
- core.setup.add_option.assert_called_with("validate",
- Bcfg2.Options.CFG_VALIDATION)
- core.setup.reparse.assert_called_with()
-
- core.reset_mock()
- core.setup.reset_mock()
- mock_pulltarget_init.reset_mock()
- mock_groupspool_init.reset_mock()
- core.setup.__contains__.return_value = True
+ setup.add_option.assert_called_with(
+ "validate",
+ Bcfg2.Options.CFG_VALIDATION)
+ mock_get_option_parser.return_value.reparse.assert_called_with()
+
+ reset()
+ setup.__contains__.return_value = True
cfg = self.test_obj(core, datastore)
mock_pulltarget_init.assert_called_with(cfg)
mock_groupspool_init.assert_called_with(cfg, core, datastore)
- self.assertFalse(core.setup.add_option.called)
- self.assertFalse(core.setup.reparse.called)
+ self.assertFalse(setup.add_option.called)
+ self.assertFalse(setup.reparse.called)
def test_has_generator(self):
cfg = self.get_obj()