diff options
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg')
7 files changed, 224 insertions, 245 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py index e5cef8fa2..f41ae8a46 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py @@ -27,12 +27,12 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): TestCfgGenerator.setUp(self) TestStructFile.setUp(self) - def get_obj(self, name=None, core=None, fam=None): + @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.get_cfg") + def get_obj(self, mock_get_cfg, name=None, core=None, fam=None): if name is None: name = self.path - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG = Mock() if core is not None: - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG.core = core + mock_get_cfg.return_value.core = core return self.test_obj(name) @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") @@ -111,17 +111,18 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): reset() host = "baz.example.com" spec = lxml.etree.Element("AuthorizedKeys") - lxml.etree.SubElement( - lxml.etree.SubElement(spec, - "Allow", - attrib={"from": pubkey, "host": host}), - "Params", foo="foo", bar="bar=bar") + allow = lxml.etree.SubElement(spec, "Allow", + attrib={"from": pubkey, "host": host}) + lxml.etree.SubElement(allow, "Option", name="foo", value="foo") + lxml.etree.SubElement(allow, "Option", name="bar") + lxml.etree.SubElement(allow, "Option", name="baz", value="baz=baz") akg.XMLMatch.return_value = spec params, actual_host, actual_pubkey = akg.get_data(entry, metadata).split() self.assertEqual(actual_host, host) self.assertEqual(actual_pubkey, pubkey) - self.assertItemsEqual(params.split(","), ["foo=foo", "bar=bar=bar"]) + self.assertItemsEqual(params.split(","), ["foo=foo", "bar", + "baz=baz=baz"]) akg.XMLMatch.assert_called_with(metadata) akg.core.build_metadata.assert_called_with(host) self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey) @@ -131,10 +132,10 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): spec = lxml.etree.Element("AuthorizedKeys") text = lxml.etree.SubElement(spec, "Allow") text.text = "ssh-rsa publickey /foo/bar\n" - lxml.etree.SubElement(text, "Params", foo="foo") + lxml.etree.SubElement(text, "Option", name="foo") akg.XMLMatch.return_value = spec self.assertEqual(akg.get_data(entry, metadata), - "foo=foo %s" % text.text.strip()) + "foo %s" % text.text.strip()) akg.XMLMatch.assert_called_with(metadata) self.assertFalse(akg.core.build_metadata.called) self.assertFalse(akg.core.Bind.called) @@ -143,7 +144,7 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey}) akg.XMLMatch.return_value = spec self.assertItemsEqual(akg.get_data(entry, metadata).splitlines(), - ["foo=foo %s" % text.text.strip(), + ["foo %s" % text.text.strip(), "profile %s" % pubkey]) akg.XMLMatch.assert_called_with(metadata) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py index e1ffa7272..2285fb458 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py @@ -17,31 +17,30 @@ from common import * from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator -if HAS_CHEETAH or can_skip: - class TestCfgCheetahGenerator(TestCfgGenerator): - test_obj = CfgCheetahGenerator +class TestCfgCheetahGenerator(TestCfgGenerator): + test_obj = CfgCheetahGenerator - @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") - def setUp(self): - TestCfgGenerator.setUp(self) - set_setup_default("repository", datastore) + @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") + def setUp(self): + TestCfgGenerator.setUp(self) + set_setup_default("repository", datastore) - @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template") - def test_get_data(self, mock_Template): - ccg = self.get_obj() - ccg.data = "data" - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() + @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template") + def test_get_data(self, mock_Template): + ccg = self.get_obj() + ccg.data = "data" + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() - self.assertEqual(ccg.get_data(entry, metadata), - mock_Template.return_value.respond.return_value) - mock_Template.assert_called_with( - "data".decode(Bcfg2.Options.setup.encoding), - compilerSettings=ccg.settings) - tmpl = mock_Template.return_value - tmpl.respond.assert_called_with() - self.assertEqual(tmpl.metadata, metadata) - self.assertEqual(tmpl.name, entry.get("name")) - self.assertEqual(tmpl.path, entry.get("name")) - self.assertEqual(tmpl.source_path, ccg.name) - self.assertEqual(tmpl.repo, datastore) + self.assertEqual(ccg.get_data(entry, metadata), + mock_Template.return_value.respond.return_value) + mock_Template.assert_called_with( + "data".decode(Bcfg2.Options.setup.encoding), + compilerSettings=ccg.settings) + tmpl = mock_Template.return_value + tmpl.respond.assert_called_with() + self.assertEqual(tmpl.metadata, metadata) + self.assertEqual(tmpl.name, entry.get("name")) + self.assertEqual(tmpl.path, entry.get("name")) + self.assertEqual(tmpl.source_path, ccg.name) + self.assertEqual(tmpl.repo, datastore) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py index 46062569d..4c987551b 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py @@ -30,18 +30,17 @@ except ImportError: HAS_CRYPTO = False -if can_skip or (HAS_CRYPTO and HAS_CHEETAH): - class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator, - TestCfgEncryptedGenerator): - test_obj = CfgEncryptedCheetahGenerator +class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator, + TestCfgEncryptedGenerator): + test_obj = CfgEncryptedCheetahGenerator - @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") - def setUp(self): - pass + @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") + @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") + def setUp(self): + pass - def test_handle_event(self): - TestCfgEncryptedGenerator.test_handle_event(self) + def test_handle_event(self): + TestCfgEncryptedGenerator.test_handle_event(self) - def test_get_data(self): - TestCfgCheetahGenerator.test_get_data(self) + def test_get_data(self): + TestCfgCheetahGenerator.test_get_data(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py index 5409cf863..873ebd837 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py @@ -18,54 +18,53 @@ from common import * from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator -if can_skip or HAS_CRYPTO: - class TestCfgEncryptedGenerator(TestCfgGenerator): - test_obj = CfgEncryptedGenerator +class TestCfgEncryptedGenerator(TestCfgGenerator): + test_obj = CfgEncryptedGenerator - @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - def setUp(self): - TestCfgGenerator.setUp(self) + @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") + def setUp(self): + TestCfgGenerator.setUp(self) - @patchIf(HAS_CRYPTO, - "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") - def test_handle_event(self, mock_decrypt): - @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") - def inner(mock_handle_event): - def reset(): - mock_decrypt.reset_mock() - mock_handle_event.reset_mock() + @patchIf(HAS_CRYPTO, + "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") + def test_handle_event(self, mock_decrypt): + @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") + def inner(mock_handle_event): + def reset(): + mock_decrypt.reset_mock() + mock_handle_event.reset_mock() - def get_event_data(obj, event): - obj.data = "encrypted" + def get_event_data(obj, event): + obj.data = "encrypted" - mock_handle_event.side_effect = get_event_data - mock_decrypt.side_effect = lambda d, **kw: "plaintext" - event = Mock() - ceg = self.get_obj() - ceg.handle_event(event) - mock_handle_event.assert_called_with(ceg, event) - mock_decrypt.assert_called_with("encrypted") - self.assertEqual(ceg.data, "plaintext") + mock_handle_event.side_effect = get_event_data + mock_decrypt.side_effect = lambda d, **kw: "plaintext" + event = Mock() + ceg = self.get_obj() + ceg.handle_event(event) + mock_handle_event.assert_called_with(ceg, event) + mock_decrypt.assert_called_with("encrypted") + self.assertEqual(ceg.data, "plaintext") - reset() - mock_decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, - ceg.handle_event, event) - inner() + reset() + mock_decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, + ceg.handle_event, event) + inner() - # to perform the tests from the parent test object, we - # make bruteforce_decrypt just return whatever data was - # given to it - mock_decrypt.side_effect = lambda d, **kw: d - TestCfgGenerator.test_handle_event(self) + # to perform the tests from the parent test object, we + # make bruteforce_decrypt just return whatever data was + # given to it + mock_decrypt.side_effect = lambda d, **kw: d + TestCfgGenerator.test_handle_event(self) - def test_get_data(self): - ceg = self.get_obj() - ceg.data = None - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() + def test_get_data(self): + ceg = self.get_obj() + ceg.data = None + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() - self.assertRaises(PluginExecutionError, - ceg.get_data, entry, metadata) + self.assertRaises(PluginExecutionError, + ceg.get_data, entry, metadata) - TestCfgGenerator.test_get_data(self) + TestCfgGenerator.test_get_data(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py index 25d2fb83b..0b74e4a60 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py @@ -18,10 +18,9 @@ from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ TestCfgGenshiGenerator -if can_skip or HAS_CRYPTO: - class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator): - test_obj = CfgEncryptedGenshiGenerator +class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator): + test_obj = CfgEncryptedGenshiGenerator - @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - def setUp(self): - TestCfgGenshiGenerator.setUp(self) + @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") + def setUp(self): + TestCfgGenshiGenerator.setUp(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index c4961db1c..2967a23b6 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -22,25 +22,15 @@ while path != "/": break path = os.path.dirname(path) from common import * -from TestServer.TestPlugins.TestCfg.Test_init import TestCfgCreator -from TestServer.TestPlugin.Testhelpers import TestStructFile +from TestServer.TestPlugins.TestCfg.Test_init import TestXMLCfgCreator -class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): +class TestCfgPrivateKeyCreator(TestXMLCfgCreator): test_obj = CfgPrivateKeyCreator should_monitor = False def get_obj(self, name=None, fam=None): - return TestCfgCreator.get_obj(self, name=name) - - @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event") - @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent") - def test_handle_event(self, mock_HandleEvent, mock_handle_event): - pkc = self.get_obj() - evt = Mock() - pkc.handle_event(evt) - mock_HandleEvent.assert_called_with(pkc, evt) - mock_handle_event.assert_called_with(pkc, evt) + return TestXMLCfgCreator.get_obj(self, name=name) @patch("shutil.rmtree") @patch("tempfile.mkdtemp") @@ -90,57 +80,6 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata) mock_rmtree.assert_called_with(datastore) - def test_get_specificity(self): - pkc = self.get_obj() - pkc.XMLMatch = Mock() - - metadata = Mock() - - def reset(): - pkc.XMLMatch.reset_mock() - metadata.group_in_category.reset_mock() - - Bcfg2.Options.setup.sshkeys_category = None - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(host=metadata.hostname)) - - Bcfg2.Options.setup.sshkeys_category = "foo" - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(group=metadata.group_in_category.return_value, - prio=50)) - metadata.group_in_category.assert_called_with("foo") - - reset() - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey", - perhost="true") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(host=metadata.hostname)) - - reset() - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey", - category="bar") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(group=metadata.group_in_category.return_value, - prio=50)) - metadata.group_in_category.assert_called_with("bar") - - reset() - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey", - prio="10") - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(group=metadata.group_in_category.return_value, - prio=10)) - metadata.group_in_category.assert_called_with("foo") - - reset() - pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey") - metadata.group_in_category.return_value = '' - self.assertItemsEqual(pkc.get_specificity(metadata), - dict(host=metadata.hostname)) - metadata.group_in_category.assert_called_with("foo") - @patch("shutil.rmtree") @patch("%s.open" % builtins) def test_create_data(self, mock_open, mock_rmtree): @@ -153,7 +92,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): # the get_specificity() return value is being used # appropriately, we put some dummy data in it and test for # that data - pkc.get_specificity.side_effect = lambda m, s: dict(group="foo") + pkc.get_specificity.side_effect = lambda m: dict(group="foo") pkc._gen_keypair = Mock() privkey = os.path.join(datastore, "privkey") pkc._gen_keypair.return_value = privkey @@ -179,67 +118,32 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): mock_open.return_value.read.side_effect = open_read_rv reset() - passphrase = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.passphrase" - - @patch(passphrase, None) - def inner(): - self.assertEqual(pkc.create_data(entry, metadata), "privatekey") - pkc.XMLMatch.assert_called_with(metadata) - pkc.get_specificity.assert_called_with(metadata, - pkc.XMLMatch.return_value) - pkc._gen_keypair.assert_called_with(metadata, - pkc.XMLMatch.return_value) - self.assertItemsEqual(mock_open.call_args_list, - [call(privkey + ".pub"), call(privkey)]) - pkc.pubkey_creator.get_filename.assert_called_with(group="foo") - pkc.pubkey_creator.write_data.assert_called_with( - "ssh-rsa publickey pubkey.filename\n", group="foo") - pkc.write_data.assert_called_with("privatekey", group="foo") - mock_rmtree.assert_called_with(datastore) - - reset() - self.assertEqual(pkc.create_data(entry, metadata, return_pair=True), - ("ssh-rsa publickey pubkey.filename\n", - "privatekey")) - pkc.XMLMatch.assert_called_with(metadata) - pkc.get_specificity.assert_called_with(metadata, - pkc.XMLMatch.return_value) - pkc._gen_keypair.assert_called_with(metadata, - pkc.XMLMatch.return_value) - self.assertItemsEqual(mock_open.call_args_list, - [call(privkey + ".pub"), call(privkey)]) - pkc.pubkey_creator.get_filename.assert_called_with(group="foo") - pkc.pubkey_creator.write_data.assert_called_with( - "ssh-rsa publickey pubkey.filename\n", - group="foo") - pkc.write_data.assert_called_with("privatekey", group="foo") - mock_rmtree.assert_called_with(datastore) - - inner() - - if HAS_CRYPTO: - @patch(passphrase, "foo") - @patch("Bcfg2.Server.Encryption.ssl_encrypt") - def inner2(mock_ssl_encrypt): - reset() - mock_ssl_encrypt.return_value = "encryptedprivatekey" - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True - self.assertEqual(pkc.create_data(entry, metadata), - "encryptedprivatekey") - pkc.XMLMatch.assert_called_with(metadata) - pkc.get_specificity.assert_called_with( - metadata, - pkc.XMLMatch.return_value) - pkc._gen_keypair.assert_called_with(metadata, - pkc.XMLMatch.return_value) - self.assertItemsEqual(mock_open.call_args_list, - [call(privkey + ".pub"), call(privkey)]) - pkc.pubkey_creator.get_filename.assert_called_with(group="foo") - pkc.pubkey_creator.write_data.assert_called_with( - "ssh-rsa publickey pubkey.filename\n", group="foo") - pkc.write_data.assert_called_with("encryptedprivatekey", - group="foo", ext=".crypt") - mock_ssl_encrypt.assert_called_with("privatekey", "foo") - mock_rmtree.assert_called_with(datastore) - - inner2() + self.assertEqual(pkc.create_data(entry, metadata), "privatekey") + pkc.XMLMatch.assert_called_with(metadata) + pkc.get_specificity.assert_called_with(metadata) + pkc._gen_keypair.assert_called_with(metadata, + pkc.XMLMatch.return_value) + self.assertItemsEqual(mock_open.call_args_list, + [call(privkey + ".pub"), call(privkey)]) + pkc.pubkey_creator.get_filename.assert_called_with(group="foo") + pkc.pubkey_creator.write_data.assert_called_with( + "ssh-rsa publickey pubkey.filename\n", group="foo") + pkc.write_data.assert_called_with("privatekey", group="foo") + mock_rmtree.assert_called_with(datastore) + + reset() + self.assertEqual(pkc.create_data(entry, metadata, return_pair=True), + ("ssh-rsa publickey pubkey.filename\n", + "privatekey")) + pkc.XMLMatch.assert_called_with(metadata) + pkc.get_specificity.assert_called_with(metadata) + pkc._gen_keypair.assert_called_with(metadata, + pkc.XMLMatch.return_value) + self.assertItemsEqual(mock_open.call_args_list, + [call(privkey + ".pub"), call(privkey)]) + pkc.pubkey_creator.get_filename.assert_called_with(group="foo") + pkc.pubkey_creator.write_data.assert_called_with( + "ssh-rsa publickey pubkey.filename\n", + group="foo") + pkc.write_data.assert_called_with("privatekey", group="foo") + mock_rmtree.assert_called_with(datastore) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index 72be50299..307461918 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -3,7 +3,7 @@ import sys import errno import lxml.etree import Bcfg2.Options -from Bcfg2.Compat import walk_packages +from Bcfg2.Compat import walk_packages, ConfigParser from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugins.Cfg import * from Bcfg2.Server.Plugin import PluginExecutionError, Specificity @@ -19,7 +19,7 @@ while path != "/": path = os.path.dirname(path) from common import * from TestPlugin import TestSpecificData, TestEntrySet, TestGroupSpool, \ - TestPullTarget + TestPullTarget, TestStructFile class TestCfgBaseFileMatcher(TestSpecificData): @@ -172,10 +172,12 @@ class TestCfgVerifier(TestCfgBaseFileMatcher): class TestCfgCreator(TestCfgBaseFileMatcher): test_obj = CfgCreator path = "/foo/bar/test.txt" + should_monitor = False def setUp(self): TestCfgBaseFileMatcher.setUp(self) set_setup_default("filemonitor", MagicMock()) + set_setup_default("cfg_passphrase", None) def get_obj(self, name=None): if name is None: @@ -245,6 +247,75 @@ class TestCfgCreator(TestCfgBaseFileMatcher): self.assertRaises(CfgCreationError, cc.write_data, data) +class TestXMLCfgCreator(TestCfgCreator, TestStructFile): + test_obj = XMLCfgCreator + + def setUp(self): + TestCfgCreator.setUp(self) + TestStructFile.setUp(self) + + @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event") + @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent") + def test_handle_event(self, mock_HandleEvent, mock_handle_event): + cc = self.get_obj() + evt = Mock() + cc.handle_event(evt) + mock_HandleEvent.assert_called_with(cc, evt) + mock_handle_event.assert_called_with(cc, evt) + + def test_get_specificity(self): + cc = self.get_obj() + metadata = Mock() + + def reset(): + metadata.group_in_category.reset_mock() + + category = "%s.%s.category" % (self.test_obj.__module__, + self.test_obj.__name__) + @patch(category, None) + def inner(): + cc.xdata = lxml.etree.Element("PrivateKey") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(host=metadata.hostname)) + inner() + + @patch(category, "foo") + def inner2(): + cc.xdata = lxml.etree.Element("PrivateKey") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(group=metadata.group_in_category.return_value, + prio=50)) + metadata.group_in_category.assert_called_with("foo") + + reset() + cc.xdata = lxml.etree.Element("PrivateKey", perhost="true") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(host=metadata.hostname)) + + reset() + cc.xdata = lxml.etree.Element("PrivateKey", category="bar") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(group=metadata.group_in_category.return_value, + prio=50)) + metadata.group_in_category.assert_called_with("bar") + + reset() + cc.xdata = lxml.etree.Element("PrivateKey", prio="10") + self.assertItemsEqual(cc.get_specificity(metadata), + dict(group=metadata.group_in_category.return_value, + prio=10)) + metadata.group_in_category.assert_called_with("foo") + + reset() + cc.xdata = lxml.etree.Element("PrivateKey") + metadata.group_in_category.return_value = '' + self.assertItemsEqual(cc.get_specificity(metadata), + dict(host=metadata.hostname)) + metadata.group_in_category.assert_called_with("foo") + + inner2() + + class TestCfgDefaultInfo(TestCfgInfo): test_obj = CfgDefaultInfo @@ -276,6 +347,7 @@ class TestCfgEntrySet(TestEntrySet): def setUp(self): TestEntrySet.setUp(self) set_setup_default("cfg_validation", False) + set_setup_default("cfg_handlers", []) def test__init(self): pass @@ -283,14 +355,14 @@ class TestCfgEntrySet(TestEntrySet): def test_handle_event(self): eset = self.get_obj() eset.entry_init = Mock() - eset._handlers = [Mock(), Mock(), Mock()] - for hdlr in eset._handlers: + Bcfg2.Options.setup.cfg_handlers = [Mock(), Mock(), Mock()] + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.__name__ = "handler" eset.entries = dict() def reset(): eset.entry_init.reset_mock() - for hdlr in eset._handlers: + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.reset_mock() # test that a bogus deleted event is discarded @@ -300,18 +372,19 @@ class TestCfgEntrySet(TestEntrySet): eset.handle_event(evt) self.assertFalse(eset.entry_init.called) self.assertItemsEqual(eset.entries, dict()) - for hdlr in eset._handlers: + for hdlr in Bcfg2.Options.setup.cfg_handlers: self.assertFalse(hdlr.handles.called) self.assertFalse(hdlr.ignore.called) # test creation of a new file for action in ["exists", "created", "changed"]: + print("Testing handling of %s events" % action) evt = Mock() evt.code2str.return_value = action evt.filename = os.path.join(datastore, "test.txt") # test with no handler that handles - for hdlr in eset._handlers: + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.handles.return_value = False hdlr.ignore.return_value = False @@ -319,16 +392,16 @@ class TestCfgEntrySet(TestEntrySet): eset.handle_event(evt) self.assertFalse(eset.entry_init.called) self.assertItemsEqual(eset.entries, dict()) - for hdlr in eset._handlers: + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.handles.assert_called_with(evt, basename=eset.path) hdlr.ignore.assert_called_with(evt, basename=eset.path) # test with a handler that handles the entry reset() - eset._handlers[-1].handles.return_value = True + Bcfg2.Options.setup.cfg_handlers[-1].handles.return_value = True eset.handle_event(evt) - eset.entry_init.assert_called_with(evt, eset._handlers[-1]) - for hdlr in eset._handlers: + eset.entry_init.assert_called_with(evt, Bcfg2.Options.setup.cfg_handlers[-1]) + for hdlr in Bcfg2.Options.setup.cfg_handlers: hdlr.handles.assert_called_with(evt, basename=eset.path) if not hdlr.return_value: hdlr.ignore.assert_called_with(evt, basename=eset.path) @@ -336,14 +409,14 @@ class TestCfgEntrySet(TestEntrySet): # test with a handler that ignores the entry before one # that handles it reset() - eset._handlers[0].ignore.return_value = True + Bcfg2.Options.setup.cfg_handlers[0].ignore.return_value = True eset.handle_event(evt) self.assertFalse(eset.entry_init.called) - eset._handlers[0].handles.assert_called_with(evt, - basename=eset.path) - eset._handlers[0].ignore.assert_called_with(evt, - basename=eset.path) - for hdlr in eset._handlers[1:]: + Bcfg2.Options.setup.cfg_handlers[0].handles.assert_called_with( + evt, basename=eset.path) + Bcfg2.Options.setup.cfg_handlers[0].ignore.assert_called_with( + evt, basename=eset.path) + for hdlr in Bcfg2.Options.setup.cfg_handlers[1:]: self.assertFalse(hdlr.handles.called) self.assertFalse(hdlr.ignore.called) @@ -355,7 +428,7 @@ class TestCfgEntrySet(TestEntrySet): eset.entries[evt.filename] = Mock() eset.handle_event(evt) self.assertFalse(eset.entry_init.called) - for hdlr in eset._handlers: + for hdlr in Bcfg2.Options.setup.cfg_handlers: self.assertFalse(hdlr.handles.called) self.assertFalse(hdlr.ignore.called) eset.entries[evt.filename].handle_event.assert_called_with(evt) @@ -365,7 +438,7 @@ class TestCfgEntrySet(TestEntrySet): evt.code2str.return_value = "deleted" eset.handle_event(evt) self.assertFalse(eset.entry_init.called) - for hdlr in eset._handlers: + for hdlr in Bcfg2.Options.setup.cfg_handlers: self.assertFalse(hdlr.handles.called) self.assertFalse(hdlr.ignore.called) self.assertItemsEqual(eset.entries, dict()) @@ -730,6 +803,11 @@ class TestCfgEntrySet(TestEntrySet): class TestCfg(TestGroupSpool, TestPullTarget): test_obj = Cfg + def setUp(self): + TestGroupSpool.setUp(self) + TestPullTarget.setUp(self) + set_setup_default("cfg_handlers", []) + def get_obj(self, core=None): if core is None: core = Mock() |