diff options
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins')
19 files changed, 1397 insertions, 1671 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py new file mode 100644 index 000000000..86a960701 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py @@ -0,0 +1,223 @@ +import os +import sys +import lxml.etree +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.ACL import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \ + TestClientACLs + + +class TestFunctions(Bcfg2TestCase): + def test_rmi_names_equal(self): + good_cases = [('*', 'foo'), + ('foo', 'foo'), + ('foo.*', 'foo.bar'), + ('*.*', 'foo.bar'), + ('foo.bar', 'foo.bar'), + ('*.bar', 'foo.bar'), + ('foo.*.bar', 'foo.baz.bar')] + bad_cases = [('foo', 'bar'), + ('*', 'foo.bar'), + ('*.*', 'foo'), + ('*.*', 'foo.bar.baz'), + ('foo.*', 'bar.foo'), + ('*.bar', 'bar.foo'), + ('foo.*', 'foobar')] + for first, second in good_cases: + self.assertTrue(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly False" % + (first, second)) + self.assertTrue(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly False" % + (second, first)) + for first, second in bad_cases: + self.assertFalse(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly True" % + (first, second)) + self.assertFalse(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly True" % + (second, first)) + + def test_ip_matches(self): + good_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + bad_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + for ip, entry in good_cases: + self.assertTrue(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly False" % + (ip, lxml.etree.tostring(entry))) + for ip, entry in bad_cases: + self.assertFalse(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly True" % + (ip, lxml.etree.tostring(entry))) + + +class TestIPACLFile(TestXMLFileBacked): + test_obj = IPACLFile + + @patch("Bcfg2.Server.Plugins.ACL.ip_matches") + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches): + af = self.get_obj() + ip = "10.0.0.8" + rmi = "ACL.test" + + def reset(): + mock_rmi_names_equal.reset_mock() + mock_ip_matches.reset_mock() + + # test default defer with no entries + af.entries = [] + self.assertIsNone(af.check_acl(ip, rmi)) + + # test explicit allow, deny, and defer + entries = dict(Allow=lxml.etree.Element("Allow", method=rmi), + Deny=lxml.etree.Element("Deny", method=rmi), + Defer=lxml.etree.Element("Defer", method=rmi)) + af.entries = list(entries.values()) + + def get_ip_matches(tag): + def ip_matches(ip, entry): + return entry.tag == tag + + return ip_matches + + mock_rmi_names_equal.return_value = True + + reset() + mock_ip_matches.side_effect = get_ip_matches("Allow") + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Allow']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Deny") + self.assertFalse(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Deny']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Defer") + self.assertIsNone(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Defer']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + # test matching RMI names + reset() + mock_ip_matches.side_effect = lambda i, e: True + mock_rmi_names_equal.side_effect = lambda a, b: a == b + rmi = "ACL.test2" + matching = lxml.etree.Element("Allow", method=rmi) + af.entries.append(matching) + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, matching) + self.assertTrue( + call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or + call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list) + + # test implicit allow for localhost, defer for others + reset() + mock_ip_matches.side_effect = lambda i, e: False + self.assertIsNone(af.check_acl(ip, rmi)) + + reset() + self.assertTrue(af.check_acl("127.0.0.1", rmi)) + + +class TestMetadataACLFile(TestStructFile): + test_obj = MetadataACLFile + + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal): + af = self.get_obj() + af.Match = Mock() + metadata = Mock() + mock_rmi_names_equal.side_effect = lambda a, b: a == b + + def reset(): + af.Match.reset_mock() + mock_rmi_names_equal.reset_mock() + + # test default allow + af.entries = [] + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + + # test explicit allow and deny + reset() + af.entries = [lxml.etree.Element("Allow", method='ACL.test'), + lxml.etree.Element("Deny", method='ACL.test2')] + af.Match.return_value = af.entries + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test', 'ACL.test'), + mock_rmi_names_equal.call_args_list) + + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test2')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test2', 'ACL.test2'), + mock_rmi_names_equal.call_args_list) + + # test default deny for non-localhost + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + # test default allow for localhost + reset() + metadata.hostname = 'localhost' + self.assertTrue(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + +class TestACL(TestPlugin, TestClientACLs): + test_obj = ACL + + def test_check_acl_ip(self): + acl = self.get_obj() + acl.ip_acls = Mock() + self.assertEqual(acl.check_acl_ip(("192.168.1.10", "12345"), + "ACL.test"), + acl.ip_acls.check_acl.return_value) + acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test") + + def test_check_acl_metadata(self): + acl = self.get_obj() + acl.metadata_acls = Mock() + metadata = Mock() + self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"), + acl.metadata_acls.check_acl.return_value) + acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py new file mode 100644 index 000000000..cfb379c40 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -0,0 +1,111 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Bundler import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestStructure, \ + TestXMLDirectoryBacked + + +class TestBundleFile(TestStructFile): + test_obj = BundleFile + path = os.path.join(datastore, "test", "test1.xml") + + def test_bundle_name(self): + cases = [("foo.xml", "foo"), + ("foo.bar.xml", "foo.bar"), + ("foo-bar-baz.xml", "foo-bar-baz"), + ("foo....xml", "foo..."), + ("foo.genshi", "foo")] + bf = self.get_obj() + for fname, bname in cases: + bf.name = fname + self.assertEqual(bf.bundle_name, bname) + + +class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): + test_obj = Bundler + + def get_obj(self, core=None): + @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, + self.test_obj.__name__), + Mock()) + def inner(): + return TestPlugin.get_obj(self, core=core) + return inner() + + @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent") + def test_HandleEvent(self, mock_HandleEvent): + b = self.get_obj() + b.bundles = dict(foo=Mock(), bar=Mock()) + b.entries = {"foo.xml": BundleFile("foo.xml"), + "baz.xml": BundleFile("baz.xml")} + event = Mock() + b.HandleEvent(event) + mock_HandleEvent.assert_called_with(b, event) + self.assertItemsEqual(b.bundles, + dict(foo=b.entries['foo.xml'], + baz=b.entries['baz.xml'])) + + def test_BuildStructures(self): + b = self.get_obj() + b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(), + has_dep=Mock(), is_dep=Mock(), indep=Mock()) + expected = dict() + + b.bundles['error'].XMLMatch.side_effect = TemplateError(None) + + xinclude = lxml.etree.Element("Bundle") + lxml.etree.SubElement(lxml.etree.SubElement(xinclude, "Bundle"), + "Path", name="/test") + b.bundles['xinclude'].XMLMatch.return_value = xinclude + expected['xinclude'] = lxml.etree.Element("Bundle", name="xinclude") + lxml.etree.SubElement(expected['xinclude'], "Path", name="/test") + + has_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(has_dep, "Bundle", name="is_dep") + lxml.etree.SubElement(has_dep, "Package", name="foo") + b.bundles['has_dep'].XMLMatch.return_value = has_dep + expected['has_dep'] = lxml.etree.Element("Bundle", name="has_dep") + lxml.etree.SubElement(expected['has_dep'], "Package", name="foo") + + is_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(is_dep, "Package", name="bar") + b.bundles['is_dep'].XMLMatch.return_value = is_dep + expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep") + lxml.etree.SubElement(expected['is_dep'], "Package", name="bar") + + indep = lxml.etree.Element("Bundle", independent="true") + lxml.etree.SubElement(indep, "Service", name="baz") + b.bundles['indep'].XMLMatch.return_value = indep + expected['indep'] = lxml.etree.Element("Independent", name="indep") + lxml.etree.SubElement(expected['indep'], "Service", name="baz") + + metadata = Mock() + metadata.bundles = ["error", "xinclude", "has_dep", "indep"] + + rv = b.BuildStructures(metadata) + self.assertEqual(len(rv), 4) + for bundle in rv: + name = bundle.get("name") + self.assertIsNotNone(name, + "Bundle %s was not built" % name) + self.assertIn(name, expected, + "Unexpected bundle %s was built" % name) + self.assertXMLEqual(bundle, expected[name], + "Bundle %s was not built correctly" % name) + b.bundles[name].XMLMatch.assert_called_with(metadata) + + b.bundles['error'].XMLMatch.assert_called_with(metadata) + self.assertFalse(b.bundles['skip'].XMLMatch.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py index d655a20cd..7e96b618c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py @@ -23,12 +23,16 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): test_obj = CfgAuthorizedKeysGenerator should_monitor = False - def get_obj(self, name=None, core=None, fam=None): + def setUp(self): + TestCfgGenerator.setUp(self) + TestStructFile.setUp(self) + + @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.get_cfg") + def get_obj(self, mock_get_cfg, name=None, core=None, fam=None): if name is None: name = self.path - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG = Mock() if core is not None: - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG.core = core + mock_get_cfg.return_value.core = core return self.test_obj(name) @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") @@ -40,33 +44,9 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): mock_HandleEvent.assert_called_with(akg, evt) mock_handle_event.assert_called_with(akg, evt) - def test_category(self): - akg = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp - - self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") - - cfp.reset_mock() - cfp.has_section.return_value = True - self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(akg.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") - @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata") - @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category") def test_get_data(self, mock_ClientMetadata): + Bcfg2.Options.setup.sshkeys_category = "category" akg = self.get_obj() akg.XMLMatch = Mock() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py index fc5d5e53d..2285fb458 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py @@ -17,32 +17,30 @@ from common import * from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator -if HAS_CHEETAH or can_skip: - class TestCfgCheetahGenerator(TestCfgGenerator): - test_obj = CfgCheetahGenerator +class TestCfgCheetahGenerator(TestCfgGenerator): + test_obj = CfgCheetahGenerator - @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") - def setUp(self): - pass + @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") + def setUp(self): + TestCfgGenerator.setUp(self) + set_setup_default("repository", datastore) - @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template") - def test_get_data(self, mock_Template): - ccg = self.get_obj(encoding='UTF-8') - ccg.data = "data" - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock() + @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template") + def test_get_data(self, mock_Template): + ccg = self.get_obj() + ccg.data = "data" + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() - self.assertEqual(ccg.get_data(entry, metadata), - mock_Template.return_value.respond.return_value) - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo") - mock_Template.assert_called_with("data".decode(ccg.encoding), - compilerSettings=ccg.settings) - tmpl = mock_Template.return_value - tmpl.respond.assert_called_with() - self.assertEqual(tmpl.metadata, metadata) - self.assertEqual(tmpl.name, entry.get("name")) - self.assertEqual(tmpl.path, entry.get("name")) - self.assertEqual(tmpl.source_path, ccg.name) - self.assertEqual(tmpl.repo, - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value) + self.assertEqual(ccg.get_data(entry, metadata), + mock_Template.return_value.respond.return_value) + mock_Template.assert_called_with( + "data".decode(Bcfg2.Options.setup.encoding), + compilerSettings=ccg.settings) + tmpl = mock_Template.return_value + tmpl.respond.assert_called_with() + self.assertEqual(tmpl.metadata, metadata) + self.assertEqual(tmpl.name, entry.get("name")) + self.assertEqual(tmpl.path, entry.get("name")) + self.assertEqual(tmpl.source_path, ccg.name) + self.assertEqual(tmpl.repo, datastore) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py index 46062569d..4c987551b 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py @@ -30,18 +30,17 @@ except ImportError: HAS_CRYPTO = False -if can_skip or (HAS_CRYPTO and HAS_CHEETAH): - class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator, - TestCfgEncryptedGenerator): - test_obj = CfgEncryptedCheetahGenerator +class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator, + TestCfgEncryptedGenerator): + test_obj = CfgEncryptedCheetahGenerator - @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") - def setUp(self): - pass + @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") + @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") + def setUp(self): + pass - def test_handle_event(self): - TestCfgEncryptedGenerator.test_handle_event(self) + def test_handle_event(self): + TestCfgEncryptedGenerator.test_handle_event(self) - def test_get_data(self): - TestCfgCheetahGenerator.test_get_data(self) + def test_get_data(self): + TestCfgCheetahGenerator.test_get_data(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py index 71a7410da..873ebd837 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py @@ -18,59 +18,53 @@ from common import * from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator -if can_skip or HAS_CRYPTO: - class TestCfgEncryptedGenerator(TestCfgGenerator): - test_obj = CfgEncryptedGenerator +class TestCfgEncryptedGenerator(TestCfgGenerator): + test_obj = CfgEncryptedGenerator - @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - def setUp(self): - pass + @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") + def setUp(self): + TestCfgGenerator.setUp(self) - @patchIf(HAS_CRYPTO, - "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm") - @patchIf(HAS_CRYPTO, - "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") - def test_handle_event(self, mock_decrypt, mock_get_algorithm): - @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") - def inner(mock_handle_event): - def reset(): - mock_decrypt.reset_mock() - mock_get_algorithm.reset_mock() - mock_handle_event.reset_mock() + @patchIf(HAS_CRYPTO, + "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") + def test_handle_event(self, mock_decrypt): + @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") + def inner(mock_handle_event): + def reset(): + mock_decrypt.reset_mock() + mock_handle_event.reset_mock() - def get_event_data(obj, event): - obj.data = "encrypted" + def get_event_data(obj, event): + obj.data = "encrypted" - mock_handle_event.side_effect = get_event_data - mock_decrypt.side_effect = lambda d, **kw: "plaintext" - event = Mock() - ceg = self.get_obj() - ceg.handle_event(event) - mock_handle_event.assert_called_with(ceg, event) - mock_decrypt.assert_called_with("encrypted", - setup=SETUP, - algorithm=mock_get_algorithm.return_value) - self.assertEqual(ceg.data, "plaintext") + mock_handle_event.side_effect = get_event_data + mock_decrypt.side_effect = lambda d, **kw: "plaintext" + event = Mock() + ceg = self.get_obj() + ceg.handle_event(event) + mock_handle_event.assert_called_with(ceg, event) + mock_decrypt.assert_called_with("encrypted") + self.assertEqual(ceg.data, "plaintext") - reset() - mock_decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, - ceg.handle_event, event) - inner() + reset() + mock_decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, + ceg.handle_event, event) + inner() - # to perform the tests from the parent test object, we - # make bruteforce_decrypt just return whatever data was - # given to it - mock_decrypt.side_effect = lambda d, **kw: d - TestCfgGenerator.test_handle_event(self) + # to perform the tests from the parent test object, we + # make bruteforce_decrypt just return whatever data was + # given to it + mock_decrypt.side_effect = lambda d, **kw: d + TestCfgGenerator.test_handle_event(self) - def test_get_data(self): - ceg = self.get_obj() - ceg.data = None - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() + def test_get_data(self): + ceg = self.get_obj() + ceg.data = None + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() - self.assertRaises(PluginExecutionError, - ceg.get_data, entry, metadata) + self.assertRaises(PluginExecutionError, + ceg.get_data, entry, metadata) - TestCfgGenerator.test_get_data(self) + TestCfgGenerator.test_get_data(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py index b447a9bb8..0b74e4a60 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py @@ -14,20 +14,13 @@ while path != "/": path = os.path.dirname(path) from common import * -try: - from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ - TestCfgGenshiGenerator - HAS_GENSHI = True -except ImportError: - TestCfgGenshiGenerator = object - HAS_GENSHI = False +from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ + TestCfgGenshiGenerator -if can_skip or (HAS_CRYPTO and HAS_GENSHI): - class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator): - test_obj = CfgEncryptedGenshiGenerator +class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator): + test_obj = CfgEncryptedGenshiGenerator - @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") - def setUp(self): - pass + @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") + def setUp(self): + TestCfgGenshiGenerator.setUp(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py index 0f369113b..7ceedb7c2 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py @@ -21,35 +21,32 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier class TestCfgExternalCommandVerifier(TestCfgVerifier): test_obj = CfgExternalCommandVerifier - @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen") - def test_verify_entry(self, mock_Popen): - proc = Mock() - mock_Popen.return_value = proc - proc.wait.return_value = 0 - proc.communicate.return_value = ("stdout", "stderr") + def test_verify_entry(self): entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() ecv = self.get_obj() ecv.cmd = ["/bin/bash", "-x", "foo"] + ecv.exc = Mock() + ecv.exc.run.return_value = Mock() + ecv.exc.run.return_value.success = True + ecv.verify_entry(entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") - mock_Popen.reset_mock() - proc.wait.return_value = 13 + ecv.exc.reset_mock() + ecv.exc.run.return_value.success = False self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") + + ecv.exc.reset_mock() - mock_Popen.reset_mock() - mock_Popen.side_effect = OSError + ecv.exc.reset_mock() + ecv.exc.run.side_effect = OSError self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") @patch("os.access") def test_handle_event(self, mock_access): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py index 2e8b7bfa5..9b6e7fe88 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -19,111 +19,102 @@ from common import * from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator -if can_skip or HAS_GENSHI: - class TestCfgGenshiGenerator(TestCfgGenerator): - test_obj = CfgGenshiGenerator - - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") - def setUp(self): - pass - - def test_removecomment(self): - data = [(None, "test", 1), - (None, "test2", 2)] - stream = [(genshi.core.COMMENT, "test", 0), - data[0], - (genshi.core.COMMENT, "test3", 0), - data[1]] - self.assertItemsEqual(list(removecomment(stream)), data) - - def test__init(self): - TestCfgGenerator.test__init(self) - cgg = self.get_obj() - self.assertIsInstance(cgg.loader, cgg.__loader_cls__) - - def test_get_data(self): - cgg = self.get_obj() - cgg._handle_genshi_exception = Mock() - cgg.template = Mock() - fltr = Mock() - cgg.template.generate.return_value = fltr - stream = Mock() - fltr.filter.return_value = stream - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() - - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock() - - def reset(): - cgg.template.reset_mock() - cgg._handle_genshi_exception.reset_mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock() - - template_vars = dict( - name=entry.get("name"), - metadata=metadata, - path=cgg.name, - source_path=cgg.name, - repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value) - - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - def render(fmt, **kwargs): - stream.render.side_effect = None - raise TypeError - stream.render.side_effect = render - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - self.assertEqual(stream.render.call_args_list, - [call("text", encoding=cgg.encoding, - strip_whitespace=False), - call("text", encoding=cgg.encoding)]) - - reset() - stream.render.side_effect = UndefinedError("test") - self.assertRaises(UndefinedError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - stream.render.side_effect = ValueError - cgg._handle_genshi_exception.side_effect = ValueError - self.assertRaises(ValueError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - self.assertTrue(cgg._handle_genshi_exception.called) - - def test_handle_event(self): - cgg = self.get_obj() - cgg.loader = Mock() - event = Mock() - cgg.handle_event(event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) - - cgg.loader.reset_mock() - cgg.loader.load.side_effect = OSError - self.assertRaises(PluginExecutionError, - cgg.handle_event, event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) +class TestCfgGenshiGenerator(TestCfgGenerator): + test_obj = CfgGenshiGenerator + + def setUp(self): + TestCfgGenerator.setUp(self) + set_setup_default("repository", datastore) + + def test__init(self): + TestCfgGenerator.test__init(self) + cgg = self.get_obj() + self.assertIsInstance(cgg.loader, cgg.__loader_cls__) + + def test_get_data(self): + cgg = self.get_obj() + cgg._handle_genshi_exception = Mock() + cgg.template = Mock() + fltr = Mock() + cgg.template.generate.return_value = fltr + stream = Mock() + fltr.filter.return_value = stream + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + def reset(): + cgg.template.reset_mock() + cgg._handle_genshi_exception.reset_mock() + + template_vars = dict( + name=entry.get("name"), + metadata=metadata, + path=cgg.name, + source_path=cgg.name, + repo=datastore) + + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with( + "text", + encoding=Bcfg2.Options.setup.encoding, + strip_whitespace=False) + + reset() + def render(fmt, **kwargs): + stream.render.side_effect = None + raise TypeError + stream.render.side_effect = render + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + fltr.filter.assert_called_with(removecomment) + self.assertEqual(stream.render.call_args_list, + [call("text", + encoding=Bcfg2.Options.setup.encoding, + strip_whitespace=False), + call("text", + encoding=Bcfg2.Options.setup.encoding)]) + + reset() + stream.render.side_effect = UndefinedError("test") + self.assertRaises(UndefinedError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", + encoding=Bcfg2.Options.setup.encoding, + strip_whitespace=False) + + reset() + stream.render.side_effect = ValueError + cgg._handle_genshi_exception.side_effect = ValueError + self.assertRaises(ValueError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", + encoding=Bcfg2.Options.setup.encoding, + strip_whitespace=False) + self.assertTrue(cgg._handle_genshi_exception.called) + + def test_handle_event(self): + cgg = self.get_obj() + cgg.loader = Mock() + event = Mock() + cgg.handle_event(event) + cgg.loader.load.assert_called_with( + cgg.name, + cls=NewTextTemplate, + encoding=Bcfg2.Options.setup.encoding) + + cgg.loader.reset_mock() + cgg.loader.load.side_effect = OSError + self.assertRaises(PluginExecutionError, + cgg.handle_event, event) + cgg.loader.load.assert_called_with( + cgg.name, + cls=NewTextTemplate, + encoding=Bcfg2.Options.setup.encoding) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py index 839e9c3b8..349da2213 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py @@ -21,53 +21,26 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgInfo class TestCfgInfoXML(TestCfgInfo): test_obj = CfgInfoXML + def setUp(self): + TestCfgInfo.setUp(self) + set_setup_default("filemonitor", MagicMock()) + def test__init(self): TestCfgInfo.test__init(self) ci = self.get_obj() self.assertIsInstance(ci.infoxml, InfoXML) def test_bind_info_to_entry(self): - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() ci = self.get_obj() ci.infoxml = Mock() - ci._set_info = Mock() - - self.assertRaises(PluginExecutionError, - ci.bind_info_to_entry, entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, dict(), - entry=entry) - self.assertFalse(ci._set_info.called) - - ci.infoxml.reset_mock() - ci._set_info.reset_mock() - mdata_value = Mock() - def set_mdata(metadata, mdata, entry=None): - mdata['Info'] = {None: mdata_value} + entry = Mock() + metadata = Mock() - ci.infoxml.pnode.Match.side_effect = set_mdata ci.bind_info_to_entry(entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, - dict(Info={None: mdata_value}), - entry=entry) - ci._set_info.assert_called_with(entry, mdata_value) + ci.infoxml.BindEntry.assert_called_with(entry, metadata) def test_handle_event(self): ci = self.get_obj() ci.infoxml = Mock() ci.handle_event(Mock) ci.infoxml.HandleEvent.assert_called_with() - - def test__set_info(self): - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info") - def inner(mock_set_info): - ci = self.get_obj() - entry = Mock() - info = {"foo": "foo", - "__children__": ["one", "two"]} - ci._set_info(entry, info) - self.assertItemsEqual(entry.append.call_args_list, - [call(c) for c in info['__children__']]) - - inner() - TestCfgInfo.test__set_info(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index dc4b11241..2967a23b6 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -7,7 +7,7 @@ from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import * from Bcfg2.Server.Plugin import PluginExecutionError import Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator try: - from Bcfg2.Encryption import EVPError + from Bcfg2.Server.Encryption import EVPError HAS_CRYPTO = True except: HAS_CRYPTO = False @@ -22,99 +22,35 @@ while path != "/": break path = os.path.dirname(path) from common import * -from TestServer.TestPlugins.TestCfg.Test_init import TestCfgCreator -from TestServer.TestPlugin.Testhelpers import TestStructFile +from TestServer.TestPlugins.TestCfg.Test_init import TestXMLCfgCreator -class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): +class TestCfgPrivateKeyCreator(TestXMLCfgCreator): test_obj = CfgPrivateKeyCreator should_monitor = False def get_obj(self, name=None, fam=None): - return TestCfgCreator.get_obj(self, name=name) - - @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event") - @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent") - def test_handle_event(self, mock_HandleEvent, mock_handle_event): - pkc = self.get_obj() - evt = Mock() - pkc.handle_event(evt) - mock_HandleEvent.assert_called_with(pkc, evt) - mock_handle_event.assert_called_with(pkc, evt) - - def test_category(self): - pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp - - self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") - - cfp.reset_mock() - cfp.has_section.return_value = True - self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(pkc.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - def test_passphrase(self, mock_get_passphrases): - pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp - - self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") - - cfp.reset_mock() - cfp.has_section.return_value = True - self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") - - cfp.reset_mock() - cfp.get.return_value = "test" - mock_get_passphrases.return_value = dict(test="foo", test2="bar") - cfp.has_option.return_value = True - self.assertEqual(pkc.passphrase, "foo") - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") - cfp.get.assert_called_with("sshkeys", "passphrase") - mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) + return TestXMLCfgCreator.get_obj(self, name=name) @patch("shutil.rmtree") @patch("tempfile.mkdtemp") - @patch("subprocess.Popen") - def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree): + def test__gen_keypair(self, mock_mkdtemp, mock_rmtree): pkc = self.get_obj() + pkc.cmd = Mock() pkc.XMLMatch = Mock() mock_mkdtemp.return_value = datastore metadata = Mock() - proc = Mock() - proc.wait.return_value = 0 - proc.communicate.return_value = MagicMock() - mock_Popen.return_value = proc + exc = Mock() + exc.success = True + pkc.cmd.run.return_value = exc spec = lxml.etree.Element("PrivateKey") pkc.XMLMatch.return_value = spec def reset(): pkc.XMLMatch.reset_mock() - mock_Popen.reset_mock() + pkc.cmd.reset_mock() mock_mkdtemp.reset_mock() mock_rmtree.reset_mock() @@ -122,10 +58,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "rsa", "-N", ""]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "rsa", "-N", ""]) reset() lxml.etree.SubElement(spec, "Params", bits="768", type="dsa") @@ -136,73 +71,15 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "dsa", "-b", "768", "-N", "foo"]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "dsa", "-b", "768", "-N", "foo"]) reset() - proc.wait.return_value = 1 + pkc.cmd.run.return_value.success = False self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata) mock_rmtree.assert_called_with(datastore) - def test_get_specificity(self): - pkc = self.get_obj() - pkc.XMLMatch = Mock() - - metadata = Mock() - - def reset(): - pkc.XMLMatch.reset_mock() - metadata.group_in_category.reset_mock() - - category = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.category" - @patch(category, None) - def inner(): - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(host=metadata.hostname)) - inner() - - @patch(category, "foo") - def inner2(): - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(group=metadata.group_in_category.return_value, - prio=50)) - metadata.group_in_category.assert_called_with("foo") - - reset() - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey", - perhost="true") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(host=metadata.hostname)) - - reset() - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey", - category="bar") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(group=metadata.group_in_category.return_value, - prio=50)) - metadata.group_in_category.assert_called_with("bar") - - reset() - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey", - prio="10") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(group=metadata.group_in_category.return_value, - prio=10)) - metadata.group_in_category.assert_called_with("foo") - - reset() - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey") - metadata.group_in_category.return_value = '' - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(host=metadata.hostname)) - metadata.group_in_category.assert_called_with("foo") - - inner2() - @patch("shutil.rmtree") @patch("%s.open" % builtins) def test_create_data(self, mock_open, mock_rmtree): @@ -215,7 +92,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): # the get_specificity() return value is being used # appropriately, we put some dummy data in it and test for # that data - pkc.get_specificity.side_effect = lambda m, s: dict(group="foo") + pkc.get_specificity.side_effect = lambda m: dict(group="foo") pkc._gen_keypair = Mock() privkey = os.path.join(datastore, "privkey") pkc._gen_keypair.return_value = privkey @@ -241,197 +118,32 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): mock_open.return_value.read.side_effect = open_read_rv reset() - passphrase = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.passphrase" - - @patch(passphrase, None) - def inner(): - self.assertEqual(pkc.create_data(entry, metadata), "privatekey") - pkc.XMLMatch.assert_called_with(metadata) - pkc.get_specificity.assert_called_with(metadata, - pkc.XMLMatch.return_value) - pkc._gen_keypair.assert_called_with(metadata, - pkc.XMLMatch.return_value) - self.assertItemsEqual(mock_open.call_args_list, - [call(privkey + ".pub"), call(privkey)]) - pkc.pubkey_creator.get_filename.assert_called_with(group="foo") - pkc.pubkey_creator.write_data.assert_called_with( - "ssh-rsa publickey pubkey.filename\n", group="foo") - pkc.write_data.assert_called_with("privatekey", group="foo") - mock_rmtree.assert_called_with(datastore) - - reset() - self.assertEqual(pkc.create_data(entry, metadata, return_pair=True), - ("ssh-rsa publickey pubkey.filename\n", - "privatekey")) - pkc.XMLMatch.assert_called_with(metadata) - pkc.get_specificity.assert_called_with(metadata, - pkc.XMLMatch.return_value) - pkc._gen_keypair.assert_called_with(metadata, - pkc.XMLMatch.return_value) - self.assertItemsEqual(mock_open.call_args_list, - [call(privkey + ".pub"), call(privkey)]) - pkc.pubkey_creator.get_filename.assert_called_with(group="foo") - pkc.pubkey_creator.write_data.assert_called_with( - "ssh-rsa publickey pubkey.filename\n", - group="foo") - pkc.write_data.assert_called_with("privatekey", group="foo") - mock_rmtree.assert_called_with(datastore) - - inner() - - if HAS_CRYPTO: - @patch(passphrase, "foo") - @patch("Bcfg2.Encryption.ssl_encrypt") - @patch("Bcfg2.Encryption.get_algorithm") - def inner2(mock_get_algorithm, mock_ssl_encrypt): - reset() - mock_ssl_encrypt.return_value = "encryptedprivatekey" - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True - self.assertEqual(pkc.create_data(entry, metadata), - "encryptedprivatekey") - pkc.XMLMatch.assert_called_with(metadata) - pkc.get_specificity.assert_called_with( - metadata, - pkc.XMLMatch.return_value) - pkc._gen_keypair.assert_called_with(metadata, - pkc.XMLMatch.return_value) - self.assertItemsEqual(mock_open.call_args_list, - [call(privkey + ".pub"), call(privkey)]) - pkc.pubkey_creator.get_filename.assert_called_with(group="foo") - pkc.pubkey_creator.write_data.assert_called_with( - "ssh-rsa publickey pubkey.filename\n", group="foo") - pkc.write_data.assert_called_with("encryptedprivatekey", - group="foo", ext=".crypt") - mock_ssl_encrypt.assert_called_with( - "privatekey", "foo", - algorithm=mock_get_algorithm.return_value) - mock_rmtree.assert_called_with(datastore) - - inner2() - - def test_Index(self): - has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False - TestStructFile.test_Index(self) - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict" - - pkc = self.get_obj() - pkc._decrypt = Mock() - pkc._decrypt.return_value = 'plaintext' - pkc.data = ''' -<PrivateKey> - <Group name="test"> - <Passphrase encrypted="foo">crypted</Passphrase> - </Group> - <Group name="test" negate="true"> - <Passphrase>plain</Passphrase> - </Group> -</PrivateKey>''' - - # test successful decryption - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - for el in pkc.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pkc._decrypt.return_value) - - # test failed decryption, strict - pkc._decrypt.reset_mock() - pkc._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pkc.Index) - - # test failed decryption, lax - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax" - pkc._decrypt.reset_mock() - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): - pkc = self.get_obj() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) + self.assertEqual(pkc.create_data(entry, metadata), "privatekey") + pkc.XMLMatch.assert_called_with(metadata) + pkc.get_specificity.assert_called_with(metadata) + pkc._gen_keypair.assert_called_with(metadata, + pkc.XMLMatch.return_value) + self.assertItemsEqual(mock_open.call_args_list, + [call(privkey + ".pub"), call(privkey)]) + pkc.pubkey_creator.get_filename.assert_called_with(group="foo") + pkc.pubkey_creator.write_data.assert_called_with( + "ssh-rsa publickey pubkey.filename\n", group="foo") + pkc.write_data.assert_called_with("privatekey", group="foo") + mock_rmtree.assert_called_with(datastore) - # test failure to decrypt element without valid passphrase reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) + self.assertEqual(pkc.create_data(entry, metadata, return_pair=True), + ("ssh-rsa publickey pubkey.filename\n", + "privatekey")) + pkc.XMLMatch.assert_called_with(metadata) + pkc.get_specificity.assert_called_with(metadata) + pkc._gen_keypair.assert_called_with(metadata, + pkc.XMLMatch.return_value) + self.assertItemsEqual(mock_open.call_args_list, + [call(privkey + ".pub"), call(privkey)]) + pkc.pubkey_creator.get_filename.assert_called_with(group="foo") + pkc.pubkey_creator.write_data.assert_called_with( + "ssh-rsa publickey pubkey.filename\n", + group="foo") + pkc.write_data.assert_called_with("privatekey", group="foo") + mock_rmtree.assert_called_with(datastore) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index ea3549c1b..307461918 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -3,7 +3,7 @@ import sys import errno import lxml.etree import Bcfg2.Options -from Bcfg2.Compat import walk_packages +from Bcfg2.Compat import walk_packages, ConfigParser from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugins.Cfg import * from Bcfg2.Server.Plugin import PluginExecutionError, Specificity @@ -19,7 +19,7 @@ while path != "/": path = os.path.dirname(path) from common import * from TestPlugin import TestSpecificData, TestEntrySet, TestGroupSpool, \ - TestPullTarget + TestPullTarget, TestStructFile class TestCfgBaseFileMatcher(TestSpecificData): @@ -152,28 +152,13 @@ class TestCfgInfo(TestCfgBaseFileMatcher): @patch("Bcfg2.Server.Plugins.Cfg.CfgBaseFileMatcher.__init__") def test__init(self, mock__init): ci = self.get_obj("test.txt") - mock__init.assert_called_with(ci, "test.txt", None, None) + mock__init.assert_called_with(ci, "test.txt", None) def test_bind_info_to_entry(self): ci = self.get_obj() self.assertRaises(NotImplementedError, ci.bind_info_to_entry, Mock(), Mock()) - def test__set_info(self): - ci = self.get_obj() - entry = Mock() - entry.attrib = dict() - - info = {"foo": "foo", - "_bar": "bar", - "bar:baz=quux": "quux", - "baz__": "baz", - "__quux": "quux"} - ci._set_info(entry, info) - self.assertItemsEqual(entry.attrib, - dict([(k, v) for k, v in info.items() - if not k.startswith("__")])) - class TestCfgVerifier(TestCfgBaseFileMatcher): test_obj = CfgVerifier @@ -187,6 +172,12 @@ class TestCfgVerifier(TestCfgBaseFileMatcher): class TestCfgCreator(TestCfgBaseFileMatcher): test_obj = CfgCreator path = "/foo/bar/test.txt" + should_monitor = False + + def setUp(self): + TestCfgBaseFileMatcher.setUp(self) + set_setup_default("filemonitor", MagicMock()) + set_setup_default("cfg_passphrase", None) def get_obj(self, name=None): if name is None: @@ -256,62 +247,122 @@ class TestCfgCreator(TestCfgBaseFileMatcher): self.assertRaises(CfgCreationError, cc.write_data, data) +class TestXMLCfgCreator(TestCfgCreator, TestStructFile): + test_obj = XMLCfgCreator + + def setUp(self): + TestCfgCreator.setUp(self) + TestStructFile.setUp(self) + + @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event") + @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent") + def test_handle_event(self, mock_HandleEvent, mock_handle_event): + cc = self.get_obj() + evt = Mock() + cc.handle_event(evt) + mock_HandleEvent.assert_called_with(cc, evt) + mock_handle_event.assert_called_with(cc, evt) + + def test_get_specificity(self): + cc = self.get_obj() + metadata = Mock() + + def reset(): + metadata.group_in_category.reset_mock() + + category = "%s.%s.category" % (self.test_obj.__module__, + self.test_obj.__name__) + @patch(category, None) + def inner(): + cc.xdata = lxml.etree.Element("PrivateKey") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(host=metadata.hostname)) + inner() + + @patch(category, "foo") + def inner2(): + cc.xdata = lxml.etree.Element("PrivateKey") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(group=metadata.group_in_category.return_value, + prio=50)) + metadata.group_in_category.assert_called_with("foo") + + reset() + cc.xdata = lxml.etree.Element("PrivateKey", perhost="true") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(host=metadata.hostname)) + + reset() + cc.xdata = lxml.etree.Element("PrivateKey", category="bar") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(group=metadata.group_in_category.return_value, + prio=50)) + metadata.group_in_category.assert_called_with("bar") + + reset() + cc.xdata = lxml.etree.Element("PrivateKey", prio="10") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(group=metadata.group_in_category.return_value, + prio=10)) + metadata.group_in_category.assert_called_with("foo") + + reset() + cc.xdata = lxml.etree.Element("PrivateKey") + metadata.group_in_category.return_value = '' + self.assertItemsEqual(cc.get_specificity(metadata), + dict(host=metadata.hostname)) + metadata.group_in_category.assert_called_with("foo") + + inner2() + + class TestCfgDefaultInfo(TestCfgInfo): test_obj = CfgDefaultInfo - def get_obj(self, defaults=None): - if defaults is None: - defaults = dict() - return self.test_obj(defaults) + def get_obj(self, *_): + return self.test_obj() - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__") - def test__init(self, mock__init): - defaults = Mock() - cdi = self.get_obj(defaults=defaults) - mock__init.assert_called_with(cdi, '') - self.assertEqual(defaults, cdi.defaults) + def test__init(self): + pass def test_handle_event(self): # this CfgInfo handler doesn't handle any events -- it's not # file-driven, but based on the built-in defaults pass - def test_bind_info_to_entry(self): + @patch("Bcfg2.Server.Plugin.default_path_metadata") + def test_bind_info_to_entry(self, mock_default_path_metadata): cdi = self.get_obj() - cdi._set_info = Mock() - entry = Mock() + entry = lxml.etree.Element("Test", name="test") + mock_default_path_metadata.return_value = \ + dict(owner="root", mode="0600") cdi.bind_info_to_entry(entry, Mock()) - cdi._set_info.assert_called_with(entry, cdi.defaults) + self.assertItemsEqual(entry.attrib, + dict(owner="root", mode="0600", name="test")) class TestCfgEntrySet(TestEntrySet): test_obj = CfgEntrySet + def setUp(self): + TestEntrySet.setUp(self) + set_setup_default("cfg_validation", False) + set_setup_default("cfg_handlers", []) + def test__init(self): pass - def test_handlers(self): - # this is really really difficult to mock out, so we just get - # a list of handlers and make sure that it roughly matches - # what's on the filesystem - expected = [] - for submodule in walk_packages(path=Bcfg2.Server.Plugins.Cfg.__path__, - prefix="Bcfg2.Server.Plugins.Cfg."): - expected.append(submodule[1].rsplit('.', 1)[-1]) - self.assertItemsEqual(expected, [h.__name__ for h in handlers()]) - - @patch("Bcfg2.Server.Plugins.Cfg.handlers") - def test_handle_event(self, mock_handlers): + def test_handle_event(self): eset = self.get_obj() eset.entry_init = Mock() - mock_handlers.return_value = [Mock(), Mock(), Mock()] - for hdlr in mock_handlers.return_value: + Bcfg2.Options.setup.cfg_handlers = [Mock(), Mock(), Mock()] + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.__name__ = "handler" eset.entries = dict() def reset(): eset.entry_init.reset_mock() - for hdlr in mock_handlers.return_value: + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.reset_mock() # test that a bogus deleted event is discarded @@ -321,18 +372,19 @@ class TestCfgEntrySet(TestEntrySet): eset.handle_event(evt) self.assertFalse(eset.entry_init.called) self.assertItemsEqual(eset.entries, dict()) - for hdlr in mock_handlers.return_value: + for hdlr in Bcfg2.Options.setup.cfg_handlers: self.assertFalse(hdlr.handles.called) self.assertFalse(hdlr.ignore.called) # test creation of a new file for action in ["exists", "created", "changed"]: + print("Testing handling of %s events" % action) evt = Mock() evt.code2str.return_value = action evt.filename = os.path.join(datastore, "test.txt") # test with no handler that handles - for hdlr in mock_handlers.return_value: + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.handles.return_value = False hdlr.ignore.return_value = False @@ -340,16 +392,16 @@ class TestCfgEntrySet(TestEntrySet): eset.handle_event(evt) self.assertFalse(eset.entry_init.called) self.assertItemsEqual(eset.entries, dict()) - for hdlr in mock_handlers.return_value: + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.handles.assert_called_with(evt, basename=eset.path) hdlr.ignore.assert_called_with(evt, basename=eset.path) # test with a handler that handles the entry reset() - mock_handlers.return_value[-1].handles.return_value = True + Bcfg2.Options.setup.cfg_handlers[-1].handles.return_value = True eset.handle_event(evt) - eset.entry_init.assert_called_with(evt, mock_handlers.return_value[-1]) - for hdlr in mock_handlers.return_value: + eset.entry_init.assert_called_with(evt, Bcfg2.Options.setup.cfg_handlers[-1]) + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.handles.assert_called_with(evt, basename=eset.path) if not hdlr.return_value: hdlr.ignore.assert_called_with(evt, basename=eset.path) @@ -357,14 +409,14 @@ class TestCfgEntrySet(TestEntrySet): # test with a handler that ignores the entry before one # that handles it reset() - mock_handlers.return_value[0].ignore.return_value = True + Bcfg2.Options.setup.cfg_handlers[0].ignore.return_value = True eset.handle_event(evt) self.assertFalse(eset.entry_init.called) - mock_handlers.return_value[0].handles.assert_called_with(evt, - basename=eset.path) - mock_handlers.return_value[0].ignore.assert_called_with(evt, - basename=eset.path) - for hdlr in mock_handlers.return_value[1:]: + Bcfg2.Options.setup.cfg_handlers[0].handles.assert_called_with( + evt, basename=eset.path) + Bcfg2.Options.setup.cfg_handlers[0].ignore.assert_called_with( + evt, basename=eset.path) + for hdlr in Bcfg2.Options.setup.cfg_handlers[1:]: self.assertFalse(hdlr.handles.called) self.assertFalse(hdlr.ignore.called) @@ -376,7 +428,7 @@ class TestCfgEntrySet(TestEntrySet): eset.entries[evt.filename] = Mock() eset.handle_event(evt) self.assertFalse(eset.entry_init.called) - for hdlr in mock_handlers.return_value: + for hdlr in Bcfg2.Options.setup.cfg_handlers: self.assertFalse(hdlr.handles.called) self.assertFalse(hdlr.ignore.called) eset.entries[evt.filename].handle_event.assert_called_with(evt) @@ -386,7 +438,7 @@ class TestCfgEntrySet(TestEntrySet): evt.code2str.return_value = "deleted" eset.handle_event(evt) self.assertFalse(eset.entry_init.called) - for hdlr in mock_handlers.return_value: + for hdlr in Bcfg2.Options.setup.cfg_handlers: self.assertFalse(hdlr.handles.called) self.assertFalse(hdlr.ignore.called) self.assertItemsEqual(eset.entries, dict()) @@ -438,15 +490,15 @@ class TestCfgEntrySet(TestEntrySet): @patch("Bcfg2.Server.Plugins.Cfg.u_str") @patch("Bcfg2.Server.Plugins.Cfg.b64encode") def test_bind_entry(self, mock_b64encode, mock_u_str): - Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False) - mock_u_str.side_effect = lambda x: x + Bcfg2.Options.setup.cfg_validation = False eset = self.get_obj() eset.bind_info_to_entry = Mock() eset._generate_data = Mock() eset.get_handlers = Mock() eset._validate_data = Mock() + eset.setup = dict(validate=False) def reset(): mock_b64encode.reset_mock() @@ -524,7 +576,7 @@ class TestCfgEntrySet(TestEntrySet): # test successful validation entry = reset() - Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True + Bcfg2.Options.setup.cfg_validation = True bound = eset.bind_entry(entry, metadata) eset.bind_info_to_entry.assert_called_with(entry, metadata) eset._generate_data.assert_called_with(entry, metadata) @@ -546,16 +598,16 @@ class TestCfgEntrySet(TestEntrySet): def test_get_handlers(self): eset = self.get_obj() eset.entries['test1.txt'] = CfgInfo("test1.txt") - eset.entries['test2.txt'] = CfgGenerator("test2.txt", Mock(), None) + eset.entries['test2.txt'] = CfgGenerator("test2.txt", Mock()) eset.entries['test2.txt'].specific.matches.return_value = True eset.entries['test3.txt'] = CfgInfo("test3.txt") - eset.entries['test4.txt'] = CfgGenerator("test4.txt", Mock(), None) + eset.entries['test4.txt'] = CfgGenerator("test4.txt", Mock()) eset.entries['test4.txt'].specific.matches.return_value = False - eset.entries['test5.txt'] = CfgGenerator("test5.txt", Mock(), None) + eset.entries['test5.txt'] = CfgGenerator("test5.txt", Mock()) eset.entries['test5.txt'].specific.matches.return_value = True - eset.entries['test6.txt'] = CfgVerifier("test6.txt", Mock(), None) + eset.entries['test6.txt'] = CfgVerifier("test6.txt", Mock()) eset.entries['test6.txt'].specific.matches.return_value = True - eset.entries['test7.txt'] = CfgFilter("test7.txt", Mock(), None) + eset.entries['test7.txt'] = CfgFilter("test7.txt", Mock()) eset.entries['test7.txt'].specific.matches.return_value = False def reset(): @@ -601,24 +653,24 @@ class TestCfgEntrySet(TestEntrySet): if entry.specific is not None: self.assertFalse(entry.specific.matches.called) - def test_bind_info_to_entry(self): - default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO + @patch("Bcfg2.Server.Plugins.Cfg.CfgDefaultInfo") + def test_bind_info_to_entry(self, mock_DefaultInfo): eset = self.get_obj() eset.get_handlers = Mock() eset.get_handlers.return_value = [] - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock() metadata = Mock() def reset(): eset.get_handlers.reset_mock() - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock() + mock_DefaultInfo.reset_mock() return lxml.etree.Element("Path", name="/test.txt") # test with no info handlers entry = reset() eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) self.assertEqual(entry.get("type"), "file") # test with one info handler @@ -627,7 +679,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = [handler] eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) handler.bind_info_to_entry.assert_called_with(entry, metadata) self.assertEqual(entry.get("type"), "file") @@ -637,7 +690,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = handlers eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) # we don't care which handler gets called as long as exactly # one of them does called = 0 @@ -648,8 +702,6 @@ class TestCfgEntrySet(TestEntrySet): self.assertEqual(called, 1) self.assertEqual(entry.get("type"), "file") - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info - def test_create_data(self): eset = self.get_obj() eset.best_matching = Mock() @@ -751,35 +803,16 @@ class TestCfgEntrySet(TestEntrySet): class TestCfg(TestGroupSpool, TestPullTarget): test_obj = Cfg + def setUp(self): + TestGroupSpool.setUp(self) + TestPullTarget.setUp(self) + set_setup_default("cfg_handlers", []) + def get_obj(self, core=None): if core is None: core = Mock() - core.setup = MagicMock() return TestGroupSpool.get_obj(self, core=core) - @patch("Bcfg2.Server.Plugin.GroupSpool.__init__") - @patch("Bcfg2.Server.Plugin.PullTarget.__init__") - def test__init(self, mock_pulltarget_init, mock_groupspool_init): - core = Mock() - core.setup = MagicMock() - cfg = self.test_obj(core, datastore) - mock_pulltarget_init.assert_called_with(cfg) - mock_groupspool_init.assert_called_with(cfg, core, datastore) - core.setup.add_option.assert_called_with("validate", - Bcfg2.Options.CFG_VALIDATION) - core.setup.reparse.assert_called_with() - - core.reset_mock() - core.setup.reset_mock() - mock_pulltarget_init.reset_mock() - mock_groupspool_init.reset_mock() - core.setup.__contains__.return_value = True - cfg = self.test_obj(core, datastore) - mock_pulltarget_init.assert_called_with(cfg) - mock_groupspool_init.assert_called_with(cfg, core, datastore) - self.assertFalse(core.setup.add_option.called) - self.assertFalse(core.setup.reparse.called) - def test_has_generator(self): cfg = self.get_obj() cfg.entries = dict() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py new file mode 100644 index 000000000..537ceb4ff --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py @@ -0,0 +1,60 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Decisions import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestDecision + + +class TestDecisionFile(TestStructFile): + test_obj = DecisionFile + + def test_get_decisions(self): + df = self.get_obj() + metadata = Mock() + + df.xdata = None + self.assertItemsEqual(df.get_decisions(metadata), []) + + df.xdata = lxml.etree.Element("Decisions") + df.XMLMatch = Mock() + df.XMLMatch.return_value = lxml.etree.Element("Decisions") + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Service", name='*') + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Path", + name='/etc/apt/apt.conf') + + self.assertItemsEqual(df.get_decisions(metadata), + [("Service", '*'), + ("Path", '/etc/apt/apt.conf')]) + df.XMLMatch.assert_called_with(metadata) + + +class TestDecisions(TestPlugin, TestDecision): + test_obj = Decisions + + def test_GetDecisions(self): + d = self.get_obj() + d.whitelist = Mock() + d.blacklist = Mock() + metadata = Mock() + + self.assertEqual(d.GetDecisions(metadata, "whitelist"), + d.whitelist.get_decision.return_value) + d.whitelist.get_decision.assert_called_with(metadata) + + self.assertEqual(d.GetDecisions(metadata, "blacklist"), + d.blacklist.get_decision.return_value) + d.blacklist.get_decision.assert_called_with(metadata) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py index 7be3d8e84..9b4a6af88 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py @@ -1,5 +1,6 @@ import os import sys +import copy import lxml.etree from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugins.Defaults import * @@ -62,3 +63,31 @@ class TestDefaults(TestRules, TestGoalValidator): def test__regex_enabled(self): r = self.get_obj() self.assertTrue(r._regex_enabled) + + def _do_test(self, name, groups=None): + if groups is None: + groups = [] + d = self.get_obj() + metadata = Mock(groups=groups) + config = lxml.etree.Element("Configuration") + struct = lxml.etree.SubElement(config, "Bundle", name=name) + entry = copy.deepcopy(self.abstract[name]) + struct.append(entry) + d.validate_goals(metadata, config) + self.assertXMLEqual(entry, self.concrete[name]) + + def _do_test_failure(self, name, groups=None, handles=None): + if groups is None: + groups = [] + d = self.get_obj() + metadata = Mock(groups=groups) + config = lxml.etree.Element("Configuration") + struct = lxml.etree.SubElement(config, "Bundle", name=name) + orig = copy.deepcopy(self.abstract[name]) + entry = copy.deepcopy(self.abstract[name]) + struct.append(entry) + d.validate_goals(metadata, config) + self.assertXMLEqual(entry, orig) + + def test_regex(self): + self._do_test('regex') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index e839914d7..290edb83a 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -6,7 +6,6 @@ import socket import lxml.etree import Bcfg2.Server import Bcfg2.Server.Plugin -from Bcfg2.Server.Plugins.Metadata import * from mock import Mock, MagicMock, patch # add all parent testsuite directories to sys.path to allow (most) @@ -19,9 +18,13 @@ while path != "/": break path = os.path.dirname(path) from common import * +from Bcfg2.Server.Plugins.Metadata import load_django_models from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \ TestClientRunHooks, TestDatabaseBacked +load_django_models() +from Bcfg2.Server.Plugins.Metadata import * + def get_clients_test_tree(): return lxml.etree.XML(''' @@ -88,18 +91,17 @@ def get_groups_test_tree(): </Groups>''').getroottree() -def get_metadata_object(core=None, watch_clients=False, use_db=False): +def get_metadata_object(core=None): if core is None: core = Mock() - core.setup = MagicMock() core.metadata_cache = MagicMock() - core.setup.cfp.getboolean = Mock(return_value=use_db) + set_setup_default("password") @patchIf(not isinstance(os.makedirs, Mock), "os.makedirs", Mock()) @patchIf(not isinstance(lxml.etree.Element, Mock), "lxml.etree.Element", Mock()) def inner(): - return Metadata(core, datastore, watch_clients=watch_clients) + return Metadata(core, datastore) return inner() @@ -108,117 +110,117 @@ class TestMetadataDB(DBModelTestCase): models = [MetadataClientModel] -if HAS_DJANGO or can_skip: - class TestClientVersions(TestDatabaseBacked): - test_clients = dict(client1="1.2.0", - client2="1.2.2", - client3="1.3.0pre1", - client4="1.1.0", - client5=None, - client6=None) - - @skipUnless(HAS_DJANGO, "Django not found") - def setUp(self): - self.test_obj = ClientVersions - syncdb(TestMetadataDB) - for client, version in self.test_clients.items(): - MetadataClientModel(hostname=client, version=version).save() - - def test__contains(self): - v = self.get_obj() - self.assertIn("client1", v) - self.assertIn("client5", v) - self.assertNotIn("client__contains", v) - - def test_keys(self): - v = self.get_obj() - self.assertItemsEqual(self.test_clients.keys(), v.keys()) - - def test__setitem(self): - v = self.get_obj() - - # test setting version of existing client - v["client1"] = "1.2.3" - self.assertIn("client1", v) - self.assertEqual(v['client1'], "1.2.3") - client = MetadataClientModel.objects.get(hostname="client1") - self.assertEqual(client.version, "1.2.3") - - # test adding new client - new = "client__setitem" - v[new] = "1.3.0" - self.assertIn(new, v) - self.assertEqual(v[new], "1.3.0") - client = MetadataClientModel.objects.get(hostname=new) - self.assertEqual(client.version, "1.3.0") - - # test adding new client with no version - new2 = "client__setitem_2" - v[new2] = None - self.assertIn(new2, v) - self.assertEqual(v[new2], None) - client = MetadataClientModel.objects.get(hostname=new2) - self.assertEqual(client.version, None) - - def test__getitem(self): - v = self.get_obj() - - # test getting existing client - self.assertEqual(v['client2'], "1.2.2") - self.assertIsNone(v['client5']) - - # test exception on nonexistent client - expected = KeyError - try: - v['clients__getitem'] - except expected: - pass - except: - err = sys.exc_info()[1] - self.assertFalse(True, "%s raised instead of %s" % - (err.__class__.__name__, - expected.__class__.__name__)) - else: - self.assertFalse(True, - "%s not raised" % expected.__class__.__name__) +class TestClientVersions(TestDatabaseBacked): + test_clients = dict(client1="1.2.0", + client2="1.2.2", + client3="1.3.0pre1", + client4="1.1.0", + client5=None, + client6=None) - def test__len(self): - v = self.get_obj() - self.assertEqual(len(v), MetadataClientModel.objects.count()) + @skipUnless(HAS_DJANGO, "Django not found") + def setUp(self): + TestDatabaseBacked.setUp(self) + self.test_obj = ClientVersions + syncdb(TestMetadataDB) + for client, version in self.test_clients.items(): + MetadataClientModel(hostname=client, version=version).save() + + def test__contains(self): + v = self.get_obj() + self.assertIn("client1", v) + self.assertIn("client5", v) + self.assertNotIn("client__contains", v) + + def test_keys(self): + v = self.get_obj() + self.assertItemsEqual(self.test_clients.keys(), v.keys()) + + def test__setitem(self): + v = self.get_obj() + + # test setting version of existing client + v["client1"] = "1.2.3" + self.assertIn("client1", v) + self.assertEqual(v['client1'], "1.2.3") + client = MetadataClientModel.objects.get(hostname="client1") + self.assertEqual(client.version, "1.2.3") + + # test adding new client + new = "client__setitem" + v[new] = "1.3.0" + self.assertIn(new, v) + self.assertEqual(v[new], "1.3.0") + client = MetadataClientModel.objects.get(hostname=new) + self.assertEqual(client.version, "1.3.0") + + # test adding new client with no version + new2 = "client__setitem_2" + v[new2] = None + self.assertIn(new2, v) + self.assertEqual(v[new2], None) + client = MetadataClientModel.objects.get(hostname=new2) + self.assertEqual(client.version, None) + + def test__getitem(self): + v = self.get_obj() + + # test getting existing client + self.assertEqual(v['client2'], "1.2.2") + self.assertIsNone(v['client5']) + + # test exception on nonexistent client + expected = KeyError + try: + v['clients__getitem'] + except expected: + pass + except: + err = sys.exc_info()[1] + self.assertFalse(True, "%s raised instead of %s" % + (err.__class__.__name__, + expected.__class__.__name__)) + else: + self.assertFalse(True, + "%s not raised" % expected.__class__.__name__) + + def test__len(self): + v = self.get_obj() + self.assertEqual(len(v), MetadataClientModel.objects.count()) - def test__iter(self): - v = self.get_obj() - self.assertItemsEqual([h for h in iter(v)], v.keys()) + def test__iter(self): + v = self.get_obj() + self.assertItemsEqual([h for h in iter(v)], v.keys()) - def test__delitem(self): - v = self.get_obj() + def test__delitem(self): + v = self.get_obj() - # test adding new client - new = "client__delitem" - v[new] = "1.3.0" + # test adding new client + new = "client__delitem" + v[new] = "1.3.0" - del v[new] - self.assertIn(new, v) - self.assertIsNone(v[new]) + del v[new] + self.assertIn(new, v) + self.assertIsNone(v[new]) class TestXMLMetadataConfig(TestXMLFileBacked): test_obj = XMLMetadataConfig path = os.path.join(datastore, 'Metadata', 'clients.xml') - def get_obj(self, basefile="clients.xml", core=None, watch_clients=False): - self.metadata = get_metadata_object(core=core, - watch_clients=watch_clients) + def get_obj(self, basefile="clients.xml", core=None): + self.metadata = get_metadata_object(core=core) @patchIf(not isinstance(lxml.etree.Element, Mock), "lxml.etree.Element", Mock()) def inner(): - return XMLMetadataConfig(self.metadata, watch_clients, basefile) + return XMLMetadataConfig(self.metadata, basefile) return inner() + @patch("Bcfg2.Server.FileMonitor.get_fam", Mock()) def test__init(self): xmc = self.get_obj() - self.assertEqual(self.metadata.core.fam, xmc.fam) - self.assertFalse(xmc.fam.AddMonitor.called) + self.assertNotIn(call(xmc.basefile), + xmc.fam.AddMonitor.call_args_list) def test_xdata(self): config = self.get_obj() @@ -262,20 +264,15 @@ class TestXMLMetadataConfig(TestXMLFileBacked): self.assertEqual(config.base_xdata, "<test/>") def test_add_monitor(self): - core = MagicMock() - config = self.get_obj(core=core) + config = self.get_obj() + config.fam = Mock() fname = "test.xml" fpath = os.path.join(self.metadata.data, fname) config.extras = [] config.add_monitor(fpath) - self.assertFalse(core.fam.AddMonitor.called) - self.assertEqual(config.extras, [fpath]) - - config = self.get_obj(core=core, watch_clients=True) - config.add_monitor(fpath) - core.fam.AddMonitor.assert_called_with(fpath, config.metadata) + config.fam.AddMonitor.assert_called_with(fpath, config.metadata) self.assertItemsEqual(config.extras, [fpath]) def test_Index(self): @@ -475,11 +472,16 @@ class TestClientMetadata(Bcfg2TestCase): class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): test_obj = Metadata - use_db = False - def get_obj(self, core=None, watch_clients=False): - return get_metadata_object(core=core, watch_clients=watch_clients, - use_db=self.use_db) + def setUp(self): + _TestMetadata.setUp(self) + TestClientRunHooks.setUp(self) + TestDatabaseBacked.setUp(self) + Bcfg2.Options.setup.metadata_db = False + Bcfg2.Options.setup.authentication = "cert+password" + + def get_obj(self, core=None): + return get_metadata_object(core=core) @skipUnless(HAS_DJANGO, "Django not found") def test__use_db(self): @@ -499,33 +501,24 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): client_name = "%s%s" % (prefix, i) return client_name - def test__init(self): - # test with watch_clients=False + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): core = MagicMock() metadata = self.get_obj(core=core) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.ClientRunHooks) - self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig) - self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig) - self.assertIsInstance(metadata.query, MetadataQuery) - self.assertEqual(metadata.states, dict()) - - # test with watch_clients=True - core.fam = MagicMock() - metadata = self.get_obj(core=core, watch_clients=True) self.assertEqual(len(metadata.states), 2) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - core.fam.reset_mock() - core.fam.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) + + mock_get_fam.reset_mock() + fam = Mock() + fam.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value = fam self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, - self.get_obj, core=core, watch_clients=True) + self.get_obj, core=core) @patch('os.makedirs', Mock()) @patch('%s.open' % builtins) @@ -586,6 +579,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_add_group(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -618,6 +612,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_update_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -635,6 +630,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_remove_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -650,6 +646,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_add_bundle(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -673,6 +670,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_remove_bundle(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -688,6 +686,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_add_client(self): metadata = self.get_obj() metadata.clients_xml.write = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree() metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -722,6 +721,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_update_client(self): metadata = self.get_obj() metadata.clients_xml.write_xml = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree()) metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -757,7 +757,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): metadata.clients_xml.xdata = copy.deepcopy(get_clients_test_tree()) metadata._handle_clients_xml_event(Mock()) - if not self.use_db: + if not Bcfg2.Options.setup.metadata_db: self.assertItemsEqual(metadata.clients, dict([(c.get("name"), c.get("profile")) for c in get_clients_test_tree().findall("//Client")])) @@ -1246,10 +1246,13 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): class TestMetadataBase(TestMetadata): """ base test object for testing Metadata with database enabled """ __test__ = False - use_db = True @skipUnless(HAS_DJANGO, "Django not found") def setUp(self): + _TestMetadata.setUp(self) + TestClientRunHooks.setUp(self) + TestDatabaseBacked.setUp(self) + Bcfg2.Options.setup.metadata_db = True syncdb(TestMetadataDB) def load_clients_data(self, metadata=None, xdata=None): @@ -1269,25 +1272,24 @@ class TestMetadataBase(TestMetadata): return client_name @patch('os.path.exists') - def test__init(self, mock_exists): - core = MagicMock() - core.fam = Mock() + @patch('Bcfg2.Server.FileMonitor.get_fam') + def test__init(self, mock_get_fam, mock_exists): mock_exists.return_value = False - metadata = self.get_obj(core=core, watch_clients=True) + metadata = self.get_obj() self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) - core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, - "groups.xml"), - metadata) + mock_get_fam.return_value.AddMonitor.assert_called_with( + os.path.join(metadata.data, "groups.xml"), + metadata) mock_exists.return_value = True - core.fam.reset_mock() - metadata = self.get_obj(core=core, watch_clients=True) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) + mock_get_fam.reset_mock() + metadata = self.get_obj() + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) def test_add_group(self): pass @@ -1351,12 +1353,7 @@ class TestMetadataBase(TestMetadata): class TestMetadata_NoClientsXML(TestMetadataBase): """ test Metadata without a clients.xml. we have to disable or override tests that rely on client options """ - # only run these tests if it's possible to skip tests or if we - # have django. otherwise they'll all get run because our fake - # skipping decorators for python < 2.7 won't work when they - # decorate setUp() - if can_skip or HAS_DJANGO: - __test__ = True + __test__ = True def load_groups_data(self, metadata=None, xdata=None): if metadata is None: @@ -1520,17 +1517,13 @@ class TestMetadata_NoClientsXML(TestMetadataBase): class TestMetadata_ClientsXML(TestMetadataBase): """ test Metadata with a clients.xml. """ - # only run these tests if it's possible to skip tests or if we - # have django. otherwise they'll all get run because our fake - # skipping decorators for python < 2.7 won't work when they - # decorate setUp() - if can_skip or HAS_DJANGO: - __test__ = True + __test__ = True def load_clients_data(self, metadata=None, xdata=None): if metadata is None: metadata = self.get_obj() - metadata.core.fam = Mock() + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = MagicMock() @patchIf(not isinstance(lxml.etree.Element, Mock), "lxml.etree.Element", Mock()) def inner(): @@ -1538,5 +1531,7 @@ class TestMetadata_ClientsXML(TestMetadataBase): inner() metadata = TestMetadata.load_clients_data(self, metadata=metadata, xdata=xdata) - return TestMetadataBase.load_clients_data(self, metadata=metadata, - xdata=xdata) + rv = TestMetadataBase.load_clients_data(self, metadata=metadata, + xdata=xdata) + Bcfg2.Server.FileMonitor._FAM = fam + return rv diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index a87628eaf..4830f9f2f 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -1,7 +1,7 @@ import os import sys -import copy -import time +import shutil +import tempfile import lxml.etree import Bcfg2.version import Bcfg2.Server @@ -18,54 +18,22 @@ while path != "/": break path = os.path.dirname(path) from common import * -from Bcfg2.Server.Plugins.Probes import * +from Bcfg2.Server.Plugins.Probes import load_django_models from TestPlugin import TestEntrySet, TestProbing, TestConnector, \ TestDatabaseBacked -# test data for JSON and YAML tests -test_data = dict(a=1, b=[1, 2, 3], c="test", - d=dict(a=1, b=dict(a=1), c=(1, "2", 3))) - - -class FakeElement(lxml.etree._Element): - getroottree = Mock() - - def __init__(self, el): - self._element = el - - def __getattribute__(self, attr): - el = lxml.etree._Element.__getattribute__(self, - '__dict__')['_element'] - if attr == "getroottree": - return FakeElement.getroottree - elif attr == "_element": - return el - else: - return getattr(el, attr) - - -class StoringElement(object): - OriginalElement = copy.copy(lxml.etree.Element) - - def __init__(self): - self.element = None - self.return_value = None - - def __call__(self, *args, **kwargs): - self.element = self.OriginalElement(*args, **kwargs) - self.return_value = FakeElement(self.element) - return self.return_value +load_django_models() +from Bcfg2.Server.Plugins.Probes import * +if HAS_JSON: + json = json -class StoringSubElement(object): - OriginalSubElement = copy.copy(lxml.etree.SubElement) +if HAS_YAML: + yaml = yaml - def __call__(self, parent, tag, **kwargs): - try: - return self.OriginalSubElement(parent._element, tag, - **kwargs) - except AttributeError: - return self.OriginalSubElement(parent, tag, **kwargs) +# test data for JSON and YAML tests +test_data = dict(a=1, b=[1, 2, 3], c="test", + d=dict(a=1, b=dict(a=1), c=(1, "2", 3))) class FakeList(list): @@ -74,19 +42,8 @@ class FakeList(list): class TestProbesDB(DBModelTestCase): if HAS_DJANGO: - models = [ProbesGroupsModel, ProbesDataModel] - - -class TestClientProbeDataSet(Bcfg2TestCase): - def test__init(self): - ds = ClientProbeDataSet() - self.assertLessEqual(ds.timestamp, time.time()) - self.assertIsInstance(ds, dict) - self.assertNotIn("timestamp", ds) - - ds = ClientProbeDataSet(timestamp=123) - self.assertEqual(ds.timestamp, 123) - self.assertNotIn("timestamp", ds) + models = [ProbesGroupsModel, + ProbesDataModel] class TestProbeData(Bcfg2TestCase): @@ -108,19 +65,22 @@ class TestProbeData(Bcfg2TestCase): def test_xdata(self): xdata = lxml.etree.Element("test") lxml.etree.SubElement(xdata, "test2") - data = ProbeData(lxml.etree.tostring(xdata, - xml_declaration=False).decode('UTF-8')) + data = ProbeData( + lxml.etree.tostring(xdata, + xml_declaration=False).decode('UTF-8')) self.assertIsNotNone(data.xdata) self.assertIsNotNone(data.xdata.find("test2")) - @skipUnless(HAS_JSON, "JSON libraries not found, skipping JSON tests") + @skipUnless(HAS_JSON, + "JSON libraries not found, skipping JSON tests") def test_json(self): jdata = json.dumps(test_data) data = ProbeData(jdata) self.assertIsNotNone(data.json) self.assertItemsEqual(test_data, data.json) - @skipUnless(HAS_YAML, "YAML libraries not found, skipping YAML tests") + @skipUnless(HAS_YAML, + "YAML libraries not found, skipping YAML tests") def test_yaml(self): jdata = yaml.dump(test_data) data = ProbeData(jdata) @@ -134,22 +94,20 @@ class TestProbeSet(TestEntrySet): ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"] bogus_names = ["test.py"] - def get_obj(self, path=datastore, fam=None, encoding=None, + def get_obj(self, path=datastore, encoding=None, plugin_name="Probes", basename=None): # get_obj() accepts the basename argument, accepted by the # parent get_obj() method, and just throws it away, since # ProbeSet uses a regex for the "basename" - if fam is None: - fam = Mock() - rv = self.test_obj(path, fam, encoding, plugin_name) + rv = self.test_obj(path, plugin_name) rv.entry_type = MagicMock() return rv - def test__init(self): - fam = Mock() - ps = self.get_obj(fam=fam) + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): + ps = self.get_obj() self.assertEqual(ps.plugin_name, "Probes") - fam.AddMonitor.assert_called_with(datastore, ps) + mock_get_fam.return_value.AddMonitor.assert_called_with(datastore, ps) TestEntrySet.test__init(self) def test_HandleEvent(self): @@ -242,385 +200,162 @@ group-specific""" assert False, "Strange probe found in get_probe_data() return" -class TestProbes(TestProbing, TestConnector, TestDatabaseBacked): +class TestProbes(Bcfg2TestCase): test_obj = Probes - def get_obj(self, core=None): - return TestDatabaseBacked.get_obj(self, core=core) - - def get_test_probedata(self): - test_xdata = lxml.etree.Element("test") - lxml.etree.SubElement(test_xdata, "test", foo="foo") - rv = dict() - rv["foo.example.com"] = ClientProbeDataSet(timestamp=time.time()) - rv["foo.example.com"]["xml"] = \ - ProbeData(lxml.etree.tostring(test_xdata, - xml_declaration=False).decode('UTF-8')) - rv["foo.example.com"]["text"] = ProbeData("freeform text") - rv["foo.example.com"]["multiline"] = ProbeData("""multiple + test_xdata = lxml.etree.Element("test") + lxml.etree.SubElement(test_xdata, "test", foo="foo") + test_xdoc = lxml.etree.tostring(test_xdata, + xml_declaration=False).decode('UTF-8') + + data = dict() + data['xml'] = "group:group\n" + test_xdoc + data['text'] = "freeform text" + data['multiline'] = """multiple lines of freeform text -""") - rv["bar.example.com"] = ClientProbeDataSet(timestamp=time.time()) - rv["bar.example.com"]["empty"] = ProbeData("") - if HAS_JSON: - rv["bar.example.com"]["json"] = ProbeData(json.dumps(test_data)) - if HAS_YAML: - rv["bar.example.com"]["yaml"] = ProbeData(yaml.dump(test_data)) - return rv - - def get_test_cgroups(self): - return {"foo.example.com": ["group", "group with spaces", - "group-with-dashes"], - "bar.example.com": []} - - def get_probes_object(self, use_db=False, load_data=None): - core = MagicMock() - core.setup.cfp.getboolean = Mock() - core.setup.cfp.getboolean.return_value = use_db - if load_data is None: - load_data = MagicMock() - # we have to patch load_data() in a funny way because - # different versions of Mock have different scopes for - # patching. in some versions, a patch applied to - # get_probes_object() would only apply to that function, while - # in others it would also apply to the calling function (e.g., - # test__init(), which relies on being able to check the calls - # of load_data(), and thus on load_data() being consistently - # mocked) - @patch("%s.%s.load_data" % (self.test_obj.__module__, - self.test_obj.__name__), new=load_data) - def inner(): - return self.get_obj(core) - - return inner() - - def test__init(self): - mock_load_data = Mock() - probes = self.get_probes_object(load_data=mock_load_data) - probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore, - probes.name), - probes.probes) - mock_load_data.assert_any_call() - self.assertEqual(probes.probedata, ClientProbeDataSet()) - self.assertEqual(probes.cgroups, dict()) - - @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", Mock()) - def test__use_db(self): - probes = self.get_probes_object() - self.assertFalse(probes._use_db) - probes.core.setup.cfp.getboolean.assert_called_with("probes", - "use_database", - default=False) - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock()) - @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock()) - def test_write_data_xml(self): - probes = self.get_probes_object(use_db=False) - probes.write_data("test") - probes._write_data_xml.assert_called_with("test") - self.assertFalse(probes._write_data_db.called) - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock()) - @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock()) - def test_write_data_db(self): - probes = self.get_probes_object(use_db=True) - probes.write_data("test") - probes._write_data_db.assert_called_with("test") - self.assertFalse(probes._write_data_xml.called) - - def test__write_data_xml(self): - probes = self.get_probes_object(use_db=False) - probes.probedata = self.get_test_probedata() - probes.cgroups = self.get_test_cgroups() - - @patch("lxml.etree.Element") - @patch("lxml.etree.SubElement", StoringSubElement()) - def inner(mock_Element): - mock_Element.side_effect = StoringElement() - probes._write_data_xml(None) - - top = mock_Element.side_effect.return_value - write = top.getroottree.return_value.write - self.assertEqual(write.call_args[0][0], - os.path.join(datastore, probes.name, - "probed.xml")) - - data = top._element - foodata = data.find("Client[@name='foo.example.com']") - self.assertIsNotNone(foodata) - self.assertIsNotNone(foodata.get("timestamp")) - self.assertEqual(len(foodata.findall("Probe")), - len(probes.probedata['foo.example.com'])) - self.assertEqual(len(foodata.findall("Group")), - len(probes.cgroups['foo.example.com'])) - xml = foodata.find("Probe[@name='xml']") - self.assertIsNotNone(xml) - self.assertIsNotNone(xml.get("value")) - xdata = lxml.etree.XML(xml.get("value")) - self.assertIsNotNone(xdata) - self.assertIsNotNone(xdata.find("test")) - self.assertEqual(xdata.find("test").get("foo"), "foo") - text = foodata.find("Probe[@name='text']") - self.assertIsNotNone(text) - self.assertIsNotNone(text.get("value")) - multiline = foodata.find("Probe[@name='multiline']") - self.assertIsNotNone(multiline) - self.assertIsNotNone(multiline.get("value")) - self.assertGreater(len(multiline.get("value").splitlines()), 1) - - bardata = data.find("Client[@name='bar.example.com']") - self.assertIsNotNone(bardata) - self.assertIsNotNone(bardata.get("timestamp")) - self.assertEqual(len(bardata.findall("Probe")), - len(probes.probedata['bar.example.com'])) - self.assertEqual(len(bardata.findall("Group")), - len(probes.cgroups['bar.example.com'])) - empty = bardata.find("Probe[@name='empty']") - self.assertIsNotNone(empty) - self.assertIsNotNone(empty.get("value")) - self.assertEqual(empty.get("value"), "") - if HAS_JSON: - jdata = bardata.find("Probe[@name='json']") - self.assertIsNotNone(jdata) - self.assertIsNotNone(jdata.get("value")) - self.assertItemsEqual(test_data, - json.loads(jdata.get("value"))) - if HAS_YAML: - ydata = bardata.find("Probe[@name='yaml']") - self.assertIsNotNone(ydata) - self.assertIsNotNone(ydata.get("value")) - self.assertItemsEqual(test_data, - yaml.load(ydata.get("value"))) - - inner() - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - def test__write_data_db(self): - syncdb(TestProbesDB) - probes = self.get_probes_object(use_db=True) - probes.probedata = self.get_test_probedata() - probes.cgroups = self.get_test_cgroups() - - for cname in ["foo.example.com", "bar.example.com"]: - client = Mock() - client.hostname = cname - probes._write_data_db(client) - - pdata = ProbesDataModel.objects.filter(hostname=cname).all() - self.assertEqual(len(pdata), len(probes.probedata[cname])) - - for probe in pdata: - self.assertEqual(probe.hostname, client.hostname) - self.assertIsNotNone(probe.data) - if probe.probe == "xml": - xdata = lxml.etree.XML(probe.data) - self.assertIsNotNone(xdata) - self.assertIsNotNone(xdata.find("test")) - self.assertEqual(xdata.find("test").get("foo"), "foo") - elif probe.probe == "text": - pass - elif probe.probe == "multiline": - self.assertGreater(len(probe.data.splitlines()), 1) - elif probe.probe == "empty": - self.assertEqual(probe.data, "") - elif probe.probe == "yaml": - self.assertItemsEqual(test_data, yaml.load(probe.data)) - elif probe.probe == "json": - self.assertItemsEqual(test_data, json.loads(probe.data)) - else: - assert False, "Strange probe found in _write_data_db data" - - pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() - self.assertEqual(len(pgroups), len(probes.cgroups[cname])) - - # test that old probe data is removed properly - cname = 'foo.example.com' - del probes.probedata[cname]['text'] - probes.cgroups[cname].pop() - client = Mock() - client.hostname = cname - probes._write_data_db(client) - - pdata = ProbesDataModel.objects.filter(hostname=cname).all() - self.assertEqual(len(pdata), len(probes.probedata[cname])) - pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() - self.assertEqual(len(pgroups), len(probes.cgroups[cname])) - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock()) - @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock()) - def test_load_data_xml(self): - probes = self.get_probes_object(use_db=False) - probes.load_data() - probes._load_data_xml.assert_any_call() - self.assertFalse(probes._load_data_db.called) - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock()) - @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock()) - def test_load_data_db(self): - probes = self.get_probes_object(use_db=True) - probes.load_data() - probes._load_data_db.assert_any_call(client=None) - self.assertFalse(probes._load_data_xml.called) - - @patch("lxml.etree.parse") - def test__load_data_xml(self, mock_parse): - probes = self.get_probes_object(use_db=False) - probes.probedata = self.get_test_probedata() - probes.cgroups = self.get_test_cgroups() - - # to get the value for lxml.etree.parse to parse, we call - # _write_data_xml, mock the lxml.etree._ElementTree.write() - # call, and grab the data that gets "written" to probed.xml - @patch("lxml.etree.Element") - @patch("lxml.etree.SubElement", StoringSubElement()) - def inner(mock_Element): - mock_Element.side_effect = StoringElement() - probes._write_data_xml(None) - top = mock_Element.side_effect.return_value - return top._element - - xdata = inner() - mock_parse.return_value = xdata.getroottree() - probes.probedata = dict() - probes.cgroups = dict() - - probes._load_data_xml() - mock_parse.assert_called_with(os.path.join(datastore, probes.name, - 'probed.xml'), - parser=Bcfg2.Server.XMLParser) - self.assertItemsEqual(probes.probedata, self.get_test_probedata()) - self.assertItemsEqual(probes.cgroups, self.get_test_cgroups()) - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - def test__load_data_db(self): - syncdb(TestProbesDB) - probes = self.get_probes_object(use_db=True) - probes.probedata = self.get_test_probedata() - probes.cgroups = self.get_test_cgroups() - for cname in probes.probedata.keys(): - client = Mock() - client.hostname = cname - probes._write_data_db(client) - - probes.probedata = dict() - probes.cgroups = dict() - probes._load_data_db() - self.assertItemsEqual(probes.probedata, self.get_test_probedata()) - # the db backend does not store groups at all if a client has - # no groups set, so we can't just use assertItemsEqual here, - # because loading saved data may _not_ result in the original - # data if some clients had no groups set. - test_cgroups = self.get_test_cgroups() - for cname, groups in test_cgroups.items(): - if cname in probes.cgroups: - self.assertEqual(groups, probes.cgroups[cname]) - else: - self.assertEqual(groups, []) +group:group-with-dashes +group: group:with:colons +""" + data['empty'] = '' + data['almost_empty'] = 'group: other_group' + if HAS_JSON: + data['json'] = json.dumps(test_data) + if HAS_YAML: + data['yaml'] = yaml.dump(test_data) + + def setUp(self): + Bcfg2TestCase.setUp(self) + set_setup_default("probes_db") + self.datastore = None + Bcfg2.Server.Cache.expire("Probes") + + def tearDown(self): + if self.datastore is not None: + shutil.rmtree(self.datastore) + self.datastore = None - @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data") - def test_GetProbes(self, mock_get_probe_data): - probes = self.get_probes_object() - metadata = Mock() - probes.GetProbes(metadata) - mock_get_probe_data.assert_called_with(metadata) - - @patch("Bcfg2.Server.Plugins.Probes.Probes.write_data") - @patch("Bcfg2.Server.Plugins.Probes.Probes.ReceiveDataItem") - def test_ReceiveData(self, mock_ReceiveDataItem, mock_write_data): - # we use a simple (read: bogus) datalist here to make this - # easy to test - datalist = ["a", "b", "c"] - - probes = self.get_probes_object() - probes.core.metadata_cache_mode = 'off' - client = Mock() - client.hostname = "foo.example.com" - probes.ReceiveData(client, datalist) - - cgroups = [] - cprobedata = ClientProbeDataSet() - self.assertItemsEqual(mock_ReceiveDataItem.call_args_list, - [call(client, "a", cgroups, cprobedata), - call(client, "b", cgroups, cprobedata), - call(client, "c", cgroups, cprobedata)]) - mock_write_data.assert_called_with(client) - self.assertFalse(probes.core.metadata_cache.expire.called) - - # change the datalist, ensure that the cache is cleared - probes.cgroups[client.hostname] = datalist - probes.core.metadata_cache_mode = 'aggressive' - probes.ReceiveData(client, ['a', 'b', 'd']) - - mock_write_data.assert_called_with(client) - probes.core.metadata_cache.expire.assert_called_with(client.hostname) - - def test_ReceiveDataItem(self): - probes = self.get_probes_object() - for cname, cdata in self.get_test_probedata().items(): - client = Mock() - client.hostname = cname - cgroups = [] - cprobedata = ClientProbeDataSet() - for pname, pdata in cdata.items(): - dataitem = lxml.etree.Element("Probe", name=pname) - if pname == "text": - # add some groups to the plaintext test to test - # group parsing - data = [pdata] - for group in self.get_test_cgroups()[cname]: - data.append("group:%s" % group) - dataitem.text = "\n".join(data) - else: - dataitem.text = str(pdata) - - probes.ReceiveDataItem(client, dataitem, cgroups, cprobedata) - - probes.cgroups[client.hostname] = cgroups - probes.probedata[client.hostname] = cprobedata - self.assertIn(client.hostname, probes.probedata) - self.assertIn(pname, probes.probedata[cname]) - self.assertEqual(pdata, probes.probedata[cname][pname]) - self.assertIn(client.hostname, probes.cgroups) - self.assertEqual(probes.cgroups[cname], - self.get_test_cgroups()[cname]) - - def test_get_additional_groups(self): - TestConnector.test_get_additional_groups(self) - - probes = self.get_probes_object() - test_cgroups = self.get_test_cgroups() - probes.cgroups = self.get_test_cgroups() - for cname in test_cgroups.keys(): - metadata = Mock() - metadata.hostname = cname - self.assertEqual(test_cgroups[cname], - probes.get_additional_groups(metadata)) - # test a non-existent client - metadata = Mock() - metadata.hostname = "nonexistent" - self.assertEqual(probes.get_additional_groups(metadata), - list()) - - def test_get_additional_data(self): - TestConnector.test_get_additional_data(self) - - probes = self.get_probes_object() - test_probedata = self.get_test_probedata() - probes.probedata = self.get_test_probedata() - for cname in test_probedata.keys(): - metadata = Mock() - metadata.hostname = cname - self.assertEqual(test_probedata[cname], - probes.get_additional_data(metadata)) - # test a non-existent client + def get_obj(self, core=None): + if core is None: + core = Mock() + if Bcfg2.Options.setup.probes_db: + @patch("os.makedirs", Mock()) + def inner(): + return self.test_obj(core, datastore) + return inner() + else: + # actually use a real datastore so we can read and write + # probed.xml + if self.datastore is None: + self.datastore = tempfile.mkdtemp() + return self.test_obj(core, self.datastore) + + def test_GetProbes(self): + p = self.get_obj() + p.probes = Mock() metadata = Mock() - metadata.hostname = "nonexistent" - self.assertEqual(probes.get_additional_data(metadata), - ClientProbeDataSet()) + p.GetProbes(metadata) + p.probes.get_probe_data.assert_called_with(metadata) + + def additionalDataEqual(self, actual, expected): + self.assertItemsEqual( + dict([(k, str(d)) for k, d in actual.items()]), + expected) + + def test_probes_xml(self): + """ Set and retrieve probe data with database disabled """ + Bcfg2.Options.setup.probes_db = False + self._perform_tests() + + @skipUnless(HAS_DJANGO, "Django not found") + def test_probes_db(self): + """ Set and retrieve probe data with database enabled """ + Bcfg2.Options.setup.probes_db = True + self._perform_tests() + + def _perform_tests(self): + p = self.get_obj() + + # first, sanity checks + foo_md = Mock(hostname="foo.example.com") + bar_md = Mock(hostname="bar.example.com") + self.assertItemsEqual(p.get_additional_groups(foo_md), []) + self.assertItemsEqual(p.get_additional_data(foo_md), dict()) + self.assertItemsEqual(p.get_additional_groups(bar_md), []) + self.assertItemsEqual(p.get_additional_data(bar_md), dict()) + + # next, set some initial probe data + foo_datalist = [] + for key in ['xml', 'text', 'multiline']: + pdata = lxml.etree.Element("Probe", name=key) + pdata.text = self.data[key] + foo_datalist.append(pdata) + foo_addl_data = dict(xml=self.test_xdoc, + text="freeform text", + multiline="""multiple +lines +of +freeform +text""") + bar_datalist = [] + for key in ['empty', 'almost_empty', 'json', 'yaml']: + if key in self.data: + pdata = lxml.etree.Element("Probe", name=key) + pdata.text = self.data[key] + bar_datalist.append(pdata) + bar_addl_data = dict(empty="", almost_empty="") + if HAS_JSON: + bar_addl_data['json'] = self.data['json'] + if HAS_YAML: + bar_addl_data['yaml'] = self.data['yaml'] + + p.ReceiveData(foo_md, foo_datalist) + self.assertItemsEqual(p.get_additional_groups(foo_md), + ["group", "group-with-dashes", + "group:with:colons"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + + p.ReceiveData(bar_md, bar_datalist) + self.assertItemsEqual(p.get_additional_groups(foo_md), + ["group", "group-with-dashes", + "group:with:colons"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group']) + self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data) + + # instantiate a new Probes object and clear Probes caches to + # imitate a server restart + p = self.get_obj() + Bcfg2.Server.Cache.expire("Probes") + + self.assertItemsEqual(p.get_additional_groups(foo_md), + ["group", "group-with-dashes", + "group:with:colons"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group']) + self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data) + + # set new data (and groups) for foo + foo_datalist = [] + pdata = lxml.etree.Element("Probe", name='xml') + pdata.text = self.data['xml'] + foo_datalist.append(pdata) + foo_addl_data = dict(xml=self.test_xdoc) + + p.ReceiveData(foo_md, foo_datalist) + self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group']) + self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data) + + # instantiate a new Probes object and clear Probes caches to + # imitate a server restart + p = self.get_obj() + Bcfg2.Server.Cache.expire("Probes") + + self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group']) + self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index 896f5861e..159dc6e66 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -19,12 +19,6 @@ from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \ TestPlugin, TestDirectoryBacked try: - from Bcfg2.Encryption import EVPError - HAS_CRYPTO = True -except: - HAS_CRYPTO = False - -try: import json JSON = "json" except ImportError: @@ -36,12 +30,12 @@ class TestPropertyFile(Bcfg2TestCase): path = os.path.join(datastore, "test") def get_obj(self, path=None): + set_setup_default("writes_enabled", False) if path is None: path = self.path return self.test_obj(path) def test_write(self): - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() pf = self.get_obj() pf.validate_data = Mock() pf._write = Mock() @@ -52,20 +46,16 @@ class TestPropertyFile(Bcfg2TestCase): def reset(): pf.validate_data.reset_mock() pf._write.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() # test writes disabled - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + Bcfg2.Options.setup.writes_enabled = False self.assertRaises(PluginExecutionError, pf.write) self.assertFalse(pf.validate_data.called) self.assertFalse(pf._write.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", - "writes_enabled", - default=True) # test successful write reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + Bcfg2.Options.setup.writes_enabled = True self.assertEqual(pf.write(), pf._write.return_value) pf.validate_data.assert_called_with() pf._write.assert_called_with() @@ -99,96 +89,95 @@ class TestPropertyFile(Bcfg2TestCase): mock_copy.assert_called_with(pf) -if can_skip or HAS_JSON: - class TestJSONPropertyFile(TestFileBacked, TestPropertyFile): - test_obj = JSONPropertyFile - - def get_obj(self, *args, **kwargs): - return TestFileBacked.get_obj(self, *args, **kwargs) - - @skipUnless(HAS_JSON, "JSON libraries not found, skipping") - def setUp(self): - pass - - @patch("%s.loads" % JSON) - def test_Index(self, mock_loads): - pf = self.get_obj() - pf.Index() - mock_loads.assert_called_with(pf.data) - self.assertEqual(pf.json, mock_loads.return_value) - - mock_loads.reset_mock() - mock_loads.side_effect = ValueError - self.assertRaises(PluginExecutionError, pf.Index) - mock_loads.assert_called_with(pf.data) - - @patch("%s.dump" % JSON) - @patch("%s.open" % builtins) - def test__write(self, mock_open, mock_dump): - pf = self.get_obj() - self.assertTrue(pf._write()) - mock_open.assert_called_with(pf.name, 'wb') - mock_dump.assert_called_with(pf.json, mock_open.return_value) - - @patch("%s.dumps" % JSON) - def test_validate_data(self, mock_dumps): - pf = self.get_obj() - pf.validate_data() - mock_dumps.assert_called_with(pf.json) - - mock_dumps.reset_mock() - mock_dumps.side_effect = ValueError - self.assertRaises(PluginExecutionError, pf.validate_data) - mock_dumps.assert_called_with(pf.json) - - -if can_skip or HAS_YAML: - class TestYAMLPropertyFile(TestFileBacked, TestPropertyFile): - test_obj = YAMLPropertyFile - - def get_obj(self, *args, **kwargs): - return TestFileBacked.get_obj(self, *args, **kwargs) - - @skipUnless(HAS_YAML, "YAML libraries not found, skipping") - def setUp(self): - pass - - @patch("yaml.load") - def test_Index(self, mock_load): - pf = self.get_obj() - pf.Index() - mock_load.assert_called_with(pf.data) - self.assertEqual(pf.yaml, mock_load.return_value) - - mock_load.reset_mock() - mock_load.side_effect = yaml.YAMLError - self.assertRaises(PluginExecutionError, pf.Index) - mock_load.assert_called_with(pf.data) - - @patch("yaml.dump") - @patch("%s.open" % builtins) - def test__write(self, mock_open, mock_dump): - pf = self.get_obj() - self.assertTrue(pf._write()) - mock_open.assert_called_with(pf.name, 'wb') - mock_dump.assert_called_with(pf.yaml, mock_open.return_value) - - @patch("yaml.dump") - def test_validate_data(self, mock_dump): - pf = self.get_obj() - pf.validate_data() - mock_dump.assert_called_with(pf.yaml) - - mock_dump.reset_mock() - mock_dump.side_effect = yaml.YAMLError - self.assertRaises(PluginExecutionError, pf.validate_data) - mock_dump.assert_called_with(pf.yaml) +class TestJSONPropertyFile(TestFileBacked, TestPropertyFile): + test_obj = JSONPropertyFile + + @skipUnless(HAS_JSON, "JSON libraries not found, skipping") + def setUp(self): + TestFileBacked.setUp(self) + TestPropertyFile.setUp(self) + + @patch("%s.loads" % JSON) + def test_Index(self, mock_loads): + pf = self.get_obj() + pf.Index() + mock_loads.assert_called_with(pf.data) + self.assertEqual(pf.json, mock_loads.return_value) + + mock_loads.reset_mock() + mock_loads.side_effect = ValueError + self.assertRaises(PluginExecutionError, pf.Index) + mock_loads.assert_called_with(pf.data) + + @patch("%s.dump" % JSON) + @patch("%s.open" % builtins) + def test__write(self, mock_open, mock_dump): + pf = self.get_obj() + self.assertTrue(pf._write()) + mock_open.assert_called_with(pf.name, 'wb') + mock_dump.assert_called_with(pf.json, mock_open.return_value) + + @patch("%s.dumps" % JSON) + def test_validate_data(self, mock_dumps): + pf = self.get_obj() + pf.validate_data() + mock_dumps.assert_called_with(pf.json) + + mock_dumps.reset_mock() + mock_dumps.side_effect = ValueError + self.assertRaises(PluginExecutionError, pf.validate_data) + mock_dumps.assert_called_with(pf.json) + + +class TestYAMLPropertyFile(TestFileBacked, TestPropertyFile): + test_obj = YAMLPropertyFile + + @skipUnless(HAS_YAML, "YAML libraries not found, skipping") + def setUp(self): + TestFileBacked.setUp(self) + TestPropertyFile.setUp(self) + + @patch("yaml.load") + def test_Index(self, mock_load): + pf = self.get_obj() + pf.Index() + mock_load.assert_called_with(pf.data) + self.assertEqual(pf.yaml, mock_load.return_value) + + mock_load.reset_mock() + mock_load.side_effect = yaml.YAMLError + self.assertRaises(PluginExecutionError, pf.Index) + mock_load.assert_called_with(pf.data) + + @patch("yaml.dump") + @patch("%s.open" % builtins) + def test__write(self, mock_open, mock_dump): + pf = self.get_obj() + self.assertTrue(pf._write()) + mock_open.assert_called_with(pf.name, 'wb') + mock_dump.assert_called_with(pf.yaml, mock_open.return_value) + + @patch("yaml.dump") + def test_validate_data(self, mock_dump): + pf = self.get_obj() + pf.validate_data() + mock_dump.assert_called_with(pf.yaml) + + mock_dump.reset_mock() + mock_dump.side_effect = yaml.YAMLError + self.assertRaises(PluginExecutionError, pf.validate_data) + mock_dump.assert_called_with(pf.yaml) class TestXMLPropertyFile(TestPropertyFile, TestStructFile): test_obj = XMLPropertyFile path = TestStructFile.path + def setUp(self): + TestPropertyFile.setUp(self) + TestStructFile.setUp(self) + set_setup_default("automatch", False) + def get_obj(self, *args, **kwargs): return TestStructFile.get_obj(self, *args, **kwargs) @@ -243,178 +232,47 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) - def test_Index(self): - TestStructFile.test_Index(self) - - pf = self.get_obj() - pf.xdata = lxml.etree.Element("Properties") - lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo") - pf.data = lxml.etree.tostring(pf.xdata) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pf = self.get_obj() - pf._decrypt = Mock() - pf._decrypt.return_value = 'plaintext' - pf.data = ''' -<Properties decrypt="strict"> - <Crypted encrypted="foo"> - crypted - <Plain foo="bar">plain</Plain> - </Crypted> - <Crypted encrypted="bar">crypted</Crypted> - <Plain bar="baz">plain</Plain> - <Plain> - <Crypted encrypted="foo">crypted</Crypted> - </Plain> -</Properties>''' - - # test successful decryption - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - for el in pf.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pf._decrypt.return_value) - - # test failed decryption, strict - pf._decrypt.reset_mock() - pf._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pf.Index) - - # test failed decryption, lax - pf.data = pf.data.replace("strict", "lax") - pf._decrypt.reset_mock() - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): - pf = self.get_obj() - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) - @patch("copy.copy") def test_get_additional_data(self, mock_copy): - Bcfg2.Server.Plugins.Properties.SETUP = Mock() pf = self.get_obj() + pf.setup = Mock() pf.XMLMatch = Mock() metadata = Mock() def reset(): mock_copy.reset_mock() pf.XMLMatch.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() pf.xdata = lxml.etree.Element("Properties", automatch="true") - for automatch in [True, False]: + for Bcfg2.Options.setup.automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) self.assertFalse(mock_copy.called) pf.xdata = lxml.etree.Element("Properties", automatch="false") - for automatch in [True, False]: + for Bcfg2.Options.setup.automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) pf.xdata = lxml.etree.Element("Properties") reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + Bcfg2.Options.setup.automatch = False self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + Bcfg2.Options.setup.automatch = True self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) self.assertFalse(mock_copy.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py index f018b45dc..45f3671e8 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py @@ -1,9 +1,11 @@ import os import sys +import copy import lxml.etree -import Bcfg2.Server.Plugin +import Bcfg2.Options from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugins.Rules import * +from Bcfg2.Server.Plugin import PluginExecutionError # add all parent testsuite directories to sys.path to allow (most) # relative imports in python 2.4 @@ -15,116 +17,159 @@ while path != "/": break path = os.path.dirname(path) from common import * -from TestPlugin import TestPrioDir +from TestPlugin.Testhelpers import TestPrioDir class TestRules(TestPrioDir): test_obj = Rules - def test_HandlesEntry(self): + abstract = dict( + basic=lxml.etree.Element("Path", name="/etc/basic"), + unhandled=lxml.etree.Element("Path", name="/etc/unhandled"), + priority=lxml.etree.Element("Path", name="/etc/priority"), + content=lxml.etree.Element("Path", name="/etc/text-content"), + duplicate=lxml.etree.Element("SEBoolean", name="duplicate"), + group=lxml.etree.Element("SEPort", name="6789/tcp"), + children=lxml.etree.Element("Path", name="/etc/child-entries"), + regex=lxml.etree.Element("Package", name="regex"), + slash=lxml.etree.Element("Path", name="/etc/trailing/slash"), + no_slash=lxml.etree.Element("Path", name="/etc/no/trailing/slash/")) + + concrete = dict( + basic=lxml.etree.Element("Path", name="/etc/basic", type="directory", + owner="root", group="root", mode="0600"), + priority=lxml.etree.Element("Path", name="/etc/priority", + type="directory", owner="root", + group="root", mode="0600"), + content=lxml.etree.Element("Path", name="/etc/text-content", + type="file", owner="bar", group="bar", + mode="0644"), + duplicate=lxml.etree.Element("SEBoolean", name="duplicate", + value="on"), + group=lxml.etree.Element("SEPort", name="6789/tcp", + selinuxtype="bcfg2_server_t"), + children=lxml.etree.Element("Path", name="/etc/child-entries", + type="directory", owner="root", + group="root", mode="0775"), + regex=lxml.etree.Element("Package", name="regex", type="yum", + version="any"), + slash=lxml.etree.Element("Path", name="/etc/trailing/slash", + type="directory", owner="root", group="root", + mode="0600"), + no_slash=lxml.etree.Element("Path", name="/etc/no/trailing/slash/", + type="directory", owner="root", + group="root", mode="0600")) + + concrete['content'].text = "Text content" + lxml.etree.SubElement(concrete['children'], + "ACL", type="default", scope="user", user="foouser", + perms="rw") + lxml.etree.SubElement(concrete['children'], + "ACL", type="default", scope="group", group="users", + perms="rx") + + in_file = copy.deepcopy(concrete) + in_file['regex'].set("name", ".*") + in_file['slash'].set("name", "/etc/trailing/slash/") + in_file['no_slash'].set("name", "/etc/no/trailing/slash") + + rules1 = lxml.etree.Element("Rules", priority="10") + rules1.append(in_file['basic']) + lxml.etree.SubElement(rules1, "Path", name="/etc/priority", + type="directory", owner="foo", group="foo", + mode="0644") + foogroup = lxml.etree.SubElement(rules1, "Group", name="foogroup") + foogroup.append(in_file['group']) + rules1.append(in_file['content']) + rules1.append(copy.copy(in_file['duplicate'])) + + rules2 = lxml.etree.Element("Rules", priority="20") + rules2.append(in_file['priority']) + rules2.append(in_file['children']) + rules2.append(in_file['no_slash']) + + rules3 = lxml.etree.Element("Rules", priority="10") + rules3.append(in_file['duplicate']) + rules3.append(in_file['regex']) + rules3.append(in_file['slash']) + + rules = {"rules1.xml": rules1, "rules2.xml": rules2, "rules3.xml": rules3} + + def setUp(self): + TestPrioDir.setUp(self) + set_setup_default("lax_decryption", True) + set_setup_default("rules_regex", False) + + def get_child(self, name): + """ Turn one of the XML documents in `rules` into a child + object """ + filename = os.path.join(datastore, self.test_obj.name, name) + rv = self.test_obj.__child__(filename) + rv.data = lxml.etree.tostring(self.rules[name]) + rv.Index() + return rv + + def get_obj(self, core=None): + r = TestPrioDir.get_obj(self, core=core) + r.entries = dict([(n, self.get_child(n)) for n in self.rules.keys()]) + return r + + def _do_test(self, name, groups=None): + if groups is None: + groups = [] r = self.get_obj() - r.Entries = dict(Path={"/etc/foo.conf": Mock(), - "/etc/bar.conf": Mock()}) - r._matches = Mock() - metadata = Mock() - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - self.assertEqual(r.HandlesEntry(entry, metadata), - r._matches.return_value) - r._matches.assert_called_with(entry, metadata, - r.Entries['Path'].keys()) - - r._matches.reset_mock() - entry = lxml.etree.Element("Path", name="/etc/baz.conf") - self.assertEqual(r.HandlesEntry(entry, metadata), - r._matches.return_value) - r._matches.assert_called_with(entry, metadata, - r.Entries['Path'].keys()) - - r._matches.reset_mock() - entry = lxml.etree.Element("Package", name="foo") - self.assertFalse(r.HandlesEntry(entry, metadata)) - - def test_BindEntry(self, method="BindEntry"): + metadata = Mock(groups=groups) + entry = copy.deepcopy(self.abstract[name]) + self.assertTrue(r.HandlesEntry(entry, metadata)) + r.HandleEntry(entry, metadata) + self.assertXMLEqual(entry, self.concrete[name]) + + def _do_test_failure(self, name, groups=None, handles=None): + if groups is None: + groups = [] r = self.get_obj() - r.get_attrs = Mock() - r.get_attrs.return_value = dict(overwrite="new", add="add", - text="text") - entry = lxml.etree.Element("Test", overwrite="old", keep="keep") - metadata = Mock() - - getattr(r, method)(entry, metadata) - r.get_attrs.assert_called_with(entry, metadata) - self.assertItemsEqual(entry.attrib, - dict(overwrite="old", add="add", keep="keep", - text="text")) - - def test_HandleEntry(self): - self.test_BindEntry(method="HandleEntry") - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches(self, mock_matches): - """ test _matches() behavior regardless of state of _regex_enabled """ - r = self.get_obj() - metadata = Mock() - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] - mock_matches.return_value = True - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - # test special Path cases -- adding and removing trailing slash - mock_matches.reset_mock() - mock_matches.return_value = False - rules = ["/etc/foo/", "/etc/bar"] - entry = lxml.etree.Element("Path", name="/etc/foo") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - mock_matches.reset_mock() - entry = lxml.etree.Element("Path", name="/etc/bar/") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_disabled(self, mock_matches): - """ test failure to match with regex disabled """ - r = self.get_obj() - self.set_regex_enabled(r, False) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] - self.assertFalse(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_enabled(self, mock_matches): - """ test match with regex enabled """ - r = self.get_obj() - self.set_regex_enabled(r, True) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = ["/etc/.*\.conf", "/etc/bar"] - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) - - def set_regex_enabled(self, rules_obj, state): - """ set the state of regex_enabled for this implementation of - Rules """ - if not isinstance(rules_obj.core.setup, MagicMock): - rules_obj.core.setup = MagicMock() - rules_obj.core.setup.cfp.getboolean.return_value = state - - def test__regex_enabled(self): - r = self.get_obj() - r.core.setup = MagicMock() - self.assertEqual(r._regex_enabled, - r.core.setup.cfp.getboolean.return_value) - r.core.setup.cfp.getboolean.assert_called_with("rules", "regex", - default=False) + metadata = Mock(groups=groups) + entry = self.abstract[name] + if handles is not None: + self.assertEqual(handles, r.HandlesEntry(entry, metadata)) + self.assertRaises(PluginExecutionError, + r.HandleEntry, entry, metadata) + + def test_basic(self): + """ Test basic Rules usage """ + self._do_test('basic') + self._do_test_failure('unhandled', handles=False) + + def test_priority(self): + """ Test that Rules respects priority """ + self._do_test('priority') + + def test_duplicate(self): + """ Test that Rules raises exceptions for duplicate entries """ + self._do_test_failure('duplicate') + + def test_content(self): + """ Test that Rules copies text content from concrete entries """ + self._do_test('content') + + def test_group(self): + """ Test that Rules respects <Group/> tags """ + self._do_test('group', groups=['foogroup']) + self._do_test_failure('group', groups=['bargroup'], handles=False) + + def test_children(self): + """ Test that Rules copies child elements from concrete entries """ + self._do_test('children') + + def test_regex(self): + """ Test that Rules handles regular expressions properly """ + Bcfg2.Options.setup.rules_regex = False + self._do_test_failure('regex', handles=False) + Bcfg2.Options.setup.rules_regex = True + self._do_test('regex') + Bcfg2.Options.setup.rules_regex = False + + def test_slash(self): + """ Test that Rules handles trailing slashes on Path entries """ + self._do_test('slash') + self._do_test('no_slash') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py index bf9a3ced3..128d6cae5 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py @@ -25,7 +25,7 @@ class TestHelperModule(Bcfg2TestCase): def get_obj(self, path=None): if path is None: path = self.path - return self.test_obj(path, fam=Mock()) + return self.test_obj(path) def test__init(self): hm = self.get_obj() |