diff options
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins')
17 files changed, 719 insertions, 691 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py new file mode 100644 index 000000000..86a960701 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py @@ -0,0 +1,223 @@ +import os +import sys +import lxml.etree +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.ACL import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \ + TestClientACLs + + +class TestFunctions(Bcfg2TestCase): + def test_rmi_names_equal(self): + good_cases = [('*', 'foo'), + ('foo', 'foo'), + ('foo.*', 'foo.bar'), + ('*.*', 'foo.bar'), + ('foo.bar', 'foo.bar'), + ('*.bar', 'foo.bar'), + ('foo.*.bar', 'foo.baz.bar')] + bad_cases = [('foo', 'bar'), + ('*', 'foo.bar'), + ('*.*', 'foo'), + ('*.*', 'foo.bar.baz'), + ('foo.*', 'bar.foo'), + ('*.bar', 'bar.foo'), + ('foo.*', 'foobar')] + for first, second in good_cases: + self.assertTrue(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly False" % + (first, second)) + self.assertTrue(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly False" % + (second, first)) + for first, second in bad_cases: + self.assertFalse(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly True" % + (first, second)) + self.assertFalse(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly True" % + (second, first)) + + def test_ip_matches(self): + good_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + bad_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + for ip, entry in good_cases: + self.assertTrue(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly False" % + (ip, lxml.etree.tostring(entry))) + for ip, entry in bad_cases: + self.assertFalse(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly True" % + (ip, lxml.etree.tostring(entry))) + + +class TestIPACLFile(TestXMLFileBacked): + test_obj = IPACLFile + + @patch("Bcfg2.Server.Plugins.ACL.ip_matches") + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches): + af = self.get_obj() + ip = "10.0.0.8" + rmi = "ACL.test" + + def reset(): + mock_rmi_names_equal.reset_mock() + mock_ip_matches.reset_mock() + + # test default defer with no entries + af.entries = [] + self.assertIsNone(af.check_acl(ip, rmi)) + + # test explicit allow, deny, and defer + entries = dict(Allow=lxml.etree.Element("Allow", method=rmi), + Deny=lxml.etree.Element("Deny", method=rmi), + Defer=lxml.etree.Element("Defer", method=rmi)) + af.entries = list(entries.values()) + + def get_ip_matches(tag): + def ip_matches(ip, entry): + return entry.tag == tag + + return ip_matches + + mock_rmi_names_equal.return_value = True + + reset() + mock_ip_matches.side_effect = get_ip_matches("Allow") + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Allow']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Deny") + self.assertFalse(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Deny']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Defer") + self.assertIsNone(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Defer']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + # test matching RMI names + reset() + mock_ip_matches.side_effect = lambda i, e: True + mock_rmi_names_equal.side_effect = lambda a, b: a == b + rmi = "ACL.test2" + matching = lxml.etree.Element("Allow", method=rmi) + af.entries.append(matching) + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, matching) + self.assertTrue( + call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or + call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list) + + # test implicit allow for localhost, defer for others + reset() + mock_ip_matches.side_effect = lambda i, e: False + self.assertIsNone(af.check_acl(ip, rmi)) + + reset() + self.assertTrue(af.check_acl("127.0.0.1", rmi)) + + +class TestMetadataACLFile(TestStructFile): + test_obj = MetadataACLFile + + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal): + af = self.get_obj() + af.Match = Mock() + metadata = Mock() + mock_rmi_names_equal.side_effect = lambda a, b: a == b + + def reset(): + af.Match.reset_mock() + mock_rmi_names_equal.reset_mock() + + # test default allow + af.entries = [] + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + + # test explicit allow and deny + reset() + af.entries = [lxml.etree.Element("Allow", method='ACL.test'), + lxml.etree.Element("Deny", method='ACL.test2')] + af.Match.return_value = af.entries + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test', 'ACL.test'), + mock_rmi_names_equal.call_args_list) + + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test2')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test2', 'ACL.test2'), + mock_rmi_names_equal.call_args_list) + + # test default deny for non-localhost + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + # test default allow for localhost + reset() + metadata.hostname = 'localhost' + self.assertTrue(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + +class TestACL(TestPlugin, TestClientACLs): + test_obj = ACL + + def test_check_acl_ip(self): + acl = self.get_obj() + acl.ip_acls = Mock() + self.assertEqual(acl.check_acl_ip(("192.168.1.10", "12345"), + "ACL.test"), + acl.ip_acls.check_acl.return_value) + acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test") + + def test_check_acl_metadata(self): + acl = self.get_obj() + acl.metadata_acls = Mock() + metadata = Mock() + self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"), + acl.metadata_acls.check_acl.return_value) + acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py new file mode 100644 index 000000000..cfb379c40 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -0,0 +1,111 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Bundler import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestStructure, \ + TestXMLDirectoryBacked + + +class TestBundleFile(TestStructFile): + test_obj = BundleFile + path = os.path.join(datastore, "test", "test1.xml") + + def test_bundle_name(self): + cases = [("foo.xml", "foo"), + ("foo.bar.xml", "foo.bar"), + ("foo-bar-baz.xml", "foo-bar-baz"), + ("foo....xml", "foo..."), + ("foo.genshi", "foo")] + bf = self.get_obj() + for fname, bname in cases: + bf.name = fname + self.assertEqual(bf.bundle_name, bname) + + +class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): + test_obj = Bundler + + def get_obj(self, core=None): + @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, + self.test_obj.__name__), + Mock()) + def inner(): + return TestPlugin.get_obj(self, core=core) + return inner() + + @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent") + def test_HandleEvent(self, mock_HandleEvent): + b = self.get_obj() + b.bundles = dict(foo=Mock(), bar=Mock()) + b.entries = {"foo.xml": BundleFile("foo.xml"), + "baz.xml": BundleFile("baz.xml")} + event = Mock() + b.HandleEvent(event) + mock_HandleEvent.assert_called_with(b, event) + self.assertItemsEqual(b.bundles, + dict(foo=b.entries['foo.xml'], + baz=b.entries['baz.xml'])) + + def test_BuildStructures(self): + b = self.get_obj() + b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(), + has_dep=Mock(), is_dep=Mock(), indep=Mock()) + expected = dict() + + b.bundles['error'].XMLMatch.side_effect = TemplateError(None) + + xinclude = lxml.etree.Element("Bundle") + lxml.etree.SubElement(lxml.etree.SubElement(xinclude, "Bundle"), + "Path", name="/test") + b.bundles['xinclude'].XMLMatch.return_value = xinclude + expected['xinclude'] = lxml.etree.Element("Bundle", name="xinclude") + lxml.etree.SubElement(expected['xinclude'], "Path", name="/test") + + has_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(has_dep, "Bundle", name="is_dep") + lxml.etree.SubElement(has_dep, "Package", name="foo") + b.bundles['has_dep'].XMLMatch.return_value = has_dep + expected['has_dep'] = lxml.etree.Element("Bundle", name="has_dep") + lxml.etree.SubElement(expected['has_dep'], "Package", name="foo") + + is_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(is_dep, "Package", name="bar") + b.bundles['is_dep'].XMLMatch.return_value = is_dep + expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep") + lxml.etree.SubElement(expected['is_dep'], "Package", name="bar") + + indep = lxml.etree.Element("Bundle", independent="true") + lxml.etree.SubElement(indep, "Service", name="baz") + b.bundles['indep'].XMLMatch.return_value = indep + expected['indep'] = lxml.etree.Element("Independent", name="indep") + lxml.etree.SubElement(expected['indep'], "Service", name="baz") + + metadata = Mock() + metadata.bundles = ["error", "xinclude", "has_dep", "indep"] + + rv = b.BuildStructures(metadata) + self.assertEqual(len(rv), 4) + for bundle in rv: + name = bundle.get("name") + self.assertIsNotNone(name, + "Bundle %s was not built" % name) + self.assertIn(name, expected, + "Unexpected bundle %s was built" % name) + self.assertXMLEqual(bundle, expected[name], + "Bundle %s was not built correctly" % name) + b.bundles[name].XMLMatch.assert_called_with(metadata) + + b.bundles['error'].XMLMatch.assert_called_with(metadata) + self.assertFalse(b.bundles['skip'].XMLMatch.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py index d655a20cd..b77d52033 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py @@ -42,27 +42,25 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): def test_category(self): akg = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp + akg.setup = Mock() + akg.setup.cfp.has_section.return_value = False + akg.setup.cfp.has_option.return_value = False self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + akg.setup.reset_mock() + akg.setup.cfp.has_section.return_value = True self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(akg.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + + akg.setup.reset_mock() + akg.setup.cfp.has_option.return_value = True + self.assertEqual(akg.category, akg.setup.cfp.get.return_value) + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + akg.setup.cfp.get.assert_called_with("sshkeys", "category") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py index fc5d5e53d..31227329c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py @@ -31,11 +31,11 @@ if HAS_CHEETAH or can_skip: ccg.data = "data" entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock() + ccg.setup = MagicMock() self.assertEqual(ccg.get_data(entry, metadata), mock_Template.return_value.respond.return_value) - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo") + ccg.setup.__getitem__.assert_called_with("repo") mock_Template.assert_called_with("data".decode(ccg.encoding), compilerSettings=ccg.settings) tmpl = mock_Template.return_value @@ -44,5 +44,4 @@ if HAS_CHEETAH or can_skip: self.assertEqual(tmpl.name, entry.get("name")) self.assertEqual(tmpl.path, entry.get("name")) self.assertEqual(tmpl.source_path, ccg.name) - self.assertEqual(tmpl.repo, - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value) + self.assertEqual(tmpl.repo, ccg.setup.__getitem__.return_value) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py index 71a7410da..f8e9b1990 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py @@ -27,15 +27,12 @@ if can_skip or HAS_CRYPTO: pass @patchIf(HAS_CRYPTO, - "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm") - @patchIf(HAS_CRYPTO, "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") - def test_handle_event(self, mock_decrypt, mock_get_algorithm): + def test_handle_event(self, mock_decrypt): @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") def inner(mock_handle_event): def reset(): mock_decrypt.reset_mock() - mock_get_algorithm.reset_mock() mock_handle_event.reset_mock() def get_event_data(obj, event): @@ -47,9 +44,7 @@ if can_skip or HAS_CRYPTO: ceg = self.get_obj() ceg.handle_event(event) mock_handle_event.assert_called_with(ceg, event) - mock_decrypt.assert_called_with("encrypted", - setup=SETUP, - algorithm=mock_get_algorithm.return_value) + mock_decrypt.assert_called_with("encrypted") self.assertEqual(ceg.data, "plaintext") reset() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py index b447a9bb8..330779c99 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py @@ -14,20 +14,14 @@ while path != "/": path = os.path.dirname(path) from common import * -try: - from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ - TestCfgGenshiGenerator - HAS_GENSHI = True -except ImportError: - TestCfgGenshiGenerator = object - HAS_GENSHI = False +from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ + TestCfgGenshiGenerator -if can_skip or (HAS_CRYPTO and HAS_GENSHI): +if can_skip or HAS_CRYPTO: class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator): test_obj = CfgEncryptedGenshiGenerator @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") def setUp(self): pass diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py index 0f369113b..7ceedb7c2 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py @@ -21,35 +21,32 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier class TestCfgExternalCommandVerifier(TestCfgVerifier): test_obj = CfgExternalCommandVerifier - @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen") - def test_verify_entry(self, mock_Popen): - proc = Mock() - mock_Popen.return_value = proc - proc.wait.return_value = 0 - proc.communicate.return_value = ("stdout", "stderr") + def test_verify_entry(self): entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() ecv = self.get_obj() ecv.cmd = ["/bin/bash", "-x", "foo"] + ecv.exc = Mock() + ecv.exc.run.return_value = Mock() + ecv.exc.run.return_value.success = True + ecv.verify_entry(entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") - mock_Popen.reset_mock() - proc.wait.return_value = 13 + ecv.exc.reset_mock() + ecv.exc.run.return_value.success = False self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") + + ecv.exc.reset_mock() - mock_Popen.reset_mock() - mock_Popen.side_effect = OSError + ecv.exc.reset_mock() + ecv.exc.run.side_effect = OSError self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") @patch("os.access") def test_handle_event(self, mock_access): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py index 2e8b7bfa5..b73670fb7 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -19,111 +19,97 @@ from common import * from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator -if can_skip or HAS_GENSHI: - class TestCfgGenshiGenerator(TestCfgGenerator): - test_obj = CfgGenshiGenerator - - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") - def setUp(self): - pass - - def test_removecomment(self): - data = [(None, "test", 1), - (None, "test2", 2)] - stream = [(genshi.core.COMMENT, "test", 0), - data[0], - (genshi.core.COMMENT, "test3", 0), - data[1]] - self.assertItemsEqual(list(removecomment(stream)), data) - - def test__init(self): - TestCfgGenerator.test__init(self) - cgg = self.get_obj() - self.assertIsInstance(cgg.loader, cgg.__loader_cls__) - - def test_get_data(self): - cgg = self.get_obj() - cgg._handle_genshi_exception = Mock() - cgg.template = Mock() - fltr = Mock() - cgg.template.generate.return_value = fltr - stream = Mock() - fltr.filter.return_value = stream - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() - - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock() - - def reset(): - cgg.template.reset_mock() - cgg._handle_genshi_exception.reset_mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock() - - template_vars = dict( - name=entry.get("name"), - metadata=metadata, - path=cgg.name, - source_path=cgg.name, - repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value) - - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - def render(fmt, **kwargs): - stream.render.side_effect = None - raise TypeError - stream.render.side_effect = render - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - self.assertEqual(stream.render.call_args_list, - [call("text", encoding=cgg.encoding, - strip_whitespace=False), - call("text", encoding=cgg.encoding)]) - - reset() - stream.render.side_effect = UndefinedError("test") - self.assertRaises(UndefinedError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - stream.render.side_effect = ValueError - cgg._handle_genshi_exception.side_effect = ValueError - self.assertRaises(ValueError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - self.assertTrue(cgg._handle_genshi_exception.called) - - def test_handle_event(self): - cgg = self.get_obj() - cgg.loader = Mock() - event = Mock() - cgg.handle_event(event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) - - cgg.loader.reset_mock() - cgg.loader.load.side_effect = OSError - self.assertRaises(PluginExecutionError, - cgg.handle_event, event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) +class TestCfgGenshiGenerator(TestCfgGenerator): + test_obj = CfgGenshiGenerator + + def test__init(self): + TestCfgGenerator.test__init(self) + cgg = self.get_obj() + self.assertIsInstance(cgg.loader, cgg.__loader_cls__) + + def test_get_data(self): + cgg = self.get_obj() + cgg._handle_genshi_exception = Mock() + cgg.setup = MagicMock() + cgg.template = Mock() + fltr = Mock() + cgg.template.generate.return_value = fltr + stream = Mock() + fltr.filter.return_value = stream + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + + def reset(): + cgg.template.reset_mock() + cgg._handle_genshi_exception.reset_mock() + cgg.setup.reset_mock() + + template_vars = dict( + name=entry.get("name"), + metadata=metadata, + path=cgg.name, + source_path=cgg.name, + repo=cgg.setup.__getitem__.return_value) + + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + def render(fmt, **kwargs): + stream.render.side_effect = None + raise TypeError + stream.render.side_effect = render + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + self.assertEqual(stream.render.call_args_list, + [call("text", encoding=cgg.encoding, + strip_whitespace=False), + call("text", encoding=cgg.encoding)]) + + reset() + stream.render.side_effect = UndefinedError("test") + self.assertRaises(UndefinedError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + stream.render.side_effect = ValueError + cgg._handle_genshi_exception.side_effect = ValueError + self.assertRaises(ValueError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + self.assertTrue(cgg._handle_genshi_exception.called) + + def test_handle_event(self): + cgg = self.get_obj() + cgg.loader = Mock() + event = Mock() + cgg.handle_event(event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) + + cgg.loader.reset_mock() + cgg.loader.load.side_effect = OSError + self.assertRaises(PluginExecutionError, + cgg.handle_event, event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py index 839e9c3b8..7e7cb5e3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py @@ -27,47 +27,16 @@ class TestCfgInfoXML(TestCfgInfo): self.assertIsInstance(ci.infoxml, InfoXML) def test_bind_info_to_entry(self): - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() ci = self.get_obj() ci.infoxml = Mock() - ci._set_info = Mock() - - self.assertRaises(PluginExecutionError, - ci.bind_info_to_entry, entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, dict(), - entry=entry) - self.assertFalse(ci._set_info.called) - - ci.infoxml.reset_mock() - ci._set_info.reset_mock() - mdata_value = Mock() - def set_mdata(metadata, mdata, entry=None): - mdata['Info'] = {None: mdata_value} + entry = Mock() + metadata = Mock() - ci.infoxml.pnode.Match.side_effect = set_mdata ci.bind_info_to_entry(entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, - dict(Info={None: mdata_value}), - entry=entry) - ci._set_info.assert_called_with(entry, mdata_value) + ci.infoxml.BindEntry.assert_called_with(entry, metadata) def test_handle_event(self): ci = self.get_obj() ci.infoxml = Mock() ci.handle_event(Mock) ci.infoxml.HandleEvent.assert_called_with() - - def test__set_info(self): - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info") - def inner(mock_set_info): - ci = self.get_obj() - entry = Mock() - info = {"foo": "foo", - "__children__": ["one", "two"]} - ci._set_info(entry, info) - self.assertItemsEqual(entry.append.call_args_list, - [call(c) for c in info['__children__']]) - - inner() - TestCfgInfo.test__set_info(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index dc4b11241..48d5cdbfe 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -7,7 +7,7 @@ from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import * from Bcfg2.Server.Plugin import PluginExecutionError import Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator try: - from Bcfg2.Encryption import EVPError + from Bcfg2.Server.Encryption import EVPError HAS_CRYPTO = True except: HAS_CRYPTO = False @@ -44,77 +44,76 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): def test_category(self): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(pkc.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + pkc.setup.reset_mock() + pkc.setup.cfp.has_option.return_value = True + self.assertEqual(pkc.category, pkc.setup.cfp.get.return_value) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.get.assert_called_with("sshkeys", "category") @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.get_passphrases") def test_passphrase(self, mock_get_passphrases): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") - cfp.reset_mock() - cfp.get.return_value = "test" + pkc.setup.reset_mock() + pkc.setup.cfp.get.return_value = "test" mock_get_passphrases.return_value = dict(test="foo", test2="bar") - cfp.has_option.return_value = True + pkc.setup.cfp.has_option.return_value = True self.assertEqual(pkc.passphrase, "foo") - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") - cfp.get.assert_called_with("sshkeys", "passphrase") - mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") + pkc.setup.cfp.get.assert_called_with("sshkeys", "passphrase") + mock_get_passphrases.assert_called_with() @patch("shutil.rmtree") @patch("tempfile.mkdtemp") - @patch("subprocess.Popen") - def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree): + def test__gen_keypair(self, mock_mkdtemp, mock_rmtree): pkc = self.get_obj() + pkc.cmd = Mock() pkc.XMLMatch = Mock() mock_mkdtemp.return_value = datastore metadata = Mock() - proc = Mock() - proc.wait.return_value = 0 - proc.communicate.return_value = MagicMock() - mock_Popen.return_value = proc + exc = Mock() + exc.success = True + pkc.cmd.run.return_value = exc spec = lxml.etree.Element("PrivateKey") pkc.XMLMatch.return_value = spec def reset(): pkc.XMLMatch.reset_mock() - mock_Popen.reset_mock() + pkc.cmd.reset_mock() mock_mkdtemp.reset_mock() mock_rmtree.reset_mock() @@ -122,10 +121,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "rsa", "-N", ""]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "rsa", "-N", ""]) reset() lxml.etree.SubElement(spec, "Params", bits="768", type="dsa") @@ -136,13 +134,12 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "dsa", "-b", "768", "-N", "foo"]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "dsa", "-b", "768", "-N", "foo"]) reset() - proc.wait.return_value = 1 + pkc.cmd.run.return_value.success = False self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata) mock_rmtree.assert_called_with(datastore) @@ -281,9 +278,8 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): if HAS_CRYPTO: @patch(passphrase, "foo") - @patch("Bcfg2.Encryption.ssl_encrypt") - @patch("Bcfg2.Encryption.get_algorithm") - def inner2(mock_get_algorithm, mock_ssl_encrypt): + @patch("Bcfg2.Server.Encryption.ssl_encrypt") + def inner2(mock_ssl_encrypt): reset() mock_ssl_encrypt.return_value = "encryptedprivatekey" Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True @@ -302,136 +298,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): "ssh-rsa publickey pubkey.filename\n", group="foo") pkc.write_data.assert_called_with("encryptedprivatekey", group="foo", ext=".crypt") - mock_ssl_encrypt.assert_called_with( - "privatekey", "foo", - algorithm=mock_get_algorithm.return_value) + mock_ssl_encrypt.assert_called_with("privatekey", "foo") mock_rmtree.assert_called_with(datastore) inner2() - - def test_Index(self): - has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False - TestStructFile.test_Index(self) - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict" - - pkc = self.get_obj() - pkc._decrypt = Mock() - pkc._decrypt.return_value = 'plaintext' - pkc.data = ''' -<PrivateKey> - <Group name="test"> - <Passphrase encrypted="foo">crypted</Passphrase> - </Group> - <Group name="test" negate="true"> - <Passphrase>plain</Passphrase> - </Group> -</PrivateKey>''' - - # test successful decryption - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - for el in pkc.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pkc._decrypt.return_value) - - # test failed decryption, strict - pkc._decrypt.reset_mock() - pkc._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pkc.Index) - - # test failed decryption, lax - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax" - pkc._decrypt.reset_mock() - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): - pkc = self.get_obj() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index ea3549c1b..07a1b120e 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -159,21 +159,6 @@ class TestCfgInfo(TestCfgBaseFileMatcher): self.assertRaises(NotImplementedError, ci.bind_info_to_entry, Mock(), Mock()) - def test__set_info(self): - ci = self.get_obj() - entry = Mock() - entry.attrib = dict() - - info = {"foo": "foo", - "_bar": "bar", - "bar:baz=quux": "quux", - "baz__": "baz", - "__quux": "quux"} - ci._set_info(entry, info) - self.assertItemsEqual(entry.attrib, - dict([(k, v) for k, v in info.items() - if not k.startswith("__")])) - class TestCfgVerifier(TestCfgBaseFileMatcher): test_obj = CfgVerifier @@ -259,29 +244,26 @@ class TestCfgCreator(TestCfgBaseFileMatcher): class TestCfgDefaultInfo(TestCfgInfo): test_obj = CfgDefaultInfo - def get_obj(self, defaults=None): - if defaults is None: - defaults = dict() - return self.test_obj(defaults) + def get_obj(self, *_): + return self.test_obj() - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__") - def test__init(self, mock__init): - defaults = Mock() - cdi = self.get_obj(defaults=defaults) - mock__init.assert_called_with(cdi, '') - self.assertEqual(defaults, cdi.defaults) + def test__init(self): + pass def test_handle_event(self): # this CfgInfo handler doesn't handle any events -- it's not # file-driven, but based on the built-in defaults pass - def test_bind_info_to_entry(self): + @patch("Bcfg2.Server.Plugin.default_path_metadata") + def test_bind_info_to_entry(self, mock_default_path_metadata): cdi = self.get_obj() - cdi._set_info = Mock() - entry = Mock() + entry = lxml.etree.Element("Test", name="test") + mock_default_path_metadata.return_value = \ + dict(owner="root", mode="0600") cdi.bind_info_to_entry(entry, Mock()) - cdi._set_info.assert_called_with(entry, cdi.defaults) + self.assertItemsEqual(entry.attrib, + dict(owner="root", mode="0600", name="test")) class TestCfgEntrySet(TestEntrySet): @@ -438,8 +420,6 @@ class TestCfgEntrySet(TestEntrySet): @patch("Bcfg2.Server.Plugins.Cfg.u_str") @patch("Bcfg2.Server.Plugins.Cfg.b64encode") def test_bind_entry(self, mock_b64encode, mock_u_str): - Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False) - mock_u_str.side_effect = lambda x: x eset = self.get_obj() @@ -447,6 +427,7 @@ class TestCfgEntrySet(TestEntrySet): eset._generate_data = Mock() eset.get_handlers = Mock() eset._validate_data = Mock() + eset.setup = dict(validate=False) def reset(): mock_b64encode.reset_mock() @@ -524,7 +505,7 @@ class TestCfgEntrySet(TestEntrySet): # test successful validation entry = reset() - Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True + eset.setup['validate'] = True bound = eset.bind_entry(entry, metadata) eset.bind_info_to_entry.assert_called_with(entry, metadata) eset._generate_data.assert_called_with(entry, metadata) @@ -601,24 +582,24 @@ class TestCfgEntrySet(TestEntrySet): if entry.specific is not None: self.assertFalse(entry.specific.matches.called) - def test_bind_info_to_entry(self): - default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO + @patch("Bcfg2.Server.Plugins.Cfg.CfgDefaultInfo") + def test_bind_info_to_entry(self, mock_DefaultInfo): eset = self.get_obj() eset.get_handlers = Mock() eset.get_handlers.return_value = [] - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock() metadata = Mock() def reset(): eset.get_handlers.reset_mock() - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock() + mock_DefaultInfo.reset_mock() return lxml.etree.Element("Path", name="/test.txt") # test with no info handlers entry = reset() eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) self.assertEqual(entry.get("type"), "file") # test with one info handler @@ -627,7 +608,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = [handler] eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) handler.bind_info_to_entry.assert_called_with(entry, metadata) self.assertEqual(entry.get("type"), "file") @@ -637,7 +619,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = handlers eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) # we don't care which handler gets called as long as exactly # one of them does called = 0 @@ -648,8 +631,6 @@ class TestCfgEntrySet(TestEntrySet): self.assertEqual(called, 1) self.assertEqual(entry.get("type"), "file") - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info - def test_create_data(self): eset = self.get_obj() eset.best_matching = Mock() @@ -754,31 +735,39 @@ class TestCfg(TestGroupSpool, TestPullTarget): def get_obj(self, core=None): if core is None: core = Mock() - core.setup = MagicMock() return TestGroupSpool.get_obj(self, core=core) + @patch("Bcfg2.Options.get_option_parser") @patch("Bcfg2.Server.Plugin.GroupSpool.__init__") @patch("Bcfg2.Server.Plugin.PullTarget.__init__") - def test__init(self, mock_pulltarget_init, mock_groupspool_init): + def test__init(self, mock_pulltarget_init, mock_groupspool_init, + mock_get_option_parser): + setup = MagicMock() + setup.__contains__.return_value = False + mock_get_option_parser.return_value = setup + + def reset(): + core.reset_mock() + setup.reset_mock() + mock_pulltarget_init.reset_mock() + mock_groupspool_init.reset_mock() + core = Mock() - core.setup = MagicMock() cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - core.setup.add_option.assert_called_with("validate", - Bcfg2.Options.CFG_VALIDATION) - core.setup.reparse.assert_called_with() - - core.reset_mock() - core.setup.reset_mock() - mock_pulltarget_init.reset_mock() - mock_groupspool_init.reset_mock() - core.setup.__contains__.return_value = True + setup.add_option.assert_called_with( + "validate", + Bcfg2.Options.CFG_VALIDATION) + mock_get_option_parser.return_value.reparse.assert_called_with() + + reset() + setup.__contains__.return_value = True cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - self.assertFalse(core.setup.add_option.called) - self.assertFalse(core.setup.reparse.called) + self.assertFalse(setup.add_option.called) + self.assertFalse(setup.reparse.called) def test_has_generator(self): cfg = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py new file mode 100644 index 000000000..537ceb4ff --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py @@ -0,0 +1,60 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Decisions import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestDecision + + +class TestDecisionFile(TestStructFile): + test_obj = DecisionFile + + def test_get_decisions(self): + df = self.get_obj() + metadata = Mock() + + df.xdata = None + self.assertItemsEqual(df.get_decisions(metadata), []) + + df.xdata = lxml.etree.Element("Decisions") + df.XMLMatch = Mock() + df.XMLMatch.return_value = lxml.etree.Element("Decisions") + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Service", name='*') + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Path", + name='/etc/apt/apt.conf') + + self.assertItemsEqual(df.get_decisions(metadata), + [("Service", '*'), + ("Path", '/etc/apt/apt.conf')]) + df.XMLMatch.assert_called_with(metadata) + + +class TestDecisions(TestPlugin, TestDecision): + test_obj = Decisions + + def test_GetDecisions(self): + d = self.get_obj() + d.whitelist = Mock() + d.blacklist = Mock() + metadata = Mock() + + self.assertEqual(d.GetDecisions(metadata, "whitelist"), + d.whitelist.get_decision.return_value) + d.whitelist.get_decision.assert_called_with(metadata) + + self.assertEqual(d.GetDecisions(metadata, "blacklist"), + d.blacklist.get_decision.return_value) + d.blacklist.get_decision.assert_called_with(metadata) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index e2da6ec5d..13c27c149 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -215,9 +215,9 @@ class TestXMLMetadataConfig(TestXMLFileBacked): return XMLMetadataConfig(self.metadata, watch_clients, basefile) return inner() + @patch("Bcfg2.Server.FileMonitor.get_fam", Mock()) def test__init(self): xmc = self.get_obj() - self.assertEqual(self.metadata.core.fam, xmc.fam) self.assertFalse(xmc.fam.AddMonitor.called) def test_xdata(self): @@ -262,20 +262,21 @@ class TestXMLMetadataConfig(TestXMLFileBacked): self.assertEqual(config.base_xdata, "<test/>") def test_add_monitor(self): - core = MagicMock() - config = self.get_obj(core=core) + config = self.get_obj() + config.fam = Mock() fname = "test.xml" fpath = os.path.join(self.metadata.data, fname) config.extras = [] config.add_monitor(fpath) - self.assertFalse(core.fam.AddMonitor.called) + self.assertFalse(config.fam.AddMonitor.called) self.assertEqual(config.extras, [fpath]) - config = self.get_obj(core=core, watch_clients=True) + config = self.get_obj(watch_clients=True) + config.fam = Mock() config.add_monitor(fpath) - core.fam.AddMonitor.assert_called_with(fpath, config.metadata) + config.fam.AddMonitor.assert_called_with(fpath, config.metadata) self.assertItemsEqual(config.extras, [fpath]) def test_Index(self): @@ -499,7 +500,8 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): client_name = "%s%s" % (prefix, i) return client_name - def test__init(self): + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): # test with watch_clients=False core = MagicMock() metadata = self.get_obj(core=core) @@ -512,18 +514,19 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): self.assertEqual(metadata.states, dict()) # test with watch_clients=True - core.fam = MagicMock() metadata = self.get_obj(core=core, watch_clients=True) self.assertEqual(len(metadata.states), 2) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - core.fam.reset_mock() - core.fam.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) + + mock_get_fam.reset_mock() + fam = Mock() + fam.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value = fam self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, self.get_obj, core=core, watch_clients=True) @@ -586,6 +589,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_add_group(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -618,6 +622,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_update_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -635,6 +640,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_remove_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -650,6 +656,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_add_bundle(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -673,6 +680,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_remove_bundle(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -688,6 +696,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_add_client(self): metadata = self.get_obj() metadata.clients_xml.write = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree() metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -722,6 +731,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_update_client(self): metadata = self.get_obj() metadata.clients_xml.write_xml = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree()) metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -1269,25 +1279,24 @@ class TestMetadataBase(TestMetadata): return client_name @patch('os.path.exists') - def test__init(self, mock_exists): - core = MagicMock() - core.fam = Mock() + @patch('Bcfg2.Server.FileMonitor.get_fam') + def test__init(self, mock_get_fam, mock_exists): mock_exists.return_value = False - metadata = self.get_obj(core=core, watch_clients=True) + metadata = self.get_obj(watch_clients=True) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) - core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, - "groups.xml"), - metadata) + mock_get_fam.return_value.AddMonitor.assert_called_with( + os.path.join(metadata.data, "groups.xml"), + metadata) mock_exists.return_value = True - core.fam.reset_mock() - metadata = self.get_obj(core=core, watch_clients=True) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) + mock_get_fam.reset_mock() + metadata = self.get_obj(watch_clients=True) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) def test_add_group(self): pass @@ -1530,7 +1539,8 @@ class TestMetadata_ClientsXML(TestMetadataBase): def load_clients_data(self, metadata=None, xdata=None): if metadata is None: metadata = self.get_obj() - metadata.core.fam = Mock() + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = MagicMock() @patchIf(not isinstance(lxml.etree.Element, Mock), "lxml.etree.Element", Mock()) def inner(): @@ -1538,5 +1548,7 @@ class TestMetadata_ClientsXML(TestMetadataBase): inner() metadata = TestMetadata.load_clients_data(self, metadata=metadata, xdata=xdata) - return TestMetadataBase.load_clients_data(self, metadata=metadata, - xdata=xdata) + rv = TestMetadataBase.load_clients_data(self, metadata=metadata, + xdata=xdata) + Bcfg2.Server.FileMonitor._FAM = fam + return rv diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index 0794db62e..30b08ef2f 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -134,22 +134,20 @@ class TestProbeSet(TestEntrySet): ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"] bogus_names = ["test.py"] - def get_obj(self, path=datastore, fam=None, encoding=None, + def get_obj(self, path=datastore, encoding=None, plugin_name="Probes", basename=None): # get_obj() accepts the basename argument, accepted by the # parent get_obj() method, and just throws it away, since # ProbeSet uses a regex for the "basename" - if fam is None: - fam = Mock() - rv = self.test_obj(path, fam, encoding, plugin_name) + rv = self.test_obj(path, encoding, plugin_name) rv.entry_type = MagicMock() return rv - def test__init(self): - fam = Mock() - ps = self.get_obj(fam=fam) + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): + ps = self.get_obj() self.assertEqual(ps.plugin_name, "Probes") - fam.AddMonitor.assert_called_with(datastore, ps) + mock_get_fam.return_value.AddMonitor.assert_called_with(datastore, ps) TestEntrySet.test__init(self) def test_HandleEvent(self): @@ -300,9 +298,6 @@ text def test__init(self): mock_load_data = Mock() probes = self.get_probes_object(load_data=mock_load_data) - probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore, - probes.name), - probes.probes) mock_load_data.assert_any_call() self.assertEqual(probes.probedata, ClientProbeDataSet()) self.assertEqual(probes.cgroups, dict()) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index 896f5861e..92dc85fb1 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -19,12 +19,6 @@ from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \ TestPlugin, TestDirectoryBacked try: - from Bcfg2.Encryption import EVPError - HAS_CRYPTO = True -except: - HAS_CRYPTO = False - -try: import json JSON = "json" except ImportError: @@ -41,10 +35,10 @@ class TestPropertyFile(Bcfg2TestCase): return self.test_obj(path) def test_write(self): - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() pf = self.get_obj() pf.validate_data = Mock() pf._write = Mock() + pf.setup = Mock() xstr = u("<Properties/>\n") pf.xdata = lxml.etree.XML(xstr) @@ -52,20 +46,20 @@ class TestPropertyFile(Bcfg2TestCase): def reset(): pf.validate_data.reset_mock() pf._write.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() # test writes disabled - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertRaises(PluginExecutionError, pf.write) self.assertFalse(pf.validate_data.called) self.assertFalse(pf._write.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", + pf.setup.cfp.getboolean.assert_called_with("properties", "writes_enabled", default=True) # test successful write reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.write(), pf._write.return_value) pf.validate_data.assert_called_with() pf._write.assert_called_with() @@ -243,178 +237,61 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) - def test_Index(self): - TestStructFile.test_Index(self) - - pf = self.get_obj() - pf.xdata = lxml.etree.Element("Properties") - lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo") - pf.data = lxml.etree.tostring(pf.xdata) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pf = self.get_obj() - pf._decrypt = Mock() - pf._decrypt.return_value = 'plaintext' - pf.data = ''' -<Properties decrypt="strict"> - <Crypted encrypted="foo"> - crypted - <Plain foo="bar">plain</Plain> - </Crypted> - <Crypted encrypted="bar">crypted</Crypted> - <Plain bar="baz">plain</Plain> - <Plain> - <Crypted encrypted="foo">crypted</Crypted> - </Plain> -</Properties>''' - - # test successful decryption - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - for el in pf.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pf._decrypt.return_value) - - # test failed decryption, strict - pf._decrypt.reset_mock() - pf._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pf.Index) - - # test failed decryption, lax - pf.data = pf.data.replace("strict", "lax") - pf._decrypt.reset_mock() - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): - pf = self.get_obj() - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) - @patch("copy.copy") def test_get_additional_data(self, mock_copy): - Bcfg2.Server.Plugins.Properties.SETUP = Mock() pf = self.get_obj() + pf.setup = Mock() pf.XMLMatch = Mock() metadata = Mock() def reset(): mock_copy.reset_mock() pf.XMLMatch.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() pf.xdata = lxml.etree.Element("Properties", automatch="true") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) pf.xdata = lxml.etree.Element("Properties", automatch="false") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) pf.xdata = lxml.etree.Element("Properties") reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py index f018b45dc..7083fff06 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py @@ -45,81 +45,37 @@ class TestRules(TestPrioDir): entry = lxml.etree.Element("Package", name="foo") self.assertFalse(r.HandlesEntry(entry, metadata)) - def test_BindEntry(self, method="BindEntry"): - r = self.get_obj() - r.get_attrs = Mock() - r.get_attrs.return_value = dict(overwrite="new", add="add", - text="text") - entry = lxml.etree.Element("Test", overwrite="old", keep="keep") - metadata = Mock() - - getattr(r, method)(entry, metadata) - r.get_attrs.assert_called_with(entry, metadata) - self.assertItemsEqual(entry.attrib, - dict(overwrite="old", add="add", keep="keep", - text="text")) - - def test_HandleEntry(self): - self.test_BindEntry(method="HandleEntry") - @patch("Bcfg2.Server.Plugin.PrioDir._matches") def test__matches(self, mock_matches): - """ test _matches() behavior regardless of state of _regex_enabled """ r = self.get_obj() metadata = Mock() + # test parent _matches() returning True entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] + candidate = lxml.etree.Element("Path", name="/etc/bar.conf") mock_matches.return_value = True - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) + self.assertTrue(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) - # test special Path cases -- adding and removing trailing slash + # test all conditions returning False mock_matches.reset_mock() mock_matches.return_value = False - rules = ["/etc/foo/", "/etc/bar"] - entry = lxml.etree.Element("Path", name="/etc/foo") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) + self.assertFalse(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) + # test special Path cases -- adding and removing trailing slash mock_matches.reset_mock() - entry = lxml.etree.Element("Path", name="/etc/bar/") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_disabled(self, mock_matches): - """ test failure to match with regex disabled """ - r = self.get_obj() - self.set_regex_enabled(r, False) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] - self.assertFalse(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_enabled(self, mock_matches): - """ test match with regex enabled """ - r = self.get_obj() - self.set_regex_enabled(r, True) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = ["/etc/.*\.conf", "/etc/bar"] - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) - - def set_regex_enabled(self, rules_obj, state): - """ set the state of regex_enabled for this implementation of - Rules """ - if not isinstance(rules_obj.core.setup, MagicMock): - rules_obj.core.setup = MagicMock() - rules_obj.core.setup.cfp.getboolean.return_value = state + withslash = lxml.etree.Element("Path", name="/etc/foo") + withoutslash = lxml.etree.Element("Path", name="/etc/foo/") + self.assertTrue(r._matches(withslash, metadata, withoutslash)) + self.assertTrue(r._matches(withoutslash, metadata, withslash)) + + if r._regex_enabled: + mock_matches.reset_mock() + candidate = lxml.etree.Element("Path", name="/etc/.*\.conf") + self.assertTrue(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) + self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) def test__regex_enabled(self): r = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py index bf9a3ced3..128d6cae5 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py @@ -25,7 +25,7 @@ class TestHelperModule(Bcfg2TestCase): def get_obj(self, path=None): if path is None: path = self.path - return self.test_obj(path, fam=Mock()) + return self.test_obj(path) def test__init(self): hm = self.get_obj() |