From ebe7542db7217c2fac3d7111e80f94caedfb69e2 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 16 Jan 2013 13:28:06 -0500 Subject: added module-level OptionParser to avoid passing it as an argument or global all over --- .../TestCfg/TestCfgAuthorizedKeysGenerator.py | 32 +++--- .../TestPlugins/TestCfg/TestCfgCheetahGenerator.py | 7 +- .../TestCfg/TestCfgEncryptedGenerator.py | 9 +- .../TestPlugins/TestCfg/TestCfgGenshiGenerator.py | 14 +-- .../TestCfg/TestCfgPrivateKeyCreator.py | 126 ++++++++------------- .../TestServer/TestPlugins/TestCfg/Test_init.py | 41 ++++--- .../TestServer/TestPlugins/TestProperties.py | 85 ++++++-------- 7 files changed, 132 insertions(+), 182 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py index d655a20cd..b77d52033 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py @@ -42,27 +42,25 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): def test_category(self): akg = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp + akg.setup = Mock() + akg.setup.cfp.has_section.return_value = False + akg.setup.cfp.has_option.return_value = False self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + akg.setup.reset_mock() + akg.setup.cfp.has_section.return_value = True self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(akg.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + + akg.setup.reset_mock() + akg.setup.cfp.has_option.return_value = True + self.assertEqual(akg.category, akg.setup.cfp.get.return_value) + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + akg.setup.cfp.get.assert_called_with("sshkeys", "category") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py index fc5d5e53d..31227329c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py @@ -31,11 +31,11 @@ if HAS_CHEETAH or can_skip: ccg.data = "data" entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock() + ccg.setup = MagicMock() self.assertEqual(ccg.get_data(entry, metadata), mock_Template.return_value.respond.return_value) - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo") + ccg.setup.__getitem__.assert_called_with("repo") mock_Template.assert_called_with("data".decode(ccg.encoding), compilerSettings=ccg.settings) tmpl = mock_Template.return_value @@ -44,5 +44,4 @@ if HAS_CHEETAH or can_skip: self.assertEqual(tmpl.name, entry.get("name")) self.assertEqual(tmpl.path, entry.get("name")) self.assertEqual(tmpl.source_path, ccg.name) - self.assertEqual(tmpl.repo, - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value) + self.assertEqual(tmpl.repo, ccg.setup.__getitem__.return_value) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py index 71a7410da..f8e9b1990 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py @@ -26,16 +26,13 @@ if can_skip or HAS_CRYPTO: def setUp(self): pass - @patchIf(HAS_CRYPTO, - "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") - def test_handle_event(self, mock_decrypt, mock_get_algorithm): + def test_handle_event(self, mock_decrypt): @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") def inner(mock_handle_event): def reset(): mock_decrypt.reset_mock() - mock_get_algorithm.reset_mock() mock_handle_event.reset_mock() def get_event_data(obj, event): @@ -47,9 +44,7 @@ if can_skip or HAS_CRYPTO: ceg = self.get_obj() ceg.handle_event(event) mock_handle_event.assert_called_with(ceg, event) - mock_decrypt.assert_called_with("encrypted", - setup=SETUP, - algorithm=mock_get_algorithm.return_value) + mock_decrypt.assert_called_with("encrypted") self.assertEqual(ceg.data, "plaintext") reset() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py index 385f8df77..88f3cf3f7 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -43,6 +43,7 @@ if can_skip or HAS_GENSHI: def test_get_data(self): cgg = self.get_obj() cgg._handle_genshi_exception = Mock() + cgg.setup = MagicMock() cgg.template = Mock() fltr = Mock() cgg.template.generate.return_value = fltr @@ -51,24 +52,23 @@ if can_skip or HAS_GENSHI: entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock() def reset(): cgg.template.reset_mock() cgg._handle_genshi_exception.reset_mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock() + cgg.setup.reset_mock() template_vars = dict( name=entry.get("name"), metadata=metadata, path=cgg.name, source_path=cgg.name, - repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value) + repo=cgg.setup.__getitem__.return_value) self.assertEqual(cgg.get_data(entry, metadata), stream.render.return_value) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) @@ -81,7 +81,7 @@ if can_skip or HAS_GENSHI: self.assertEqual(cgg.get_data(entry, metadata), stream.render.return_value) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) self.assertEqual(stream.render.call_args_list, [call("text", encoding=cgg.encoding, @@ -93,7 +93,7 @@ if can_skip or HAS_GENSHI: self.assertRaises(UndefinedError, cgg.get_data, entry, metadata) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) @@ -104,7 +104,7 @@ if can_skip or HAS_GENSHI: self.assertRaises(ValueError, cgg.get_data, entry, metadata) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index dc4b11241..4c8ab8b43 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -44,56 +44,56 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): def test_category(self): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(pkc.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + pkc.setup.reset_mock() + pkc.setup.cfp.has_option.return_value = True + self.assertEqual(pkc.category, pkc.setup.cfp.get.return_value) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.get.assert_called_with("sshkeys", "category") @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") def test_passphrase(self, mock_get_passphrases): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") - cfp.reset_mock() - cfp.get.return_value = "test" + pkc.setup.reset_mock() + pkc.setup.cfp.get.return_value = "test" mock_get_passphrases.return_value = dict(test="foo", test2="bar") - cfp.has_option.return_value = True + pkc.setup.cfp.has_option.return_value = True self.assertEqual(pkc.passphrase, "foo") - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") - cfp.get.assert_called_with("sshkeys", "passphrase") - mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") + pkc.setup.cfp.get.assert_called_with("sshkeys", "passphrase") + mock_get_passphrases.assert_called_with() @patch("shutil.rmtree") @patch("tempfile.mkdtemp") @@ -282,8 +282,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): if HAS_CRYPTO: @patch(passphrase, "foo") @patch("Bcfg2.Encryption.ssl_encrypt") - @patch("Bcfg2.Encryption.get_algorithm") - def inner2(mock_get_algorithm, mock_ssl_encrypt): + def inner2(mock_ssl_encrypt): reset() mock_ssl_encrypt.return_value = "encryptedprivatekey" Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True @@ -302,9 +301,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): "ssh-rsa publickey pubkey.filename\n", group="foo") pkc.write_data.assert_called_with("encryptedprivatekey", group="foo", ext=".crypt") - mock_ssl_encrypt.assert_called_with( - "privatekey", "foo", - algorithm=mock_get_algorithm.return_value) + mock_ssl_encrypt.assert_called_with("privatekey", "foo") mock_rmtree.assert_called_with(datastore) inner2() @@ -317,10 +314,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") def test_Index_crypto(self): - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict" - pkc = self.get_obj() + pkc.setup = Mock() + pkc.setup.cfp.get.return_value = "strict" pkc._decrypt = Mock() pkc._decrypt.return_value = 'plaintext' pkc.data = ''' @@ -348,7 +344,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): self.assertRaises(PluginExecutionError, pkc.Index) # test failed decryption, lax - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax" + pkc.setup.cfp.get.return_value = "lax" pkc._decrypt.reset_mock() pkc.Index() self.assertItemsEqual( @@ -358,17 +354,13 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): pkc = self.get_obj() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock() def reset(): mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() mock_get_passphrases.reset_mock() mock_ssl.reset_mock() @@ -382,29 +374,19 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): reset() el = lxml.etree.Element("Test", encrypted="foo") el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") mock_ssl.return_value = "decrypted with ssl" self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test failure to decrypt element with a passphrase in the config reset() mock_ssl.side_effect = EVPError self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test element without valid passphrase @@ -412,26 +394,14 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): el.set("encrypted", "true") mock_bruteforce.return_value = "decrypted with bruteforce" self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) # test failure to decrypt element without valid passphrase reset() mock_bruteforce.side_effect = EVPError self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index 2e758774e..ee0b0be9d 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -439,8 +439,6 @@ class TestCfgEntrySet(TestEntrySet): @patch("Bcfg2.Server.Plugins.Cfg.u_str") @patch("Bcfg2.Server.Plugins.Cfg.b64encode") def test_bind_entry(self, mock_b64encode, mock_u_str): - Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False) - mock_u_str.side_effect = lambda x: x eset = self.get_obj() @@ -448,6 +446,7 @@ class TestCfgEntrySet(TestEntrySet): eset._generate_data = Mock() eset.get_handlers = Mock() eset._validate_data = Mock() + eset.setup = dict(validate=False) def reset(): mock_b64encode.reset_mock() @@ -523,7 +522,7 @@ class TestCfgEntrySet(TestEntrySet): # test successful validation entry = reset() - Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True + eset.setup['validate'] = True bound = eset.bind_entry(entry, metadata) eset.bind_info_to_entry.assert_called_with(entry, metadata) eset._generate_data.assert_called_with(entry, metadata) @@ -753,31 +752,39 @@ class TestCfg(TestGroupSpool, TestPullTarget): def get_obj(self, core=None): if core is None: core = Mock() - core.setup = MagicMock() return TestGroupSpool.get_obj(self, core=core) + @patch("Bcfg2.Options.get_option_parser") @patch("Bcfg2.Server.Plugin.GroupSpool.__init__") @patch("Bcfg2.Server.Plugin.PullTarget.__init__") - def test__init(self, mock_pulltarget_init, mock_groupspool_init): + def test__init(self, mock_pulltarget_init, mock_groupspool_init, + mock_get_option_parser): + setup = MagicMock() + setup.__contains__.return_value = False + mock_get_option_parser.return_value = setup + + def reset(): + core.reset_mock() + setup.reset_mock() + mock_pulltarget_init.reset_mock() + mock_groupspool_init.reset_mock() + core = Mock() - core.setup = MagicMock() cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - core.setup.add_option.assert_called_with("validate", - Bcfg2.Options.CFG_VALIDATION) - core.setup.reparse.assert_called_with() - - core.reset_mock() - core.setup.reset_mock() - mock_pulltarget_init.reset_mock() - mock_groupspool_init.reset_mock() - core.setup.__contains__.return_value = True + setup.add_option.assert_called_with( + "validate", + Bcfg2.Options.CFG_VALIDATION) + mock_get_option_parser.return_value.reparse.assert_called_with() + + reset() + setup.__contains__.return_value = True cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - self.assertFalse(core.setup.add_option.called) - self.assertFalse(core.setup.reparse.called) + self.assertFalse(setup.add_option.called) + self.assertFalse(setup.reparse.called) def test_has_generator(self): cfg = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index d66780a20..b0ca77a78 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -41,10 +41,10 @@ class TestPropertyFile(Bcfg2TestCase): return self.test_obj(path) def test_write(self): - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() pf = self.get_obj() pf.validate_data = Mock() pf._write = Mock() + pf.setup = Mock() xstr = u("\n") pf.xdata = lxml.etree.XML(xstr) @@ -52,20 +52,20 @@ class TestPropertyFile(Bcfg2TestCase): def reset(): pf.validate_data.reset_mock() pf._write.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() # test writes disabled - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertRaises(PluginExecutionError, pf.write) self.assertFalse(pf.validate_data.called) self.assertFalse(pf._write.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", + pf.setup.cfp.getboolean.assert_called_with("properties", "writes_enabled", default=True) # test successful write reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.write(), pf._write.return_value) pf.validate_data.assert_called_with() pf._write.assert_called_with() @@ -301,17 +301,13 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): pf = self.get_obj() - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() def reset(): mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() mock_get_passphrases.reset_mock() mock_ssl.reset_mock() @@ -325,29 +321,19 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): reset() el = lxml.etree.Element("Test", encrypted="foo") el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") mock_ssl.return_value = "decrypted with ssl" self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test failure to decrypt element with a passphrase in the config reset() mock_ssl.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test element without valid passphrase @@ -355,77 +341,73 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): el.set("encrypted", "true") mock_bruteforce.return_value = "decrypted with bruteforce" self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) # test failure to decrypt element without valid passphrase reset() mock_bruteforce.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) @patch("copy.copy") def test_get_additional_data(self, mock_copy): - Bcfg2.Server.Plugins.Properties.SETUP = Mock() pf = self.get_obj() + pf.setup = Mock() pf.XMLMatch = Mock() metadata = Mock() def reset(): mock_copy.reset_mock() pf.XMLMatch.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() pf.xdata = lxml.etree.Element("Properties", automatch="true") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) pf.xdata = lxml.etree.Element("Properties", automatch="false") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) pf.xdata = lxml.etree.Element("Properties") reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) @@ -449,7 +431,6 @@ class TestProperties(TestPlugin, TestConnector): core = Mock() p = self.get_obj(core=core) self.assertIsInstance(p.store, PropDirectoryBacked) - self.assertEqual(Bcfg2.Server.Plugins.Properties.SETUP, core.setup) @patch("copy.copy") def test_get_additional_data(self, mock_copy): -- cgit v1.2.3-1-g7c22 From 72a80f89361145f1560ccc248f357a9de82eded6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 17 Jan 2013 08:01:44 -0500 Subject: abstracted encryption support from Properties/CfgPrivateKeyCreator to StructFile --- .../TestCfg/TestCfgPrivateKeyCreator.py | 99 ------------------ .../TestServer/TestPlugins/TestProperties.py | 116 --------------------- 2 files changed, 215 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index 4c8ab8b43..bf34d4c3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -306,102 +306,3 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): inner2() - def test_Index(self): - has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False - TestStructFile.test_Index(self) - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pkc = self.get_obj() - pkc.setup = Mock() - pkc.setup.cfp.get.return_value = "strict" - pkc._decrypt = Mock() - pkc._decrypt.return_value = 'plaintext' - pkc.data = ''' - - - crypted - - - plain - -''' - - # test successful decryption - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - for el in pkc.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pkc._decrypt.return_value) - - # test failed decryption, strict - pkc._decrypt.reset_mock() - pkc._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pkc.Index) - - # test failed decryption, lax - pkc.setup.cfp.get.return_value = "lax" - pkc._decrypt.reset_mock() - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pkc = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index b0ca77a78..b63d08524 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -18,12 +18,6 @@ from common import * from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \ TestPlugin, TestDirectoryBacked -try: - from Bcfg2.Encryption import EVPError - HAS_CRYPTO = True -except: - HAS_CRYPTO = False - try: import json JSON = "json" @@ -243,116 +237,6 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) - def test_Index(self): - TestStructFile.test_Index(self) - - pf = self.get_obj() - pf.xdata = lxml.etree.Element("Properties") - lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo") - pf.data = lxml.etree.tostring(pf.xdata) - # extra test: crypto is not available, but properties file is - # encrypted - has_crypto = Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = False - try: - self.assertRaises(PluginExecutionError, pf.Index) - finally: - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pf = self.get_obj() - pf._decrypt = Mock() - pf._decrypt.return_value = 'plaintext' - pf.data = ''' - - - crypted - plain - - crypted - plain - - crypted - -''' - - print "HAS_CRYPTO: %s" % HAS_CRYPTO - print "Properties HAS_CRYPTO: %s" % Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - - # test successful decryption - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - for el in pf.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pf._decrypt.return_value) - - # test failed decryption, strict - pf._decrypt.reset_mock() - pf._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pf.Index) - - # test failed decryption, lax - pf.data = pf.data.replace("strict", "lax") - pf._decrypt.reset_mock() - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pf = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - @patch("copy.copy") def test_get_additional_data(self, mock_copy): pf = self.get_obj() -- cgit v1.2.3-1-g7c22 From 45c7bbf24ae3c6530f33ebb33c062818ad44816d Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 30 Oct 2012 11:35:46 -0400 Subject: added a module-level FAM object to avoid passing it as an argument a billion times --- .../Testlib/TestServer/TestPlugins/TestMetadata.py | 53 ++++++++++++---------- .../Testlib/TestServer/TestPlugins/TestProbes.py | 22 +++++---- 2 files changed, 41 insertions(+), 34 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index f627e4465..b1db34462 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -6,6 +6,7 @@ import socket import lxml.etree import Bcfg2.Server import Bcfg2.Server.Plugin +from Bcfg2.Server.FileMonitor import _FAM from Bcfg2.Server.Plugins.Metadata import * from mock import Mock, MagicMock, patch @@ -206,8 +207,7 @@ class TestXMLMetadataConfig(TestXMLFileBacked): def test__init(self): xmc = self.get_obj() - self.assertEqual(self.metadata.core.fam, xmc.fam) - self.assertFalse(xmc.fam.AddMonitor.called) + self.assertFalse(_FAM.AddMonitor.called) def test_xdata(self): config = self.get_obj() @@ -259,12 +259,12 @@ class TestXMLMetadataConfig(TestXMLFileBacked): config.extras = [] config.add_monitor(fpath) - self.assertFalse(core.fam.AddMonitor.called) + self.assertFalse(_FAM.AddMonitor.called) self.assertEqual(config.extras, [fpath]) config = self.get_obj(core=core, watch_clients=True) config.add_monitor(fpath) - core.fam.AddMonitor.assert_called_with(fpath, config.metadata) + _FAM.AddMonitor.assert_called_with(fpath, config.metadata) self.assertItemsEqual(config.extras, [fpath]) def test_Index(self): @@ -501,20 +501,22 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): self.assertEqual(metadata.states, dict()) # test with watch_clients=True - core.fam = MagicMock() + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = MagicMock() metadata = self.get_obj(core=core, watch_clients=True) self.assertEqual(len(metadata.states), 2) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - core.fam.reset_mock() - core.fam.AddMonitor = Mock(side_effect=IOError) + Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_any_call(os.path.join(metadata.data, + "groups.xml"), + metadata) + Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_any_call(os.path.join(metadata.data, + "clients.xml"), + metadata) + + Bcfg2.Server.FileMonitor._FAM.reset_mock() + Bcfg2.Server.FileMonitor._FAM.AddMonitor = Mock(side_effect=IOError) self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, self.get_obj, core=core, watch_clients=True) + Bcfg2.Server.FileMonitor._FAM = fam @patch('os.makedirs', Mock()) @patch('%s.open' % builtins) @@ -1260,17 +1262,17 @@ class TestMetadataBase(TestMetadata): @patch('os.path.exists') def test__init(self, mock_exists): - core = MagicMock() - core.fam = Mock() + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = MagicMock() mock_exists.return_value = False - metadata = self.get_obj(core=core, watch_clients=True) + metadata = self.get_obj(watch_clients=True) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) - core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, - "groups.xml"), - metadata) + Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_called_once_with( + os.path.join(metadata.data, "groups.xml"), + metadata) mock_exists.return_value = True - core.fam.reset_mock() + Bcfg2.Server.FileMonitor._FAM.reset_mock() metadata = self.get_obj(core=core, watch_clients=True) core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, "groups.xml"), @@ -1516,9 +1518,12 @@ class TestMetadata_ClientsXML(TestMetadataBase): def load_clients_data(self, metadata=None, xdata=None): if metadata is None: metadata = self.get_obj() - metadata.core.fam = Mock() + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = MagicMock() metadata.clients_xml = metadata._handle_file("clients.xml") metadata = TestMetadata.load_clients_data(self, metadata=metadata, xdata=xdata) - return TestMetadataBase.load_clients_data(self, metadata=metadata, - xdata=xdata) + rv = TestMetadataBase.load_clients_data(self, metadata=metadata, + xdata=xdata) + Bcfg2.Server.FileMonitor._FAM = fam + return rv diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index 899fb24a0..2e1d6df51 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -16,6 +16,7 @@ while path != "/": break path = os.path.dirname(path) from common import * +from Bcfg2.Server.FileMonitor import _FAM from Bcfg2.Server.Plugins.Probes import * from TestPlugin import TestEntrySet, TestProbing, TestConnector, \ TestDatabaseBacked @@ -91,22 +92,23 @@ class TestProbeSet(TestEntrySet): ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"] bogus_names = ["test.py"] - def get_obj(self, path=datastore, fam=None, encoding=None, + def get_obj(self, path=datastore, encoding=None, plugin_name="Probes", basename=None): # get_obj() accepts the basename argument, accepted by the # parent get_obj() method, and just throws it away, since # ProbeSet uses a regex for the "basename" - if fam is None: - fam = Mock() - rv = self.test_obj(path, fam, encoding, plugin_name) + rv = self.test_obj(path, encoding, plugin_name) rv.entry_type = MagicMock() return rv def test__init(self): - fam = Mock() - ps = self.get_obj(fam=fam) + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = Mock() + ps = self.get_obj() self.assertEqual(ps.plugin_name, "Probes") - fam.AddMonitor.assert_called_with(datastore, ps) + Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_called_with(datastore, + ps) + Bcfg2.Server.FileMonitor._FAM = fam TestEntrySet.test__init(self) def test_HandleEvent(self): @@ -256,9 +258,9 @@ text def test__init(self): mock_load_data = Mock() probes = self.get_probes_object(load_data=mock_load_data) - probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore, - probes.name), - probes.probes) + _FAM.AddMonitor.assert_called_with(os.path.join(datastore, + probes.name), + probes.probes) mock_load_data.assert_any_call() self.assertEqual(probes.probedata, ClientProbeDataSet()) self.assertEqual(probes.cgroups, dict()) -- cgit v1.2.3-1-g7c22 From cebd0d7ad54995c37f68586a14540ad64d99d762 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 31 Oct 2012 11:46:51 -0400 Subject: fixed unit tests --- .../Testlib/TestServer/TestPlugins/TestMetadata.py | 71 ++++++++++++---------- .../Testlib/TestServer/TestPlugins/TestProbes.py | 13 +--- 2 files changed, 42 insertions(+), 42 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index b1db34462..0afa5220d 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -6,7 +6,6 @@ import socket import lxml.etree import Bcfg2.Server import Bcfg2.Server.Plugin -from Bcfg2.Server.FileMonitor import _FAM from Bcfg2.Server.Plugins.Metadata import * from mock import Mock, MagicMock, patch @@ -205,9 +204,10 @@ class TestXMLMetadataConfig(TestXMLFileBacked): watch_clients=watch_clients) return XMLMetadataConfig(self.metadata, watch_clients, basefile) + @patch("Bcfg2.Server.FileMonitor.get_fam", Mock()) def test__init(self): xmc = self.get_obj() - self.assertFalse(_FAM.AddMonitor.called) + self.assertFalse(xmc.fam.AddMonitor.called) def test_xdata(self): config = self.get_obj() @@ -251,20 +251,21 @@ class TestXMLMetadataConfig(TestXMLFileBacked): self.assertEqual(config.base_xdata, "") def test_add_monitor(self): - core = MagicMock() - config = self.get_obj(core=core) + config = self.get_obj() + config.fam = Mock() fname = "test.xml" fpath = os.path.join(self.metadata.data, fname) config.extras = [] config.add_monitor(fpath) - self.assertFalse(_FAM.AddMonitor.called) + self.assertFalse(config.fam.AddMonitor.called) self.assertEqual(config.extras, [fpath]) - config = self.get_obj(core=core, watch_clients=True) + config = self.get_obj(watch_clients=True) + config.fam = Mock() config.add_monitor(fpath) - _FAM.AddMonitor.assert_called_with(fpath, config.metadata) + config.fam.AddMonitor.assert_called_with(fpath, config.metadata) self.assertItemsEqual(config.extras, [fpath]) def test_Index(self): @@ -488,7 +489,8 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): client_name = "%s%s" % (prefix, i) return client_name - def test__init(self): + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): # test with watch_clients=False core = MagicMock() metadata = self.get_obj(core=core) @@ -501,22 +503,21 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): self.assertEqual(metadata.states, dict()) # test with watch_clients=True - fam = Bcfg2.Server.FileMonitor._FAM - Bcfg2.Server.FileMonitor._FAM = MagicMock() metadata = self.get_obj(core=core, watch_clients=True) self.assertEqual(len(metadata.states), 2) - Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - Bcfg2.Server.FileMonitor._FAM.reset_mock() - Bcfg2.Server.FileMonitor._FAM.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) + + mock_get_fam.reset_mock() + fam = Mock() + fam.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value = fam self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, self.get_obj, core=core, watch_clients=True) - Bcfg2.Server.FileMonitor._FAM = fam @patch('os.makedirs', Mock()) @patch('%s.open' % builtins) @@ -577,6 +578,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_add_group(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -609,6 +611,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_update_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -626,6 +629,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_remove_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -641,6 +645,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_add_bundle(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -664,6 +669,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_remove_bundle(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -679,6 +685,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_add_client(self): metadata = self.get_obj() metadata.clients_xml.write = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = lxml.etree.XML('').getroottree() metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -713,6 +720,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_update_client(self): metadata = self.get_obj() metadata.clients_xml.write_xml = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree()) metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -1261,25 +1269,24 @@ class TestMetadataBase(TestMetadata): return client_name @patch('os.path.exists') - def test__init(self, mock_exists): - fam = Bcfg2.Server.FileMonitor._FAM - Bcfg2.Server.FileMonitor._FAM = MagicMock() + @patch('Bcfg2.Server.FileMonitor.get_fam') + def test__init(self, mock_get_fam, mock_exists): mock_exists.return_value = False metadata = self.get_obj(watch_clients=True) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) - Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_called_once_with( + mock_get_fam.return_value.AddMonitor.assert_called_with( os.path.join(metadata.data, "groups.xml"), metadata) mock_exists.return_value = True - Bcfg2.Server.FileMonitor._FAM.reset_mock() - metadata = self.get_obj(core=core, watch_clients=True) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) + mock_get_fam.reset_mock() + metadata = self.get_obj(watch_clients=True) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) def test_add_group(self): pass diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index 2e1d6df51..958dba4ff 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -16,7 +16,6 @@ while path != "/": break path = os.path.dirname(path) from common import * -from Bcfg2.Server.FileMonitor import _FAM from Bcfg2.Server.Plugins.Probes import * from TestPlugin import TestEntrySet, TestProbing, TestConnector, \ TestDatabaseBacked @@ -101,14 +100,11 @@ class TestProbeSet(TestEntrySet): rv.entry_type = MagicMock() return rv - def test__init(self): - fam = Bcfg2.Server.FileMonitor._FAM - Bcfg2.Server.FileMonitor._FAM = Mock() + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): ps = self.get_obj() self.assertEqual(ps.plugin_name, "Probes") - Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_called_with(datastore, - ps) - Bcfg2.Server.FileMonitor._FAM = fam + mock_get_fam.return_value.AddMonitor.assert_called_with(datastore, ps) TestEntrySet.test__init(self) def test_HandleEvent(self): @@ -258,9 +254,6 @@ text def test__init(self): mock_load_data = Mock() probes = self.get_probes_object(load_data=mock_load_data) - _FAM.AddMonitor.assert_called_with(os.path.join(datastore, - probes.name), - probes.probes) mock_load_data.assert_any_call() self.assertEqual(probes.probedata, ClientProbeDataSet()) self.assertEqual(probes.cgroups, dict()) -- cgit v1.2.3-1-g7c22 From 09e6b3eac584c0e3558c062610da1824bb5eb9b2 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 16 Jan 2013 08:00:31 -0500 Subject: fixed unit tests --- testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py index 43d594482..4db92b7c4 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py @@ -25,7 +25,7 @@ class TestHelperModule(Bcfg2TestCase): def get_obj(self, path=None): if path is None: path = self.path - return self.test_obj(path, fam=Mock()) + return self.test_obj(path) def test__init(self): hm = self.get_obj() -- cgit v1.2.3-1-g7c22 From 9be9cfec322518f764be9766b27d24132fc6a66f Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 16 Jan 2013 13:28:06 -0500 Subject: added module-level OptionParser to avoid passing it as an argument or global all over --- .../TestCfg/TestCfgAuthorizedKeysGenerator.py | 32 +++--- .../TestPlugins/TestCfg/TestCfgCheetahGenerator.py | 7 +- .../TestCfg/TestCfgEncryptedGenerator.py | 9 +- .../TestPlugins/TestCfg/TestCfgGenshiGenerator.py | 14 +-- .../TestCfg/TestCfgPrivateKeyCreator.py | 126 ++++++++------------- .../TestServer/TestPlugins/TestCfg/Test_init.py | 41 ++++--- .../TestServer/TestPlugins/TestProperties.py | 85 ++++++-------- 7 files changed, 132 insertions(+), 182 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py index d655a20cd..b77d52033 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py @@ -42,27 +42,25 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): def test_category(self): akg = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp + akg.setup = Mock() + akg.setup.cfp.has_section.return_value = False + akg.setup.cfp.has_option.return_value = False self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + akg.setup.reset_mock() + akg.setup.cfp.has_section.return_value = True self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(akg.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + + akg.setup.reset_mock() + akg.setup.cfp.has_option.return_value = True + self.assertEqual(akg.category, akg.setup.cfp.get.return_value) + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + akg.setup.cfp.get.assert_called_with("sshkeys", "category") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py index fc5d5e53d..31227329c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py @@ -31,11 +31,11 @@ if HAS_CHEETAH or can_skip: ccg.data = "data" entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock() + ccg.setup = MagicMock() self.assertEqual(ccg.get_data(entry, metadata), mock_Template.return_value.respond.return_value) - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo") + ccg.setup.__getitem__.assert_called_with("repo") mock_Template.assert_called_with("data".decode(ccg.encoding), compilerSettings=ccg.settings) tmpl = mock_Template.return_value @@ -44,5 +44,4 @@ if HAS_CHEETAH or can_skip: self.assertEqual(tmpl.name, entry.get("name")) self.assertEqual(tmpl.path, entry.get("name")) self.assertEqual(tmpl.source_path, ccg.name) - self.assertEqual(tmpl.repo, - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value) + self.assertEqual(tmpl.repo, ccg.setup.__getitem__.return_value) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py index 71a7410da..f8e9b1990 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py @@ -26,16 +26,13 @@ if can_skip or HAS_CRYPTO: def setUp(self): pass - @patchIf(HAS_CRYPTO, - "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") - def test_handle_event(self, mock_decrypt, mock_get_algorithm): + def test_handle_event(self, mock_decrypt): @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") def inner(mock_handle_event): def reset(): mock_decrypt.reset_mock() - mock_get_algorithm.reset_mock() mock_handle_event.reset_mock() def get_event_data(obj, event): @@ -47,9 +44,7 @@ if can_skip or HAS_CRYPTO: ceg = self.get_obj() ceg.handle_event(event) mock_handle_event.assert_called_with(ceg, event) - mock_decrypt.assert_called_with("encrypted", - setup=SETUP, - algorithm=mock_get_algorithm.return_value) + mock_decrypt.assert_called_with("encrypted") self.assertEqual(ceg.data, "plaintext") reset() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py index 385f8df77..88f3cf3f7 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -43,6 +43,7 @@ if can_skip or HAS_GENSHI: def test_get_data(self): cgg = self.get_obj() cgg._handle_genshi_exception = Mock() + cgg.setup = MagicMock() cgg.template = Mock() fltr = Mock() cgg.template.generate.return_value = fltr @@ -51,24 +52,23 @@ if can_skip or HAS_GENSHI: entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock() def reset(): cgg.template.reset_mock() cgg._handle_genshi_exception.reset_mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock() + cgg.setup.reset_mock() template_vars = dict( name=entry.get("name"), metadata=metadata, path=cgg.name, source_path=cgg.name, - repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value) + repo=cgg.setup.__getitem__.return_value) self.assertEqual(cgg.get_data(entry, metadata), stream.render.return_value) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) @@ -81,7 +81,7 @@ if can_skip or HAS_GENSHI: self.assertEqual(cgg.get_data(entry, metadata), stream.render.return_value) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) self.assertEqual(stream.render.call_args_list, [call("text", encoding=cgg.encoding, @@ -93,7 +93,7 @@ if can_skip or HAS_GENSHI: self.assertRaises(UndefinedError, cgg.get_data, entry, metadata) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) @@ -104,7 +104,7 @@ if can_skip or HAS_GENSHI: self.assertRaises(ValueError, cgg.get_data, entry, metadata) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index dc4b11241..4c8ab8b43 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -44,56 +44,56 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): def test_category(self): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(pkc.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + pkc.setup.reset_mock() + pkc.setup.cfp.has_option.return_value = True + self.assertEqual(pkc.category, pkc.setup.cfp.get.return_value) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.get.assert_called_with("sshkeys", "category") @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") def test_passphrase(self, mock_get_passphrases): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") - cfp.reset_mock() - cfp.get.return_value = "test" + pkc.setup.reset_mock() + pkc.setup.cfp.get.return_value = "test" mock_get_passphrases.return_value = dict(test="foo", test2="bar") - cfp.has_option.return_value = True + pkc.setup.cfp.has_option.return_value = True self.assertEqual(pkc.passphrase, "foo") - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") - cfp.get.assert_called_with("sshkeys", "passphrase") - mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") + pkc.setup.cfp.get.assert_called_with("sshkeys", "passphrase") + mock_get_passphrases.assert_called_with() @patch("shutil.rmtree") @patch("tempfile.mkdtemp") @@ -282,8 +282,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): if HAS_CRYPTO: @patch(passphrase, "foo") @patch("Bcfg2.Encryption.ssl_encrypt") - @patch("Bcfg2.Encryption.get_algorithm") - def inner2(mock_get_algorithm, mock_ssl_encrypt): + def inner2(mock_ssl_encrypt): reset() mock_ssl_encrypt.return_value = "encryptedprivatekey" Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True @@ -302,9 +301,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): "ssh-rsa publickey pubkey.filename\n", group="foo") pkc.write_data.assert_called_with("encryptedprivatekey", group="foo", ext=".crypt") - mock_ssl_encrypt.assert_called_with( - "privatekey", "foo", - algorithm=mock_get_algorithm.return_value) + mock_ssl_encrypt.assert_called_with("privatekey", "foo") mock_rmtree.assert_called_with(datastore) inner2() @@ -317,10 +314,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") def test_Index_crypto(self): - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict" - pkc = self.get_obj() + pkc.setup = Mock() + pkc.setup.cfp.get.return_value = "strict" pkc._decrypt = Mock() pkc._decrypt.return_value = 'plaintext' pkc.data = ''' @@ -348,7 +344,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): self.assertRaises(PluginExecutionError, pkc.Index) # test failed decryption, lax - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax" + pkc.setup.cfp.get.return_value = "lax" pkc._decrypt.reset_mock() pkc.Index() self.assertItemsEqual( @@ -358,17 +354,13 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): pkc = self.get_obj() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock() def reset(): mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() mock_get_passphrases.reset_mock() mock_ssl.reset_mock() @@ -382,29 +374,19 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): reset() el = lxml.etree.Element("Test", encrypted="foo") el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") mock_ssl.return_value = "decrypted with ssl" self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test failure to decrypt element with a passphrase in the config reset() mock_ssl.side_effect = EVPError self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test element without valid passphrase @@ -412,26 +394,14 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): el.set("encrypted", "true") mock_bruteforce.return_value = "decrypted with bruteforce" self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) # test failure to decrypt element without valid passphrase reset() mock_bruteforce.side_effect = EVPError self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index 2e758774e..ee0b0be9d 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -439,8 +439,6 @@ class TestCfgEntrySet(TestEntrySet): @patch("Bcfg2.Server.Plugins.Cfg.u_str") @patch("Bcfg2.Server.Plugins.Cfg.b64encode") def test_bind_entry(self, mock_b64encode, mock_u_str): - Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False) - mock_u_str.side_effect = lambda x: x eset = self.get_obj() @@ -448,6 +446,7 @@ class TestCfgEntrySet(TestEntrySet): eset._generate_data = Mock() eset.get_handlers = Mock() eset._validate_data = Mock() + eset.setup = dict(validate=False) def reset(): mock_b64encode.reset_mock() @@ -523,7 +522,7 @@ class TestCfgEntrySet(TestEntrySet): # test successful validation entry = reset() - Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True + eset.setup['validate'] = True bound = eset.bind_entry(entry, metadata) eset.bind_info_to_entry.assert_called_with(entry, metadata) eset._generate_data.assert_called_with(entry, metadata) @@ -753,31 +752,39 @@ class TestCfg(TestGroupSpool, TestPullTarget): def get_obj(self, core=None): if core is None: core = Mock() - core.setup = MagicMock() return TestGroupSpool.get_obj(self, core=core) + @patch("Bcfg2.Options.get_option_parser") @patch("Bcfg2.Server.Plugin.GroupSpool.__init__") @patch("Bcfg2.Server.Plugin.PullTarget.__init__") - def test__init(self, mock_pulltarget_init, mock_groupspool_init): + def test__init(self, mock_pulltarget_init, mock_groupspool_init, + mock_get_option_parser): + setup = MagicMock() + setup.__contains__.return_value = False + mock_get_option_parser.return_value = setup + + def reset(): + core.reset_mock() + setup.reset_mock() + mock_pulltarget_init.reset_mock() + mock_groupspool_init.reset_mock() + core = Mock() - core.setup = MagicMock() cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - core.setup.add_option.assert_called_with("validate", - Bcfg2.Options.CFG_VALIDATION) - core.setup.reparse.assert_called_with() - - core.reset_mock() - core.setup.reset_mock() - mock_pulltarget_init.reset_mock() - mock_groupspool_init.reset_mock() - core.setup.__contains__.return_value = True + setup.add_option.assert_called_with( + "validate", + Bcfg2.Options.CFG_VALIDATION) + mock_get_option_parser.return_value.reparse.assert_called_with() + + reset() + setup.__contains__.return_value = True cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - self.assertFalse(core.setup.add_option.called) - self.assertFalse(core.setup.reparse.called) + self.assertFalse(setup.add_option.called) + self.assertFalse(setup.reparse.called) def test_has_generator(self): cfg = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index d66780a20..b0ca77a78 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -41,10 +41,10 @@ class TestPropertyFile(Bcfg2TestCase): return self.test_obj(path) def test_write(self): - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() pf = self.get_obj() pf.validate_data = Mock() pf._write = Mock() + pf.setup = Mock() xstr = u("\n") pf.xdata = lxml.etree.XML(xstr) @@ -52,20 +52,20 @@ class TestPropertyFile(Bcfg2TestCase): def reset(): pf.validate_data.reset_mock() pf._write.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() # test writes disabled - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertRaises(PluginExecutionError, pf.write) self.assertFalse(pf.validate_data.called) self.assertFalse(pf._write.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", + pf.setup.cfp.getboolean.assert_called_with("properties", "writes_enabled", default=True) # test successful write reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.write(), pf._write.return_value) pf.validate_data.assert_called_with() pf._write.assert_called_with() @@ -301,17 +301,13 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): pf = self.get_obj() - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() def reset(): mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() mock_get_passphrases.reset_mock() mock_ssl.reset_mock() @@ -325,29 +321,19 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): reset() el = lxml.etree.Element("Test", encrypted="foo") el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") mock_ssl.return_value = "decrypted with ssl" self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test failure to decrypt element with a passphrase in the config reset() mock_ssl.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test element without valid passphrase @@ -355,77 +341,73 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): el.set("encrypted", "true") mock_bruteforce.return_value = "decrypted with bruteforce" self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) # test failure to decrypt element without valid passphrase reset() mock_bruteforce.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) @patch("copy.copy") def test_get_additional_data(self, mock_copy): - Bcfg2.Server.Plugins.Properties.SETUP = Mock() pf = self.get_obj() + pf.setup = Mock() pf.XMLMatch = Mock() metadata = Mock() def reset(): mock_copy.reset_mock() pf.XMLMatch.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() pf.xdata = lxml.etree.Element("Properties", automatch="true") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) pf.xdata = lxml.etree.Element("Properties", automatch="false") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) pf.xdata = lxml.etree.Element("Properties") reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) @@ -449,7 +431,6 @@ class TestProperties(TestPlugin, TestConnector): core = Mock() p = self.get_obj(core=core) self.assertIsInstance(p.store, PropDirectoryBacked) - self.assertEqual(Bcfg2.Server.Plugins.Properties.SETUP, core.setup) @patch("copy.copy") def test_get_additional_data(self, mock_copy): -- cgit v1.2.3-1-g7c22 From 1a031fc131d950dd49dc425ac1726337d8e93910 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 17 Jan 2013 08:01:44 -0500 Subject: abstracted encryption support from Properties/CfgPrivateKeyCreator to StructFile --- .../TestCfg/TestCfgPrivateKeyCreator.py | 99 ------------------ .../TestServer/TestPlugins/TestProperties.py | 116 --------------------- 2 files changed, 215 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index 4c8ab8b43..bf34d4c3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -306,102 +306,3 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): inner2() - def test_Index(self): - has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False - TestStructFile.test_Index(self) - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pkc = self.get_obj() - pkc.setup = Mock() - pkc.setup.cfp.get.return_value = "strict" - pkc._decrypt = Mock() - pkc._decrypt.return_value = 'plaintext' - pkc.data = ''' - - - crypted - - - plain - -''' - - # test successful decryption - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - for el in pkc.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pkc._decrypt.return_value) - - # test failed decryption, strict - pkc._decrypt.reset_mock() - pkc._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pkc.Index) - - # test failed decryption, lax - pkc.setup.cfp.get.return_value = "lax" - pkc._decrypt.reset_mock() - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pkc = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index b0ca77a78..b63d08524 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -18,12 +18,6 @@ from common import * from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \ TestPlugin, TestDirectoryBacked -try: - from Bcfg2.Encryption import EVPError - HAS_CRYPTO = True -except: - HAS_CRYPTO = False - try: import json JSON = "json" @@ -243,116 +237,6 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) - def test_Index(self): - TestStructFile.test_Index(self) - - pf = self.get_obj() - pf.xdata = lxml.etree.Element("Properties") - lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo") - pf.data = lxml.etree.tostring(pf.xdata) - # extra test: crypto is not available, but properties file is - # encrypted - has_crypto = Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = False - try: - self.assertRaises(PluginExecutionError, pf.Index) - finally: - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pf = self.get_obj() - pf._decrypt = Mock() - pf._decrypt.return_value = 'plaintext' - pf.data = ''' - - - crypted - plain - - crypted - plain - - crypted - -''' - - print "HAS_CRYPTO: %s" % HAS_CRYPTO - print "Properties HAS_CRYPTO: %s" % Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - - # test successful decryption - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - for el in pf.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pf._decrypt.return_value) - - # test failed decryption, strict - pf._decrypt.reset_mock() - pf._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pf.Index) - - # test failed decryption, lax - pf.data = pf.data.replace("strict", "lax") - pf._decrypt.reset_mock() - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pf = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - @patch("copy.copy") def test_get_additional_data(self, mock_copy): pf = self.get_obj() -- cgit v1.2.3-1-g7c22 From 22029e107420ff21cf9f1811bf4bb6dc2aba1dde Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 22 Jan 2013 11:16:19 -0500 Subject: made genshi a requirement --- .../TestCfg/TestCfgEncryptedGenshiGenerator.py | 12 +- .../TestPlugins/TestCfg/TestCfgGenshiGenerator.py | 202 ++++++++++----------- 2 files changed, 97 insertions(+), 117 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py index b447a9bb8..330779c99 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py @@ -14,20 +14,14 @@ while path != "/": path = os.path.dirname(path) from common import * -try: - from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ - TestCfgGenshiGenerator - HAS_GENSHI = True -except ImportError: - TestCfgGenshiGenerator = object - HAS_GENSHI = False +from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ + TestCfgGenshiGenerator -if can_skip or (HAS_CRYPTO and HAS_GENSHI): +if can_skip or HAS_CRYPTO: class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator): test_obj = CfgEncryptedGenshiGenerator @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") def setUp(self): pass diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py index 88f3cf3f7..154d6a8db 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -18,111 +18,97 @@ from common import * from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator -if can_skip or HAS_GENSHI: - class TestCfgGenshiGenerator(TestCfgGenerator): - test_obj = CfgGenshiGenerator - - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") - def setUp(self): - pass - - def test_removecomment(self): - data = [(None, "test", 1), - (None, "test2", 2)] - stream = [(genshi.core.COMMENT, "test", 0), - data[0], - (genshi.core.COMMENT, "test3", 0), - data[1]] - self.assertItemsEqual(list(removecomment(stream)), data) - - def test__init(self): - TestCfgGenerator.test__init(self) - cgg = self.get_obj() - self.assertIsInstance(cgg.loader, cgg.__loader_cls__) - - def test_get_data(self): - cgg = self.get_obj() - cgg._handle_genshi_exception = Mock() - cgg.setup = MagicMock() - cgg.template = Mock() - fltr = Mock() - cgg.template.generate.return_value = fltr - stream = Mock() - fltr.filter.return_value = stream - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() - - - def reset(): - cgg.template.reset_mock() - cgg._handle_genshi_exception.reset_mock() - cgg.setup.reset_mock() - - template_vars = dict( - name=entry.get("name"), - metadata=metadata, - path=cgg.name, - source_path=cgg.name, - repo=cgg.setup.__getitem__.return_value) - - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - cgg.setup.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - def render(fmt, **kwargs): - stream.render.side_effect = None - raise TypeError - stream.render.side_effect = render - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - cgg.setup.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - self.assertEqual(stream.render.call_args_list, - [call("text", encoding=cgg.encoding, - strip_whitespace=False), - call("text", encoding=cgg.encoding)]) - - reset() - stream.render.side_effect = UndefinedError("test") - self.assertRaises(UndefinedError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - cgg.setup.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - stream.render.side_effect = ValueError - cgg._handle_genshi_exception.side_effect = ValueError - self.assertRaises(ValueError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - cgg.setup.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - self.assertTrue(cgg._handle_genshi_exception.called) - - def test_handle_event(self): - cgg = self.get_obj() - cgg.loader = Mock() - event = Mock() - cgg.handle_event(event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) - - cgg.loader.reset_mock() - cgg.loader.load.side_effect = OSError - self.assertRaises(PluginExecutionError, - cgg.handle_event, event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) +class TestCfgGenshiGenerator(TestCfgGenerator): + test_obj = CfgGenshiGenerator + + def test__init(self): + TestCfgGenerator.test__init(self) + cgg = self.get_obj() + self.assertIsInstance(cgg.loader, cgg.__loader_cls__) + + def test_get_data(self): + cgg = self.get_obj() + cgg._handle_genshi_exception = Mock() + cgg.setup = MagicMock() + cgg.template = Mock() + fltr = Mock() + cgg.template.generate.return_value = fltr + stream = Mock() + fltr.filter.return_value = stream + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + + def reset(): + cgg.template.reset_mock() + cgg._handle_genshi_exception.reset_mock() + cgg.setup.reset_mock() + + template_vars = dict( + name=entry.get("name"), + metadata=metadata, + path=cgg.name, + source_path=cgg.name, + repo=cgg.setup.__getitem__.return_value) + + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + def render(fmt, **kwargs): + stream.render.side_effect = None + raise TypeError + stream.render.side_effect = render + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + self.assertEqual(stream.render.call_args_list, + [call("text", encoding=cgg.encoding, + strip_whitespace=False), + call("text", encoding=cgg.encoding)]) + + reset() + stream.render.side_effect = UndefinedError("test") + self.assertRaises(UndefinedError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + stream.render.side_effect = ValueError + cgg._handle_genshi_exception.side_effect = ValueError + self.assertRaises(ValueError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + self.assertTrue(cgg._handle_genshi_exception.called) + + def test_handle_event(self): + cgg = self.get_obj() + cgg.loader = Mock() + event = Mock() + cgg.handle_event(event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) + + cgg.loader.reset_mock() + cgg.loader.load.side_effect = OSError + self.assertRaises(PluginExecutionError, + cgg.handle_event, event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) -- cgit v1.2.3-1-g7c22 From c3c449f509e5f849212e853b410b7b74341d3e1e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 4 Feb 2013 16:21:03 -0500 Subject: testsuite: added unit tests for Bundler --- .../Testlib/TestServer/TestPlugins/TestBundler.py | 72 ++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py new file mode 100644 index 000000000..bc6a21b84 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -0,0 +1,72 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Bundler import * +from Bcfg2.Server.Plugin import PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestStructure, \ + TestXMLDirectoryBacked + + +class TestBundleFile(TestStructFile): + test_obj = BundleFile + path = os.path.join(datastore, "test", "test1.xml") + + def test_bundle_name(self): + cases = [("foo.xml", "foo"), + ("foo.bar.xml", "foo.bar"), + ("foo-bar-baz.xml", "foo-bar-baz"), + ("foo....xml", "foo..."), + ("foo.genshi", "foo")] + bf = self.get_obj() + for fname, bname in cases: + bf.name = fname + self.assertEqual(bf.bundle_name, bname) + + +class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): + test_obj = Bundler + + def get_obj(self, core=None): + @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, + self.test_obj.__name__), + Mock()) + def inner(): + return TestPlugin.get_obj(self, core=core) + return inner() + + @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent") + def test_HandleEvent(self, mock_HandleEvent): + b = self.get_obj() + b.bundles = dict(foo=Mock(), bar=Mock()) + b.entries = {"foo.xml": BundleFile("foo.xml"), + "baz.xml": BundleFile("baz.xml")} + event = Mock() + b.HandleEvent(event) + mock_HandleEvent.assert_called_with(b, event) + self.assertItemsEqual(b.bundles, + dict(foo=b.entries['foo.xml'], + baz=b.entries['baz.xml'])) + + def test_BuildStructures(self): + b = self.get_obj() + b.bundles = dict(foo=Mock(), bar=Mock(), baz=Mock()) + metadata = Mock() + metadata.bundles = ["foo", "baz"] + + self.assertItemsEqual(b.BuildStructures(metadata), + [b.bundles[n].XMLMatch.return_value + for n in metadata.bundles]) + for bname in metadata.bundles: + b.bundles[bname].XMLMatch.assert_called_with(metadata) -- cgit v1.2.3-1-g7c22 From 5d237f71575a109c10d5aad8d70dc5dda00a2d96 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 5 Feb 2013 09:00:04 -0500 Subject: testsuite: wrote unit tests for Decisions --- .../Testlib/TestServer/TestPlugins/TestBundler.py | 1 - .../TestServer/TestPlugins/TestDecisions.py | 60 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py index bc6a21b84..f64d66d11 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -3,7 +3,6 @@ import sys import lxml.etree from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugins.Bundler import * -from Bcfg2.Server.Plugin import PluginExecutionError # add all parent testsuite directories to sys.path to allow (most) # relative imports in python 2.4 diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py new file mode 100644 index 000000000..537ceb4ff --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py @@ -0,0 +1,60 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Decisions import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestDecision + + +class TestDecisionFile(TestStructFile): + test_obj = DecisionFile + + def test_get_decisions(self): + df = self.get_obj() + metadata = Mock() + + df.xdata = None + self.assertItemsEqual(df.get_decisions(metadata), []) + + df.xdata = lxml.etree.Element("Decisions") + df.XMLMatch = Mock() + df.XMLMatch.return_value = lxml.etree.Element("Decisions") + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Service", name='*') + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Path", + name='/etc/apt/apt.conf') + + self.assertItemsEqual(df.get_decisions(metadata), + [("Service", '*'), + ("Path", '/etc/apt/apt.conf')]) + df.XMLMatch.assert_called_with(metadata) + + +class TestDecisions(TestPlugin, TestDecision): + test_obj = Decisions + + def test_GetDecisions(self): + d = self.get_obj() + d.whitelist = Mock() + d.blacklist = Mock() + metadata = Mock() + + self.assertEqual(d.GetDecisions(metadata, "whitelist"), + d.whitelist.get_decision.return_value) + d.whitelist.get_decision.assert_called_with(metadata) + + self.assertEqual(d.GetDecisions(metadata, "blacklist"), + d.blacklist.get_decision.return_value) + d.blacklist.get_decision.assert_called_with(metadata) -- cgit v1.2.3-1-g7c22 From 25cb6db5ccb0c8e8302c220a90344a95baf3909b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 5 Feb 2013 14:04:09 -0500 Subject: moved some libraries in Bcfg2/ into more specific (Server/ or Client/) places --- .../TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index bf34d4c3c..2982825a9 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -7,7 +7,7 @@ from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import * from Bcfg2.Server.Plugin import PluginExecutionError import Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator try: - from Bcfg2.Encryption import EVPError + from Bcfg2.Server.Encryption import EVPError HAS_CRYPTO = True except: HAS_CRYPTO = False @@ -66,7 +66,8 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): pkc.setup.cfp.get.assert_called_with("sshkeys", "category") @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, + "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.get_passphrases") def test_passphrase(self, mock_get_passphrases): pkc = self.get_obj() pkc.setup = Mock() @@ -281,7 +282,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): if HAS_CRYPTO: @patch(passphrase, "foo") - @patch("Bcfg2.Encryption.ssl_encrypt") + @patch("Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.ssl_encrypt") def inner2(mock_ssl_encrypt): reset() mock_ssl_encrypt.return_value = "encryptedprivatekey" -- cgit v1.2.3-1-g7c22 From 2169edc1bba82076db776b75db89b79d6f2f4786 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 6 Feb 2013 15:45:20 -0500 Subject: converted InfoXML objects from XMLSrc to StructFile --- .../TestPlugins/TestCfg/TestCfgInfoXML.py | 37 +--------- .../TestServer/TestPlugins/TestCfg/Test_init.py | 58 ++++++--------- .../Testlib/TestServer/TestPlugins/TestRules.py | 82 +++++----------------- 3 files changed, 42 insertions(+), 135 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py index 839e9c3b8..7e7cb5e3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py @@ -27,47 +27,16 @@ class TestCfgInfoXML(TestCfgInfo): self.assertIsInstance(ci.infoxml, InfoXML) def test_bind_info_to_entry(self): - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() ci = self.get_obj() ci.infoxml = Mock() - ci._set_info = Mock() - - self.assertRaises(PluginExecutionError, - ci.bind_info_to_entry, entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, dict(), - entry=entry) - self.assertFalse(ci._set_info.called) - - ci.infoxml.reset_mock() - ci._set_info.reset_mock() - mdata_value = Mock() - def set_mdata(metadata, mdata, entry=None): - mdata['Info'] = {None: mdata_value} + entry = Mock() + metadata = Mock() - ci.infoxml.pnode.Match.side_effect = set_mdata ci.bind_info_to_entry(entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, - dict(Info={None: mdata_value}), - entry=entry) - ci._set_info.assert_called_with(entry, mdata_value) + ci.infoxml.BindEntry.assert_called_with(entry, metadata) def test_handle_event(self): ci = self.get_obj() ci.infoxml = Mock() ci.handle_event(Mock) ci.infoxml.HandleEvent.assert_called_with() - - def test__set_info(self): - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info") - def inner(mock_set_info): - ci = self.get_obj() - entry = Mock() - info = {"foo": "foo", - "__children__": ["one", "two"]} - ci._set_info(entry, info) - self.assertItemsEqual(entry.append.call_args_list, - [call(c) for c in info['__children__']]) - - inner() - TestCfgInfo.test__set_info(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index ee0b0be9d..ab383e4f3 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -159,21 +159,6 @@ class TestCfgInfo(TestCfgBaseFileMatcher): self.assertRaises(NotImplementedError, ci.bind_info_to_entry, Mock(), Mock()) - def test__set_info(self): - ci = self.get_obj() - entry = Mock() - entry.attrib = dict() - - info = {"foo": "foo", - "_bar": "bar", - "bar:baz=quux": "quux", - "baz__": "baz", - "__quux": "quux"} - ci._set_info(entry, info) - self.assertItemsEqual(entry.attrib, - dict([(k, v) for k, v in info.items() - if not k.startswith("__")])) - class TestCfgVerifier(TestCfgBaseFileMatcher): test_obj = CfgVerifier @@ -259,29 +244,26 @@ class TestCfgCreator(TestCfgBaseFileMatcher): class TestCfgDefaultInfo(TestCfgInfo): test_obj = CfgDefaultInfo - def get_obj(self, defaults=None): - if defaults is None: - defaults = dict() - return self.test_obj(defaults) + def get_obj(self, *_): + return self.test_obj() - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__") - def test__init(self, mock__init): - defaults = Mock() - cdi = self.get_obj(defaults=defaults) - mock__init.assert_called_with(cdi, '') - self.assertEqual(defaults, cdi.defaults) + def test__init(self): + pass def test_handle_event(self): # this CfgInfo handler doesn't handle any events -- it's not # file-driven, but based on the built-in defaults pass - def test_bind_info_to_entry(self): + @patch("Bcfg2.Server.Plugin.default_path_metadata") + def test_bind_info_to_entry(self, mock_default_path_metadata): cdi = self.get_obj() - cdi._set_info = Mock() - entry = Mock() + entry = lxml.etree.Element("Test", name="test") + mock_default_path_metadata.return_value = \ + dict(owner="root", mode="0600") cdi.bind_info_to_entry(entry, Mock()) - cdi._set_info.assert_called_with(entry, cdi.defaults) + self.assertItemsEqual(entry.attrib, + dict(owner="root", mode="0600", name="test")) class TestCfgEntrySet(TestEntrySet): @@ -599,24 +581,24 @@ class TestCfgEntrySet(TestEntrySet): if entry.specific is not None: self.assertFalse(entry.specific.matches.called) - def test_bind_info_to_entry(self): - default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO + @patch("Bcfg2.Server.Plugins.Cfg.CfgDefaultInfo") + def test_bind_info_to_entry(self, mock_DefaultInfo): eset = self.get_obj() eset.get_handlers = Mock() eset.get_handlers.return_value = [] - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock() metadata = Mock() def reset(): eset.get_handlers.reset_mock() - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock() + mock_DefaultInfo.reset_mock() return lxml.etree.Element("Path", name="/test.txt") # test with no info handlers entry = reset() eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) self.assertEqual(entry.get("type"), "file") # test with one info handler @@ -625,7 +607,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = [handler] eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) handler.bind_info_to_entry.assert_called_with(entry, metadata) self.assertEqual(entry.get("type"), "file") @@ -635,7 +618,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = handlers eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) # we don't care which handler gets called as long as exactly # one of them does called = 0 @@ -646,8 +630,6 @@ class TestCfgEntrySet(TestEntrySet): self.assertEqual(called, 1) self.assertEqual(entry.get("type"), "file") - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info - def test_create_data(self): eset = self.get_obj() eset.best_matching = Mock() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py index f018b45dc..7083fff06 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py @@ -45,81 +45,37 @@ class TestRules(TestPrioDir): entry = lxml.etree.Element("Package", name="foo") self.assertFalse(r.HandlesEntry(entry, metadata)) - def test_BindEntry(self, method="BindEntry"): - r = self.get_obj() - r.get_attrs = Mock() - r.get_attrs.return_value = dict(overwrite="new", add="add", - text="text") - entry = lxml.etree.Element("Test", overwrite="old", keep="keep") - metadata = Mock() - - getattr(r, method)(entry, metadata) - r.get_attrs.assert_called_with(entry, metadata) - self.assertItemsEqual(entry.attrib, - dict(overwrite="old", add="add", keep="keep", - text="text")) - - def test_HandleEntry(self): - self.test_BindEntry(method="HandleEntry") - @patch("Bcfg2.Server.Plugin.PrioDir._matches") def test__matches(self, mock_matches): - """ test _matches() behavior regardless of state of _regex_enabled """ r = self.get_obj() metadata = Mock() + # test parent _matches() returning True entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] + candidate = lxml.etree.Element("Path", name="/etc/bar.conf") mock_matches.return_value = True - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) + self.assertTrue(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) - # test special Path cases -- adding and removing trailing slash + # test all conditions returning False mock_matches.reset_mock() mock_matches.return_value = False - rules = ["/etc/foo/", "/etc/bar"] - entry = lxml.etree.Element("Path", name="/etc/foo") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) + self.assertFalse(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) + # test special Path cases -- adding and removing trailing slash mock_matches.reset_mock() - entry = lxml.etree.Element("Path", name="/etc/bar/") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_disabled(self, mock_matches): - """ test failure to match with regex disabled """ - r = self.get_obj() - self.set_regex_enabled(r, False) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] - self.assertFalse(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_enabled(self, mock_matches): - """ test match with regex enabled """ - r = self.get_obj() - self.set_regex_enabled(r, True) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = ["/etc/.*\.conf", "/etc/bar"] - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) - - def set_regex_enabled(self, rules_obj, state): - """ set the state of regex_enabled for this implementation of - Rules """ - if not isinstance(rules_obj.core.setup, MagicMock): - rules_obj.core.setup = MagicMock() - rules_obj.core.setup.cfp.getboolean.return_value = state + withslash = lxml.etree.Element("Path", name="/etc/foo") + withoutslash = lxml.etree.Element("Path", name="/etc/foo/") + self.assertTrue(r._matches(withslash, metadata, withoutslash)) + self.assertTrue(r._matches(withoutslash, metadata, withslash)) + + if r._regex_enabled: + mock_matches.reset_mock() + candidate = lxml.etree.Element("Path", name="/etc/.*\.conf") + self.assertTrue(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) + self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) def test__regex_enabled(self): r = self.get_obj() -- cgit v1.2.3-1-g7c22 From 0dab7e284017e4559019ac1e7b861ab7ccdadf5c Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Fri, 8 Feb 2013 10:29:03 -0500 Subject: Bundler: improved XInclude support, added inter-bundle dependencies --- .../Testlib/TestServer/TestPlugins/TestBundler.py | 48 ++++++++++++++++++---- 1 file changed, 41 insertions(+), 7 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py index f64d66d11..f5250ed85 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -60,12 +60,46 @@ class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): def test_BuildStructures(self): b = self.get_obj() - b.bundles = dict(foo=Mock(), bar=Mock(), baz=Mock()) + b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(), + has_dep=Mock(), is_dep=Mock()) + expected = dict() + + b.bundles['error'].XMLMatch.side_effect = TemplateError(None) + + xinclude = lxml.etree.Element("Bundle") + lxml.etree.SubElement(lxml.etree.SubElement(xinclude, "Bundle"), + "Path", name="/test") + b.bundles['xinclude'].XMLMatch.return_value = xinclude + expected['xinclude'] = lxml.etree.Element("Bundle", name="xinclude") + lxml.etree.SubElement(expected['xinclude'], "Path", name="/test") + + has_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(has_dep, "Bundle", name="is_dep") + lxml.etree.SubElement(has_dep, "Package", name="foo") + b.bundles['has_dep'].XMLMatch.return_value = has_dep + expected['has_dep'] = lxml.etree.Element("Bundle", name="has_dep") + lxml.etree.SubElement(expected['has_dep'], "Package", name="foo") + + is_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(is_dep, "Package", name="bar") + b.bundles['is_dep'].XMLMatch.return_value = is_dep + expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep") + lxml.etree.SubElement(expected['is_dep'], "Package", name="bar") + metadata = Mock() - metadata.bundles = ["foo", "baz"] + metadata.bundles = ["error", "xinclude", "has_dep"] + + rv = b.BuildStructures(metadata) + self.assertEqual(len(rv), 3) + for bundle in rv: + name = bundle.get("name") + self.assertIsNotNone(name, + "Bundle %s was not built" % name) + self.assertIn(name, expected, + "Unexpected bundle %s was built" % name) + self.assertXMLEqual(bundle, expected[name], + "Bundle %s was not built correctly" % name) + b.bundles[name].XMLMatch.assert_called_with(metadata) - self.assertItemsEqual(b.BuildStructures(metadata), - [b.bundles[n].XMLMatch.return_value - for n in metadata.bundles]) - for bname in metadata.bundles: - b.bundles[bname].XMLMatch.assert_called_with(metadata) + b.bundles['error'].XMLMatch.assert_called_with(metadata) + self.assertFalse(b.bundles['skip'].XMLMatch.called) -- cgit v1.2.3-1-g7c22 From 398be2c5cb613d9506e0c115510c1b55881ca64e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Fri, 8 Feb 2013 13:43:40 -0500 Subject: Bundler: added support for independent bundles --- .../Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py index f5250ed85..cfb379c40 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -61,7 +61,7 @@ class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): def test_BuildStructures(self): b = self.get_obj() b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(), - has_dep=Mock(), is_dep=Mock()) + has_dep=Mock(), is_dep=Mock(), indep=Mock()) expected = dict() b.bundles['error'].XMLMatch.side_effect = TemplateError(None) @@ -86,11 +86,17 @@ class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep") lxml.etree.SubElement(expected['is_dep'], "Package", name="bar") + indep = lxml.etree.Element("Bundle", independent="true") + lxml.etree.SubElement(indep, "Service", name="baz") + b.bundles['indep'].XMLMatch.return_value = indep + expected['indep'] = lxml.etree.Element("Independent", name="indep") + lxml.etree.SubElement(expected['indep'], "Service", name="baz") + metadata = Mock() - metadata.bundles = ["error", "xinclude", "has_dep"] + metadata.bundles = ["error", "xinclude", "has_dep", "indep"] rv = b.BuildStructures(metadata) - self.assertEqual(len(rv), 3) + self.assertEqual(len(rv), 4) for bundle in rv: name = bundle.get("name") self.assertIsNotNone(name, -- cgit v1.2.3-1-g7c22 From 5363e6d9a53146333da0d109aae170befc1b9481 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 12 Feb 2013 07:48:33 -0500 Subject: Added client ACLs: * IP and CIDR-based ACLs * Metadata (group/hostname)-based ACLs * Documentation * Unit tests --- .../Testlib/TestServer/TestPlugins/TestACL.py | 222 +++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py new file mode 100644 index 000000000..e457ca7c1 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py @@ -0,0 +1,222 @@ +import os +import sys +import lxml.etree +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.ACL import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \ + TestClientACLs + + +class TestFunctions(Bcfg2TestCase): + def test_rmi_names_equal(self): + good_cases = [('*', 'foo'), + ('foo', 'foo'), + ('foo.*', 'foo.bar'), + ('*.*', 'foo.bar'), + ('foo.bar', 'foo.bar'), + ('*.bar', 'foo.bar'), + ('foo.*.bar', 'foo.baz.bar')] + bad_cases = [('foo', 'bar'), + ('*', 'foo.bar'), + ('*.*', 'foo'), + ('*.*', 'foo.bar.baz'), + ('foo.*', 'bar.foo'), + ('*.bar', 'bar.foo'), + ('foo.*', 'foobar')] + for first, second in good_cases: + self.assertTrue(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly False" % + (first, second)) + self.assertTrue(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly False" % + (second, first)) + for first, second in bad_cases: + self.assertFalse(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly True" % + (first, second)) + self.assertFalse(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly True" % + (second, first)) + + def test_ip_matches(self): + good_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + bad_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + for ip, entry in good_cases: + self.assertTrue(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly False" % + (ip, lxml.etree.tostring(entry))) + for ip, entry in bad_cases: + self.assertFalse(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly True" % + (ip, lxml.etree.tostring(entry))) + + +class TestIPACLFile(TestXMLFileBacked): + test_obj = IPACLFile + + @patch("Bcfg2.Server.Plugins.ACL.ip_matches") + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches): + af = self.get_obj() + ip = "10.0.0.8" + rmi = "ACL.test" + + def reset(): + mock_rmi_names_equal.reset_mock() + mock_ip_matches.reset_mock() + + # test default defer with no entries + af.entries = [] + self.assertIsNone(af.check_acl(ip, rmi)) + + # test explicit allow, deny, and defer + entries = dict(Allow=lxml.etree.Element("Allow", method=rmi), + Deny=lxml.etree.Element("Deny", method=rmi), + Defer=lxml.etree.Element("Defer", method=rmi)) + af.entries = list(entries.values()) + + def get_ip_matches(tag): + def ip_matches(ip, entry): + return entry.tag == tag + + return ip_matches + + mock_rmi_names_equal.return_value = True + + reset() + mock_ip_matches.side_effect = get_ip_matches("Allow") + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Allow']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Deny") + self.assertFalse(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Deny']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Defer") + self.assertIsNone(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Defer']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + # test matching RMI names + reset() + mock_ip_matches.side_effect = lambda i, e: True + mock_rmi_names_equal.side_effect = lambda a, b: a == b + rmi = "ACL.test2" + matching = lxml.etree.Element("Allow", method=rmi) + af.entries.append(matching) + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, matching) + self.assertTrue( + call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or + call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list) + + # test implicit allow for localhost, defer for others + reset() + mock_ip_matches.side_effect = lambda i, e: False + self.assertIsNone(af.check_acl(ip, rmi)) + + reset() + self.assertTrue(af.check_acl("127.0.0.1", rmi)) + + +class TestMetadataACLFile(TestStructFile): + test_obj = MetadataACLFile + + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal): + af = self.get_obj() + af.Match = Mock() + metadata = Mock() + mock_rmi_names_equal.side_effect = lambda a, b: a == b + + def reset(): + af.Match.reset_mock() + mock_rmi_names_equal.reset_mock() + + # test default allow + af.entries = [] + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + + # test explicit allow and deny + reset() + af.entries = [lxml.etree.Element("Allow", method='ACL.test'), + lxml.etree.Element("Deny", method='ACL.test2')] + af.Match.return_value = af.entries + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test', 'ACL.test'), + mock_rmi_names_equal.call_args_list) + + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test2')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test2', 'ACL.test2'), + mock_rmi_names_equal.call_args_list) + + # test default deny for non-localhost + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + # test default allow for localhost + reset() + metadata.hostname = 'localhost' + self.assertTrue(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + +class TestACL(TestPlugin, TestClientACLs): + test_obj = ACL + + def test_check_acl_ip(self): + acl = self.get_obj() + acl.ip_acls = Mock() + self.assertEqual(acl.check_acl_ip("192.168.1.10", "ACL.test"), + acl.ip_acls.check_acl.return_value) + acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test") + + def test_check_acl_metadata(self): + acl = self.get_obj() + acl.metadata_acls = Mock() + metadata = Mock() + self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"), + acl.metadata_acls.check_acl.return_value) + acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test") -- cgit v1.2.3-1-g7c22 From 088ca5fee4cc99f9143f18a880cdec6712326e1e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 12 Feb 2013 09:47:04 -0500 Subject: fixed unit tests --- testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py index e457ca7c1..86a960701 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py @@ -209,7 +209,8 @@ class TestACL(TestPlugin, TestClientACLs): def test_check_acl_ip(self): acl = self.get_obj() acl.ip_acls = Mock() - self.assertEqual(acl.check_acl_ip("192.168.1.10", "ACL.test"), + self.assertEqual(acl.check_acl_ip(("192.168.1.10", "12345"), + "ACL.test"), acl.ip_acls.check_acl.return_value) acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test") -- cgit v1.2.3-1-g7c22 From e989e62b24479fac99e607d9e4fb3a292bd20418 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 5 Feb 2013 11:36:49 -0500 Subject: added support for wildcard XInclude in XMLFileBacked --- testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py | 1 + 1 file changed, 1 insertion(+) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index 0afa5220d..9f1e73541 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -198,6 +198,7 @@ if HAS_DJANGO or can_skip: class TestXMLMetadataConfig(TestXMLFileBacked): test_obj = XMLMetadataConfig + path = os.path.join(datastore, 'Metadata', 'clients.xml') def get_obj(self, basefile="clients.xml", core=None, watch_clients=False): self.metadata = get_metadata_object(core=core, -- cgit v1.2.3-1-g7c22 From 9bec4d6bbab599bee72256c7e09fe214cb849a1b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 5 Feb 2013 12:13:20 -0500 Subject: abstracted similar digit range classes in POSIXUsers/GroupPatterns into Bcfg2.Utils --- .../TestServer/TestPlugins/TestGroupPatterns.py | 21 --------------------- 1 file changed, 21 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestGroupPatterns.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestGroupPatterns.py index a7a6b3d6e..a9346156c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestGroupPatterns.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestGroupPatterns.py @@ -18,27 +18,6 @@ from common import * from TestPlugin import TestXMLFileBacked, TestPlugin, TestConnector -class TestPackedDigitRange(Bcfg2TestCase): - def test_includes(self): - # tuples of (range description, numbers that are included, - # numebrs that are excluded) - tests = [("1-3", [1, "2", 3], [4]), - ("1", [1], [0, "2"]), - ("10-11", [10, 11], [0, 1]), - ("9-9", [9], [8, 10]), - ("0-100", [0, 10, 99, 100], []), - ("1,3,5", [1, 3, 5], [0, 2, 4, 6]), - ("1-5,7", [1, 3, 5, 7], [0, 6, 8]), - ("1-5,7,9-11", [1, 3, 5, 7, 9, 11], [0, 6, 8, 12]), - ("852-855,321-497,763", [852, 855, 321, 400, 497, 763], [])] - for rng, inc, exc in tests: - r = PackedDigitRange(rng) - for test in inc: - self.assertTrue(r.includes(test)) - for test in exc: - self.assertFalse(r.includes(test)) - - class TestPatternMap(Bcfg2TestCase): def test_ranges(self): """ test processing NameRange patterns """ -- cgit v1.2.3-1-g7c22 From fd67a2735ada342251cb6baaa4e678532566e975 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 7 Feb 2013 10:00:37 -0500 Subject: moved common file locking code into Bcfg2.Utils --- testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index 9f1e73541..2697df292 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -328,7 +328,7 @@ class TestXMLMetadataConfig(TestXMLFileBacked): "clients.xml"), "") - @patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False)) + @patch('Bcfg2.Utils.locked', Mock(return_value=False)) @patch('fcntl.lockf', Mock()) @patch('os.open') @patch('os.fdopen') -- cgit v1.2.3-1-g7c22 From dc22e36574d4d4cdbde282906ef3e1d3c7fe7c94 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 7 Feb 2013 10:14:31 -0500 Subject: Metadata: process default client bootstrap mode properly --- .../Testlib/TestServer/TestPlugins/TestMetadata.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index 2697df292..221eb8a3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -20,7 +20,7 @@ while path != "/": path = os.path.dirname(path) from common import * from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \ - TestStatistics, TestDatabaseBacked + TestClientRunHooks, TestDatabaseBacked def get_clients_test_tree(): @@ -464,7 +464,7 @@ class TestClientMetadata(Bcfg2TestCase): self.assertFalse(cm.inGroup("group3")) -class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): +class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): test_obj = Metadata use_db = False @@ -497,7 +497,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): metadata = self.get_obj(core=core) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics) + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.ClientRunHooks) self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig) self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig) self.assertIsInstance(metadata.query, MetadataQuery) @@ -1227,17 +1227,16 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") - def test_process_statistics(self, mock_update_client): + def test_end_statistics(self, mock_update_client): metadata = self.load_clients_data(metadata=self.load_groups_data()) md = Mock() md.hostname = "client6" - metadata.process_statistics(md, None) - mock_update_client.assert_called_with(md.hostname, - dict(auth='cert')) + metadata.end_statistics(md) + mock_update_client.assert_called_with(md.hostname, dict(auth='cert')) mock_update_client.reset_mock() md.hostname = "client5" - metadata.process_statistics(md, None) + metadata.end_statistics(md) self.assertFalse(mock_update_client.called) def test_viz(self): @@ -1513,6 +1512,10 @@ class TestMetadata_NoClientsXML(TestMetadataBase): def test_handle_clients_xml_event(self): pass + def test_end_statistics(self): + # bootstrap mode, which is what is being tested here, doesn't + # work without clients.xml + pass class TestMetadata_ClientsXML(TestMetadataBase): """ test Metadata with a clients.xml. """ -- cgit v1.2.3-1-g7c22 From 3d06f311274d6b942ee89d8cdb13b2ecc99af1b0 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 14 Mar 2013 13:05:08 -0400 Subject: use Executor class for better subprocess calling on server --- .../TestCfg/TestCfgExternalCommandVerifier.py | 31 ++++++++++------------ .../TestCfg/TestCfgPrivateKeyCreator.py | 29 +++++++++----------- 2 files changed, 27 insertions(+), 33 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py index 0f369113b..7ceedb7c2 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py @@ -21,35 +21,32 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier class TestCfgExternalCommandVerifier(TestCfgVerifier): test_obj = CfgExternalCommandVerifier - @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen") - def test_verify_entry(self, mock_Popen): - proc = Mock() - mock_Popen.return_value = proc - proc.wait.return_value = 0 - proc.communicate.return_value = ("stdout", "stderr") + def test_verify_entry(self): entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() ecv = self.get_obj() ecv.cmd = ["/bin/bash", "-x", "foo"] + ecv.exc = Mock() + ecv.exc.run.return_value = Mock() + ecv.exc.run.return_value.success = True + ecv.verify_entry(entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") - mock_Popen.reset_mock() - proc.wait.return_value = 13 + ecv.exc.reset_mock() + ecv.exc.run.return_value.success = False self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") + + ecv.exc.reset_mock() - mock_Popen.reset_mock() - mock_Popen.side_effect = OSError + ecv.exc.reset_mock() + ecv.exc.run.side_effect = OSError self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") @patch("os.access") def test_handle_event(self, mock_access): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index 2982825a9..6cfd2f666 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -98,24 +98,23 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @patch("shutil.rmtree") @patch("tempfile.mkdtemp") - @patch("subprocess.Popen") - def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree): + def test__gen_keypair(self, mock_mkdtemp, mock_rmtree): pkc = self.get_obj() + pkc.cmd = Mock() pkc.XMLMatch = Mock() mock_mkdtemp.return_value = datastore metadata = Mock() - proc = Mock() - proc.wait.return_value = 0 - proc.communicate.return_value = MagicMock() - mock_Popen.return_value = proc + exc = Mock() + exc.success = True + pkc.cmd.run.return_value = exc spec = lxml.etree.Element("PrivateKey") pkc.XMLMatch.return_value = spec def reset(): pkc.XMLMatch.reset_mock() - mock_Popen.reset_mock() + pkc.cmd.reset_mock() mock_mkdtemp.reset_mock() mock_rmtree.reset_mock() @@ -123,10 +122,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "rsa", "-N", ""]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "rsa", "-N", ""]) reset() lxml.etree.SubElement(spec, "Params", bits="768", type="dsa") @@ -137,13 +135,12 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "dsa", "-b", "768", "-N", "foo"]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "dsa", "-b", "768", "-N", "foo"]) reset() - proc.wait.return_value = 1 + pkc.cmd.run.return_value.success = False self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata) mock_rmtree.assert_called_with(datastore) -- cgit v1.2.3-1-g7c22