From ebe7542db7217c2fac3d7111e80f94caedfb69e2 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 16 Jan 2013 13:28:06 -0500 Subject: added module-level OptionParser to avoid passing it as an argument or global all over --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 12 +- .../TestCfg/TestCfgAuthorizedKeysGenerator.py | 32 +++--- .../TestPlugins/TestCfg/TestCfgCheetahGenerator.py | 7 +- .../TestCfg/TestCfgEncryptedGenerator.py | 9 +- .../TestPlugins/TestCfg/TestCfgGenshiGenerator.py | 14 +-- .../TestCfg/TestCfgPrivateKeyCreator.py | 126 ++++++++------------- .../TestServer/TestPlugins/TestCfg/Test_init.py | 41 ++++--- .../TestServer/TestPlugins/TestProperties.py | 85 ++++++-------- 8 files changed, 139 insertions(+), 187 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 6dbdc7667..75cc41a34 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -1716,16 +1716,17 @@ class TestEntrySet(TestDebuggable): "important: true", "bogus: line"] mock_open.return_value.readlines.return_value = idata + eset.metadata = default_path_metadata() eset.update_metadata(event) - expected = DEFAULT_FILE_METADATA.copy() + expected = default_path_metadata() expected['owner'] = 'owner' expected['group'] = 'GROUP' expected['mode'] = '0775' expected['important'] = 'true' - self.assertItemsEqual(eset.metadata, - expected) + self.assertItemsEqual(eset.metadata, expected) - def test_reset_metadata(self): + @patch("Bcfg2.Server.Plugin.helpers.default_path_metadata") + def test_reset_metadata(self, mock_default_path_metadata): eset = self.get_obj() # test info.xml @@ -1740,7 +1741,8 @@ class TestEntrySet(TestDebuggable): event.filename = fname eset.metadata = Mock() eset.reset_metadata(event) - self.assertItemsEqual(eset.metadata, DEFAULT_FILE_METADATA) + self.assertEqual(eset.metadata, + mock_default_path_metadata.return_value) @patch("Bcfg2.Server.Plugin.helpers.bind_info") def test_bind_info_to_entry(self, mock_bind_info): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py index d655a20cd..b77d52033 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py @@ -42,27 +42,25 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): def test_category(self): akg = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp + akg.setup = Mock() + akg.setup.cfp.has_section.return_value = False + akg.setup.cfp.has_option.return_value = False self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + akg.setup.reset_mock() + akg.setup.cfp.has_section.return_value = True self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(akg.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + + akg.setup.reset_mock() + akg.setup.cfp.has_option.return_value = True + self.assertEqual(akg.category, akg.setup.cfp.get.return_value) + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + akg.setup.cfp.get.assert_called_with("sshkeys", "category") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py index fc5d5e53d..31227329c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py @@ -31,11 +31,11 @@ if HAS_CHEETAH or can_skip: ccg.data = "data" entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock() + ccg.setup = MagicMock() self.assertEqual(ccg.get_data(entry, metadata), mock_Template.return_value.respond.return_value) - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo") + ccg.setup.__getitem__.assert_called_with("repo") mock_Template.assert_called_with("data".decode(ccg.encoding), compilerSettings=ccg.settings) tmpl = mock_Template.return_value @@ -44,5 +44,4 @@ if HAS_CHEETAH or can_skip: self.assertEqual(tmpl.name, entry.get("name")) self.assertEqual(tmpl.path, entry.get("name")) self.assertEqual(tmpl.source_path, ccg.name) - self.assertEqual(tmpl.repo, - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value) + self.assertEqual(tmpl.repo, ccg.setup.__getitem__.return_value) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py index 71a7410da..f8e9b1990 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py @@ -26,16 +26,13 @@ if can_skip or HAS_CRYPTO: def setUp(self): pass - @patchIf(HAS_CRYPTO, - "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") - def test_handle_event(self, mock_decrypt, mock_get_algorithm): + def test_handle_event(self, mock_decrypt): @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") def inner(mock_handle_event): def reset(): mock_decrypt.reset_mock() - mock_get_algorithm.reset_mock() mock_handle_event.reset_mock() def get_event_data(obj, event): @@ -47,9 +44,7 @@ if can_skip or HAS_CRYPTO: ceg = self.get_obj() ceg.handle_event(event) mock_handle_event.assert_called_with(ceg, event) - mock_decrypt.assert_called_with("encrypted", - setup=SETUP, - algorithm=mock_get_algorithm.return_value) + mock_decrypt.assert_called_with("encrypted") self.assertEqual(ceg.data, "plaintext") reset() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py index 385f8df77..88f3cf3f7 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -43,6 +43,7 @@ if can_skip or HAS_GENSHI: def test_get_data(self): cgg = self.get_obj() cgg._handle_genshi_exception = Mock() + cgg.setup = MagicMock() cgg.template = Mock() fltr = Mock() cgg.template.generate.return_value = fltr @@ -51,24 +52,23 @@ if can_skip or HAS_GENSHI: entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock() def reset(): cgg.template.reset_mock() cgg._handle_genshi_exception.reset_mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock() + cgg.setup.reset_mock() template_vars = dict( name=entry.get("name"), metadata=metadata, path=cgg.name, source_path=cgg.name, - repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value) + repo=cgg.setup.__getitem__.return_value) self.assertEqual(cgg.get_data(entry, metadata), stream.render.return_value) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) @@ -81,7 +81,7 @@ if can_skip or HAS_GENSHI: self.assertEqual(cgg.get_data(entry, metadata), stream.render.return_value) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) self.assertEqual(stream.render.call_args_list, [call("text", encoding=cgg.encoding, @@ -93,7 +93,7 @@ if can_skip or HAS_GENSHI: self.assertRaises(UndefinedError, cgg.get_data, entry, metadata) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) @@ -104,7 +104,7 @@ if can_skip or HAS_GENSHI: self.assertRaises(ValueError, cgg.get_data, entry, metadata) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index dc4b11241..4c8ab8b43 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -44,56 +44,56 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): def test_category(self): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(pkc.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + pkc.setup.reset_mock() + pkc.setup.cfp.has_option.return_value = True + self.assertEqual(pkc.category, pkc.setup.cfp.get.return_value) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.get.assert_called_with("sshkeys", "category") @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") def test_passphrase(self, mock_get_passphrases): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") - cfp.reset_mock() - cfp.get.return_value = "test" + pkc.setup.reset_mock() + pkc.setup.cfp.get.return_value = "test" mock_get_passphrases.return_value = dict(test="foo", test2="bar") - cfp.has_option.return_value = True + pkc.setup.cfp.has_option.return_value = True self.assertEqual(pkc.passphrase, "foo") - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") - cfp.get.assert_called_with("sshkeys", "passphrase") - mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") + pkc.setup.cfp.get.assert_called_with("sshkeys", "passphrase") + mock_get_passphrases.assert_called_with() @patch("shutil.rmtree") @patch("tempfile.mkdtemp") @@ -282,8 +282,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): if HAS_CRYPTO: @patch(passphrase, "foo") @patch("Bcfg2.Encryption.ssl_encrypt") - @patch("Bcfg2.Encryption.get_algorithm") - def inner2(mock_get_algorithm, mock_ssl_encrypt): + def inner2(mock_ssl_encrypt): reset() mock_ssl_encrypt.return_value = "encryptedprivatekey" Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True @@ -302,9 +301,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): "ssh-rsa publickey pubkey.filename\n", group="foo") pkc.write_data.assert_called_with("encryptedprivatekey", group="foo", ext=".crypt") - mock_ssl_encrypt.assert_called_with( - "privatekey", "foo", - algorithm=mock_get_algorithm.return_value) + mock_ssl_encrypt.assert_called_with("privatekey", "foo") mock_rmtree.assert_called_with(datastore) inner2() @@ -317,10 +314,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") def test_Index_crypto(self): - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict" - pkc = self.get_obj() + pkc.setup = Mock() + pkc.setup.cfp.get.return_value = "strict" pkc._decrypt = Mock() pkc._decrypt.return_value = 'plaintext' pkc.data = ''' @@ -348,7 +344,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): self.assertRaises(PluginExecutionError, pkc.Index) # test failed decryption, lax - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax" + pkc.setup.cfp.get.return_value = "lax" pkc._decrypt.reset_mock() pkc.Index() self.assertItemsEqual( @@ -358,17 +354,13 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): pkc = self.get_obj() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock() def reset(): mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() mock_get_passphrases.reset_mock() mock_ssl.reset_mock() @@ -382,29 +374,19 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): reset() el = lxml.etree.Element("Test", encrypted="foo") el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") mock_ssl.return_value = "decrypted with ssl" self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test failure to decrypt element with a passphrase in the config reset() mock_ssl.side_effect = EVPError self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test element without valid passphrase @@ -412,26 +394,14 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): el.set("encrypted", "true") mock_bruteforce.return_value = "decrypted with bruteforce" self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) # test failure to decrypt element without valid passphrase reset() mock_bruteforce.side_effect = EVPError self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index 2e758774e..ee0b0be9d 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -439,8 +439,6 @@ class TestCfgEntrySet(TestEntrySet): @patch("Bcfg2.Server.Plugins.Cfg.u_str") @patch("Bcfg2.Server.Plugins.Cfg.b64encode") def test_bind_entry(self, mock_b64encode, mock_u_str): - Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False) - mock_u_str.side_effect = lambda x: x eset = self.get_obj() @@ -448,6 +446,7 @@ class TestCfgEntrySet(TestEntrySet): eset._generate_data = Mock() eset.get_handlers = Mock() eset._validate_data = Mock() + eset.setup = dict(validate=False) def reset(): mock_b64encode.reset_mock() @@ -523,7 +522,7 @@ class TestCfgEntrySet(TestEntrySet): # test successful validation entry = reset() - Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True + eset.setup['validate'] = True bound = eset.bind_entry(entry, metadata) eset.bind_info_to_entry.assert_called_with(entry, metadata) eset._generate_data.assert_called_with(entry, metadata) @@ -753,31 +752,39 @@ class TestCfg(TestGroupSpool, TestPullTarget): def get_obj(self, core=None): if core is None: core = Mock() - core.setup = MagicMock() return TestGroupSpool.get_obj(self, core=core) + @patch("Bcfg2.Options.get_option_parser") @patch("Bcfg2.Server.Plugin.GroupSpool.__init__") @patch("Bcfg2.Server.Plugin.PullTarget.__init__") - def test__init(self, mock_pulltarget_init, mock_groupspool_init): + def test__init(self, mock_pulltarget_init, mock_groupspool_init, + mock_get_option_parser): + setup = MagicMock() + setup.__contains__.return_value = False + mock_get_option_parser.return_value = setup + + def reset(): + core.reset_mock() + setup.reset_mock() + mock_pulltarget_init.reset_mock() + mock_groupspool_init.reset_mock() + core = Mock() - core.setup = MagicMock() cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - core.setup.add_option.assert_called_with("validate", - Bcfg2.Options.CFG_VALIDATION) - core.setup.reparse.assert_called_with() - - core.reset_mock() - core.setup.reset_mock() - mock_pulltarget_init.reset_mock() - mock_groupspool_init.reset_mock() - core.setup.__contains__.return_value = True + setup.add_option.assert_called_with( + "validate", + Bcfg2.Options.CFG_VALIDATION) + mock_get_option_parser.return_value.reparse.assert_called_with() + + reset() + setup.__contains__.return_value = True cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - self.assertFalse(core.setup.add_option.called) - self.assertFalse(core.setup.reparse.called) + self.assertFalse(setup.add_option.called) + self.assertFalse(setup.reparse.called) def test_has_generator(self): cfg = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index d66780a20..b0ca77a78 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -41,10 +41,10 @@ class TestPropertyFile(Bcfg2TestCase): return self.test_obj(path) def test_write(self): - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() pf = self.get_obj() pf.validate_data = Mock() pf._write = Mock() + pf.setup = Mock() xstr = u("\n") pf.xdata = lxml.etree.XML(xstr) @@ -52,20 +52,20 @@ class TestPropertyFile(Bcfg2TestCase): def reset(): pf.validate_data.reset_mock() pf._write.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() # test writes disabled - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertRaises(PluginExecutionError, pf.write) self.assertFalse(pf.validate_data.called) self.assertFalse(pf._write.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", + pf.setup.cfp.getboolean.assert_called_with("properties", "writes_enabled", default=True) # test successful write reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.write(), pf._write.return_value) pf.validate_data.assert_called_with() pf._write.assert_called_with() @@ -301,17 +301,13 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): pf = self.get_obj() - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() def reset(): mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() mock_get_passphrases.reset_mock() mock_ssl.reset_mock() @@ -325,29 +321,19 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): reset() el = lxml.etree.Element("Test", encrypted="foo") el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") mock_ssl.return_value = "decrypted with ssl" self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test failure to decrypt element with a passphrase in the config reset() mock_ssl.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test element without valid passphrase @@ -355,77 +341,73 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): el.set("encrypted", "true") mock_bruteforce.return_value = "decrypted with bruteforce" self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) # test failure to decrypt element without valid passphrase reset() mock_bruteforce.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) @patch("copy.copy") def test_get_additional_data(self, mock_copy): - Bcfg2.Server.Plugins.Properties.SETUP = Mock() pf = self.get_obj() + pf.setup = Mock() pf.XMLMatch = Mock() metadata = Mock() def reset(): mock_copy.reset_mock() pf.XMLMatch.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() pf.xdata = lxml.etree.Element("Properties", automatch="true") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) pf.xdata = lxml.etree.Element("Properties", automatch="false") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) pf.xdata = lxml.etree.Element("Properties") reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) @@ -449,7 +431,6 @@ class TestProperties(TestPlugin, TestConnector): core = Mock() p = self.get_obj(core=core) self.assertIsInstance(p.store, PropDirectoryBacked) - self.assertEqual(Bcfg2.Server.Plugins.Properties.SETUP, core.setup) @patch("copy.copy") def test_get_additional_data(self, mock_copy): -- cgit v1.2.3-1-g7c22 From 72a80f89361145f1560ccc248f357a9de82eded6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 17 Jan 2013 08:01:44 -0500 Subject: abstracted encryption support from Properties/CfgPrivateKeyCreator to StructFile --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 109 ++++++++++++++++++- .../TestCfg/TestCfgPrivateKeyCreator.py | 99 ------------------ .../TestServer/TestPlugins/TestProperties.py | 116 --------------------- 3 files changed, 108 insertions(+), 216 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 75cc41a34..3d4df3e0b 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -1,5 +1,4 @@ import os -import re import sys import copy import lxml.etree @@ -21,6 +20,12 @@ from common import * from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable from TestServer.TestPlugin.Testinterfaces import TestGenerator +try: + from Bcfg2.Encryption import EVPError + HAS_CRYPTO = True +except: + HAS_CRYPTO = False + def tostring(el): return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8') @@ -674,6 +679,108 @@ class TestStructFile(TestXMLFileBacked): children[4] = standalone return (xdata, groups, subgroups, children, subchildren, standalone) + def test_Index(self): + has_crypto = Bcfg2.Server.Plugin.helpers.HAS_CRYPTO + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = False + TestXMLFileBacked.test_Index(self) + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = has_crypto + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_Index_crypto(self): + if not self.test_obj.encryption: + return skip("Encryption disabled on %s objects, skipping" % + self.test_obj.__name__) + + sf = self.get_obj() + sf.setup = Mock() + sf.setup.cfp.get.return_value = "strict" + sf._decrypt = Mock() + sf._decrypt.return_value = 'plaintext' + sf.data = ''' + + + crypted + + + plain + +''' + + # test successful decryption + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + for el in sf.xdata.xpath("//*[@encrypted]"): + self.assertEqual(el.text, sf._decrypt.return_value) + + # test failed decryption, strict + sf._decrypt.reset_mock() + sf._decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, sf.Index) + + # test failed decryption, lax + sf.setup.cfp.get.return_value = "lax" + sf._decrypt.reset_mock() + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") + @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): + sf = self.get_obj() + + def reset(): + mock_bruteforce.reset_mock() + mock_get_passphrases.reset_mock() + mock_ssl.reset_mock() + + # test element without text contents + self.assertIsNone(sf._decrypt(lxml.etree.Element("Test"))) + self.assertFalse(mock_bruteforce.called) + self.assertFalse(mock_get_passphrases.called) + self.assertFalse(mock_ssl.called) + + # test element with a passphrase in the config file + reset() + el = lxml.etree.Element("Test", encrypted="foo") + el.text = "crypted" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") + mock_ssl.return_value = "decrypted with ssl" + self.assertEqual(sf._decrypt(el), mock_ssl.return_value) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test failure to decrypt element with a passphrase in the config + reset() + mock_ssl.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test element without valid passphrase + reset() + el.set("encrypted", "true") + mock_bruteforce.return_value = "decrypted with bruteforce" + self.assertEqual(sf._decrypt(el), mock_bruteforce.return_value) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + + # test failure to decrypt element without valid passphrase + reset() + mock_bruteforce.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + def test_include_element(self): sf = self.get_obj() metadata = Mock() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index 4c8ab8b43..bf34d4c3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -306,102 +306,3 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): inner2() - def test_Index(self): - has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False - TestStructFile.test_Index(self) - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pkc = self.get_obj() - pkc.setup = Mock() - pkc.setup.cfp.get.return_value = "strict" - pkc._decrypt = Mock() - pkc._decrypt.return_value = 'plaintext' - pkc.data = ''' - - - crypted - - - plain - -''' - - # test successful decryption - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - for el in pkc.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pkc._decrypt.return_value) - - # test failed decryption, strict - pkc._decrypt.reset_mock() - pkc._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pkc.Index) - - # test failed decryption, lax - pkc.setup.cfp.get.return_value = "lax" - pkc._decrypt.reset_mock() - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pkc = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index b0ca77a78..b63d08524 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -18,12 +18,6 @@ from common import * from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \ TestPlugin, TestDirectoryBacked -try: - from Bcfg2.Encryption import EVPError - HAS_CRYPTO = True -except: - HAS_CRYPTO = False - try: import json JSON = "json" @@ -243,116 +237,6 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) - def test_Index(self): - TestStructFile.test_Index(self) - - pf = self.get_obj() - pf.xdata = lxml.etree.Element("Properties") - lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo") - pf.data = lxml.etree.tostring(pf.xdata) - # extra test: crypto is not available, but properties file is - # encrypted - has_crypto = Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = False - try: - self.assertRaises(PluginExecutionError, pf.Index) - finally: - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pf = self.get_obj() - pf._decrypt = Mock() - pf._decrypt.return_value = 'plaintext' - pf.data = ''' - - - crypted - plain - - crypted - plain - - crypted - -''' - - print "HAS_CRYPTO: %s" % HAS_CRYPTO - print "Properties HAS_CRYPTO: %s" % Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - - # test successful decryption - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - for el in pf.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pf._decrypt.return_value) - - # test failed decryption, strict - pf._decrypt.reset_mock() - pf._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pf.Index) - - # test failed decryption, lax - pf.data = pf.data.replace("strict", "lax") - pf._decrypt.reset_mock() - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pf = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - @patch("copy.copy") def test_get_additional_data(self, mock_copy): pf = self.get_obj() -- cgit v1.2.3-1-g7c22 From a6b269011b2f1e84c650239065e56778591f087e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 30 Oct 2012 11:03:10 -0400 Subject: removed support for info/:info files --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 55 ++++++---------------- 1 file changed, 14 insertions(+), 41 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 3d4df3e0b..290bd7d5c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -1636,25 +1636,25 @@ class TestEntrySet(TestDebuggable): eset.reset_metadata.reset_mock() eset.entry_init.reset_mock() - for fname in ["info", "info.xml", ":info"]: - for evt in ["exists", "created", "changed"]: - reset() - event = Mock() - event.code2str.return_value = evt - event.filename = fname - eset.handle_event(event) - eset.update_metadata.assert_called_with(event) - self.assertFalse(eset.entry_init.called) - self.assertFalse(eset.reset_metadata.called) - + fname = "info.xml" + for evt in ["exists", "created", "changed"]: reset() event = Mock() - event.code2str.return_value = "deleted" + event.code2str.return_value = evt event.filename = fname eset.handle_event(event) - eset.reset_metadata.assert_called_with(event) + eset.update_metadata.assert_called_with(event) self.assertFalse(eset.entry_init.called) - self.assertFalse(eset.update_metadata.called) + self.assertFalse(eset.reset_metadata.called) + + reset() + event = Mock() + event.code2str.return_value = "deleted" + event.filename = fname + eset.handle_event(event) + eset.reset_metadata.assert_called_with(event) + self.assertFalse(eset.entry_init.called) + self.assertFalse(eset.update_metadata.called) for evt in ["exists", "created", "changed"]: reset() @@ -1813,25 +1813,6 @@ class TestEntrySet(TestDebuggable): self.assertFalse(mock_InfoXML.called) eset.infoxml.HandleEvent.assert_called_with(event) - for fname in [':info', 'info']: - event = Mock() - event.filename = fname - - idata = ["owner:owner", - "group: GROUP", - "mode: 775", - "important: true", - "bogus: line"] - mock_open.return_value.readlines.return_value = idata - eset.metadata = default_path_metadata() - eset.update_metadata(event) - expected = default_path_metadata() - expected['owner'] = 'owner' - expected['group'] = 'GROUP' - expected['mode'] = '0775' - expected['important'] = 'true' - self.assertItemsEqual(eset.metadata, expected) - @patch("Bcfg2.Server.Plugin.helpers.default_path_metadata") def test_reset_metadata(self, mock_default_path_metadata): eset = self.get_obj() @@ -1843,14 +1824,6 @@ class TestEntrySet(TestDebuggable): eset.reset_metadata(event) self.assertIsNone(eset.infoxml) - for fname in [':info', 'info']: - event = Mock() - event.filename = fname - eset.metadata = Mock() - eset.reset_metadata(event) - self.assertEqual(eset.metadata, - mock_default_path_metadata.return_value) - @patch("Bcfg2.Server.Plugin.helpers.bind_info") def test_bind_info_to_entry(self, mock_bind_info): # There's a strange scoping issue in py3k that prevents this -- cgit v1.2.3-1-g7c22 From 45c7bbf24ae3c6530f33ebb33c062818ad44816d Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 30 Oct 2012 11:35:46 -0400 Subject: added a module-level FAM object to avoid passing it as an argument a billion times --- .../Testlib/TestServer/TestPlugins/TestMetadata.py | 53 ++++++++++++---------- .../Testlib/TestServer/TestPlugins/TestProbes.py | 22 +++++---- 2 files changed, 41 insertions(+), 34 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index f627e4465..b1db34462 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -6,6 +6,7 @@ import socket import lxml.etree import Bcfg2.Server import Bcfg2.Server.Plugin +from Bcfg2.Server.FileMonitor import _FAM from Bcfg2.Server.Plugins.Metadata import * from mock import Mock, MagicMock, patch @@ -206,8 +207,7 @@ class TestXMLMetadataConfig(TestXMLFileBacked): def test__init(self): xmc = self.get_obj() - self.assertEqual(self.metadata.core.fam, xmc.fam) - self.assertFalse(xmc.fam.AddMonitor.called) + self.assertFalse(_FAM.AddMonitor.called) def test_xdata(self): config = self.get_obj() @@ -259,12 +259,12 @@ class TestXMLMetadataConfig(TestXMLFileBacked): config.extras = [] config.add_monitor(fpath) - self.assertFalse(core.fam.AddMonitor.called) + self.assertFalse(_FAM.AddMonitor.called) self.assertEqual(config.extras, [fpath]) config = self.get_obj(core=core, watch_clients=True) config.add_monitor(fpath) - core.fam.AddMonitor.assert_called_with(fpath, config.metadata) + _FAM.AddMonitor.assert_called_with(fpath, config.metadata) self.assertItemsEqual(config.extras, [fpath]) def test_Index(self): @@ -501,20 +501,22 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): self.assertEqual(metadata.states, dict()) # test with watch_clients=True - core.fam = MagicMock() + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = MagicMock() metadata = self.get_obj(core=core, watch_clients=True) self.assertEqual(len(metadata.states), 2) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - core.fam.reset_mock() - core.fam.AddMonitor = Mock(side_effect=IOError) + Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_any_call(os.path.join(metadata.data, + "groups.xml"), + metadata) + Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_any_call(os.path.join(metadata.data, + "clients.xml"), + metadata) + + Bcfg2.Server.FileMonitor._FAM.reset_mock() + Bcfg2.Server.FileMonitor._FAM.AddMonitor = Mock(side_effect=IOError) self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, self.get_obj, core=core, watch_clients=True) + Bcfg2.Server.FileMonitor._FAM = fam @patch('os.makedirs', Mock()) @patch('%s.open' % builtins) @@ -1260,17 +1262,17 @@ class TestMetadataBase(TestMetadata): @patch('os.path.exists') def test__init(self, mock_exists): - core = MagicMock() - core.fam = Mock() + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = MagicMock() mock_exists.return_value = False - metadata = self.get_obj(core=core, watch_clients=True) + metadata = self.get_obj(watch_clients=True) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) - core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, - "groups.xml"), - metadata) + Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_called_once_with( + os.path.join(metadata.data, "groups.xml"), + metadata) mock_exists.return_value = True - core.fam.reset_mock() + Bcfg2.Server.FileMonitor._FAM.reset_mock() metadata = self.get_obj(core=core, watch_clients=True) core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, "groups.xml"), @@ -1516,9 +1518,12 @@ class TestMetadata_ClientsXML(TestMetadataBase): def load_clients_data(self, metadata=None, xdata=None): if metadata is None: metadata = self.get_obj() - metadata.core.fam = Mock() + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = MagicMock() metadata.clients_xml = metadata._handle_file("clients.xml") metadata = TestMetadata.load_clients_data(self, metadata=metadata, xdata=xdata) - return TestMetadataBase.load_clients_data(self, metadata=metadata, - xdata=xdata) + rv = TestMetadataBase.load_clients_data(self, metadata=metadata, + xdata=xdata) + Bcfg2.Server.FileMonitor._FAM = fam + return rv diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index 899fb24a0..2e1d6df51 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -16,6 +16,7 @@ while path != "/": break path = os.path.dirname(path) from common import * +from Bcfg2.Server.FileMonitor import _FAM from Bcfg2.Server.Plugins.Probes import * from TestPlugin import TestEntrySet, TestProbing, TestConnector, \ TestDatabaseBacked @@ -91,22 +92,23 @@ class TestProbeSet(TestEntrySet): ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"] bogus_names = ["test.py"] - def get_obj(self, path=datastore, fam=None, encoding=None, + def get_obj(self, path=datastore, encoding=None, plugin_name="Probes", basename=None): # get_obj() accepts the basename argument, accepted by the # parent get_obj() method, and just throws it away, since # ProbeSet uses a regex for the "basename" - if fam is None: - fam = Mock() - rv = self.test_obj(path, fam, encoding, plugin_name) + rv = self.test_obj(path, encoding, plugin_name) rv.entry_type = MagicMock() return rv def test__init(self): - fam = Mock() - ps = self.get_obj(fam=fam) + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = Mock() + ps = self.get_obj() self.assertEqual(ps.plugin_name, "Probes") - fam.AddMonitor.assert_called_with(datastore, ps) + Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_called_with(datastore, + ps) + Bcfg2.Server.FileMonitor._FAM = fam TestEntrySet.test__init(self) def test_HandleEvent(self): @@ -256,9 +258,9 @@ text def test__init(self): mock_load_data = Mock() probes = self.get_probes_object(load_data=mock_load_data) - probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore, - probes.name), - probes.probes) + _FAM.AddMonitor.assert_called_with(os.path.join(datastore, + probes.name), + probes.probes) mock_load_data.assert_any_call() self.assertEqual(probes.probedata, ClientProbeDataSet()) self.assertEqual(probes.cgroups, dict()) -- cgit v1.2.3-1-g7c22 From cebd0d7ad54995c37f68586a14540ad64d99d762 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 31 Oct 2012 11:46:51 -0400 Subject: fixed unit tests --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 54 +++++++--------- .../Testlib/TestServer/TestPlugins/TestMetadata.py | 71 ++++++++++++---------- .../Testlib/TestServer/TestPlugins/TestProbes.py | 13 +--- 3 files changed, 65 insertions(+), 73 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 290bd7d5c..7845d5926 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -114,10 +114,10 @@ class TestFileBacked(Bcfg2TestCase): test_obj = FileBacked path = os.path.join(datastore, "test") - def get_obj(self, path=None, fam=None): + def get_obj(self, path=None): if path is None: path = self.path - return self.test_obj(path, fam=fam) + return self.test_obj(path) @patch("%s.open" % builtins) def test_HandleEvent(self, mock_open): @@ -163,24 +163,20 @@ class TestDirectoryBacked(Bcfg2TestCase): """ ensure that the child object has the correct interface """ self.assertTrue(hasattr(self.test_obj.__child__, "HandleEvent")) - def get_obj(self, fam=None): - if fam is None: - fam = Mock() - + def get_obj(self): @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, self.test_obj.__name__), Mock()) def inner(): return self.test_obj(os.path.join(datastore, - self.test_obj.__name__), - fam) + self.test_obj.__name__)) return inner() def test__init(self): @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, self.test_obj.__name__)) def inner(mock_add_monitor): - db = self.test_obj(datastore, Mock()) + db = self.test_obj(datastore) mock_add_monitor.assert_called_with('') inner() @@ -249,10 +245,9 @@ class TestDirectoryBacked(Bcfg2TestCase): db.fam = Mock() class MockChild(Mock): - def __init__(self, path, fam, **kwargs): + def __init__(self, path, **kwargs): Mock.__init__(self, **kwargs) self.path = path - self.fam = fam self.HandleEvent = Mock() db.__child__ = MockChild @@ -262,7 +257,6 @@ class TestDirectoryBacked(Bcfg2TestCase): self.assertIn(path, db.entries) self.assertEqual(db.entries[path].path, os.path.join(db.data, path)) - self.assertEqual(db.entries[path].fam, db.fam) db.entries[path].HandleEvent.assert_called_with(event) @patch("os.path.isdir") @@ -400,27 +394,27 @@ class TestXMLFileBacked(TestFileBacked): should_monitor = None path = os.path.join(datastore, "test", "test1.xml") - def get_obj(self, path=None, fam=None, should_monitor=False): + def get_obj(self, path=None, should_monitor=False): if path is None: path = self.path - return self.test_obj(path, fam=fam, should_monitor=should_monitor) + return self.test_obj(path, should_monitor=should_monitor) - def test__init(self): - fam = Mock() + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): xfb = self.get_obj() if self.should_monitor is True: - self.assertIsNotNone(xfb.fam) + self.assertEqual(xfb.fam, mock_get_fam.return_value) else: self.assertIsNone(xfb.fam) if self.should_monitor is not True: - xfb = self.get_obj(fam=fam) - self.assertFalse(fam.AddMonitor.called) + xfb = self.get_obj() + self.assertFalse(xfb.fam.AddMonitor.called) if self.should_monitor is not False: fam.reset_mock() - xfb = self.get_obj(fam=fam, should_monitor=True) - fam.AddMonitor.assert_called_with(self.path, xfb) + xfb = self.get_obj(should_monitor=True) + xfb.fam.AddMonitor.assert_called_with(self.path, xfb) @patch("os.path.exists") @patch("lxml.etree.parse") @@ -568,6 +562,7 @@ class TestXMLFileBacked(TestFileBacked): test3 = lxml.etree.Element("Test", name="test3") replacements = {"/test/test2.xml": test2, "/test/test_dir/test3.xml": test3} + def xinclude(): for el in xfb.xdata.findall('//%sinclude' % Bcfg2.Server.XI_NAMESPACE): @@ -585,25 +580,25 @@ class TestXMLFileBacked(TestFileBacked): self.assertItemsEqual([tostring(e) for e in xfb.entries], [tostring(e) for e in children]) + @patch("Bcfg2.Server.FileMonitor.get_fam", Mock()) def test_add_monitor(self): xfb = self.get_obj() xfb.add_monitor("/test/test2.xml") self.assertIn("/test/test2.xml", xfb.extras) - fam = Mock() if self.should_monitor is not True: fam.reset_mock() - xfb = self.get_obj(fam=fam) - fam.reset_mock() + xfb = self.get_obj() + xfb.fam = Mock() xfb.add_monitor("/test/test3.xml") - self.assertFalse(fam.AddMonitor.called) + self.assertFalse(xfb.fam.AddMonitor.called) self.assertIn("/test/test3.xml", xfb.extras) if self.should_monitor is not False: - fam.reset_mock() - xfb = self.get_obj(fam=fam, should_monitor=True) + xfb = self.get_obj(should_monitor=True) + xfb.fam = Mock() xfb.add_monitor("/test/test4.xml") - fam.AddMonitor.assert_called_with("/test/test4.xml", xfb) + xfb.fam.AddMonitor.assert_called_with("/test/test4.xml", xfb) self.assertIn("/test/test4.xml", xfb.extras) @@ -2071,6 +2066,3 @@ class TestGroupSpool(TestPlugin, TestGenerator): gs.event_id.assert_called_with(event) self.assertNotIn("/baz/quux", gs.entries) self.assertNotIn("/baz/quux", gs.Entries[gs.entry_type]) - - - diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index b1db34462..0afa5220d 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -6,7 +6,6 @@ import socket import lxml.etree import Bcfg2.Server import Bcfg2.Server.Plugin -from Bcfg2.Server.FileMonitor import _FAM from Bcfg2.Server.Plugins.Metadata import * from mock import Mock, MagicMock, patch @@ -205,9 +204,10 @@ class TestXMLMetadataConfig(TestXMLFileBacked): watch_clients=watch_clients) return XMLMetadataConfig(self.metadata, watch_clients, basefile) + @patch("Bcfg2.Server.FileMonitor.get_fam", Mock()) def test__init(self): xmc = self.get_obj() - self.assertFalse(_FAM.AddMonitor.called) + self.assertFalse(xmc.fam.AddMonitor.called) def test_xdata(self): config = self.get_obj() @@ -251,20 +251,21 @@ class TestXMLMetadataConfig(TestXMLFileBacked): self.assertEqual(config.base_xdata, "") def test_add_monitor(self): - core = MagicMock() - config = self.get_obj(core=core) + config = self.get_obj() + config.fam = Mock() fname = "test.xml" fpath = os.path.join(self.metadata.data, fname) config.extras = [] config.add_monitor(fpath) - self.assertFalse(_FAM.AddMonitor.called) + self.assertFalse(config.fam.AddMonitor.called) self.assertEqual(config.extras, [fpath]) - config = self.get_obj(core=core, watch_clients=True) + config = self.get_obj(watch_clients=True) + config.fam = Mock() config.add_monitor(fpath) - _FAM.AddMonitor.assert_called_with(fpath, config.metadata) + config.fam.AddMonitor.assert_called_with(fpath, config.metadata) self.assertItemsEqual(config.extras, [fpath]) def test_Index(self): @@ -488,7 +489,8 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): client_name = "%s%s" % (prefix, i) return client_name - def test__init(self): + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): # test with watch_clients=False core = MagicMock() metadata = self.get_obj(core=core) @@ -501,22 +503,21 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): self.assertEqual(metadata.states, dict()) # test with watch_clients=True - fam = Bcfg2.Server.FileMonitor._FAM - Bcfg2.Server.FileMonitor._FAM = MagicMock() metadata = self.get_obj(core=core, watch_clients=True) self.assertEqual(len(metadata.states), 2) - Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - Bcfg2.Server.FileMonitor._FAM.reset_mock() - Bcfg2.Server.FileMonitor._FAM.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) + + mock_get_fam.reset_mock() + fam = Mock() + fam.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value = fam self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, self.get_obj, core=core, watch_clients=True) - Bcfg2.Server.FileMonitor._FAM = fam @patch('os.makedirs', Mock()) @patch('%s.open' % builtins) @@ -577,6 +578,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_add_group(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -609,6 +611,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_update_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -626,6 +629,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_remove_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -641,6 +645,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_add_bundle(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -664,6 +669,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_remove_bundle(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -679,6 +685,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_add_client(self): metadata = self.get_obj() metadata.clients_xml.write = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = lxml.etree.XML('').getroottree() metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -713,6 +720,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): def test_update_client(self): metadata = self.get_obj() metadata.clients_xml.write_xml = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree()) metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -1261,25 +1269,24 @@ class TestMetadataBase(TestMetadata): return client_name @patch('os.path.exists') - def test__init(self, mock_exists): - fam = Bcfg2.Server.FileMonitor._FAM - Bcfg2.Server.FileMonitor._FAM = MagicMock() + @patch('Bcfg2.Server.FileMonitor.get_fam') + def test__init(self, mock_get_fam, mock_exists): mock_exists.return_value = False metadata = self.get_obj(watch_clients=True) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) - Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_called_once_with( + mock_get_fam.return_value.AddMonitor.assert_called_with( os.path.join(metadata.data, "groups.xml"), metadata) mock_exists.return_value = True - Bcfg2.Server.FileMonitor._FAM.reset_mock() - metadata = self.get_obj(core=core, watch_clients=True) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) + mock_get_fam.reset_mock() + metadata = self.get_obj(watch_clients=True) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) def test_add_group(self): pass diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index 2e1d6df51..958dba4ff 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -16,7 +16,6 @@ while path != "/": break path = os.path.dirname(path) from common import * -from Bcfg2.Server.FileMonitor import _FAM from Bcfg2.Server.Plugins.Probes import * from TestPlugin import TestEntrySet, TestProbing, TestConnector, \ TestDatabaseBacked @@ -101,14 +100,11 @@ class TestProbeSet(TestEntrySet): rv.entry_type = MagicMock() return rv - def test__init(self): - fam = Bcfg2.Server.FileMonitor._FAM - Bcfg2.Server.FileMonitor._FAM = Mock() + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): ps = self.get_obj() self.assertEqual(ps.plugin_name, "Probes") - Bcfg2.Server.FileMonitor._FAM.AddMonitor.assert_called_with(datastore, - ps) - Bcfg2.Server.FileMonitor._FAM = fam + mock_get_fam.return_value.AddMonitor.assert_called_with(datastore, ps) TestEntrySet.test__init(self) def test_HandleEvent(self): @@ -258,9 +254,6 @@ text def test__init(self): mock_load_data = Mock() probes = self.get_probes_object(load_data=mock_load_data) - _FAM.AddMonitor.assert_called_with(os.path.join(datastore, - probes.name), - probes.probes) mock_load_data.assert_any_call() self.assertEqual(probes.probedata, ClientProbeDataSet()) self.assertEqual(probes.cgroups, dict()) -- cgit v1.2.3-1-g7c22 From 09e6b3eac584c0e3558c062610da1824bb5eb9b2 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 16 Jan 2013 08:00:31 -0500 Subject: fixed unit tests --- testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py | 7 +------ .../Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 7845d5926..fc9b5610c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -402,17 +402,13 @@ class TestXMLFileBacked(TestFileBacked): @patch("Bcfg2.Server.FileMonitor.get_fam") def test__init(self, mock_get_fam): xfb = self.get_obj() - if self.should_monitor is True: - self.assertEqual(xfb.fam, mock_get_fam.return_value) - else: - self.assertIsNone(xfb.fam) + self.assertEqual(xfb.fam, mock_get_fam.return_value) if self.should_monitor is not True: xfb = self.get_obj() self.assertFalse(xfb.fam.AddMonitor.called) if self.should_monitor is not False: - fam.reset_mock() xfb = self.get_obj(should_monitor=True) xfb.fam.AddMonitor.assert_called_with(self.path, xfb) @@ -587,7 +583,6 @@ class TestXMLFileBacked(TestFileBacked): self.assertIn("/test/test2.xml", xfb.extras) if self.should_monitor is not True: - fam.reset_mock() xfb = self.get_obj() xfb.fam = Mock() xfb.add_monitor("/test/test3.xml") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py index 43d594482..4db92b7c4 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py @@ -25,7 +25,7 @@ class TestHelperModule(Bcfg2TestCase): def get_obj(self, path=None): if path is None: path = self.path - return self.test_obj(path, fam=Mock()) + return self.test_obj(path) def test__init(self): hm = self.get_obj() -- cgit v1.2.3-1-g7c22 From 9be9cfec322518f764be9766b27d24132fc6a66f Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 16 Jan 2013 13:28:06 -0500 Subject: added module-level OptionParser to avoid passing it as an argument or global all over --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 12 +- .../TestCfg/TestCfgAuthorizedKeysGenerator.py | 32 +++--- .../TestPlugins/TestCfg/TestCfgCheetahGenerator.py | 7 +- .../TestCfg/TestCfgEncryptedGenerator.py | 9 +- .../TestPlugins/TestCfg/TestCfgGenshiGenerator.py | 14 +-- .../TestCfg/TestCfgPrivateKeyCreator.py | 126 ++++++++------------- .../TestServer/TestPlugins/TestCfg/Test_init.py | 41 ++++--- .../TestServer/TestPlugins/TestProperties.py | 85 ++++++-------- 8 files changed, 139 insertions(+), 187 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 6dbdc7667..75cc41a34 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -1716,16 +1716,17 @@ class TestEntrySet(TestDebuggable): "important: true", "bogus: line"] mock_open.return_value.readlines.return_value = idata + eset.metadata = default_path_metadata() eset.update_metadata(event) - expected = DEFAULT_FILE_METADATA.copy() + expected = default_path_metadata() expected['owner'] = 'owner' expected['group'] = 'GROUP' expected['mode'] = '0775' expected['important'] = 'true' - self.assertItemsEqual(eset.metadata, - expected) + self.assertItemsEqual(eset.metadata, expected) - def test_reset_metadata(self): + @patch("Bcfg2.Server.Plugin.helpers.default_path_metadata") + def test_reset_metadata(self, mock_default_path_metadata): eset = self.get_obj() # test info.xml @@ -1740,7 +1741,8 @@ class TestEntrySet(TestDebuggable): event.filename = fname eset.metadata = Mock() eset.reset_metadata(event) - self.assertItemsEqual(eset.metadata, DEFAULT_FILE_METADATA) + self.assertEqual(eset.metadata, + mock_default_path_metadata.return_value) @patch("Bcfg2.Server.Plugin.helpers.bind_info") def test_bind_info_to_entry(self, mock_bind_info): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py index d655a20cd..b77d52033 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py @@ -42,27 +42,25 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): def test_category(self): akg = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp + akg.setup = Mock() + akg.setup.cfp.has_section.return_value = False + akg.setup.cfp.has_option.return_value = False self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + akg.setup.reset_mock() + akg.setup.cfp.has_section.return_value = True self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(akg.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + + akg.setup.reset_mock() + akg.setup.cfp.has_option.return_value = True + self.assertEqual(akg.category, akg.setup.cfp.get.return_value) + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + akg.setup.cfp.get.assert_called_with("sshkeys", "category") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py index fc5d5e53d..31227329c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py @@ -31,11 +31,11 @@ if HAS_CHEETAH or can_skip: ccg.data = "data" entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock() + ccg.setup = MagicMock() self.assertEqual(ccg.get_data(entry, metadata), mock_Template.return_value.respond.return_value) - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo") + ccg.setup.__getitem__.assert_called_with("repo") mock_Template.assert_called_with("data".decode(ccg.encoding), compilerSettings=ccg.settings) tmpl = mock_Template.return_value @@ -44,5 +44,4 @@ if HAS_CHEETAH or can_skip: self.assertEqual(tmpl.name, entry.get("name")) self.assertEqual(tmpl.path, entry.get("name")) self.assertEqual(tmpl.source_path, ccg.name) - self.assertEqual(tmpl.repo, - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value) + self.assertEqual(tmpl.repo, ccg.setup.__getitem__.return_value) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py index 71a7410da..f8e9b1990 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py @@ -26,16 +26,13 @@ if can_skip or HAS_CRYPTO: def setUp(self): pass - @patchIf(HAS_CRYPTO, - "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") - def test_handle_event(self, mock_decrypt, mock_get_algorithm): + def test_handle_event(self, mock_decrypt): @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") def inner(mock_handle_event): def reset(): mock_decrypt.reset_mock() - mock_get_algorithm.reset_mock() mock_handle_event.reset_mock() def get_event_data(obj, event): @@ -47,9 +44,7 @@ if can_skip or HAS_CRYPTO: ceg = self.get_obj() ceg.handle_event(event) mock_handle_event.assert_called_with(ceg, event) - mock_decrypt.assert_called_with("encrypted", - setup=SETUP, - algorithm=mock_get_algorithm.return_value) + mock_decrypt.assert_called_with("encrypted") self.assertEqual(ceg.data, "plaintext") reset() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py index 385f8df77..88f3cf3f7 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -43,6 +43,7 @@ if can_skip or HAS_GENSHI: def test_get_data(self): cgg = self.get_obj() cgg._handle_genshi_exception = Mock() + cgg.setup = MagicMock() cgg.template = Mock() fltr = Mock() cgg.template.generate.return_value = fltr @@ -51,24 +52,23 @@ if can_skip or HAS_GENSHI: entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock() def reset(): cgg.template.reset_mock() cgg._handle_genshi_exception.reset_mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock() + cgg.setup.reset_mock() template_vars = dict( name=entry.get("name"), metadata=metadata, path=cgg.name, source_path=cgg.name, - repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value) + repo=cgg.setup.__getitem__.return_value) self.assertEqual(cgg.get_data(entry, metadata), stream.render.return_value) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) @@ -81,7 +81,7 @@ if can_skip or HAS_GENSHI: self.assertEqual(cgg.get_data(entry, metadata), stream.render.return_value) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) self.assertEqual(stream.render.call_args_list, [call("text", encoding=cgg.encoding, @@ -93,7 +93,7 @@ if can_skip or HAS_GENSHI: self.assertRaises(UndefinedError, cgg.get_data, entry, metadata) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) @@ -104,7 +104,7 @@ if can_skip or HAS_GENSHI: self.assertRaises(ValueError, cgg.get_data, entry, metadata) cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") + cgg.setup.__getitem__.assert_called_with("repo") fltr.filter.assert_called_with(removecomment) stream.render.assert_called_with("text", encoding=cgg.encoding, strip_whitespace=False) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index dc4b11241..4c8ab8b43 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -44,56 +44,56 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): def test_category(self): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(pkc.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + pkc.setup.reset_mock() + pkc.setup.cfp.has_option.return_value = True + self.assertEqual(pkc.category, pkc.setup.cfp.get.return_value) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.get.assert_called_with("sshkeys", "category") @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") def test_passphrase(self, mock_get_passphrases): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") - cfp.reset_mock() - cfp.get.return_value = "test" + pkc.setup.reset_mock() + pkc.setup.cfp.get.return_value = "test" mock_get_passphrases.return_value = dict(test="foo", test2="bar") - cfp.has_option.return_value = True + pkc.setup.cfp.has_option.return_value = True self.assertEqual(pkc.passphrase, "foo") - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") - cfp.get.assert_called_with("sshkeys", "passphrase") - mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") + pkc.setup.cfp.get.assert_called_with("sshkeys", "passphrase") + mock_get_passphrases.assert_called_with() @patch("shutil.rmtree") @patch("tempfile.mkdtemp") @@ -282,8 +282,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): if HAS_CRYPTO: @patch(passphrase, "foo") @patch("Bcfg2.Encryption.ssl_encrypt") - @patch("Bcfg2.Encryption.get_algorithm") - def inner2(mock_get_algorithm, mock_ssl_encrypt): + def inner2(mock_ssl_encrypt): reset() mock_ssl_encrypt.return_value = "encryptedprivatekey" Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True @@ -302,9 +301,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): "ssh-rsa publickey pubkey.filename\n", group="foo") pkc.write_data.assert_called_with("encryptedprivatekey", group="foo", ext=".crypt") - mock_ssl_encrypt.assert_called_with( - "privatekey", "foo", - algorithm=mock_get_algorithm.return_value) + mock_ssl_encrypt.assert_called_with("privatekey", "foo") mock_rmtree.assert_called_with(datastore) inner2() @@ -317,10 +314,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") def test_Index_crypto(self): - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict" - pkc = self.get_obj() + pkc.setup = Mock() + pkc.setup.cfp.get.return_value = "strict" pkc._decrypt = Mock() pkc._decrypt.return_value = 'plaintext' pkc.data = ''' @@ -348,7 +344,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): self.assertRaises(PluginExecutionError, pkc.Index) # test failed decryption, lax - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax" + pkc.setup.cfp.get.return_value = "lax" pkc._decrypt.reset_mock() pkc.Index() self.assertItemsEqual( @@ -358,17 +354,13 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): pkc = self.get_obj() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock() def reset(): mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() mock_get_passphrases.reset_mock() mock_ssl.reset_mock() @@ -382,29 +374,19 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): reset() el = lxml.etree.Element("Test", encrypted="foo") el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") mock_ssl.return_value = "decrypted with ssl" self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test failure to decrypt element with a passphrase in the config reset() mock_ssl.side_effect = EVPError self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test element without valid passphrase @@ -412,26 +394,14 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): el.set("encrypted", "true") mock_bruteforce.return_value = "decrypted with bruteforce" self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) # test failure to decrypt element without valid passphrase reset() mock_bruteforce.side_effect = EVPError self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index 2e758774e..ee0b0be9d 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -439,8 +439,6 @@ class TestCfgEntrySet(TestEntrySet): @patch("Bcfg2.Server.Plugins.Cfg.u_str") @patch("Bcfg2.Server.Plugins.Cfg.b64encode") def test_bind_entry(self, mock_b64encode, mock_u_str): - Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False) - mock_u_str.side_effect = lambda x: x eset = self.get_obj() @@ -448,6 +446,7 @@ class TestCfgEntrySet(TestEntrySet): eset._generate_data = Mock() eset.get_handlers = Mock() eset._validate_data = Mock() + eset.setup = dict(validate=False) def reset(): mock_b64encode.reset_mock() @@ -523,7 +522,7 @@ class TestCfgEntrySet(TestEntrySet): # test successful validation entry = reset() - Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True + eset.setup['validate'] = True bound = eset.bind_entry(entry, metadata) eset.bind_info_to_entry.assert_called_with(entry, metadata) eset._generate_data.assert_called_with(entry, metadata) @@ -753,31 +752,39 @@ class TestCfg(TestGroupSpool, TestPullTarget): def get_obj(self, core=None): if core is None: core = Mock() - core.setup = MagicMock() return TestGroupSpool.get_obj(self, core=core) + @patch("Bcfg2.Options.get_option_parser") @patch("Bcfg2.Server.Plugin.GroupSpool.__init__") @patch("Bcfg2.Server.Plugin.PullTarget.__init__") - def test__init(self, mock_pulltarget_init, mock_groupspool_init): + def test__init(self, mock_pulltarget_init, mock_groupspool_init, + mock_get_option_parser): + setup = MagicMock() + setup.__contains__.return_value = False + mock_get_option_parser.return_value = setup + + def reset(): + core.reset_mock() + setup.reset_mock() + mock_pulltarget_init.reset_mock() + mock_groupspool_init.reset_mock() + core = Mock() - core.setup = MagicMock() cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - core.setup.add_option.assert_called_with("validate", - Bcfg2.Options.CFG_VALIDATION) - core.setup.reparse.assert_called_with() - - core.reset_mock() - core.setup.reset_mock() - mock_pulltarget_init.reset_mock() - mock_groupspool_init.reset_mock() - core.setup.__contains__.return_value = True + setup.add_option.assert_called_with( + "validate", + Bcfg2.Options.CFG_VALIDATION) + mock_get_option_parser.return_value.reparse.assert_called_with() + + reset() + setup.__contains__.return_value = True cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - self.assertFalse(core.setup.add_option.called) - self.assertFalse(core.setup.reparse.called) + self.assertFalse(setup.add_option.called) + self.assertFalse(setup.reparse.called) def test_has_generator(self): cfg = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index d66780a20..b0ca77a78 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -41,10 +41,10 @@ class TestPropertyFile(Bcfg2TestCase): return self.test_obj(path) def test_write(self): - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() pf = self.get_obj() pf.validate_data = Mock() pf._write = Mock() + pf.setup = Mock() xstr = u("\n") pf.xdata = lxml.etree.XML(xstr) @@ -52,20 +52,20 @@ class TestPropertyFile(Bcfg2TestCase): def reset(): pf.validate_data.reset_mock() pf._write.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() # test writes disabled - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertRaises(PluginExecutionError, pf.write) self.assertFalse(pf.validate_data.called) self.assertFalse(pf._write.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", + pf.setup.cfp.getboolean.assert_called_with("properties", "writes_enabled", default=True) # test successful write reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.write(), pf._write.return_value) pf.validate_data.assert_called_with() pf._write.assert_called_with() @@ -301,17 +301,13 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): pf = self.get_obj() - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() def reset(): mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() mock_get_passphrases.reset_mock() mock_ssl.reset_mock() @@ -325,29 +321,19 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): reset() el = lxml.etree.Element("Test", encrypted="foo") el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") mock_ssl.return_value = "decrypted with ssl" self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test failure to decrypt element with a passphrase in the config reset() mock_ssl.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") self.assertFalse(mock_bruteforce.called) # test element without valid passphrase @@ -355,77 +341,73 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): el.set("encrypted", "true") mock_bruteforce.return_value = "decrypted with bruteforce" self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) # test failure to decrypt element without valid passphrase reset() mock_bruteforce.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) self.assertFalse(mock_ssl.called) @patch("copy.copy") def test_get_additional_data(self, mock_copy): - Bcfg2.Server.Plugins.Properties.SETUP = Mock() pf = self.get_obj() + pf.setup = Mock() pf.XMLMatch = Mock() metadata = Mock() def reset(): mock_copy.reset_mock() pf.XMLMatch.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() pf.xdata = lxml.etree.Element("Properties", automatch="true") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) pf.xdata = lxml.etree.Element("Properties", automatch="false") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) pf.xdata = lxml.etree.Element("Properties") reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) @@ -449,7 +431,6 @@ class TestProperties(TestPlugin, TestConnector): core = Mock() p = self.get_obj(core=core) self.assertIsInstance(p.store, PropDirectoryBacked) - self.assertEqual(Bcfg2.Server.Plugins.Properties.SETUP, core.setup) @patch("copy.copy") def test_get_additional_data(self, mock_copy): -- cgit v1.2.3-1-g7c22 From 1a031fc131d950dd49dc425ac1726337d8e93910 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 17 Jan 2013 08:01:44 -0500 Subject: abstracted encryption support from Properties/CfgPrivateKeyCreator to StructFile --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 109 ++++++++++++++++++- .../TestCfg/TestCfgPrivateKeyCreator.py | 99 ------------------ .../TestServer/TestPlugins/TestProperties.py | 116 --------------------- 3 files changed, 108 insertions(+), 216 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 75cc41a34..3d4df3e0b 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -1,5 +1,4 @@ import os -import re import sys import copy import lxml.etree @@ -21,6 +20,12 @@ from common import * from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable from TestServer.TestPlugin.Testinterfaces import TestGenerator +try: + from Bcfg2.Encryption import EVPError + HAS_CRYPTO = True +except: + HAS_CRYPTO = False + def tostring(el): return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8') @@ -674,6 +679,108 @@ class TestStructFile(TestXMLFileBacked): children[4] = standalone return (xdata, groups, subgroups, children, subchildren, standalone) + def test_Index(self): + has_crypto = Bcfg2.Server.Plugin.helpers.HAS_CRYPTO + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = False + TestXMLFileBacked.test_Index(self) + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = has_crypto + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_Index_crypto(self): + if not self.test_obj.encryption: + return skip("Encryption disabled on %s objects, skipping" % + self.test_obj.__name__) + + sf = self.get_obj() + sf.setup = Mock() + sf.setup.cfp.get.return_value = "strict" + sf._decrypt = Mock() + sf._decrypt.return_value = 'plaintext' + sf.data = ''' + + + crypted + + + plain + +''' + + # test successful decryption + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + for el in sf.xdata.xpath("//*[@encrypted]"): + self.assertEqual(el.text, sf._decrypt.return_value) + + # test failed decryption, strict + sf._decrypt.reset_mock() + sf._decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, sf.Index) + + # test failed decryption, lax + sf.setup.cfp.get.return_value = "lax" + sf._decrypt.reset_mock() + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") + @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): + sf = self.get_obj() + + def reset(): + mock_bruteforce.reset_mock() + mock_get_passphrases.reset_mock() + mock_ssl.reset_mock() + + # test element without text contents + self.assertIsNone(sf._decrypt(lxml.etree.Element("Test"))) + self.assertFalse(mock_bruteforce.called) + self.assertFalse(mock_get_passphrases.called) + self.assertFalse(mock_ssl.called) + + # test element with a passphrase in the config file + reset() + el = lxml.etree.Element("Test", encrypted="foo") + el.text = "crypted" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") + mock_ssl.return_value = "decrypted with ssl" + self.assertEqual(sf._decrypt(el), mock_ssl.return_value) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test failure to decrypt element with a passphrase in the config + reset() + mock_ssl.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test element without valid passphrase + reset() + el.set("encrypted", "true") + mock_bruteforce.return_value = "decrypted with bruteforce" + self.assertEqual(sf._decrypt(el), mock_bruteforce.return_value) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + + # test failure to decrypt element without valid passphrase + reset() + mock_bruteforce.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + def test_include_element(self): sf = self.get_obj() metadata = Mock() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index 4c8ab8b43..bf34d4c3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -306,102 +306,3 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): inner2() - def test_Index(self): - has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False - TestStructFile.test_Index(self) - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pkc = self.get_obj() - pkc.setup = Mock() - pkc.setup.cfp.get.return_value = "strict" - pkc._decrypt = Mock() - pkc._decrypt.return_value = 'plaintext' - pkc.data = ''' - - - crypted - - - plain - -''' - - # test successful decryption - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - for el in pkc.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pkc._decrypt.return_value) - - # test failed decryption, strict - pkc._decrypt.reset_mock() - pkc._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pkc.Index) - - # test failed decryption, lax - pkc.setup.cfp.get.return_value = "lax" - pkc._decrypt.reset_mock() - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pkc = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index b0ca77a78..b63d08524 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -18,12 +18,6 @@ from common import * from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \ TestPlugin, TestDirectoryBacked -try: - from Bcfg2.Encryption import EVPError - HAS_CRYPTO = True -except: - HAS_CRYPTO = False - try: import json JSON = "json" @@ -243,116 +237,6 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) - def test_Index(self): - TestStructFile.test_Index(self) - - pf = self.get_obj() - pf.xdata = lxml.etree.Element("Properties") - lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo") - pf.data = lxml.etree.tostring(pf.xdata) - # extra test: crypto is not available, but properties file is - # encrypted - has_crypto = Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = False - try: - self.assertRaises(PluginExecutionError, pf.Index) - finally: - Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pf = self.get_obj() - pf._decrypt = Mock() - pf._decrypt.return_value = 'plaintext' - pf.data = ''' - - - crypted - plain - - crypted - plain - - crypted - -''' - - print "HAS_CRYPTO: %s" % HAS_CRYPTO - print "Properties HAS_CRYPTO: %s" % Bcfg2.Server.Plugins.Properties.HAS_CRYPTO - - # test successful decryption - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - for el in pf.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pf._decrypt.return_value) - - # test failed decryption, strict - pf._decrypt.reset_mock() - pf._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pf.Index) - - # test failed decryption, lax - pf.data = pf.data.replace("strict", "lax") - pf._decrypt.reset_mock() - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): - pf = self.get_obj() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_ssl.assert_called_with(el.text, "foopass") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with() - mock_bruteforce.assert_called_with(el.text) - self.assertFalse(mock_ssl.called) - @patch("copy.copy") def test_get_additional_data(self, mock_copy): pf = self.get_obj() -- cgit v1.2.3-1-g7c22 From 22029e107420ff21cf9f1811bf4bb6dc2aba1dde Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 22 Jan 2013 11:16:19 -0500 Subject: made genshi a requirement --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 10 + .../TestCfg/TestCfgEncryptedGenshiGenerator.py | 12 +- .../TestPlugins/TestCfg/TestCfgGenshiGenerator.py | 202 ++++++++++----------- 3 files changed, 107 insertions(+), 117 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index fc9b5610c..0b6f3fd87 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -3,6 +3,7 @@ import sys import copy import lxml.etree import Bcfg2.Server +import genshi.core from Bcfg2.Compat import reduce from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugin.helpers import * @@ -36,6 +37,15 @@ class FakeElementTree(lxml.etree._ElementTree): class TestFunctions(Bcfg2TestCase): + def test_removecomment(self): + data = [(None, "test", 1), + (None, "test2", 2)] + stream = [(genshi.core.COMMENT, "test", 0), + data[0], + (genshi.core.COMMENT, "test3", 0), + data[1]] + self.assertItemsEqual(list(removecomment(stream)), data) + def test_bind_info(self): entry = lxml.etree.Element("Path", name="/test") metadata = Mock() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py index b447a9bb8..330779c99 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py @@ -14,20 +14,14 @@ while path != "/": path = os.path.dirname(path) from common import * -try: - from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ - TestCfgGenshiGenerator - HAS_GENSHI = True -except ImportError: - TestCfgGenshiGenerator = object - HAS_GENSHI = False +from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ + TestCfgGenshiGenerator -if can_skip or (HAS_CRYPTO and HAS_GENSHI): +if can_skip or HAS_CRYPTO: class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator): test_obj = CfgEncryptedGenshiGenerator @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") def setUp(self): pass diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py index 88f3cf3f7..154d6a8db 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -18,111 +18,97 @@ from common import * from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator -if can_skip or HAS_GENSHI: - class TestCfgGenshiGenerator(TestCfgGenerator): - test_obj = CfgGenshiGenerator - - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") - def setUp(self): - pass - - def test_removecomment(self): - data = [(None, "test", 1), - (None, "test2", 2)] - stream = [(genshi.core.COMMENT, "test", 0), - data[0], - (genshi.core.COMMENT, "test3", 0), - data[1]] - self.assertItemsEqual(list(removecomment(stream)), data) - - def test__init(self): - TestCfgGenerator.test__init(self) - cgg = self.get_obj() - self.assertIsInstance(cgg.loader, cgg.__loader_cls__) - - def test_get_data(self): - cgg = self.get_obj() - cgg._handle_genshi_exception = Mock() - cgg.setup = MagicMock() - cgg.template = Mock() - fltr = Mock() - cgg.template.generate.return_value = fltr - stream = Mock() - fltr.filter.return_value = stream - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() - - - def reset(): - cgg.template.reset_mock() - cgg._handle_genshi_exception.reset_mock() - cgg.setup.reset_mock() - - template_vars = dict( - name=entry.get("name"), - metadata=metadata, - path=cgg.name, - source_path=cgg.name, - repo=cgg.setup.__getitem__.return_value) - - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - cgg.setup.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - def render(fmt, **kwargs): - stream.render.side_effect = None - raise TypeError - stream.render.side_effect = render - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - cgg.setup.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - self.assertEqual(stream.render.call_args_list, - [call("text", encoding=cgg.encoding, - strip_whitespace=False), - call("text", encoding=cgg.encoding)]) - - reset() - stream.render.side_effect = UndefinedError("test") - self.assertRaises(UndefinedError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - cgg.setup.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - stream.render.side_effect = ValueError - cgg._handle_genshi_exception.side_effect = ValueError - self.assertRaises(ValueError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - cgg.setup.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - self.assertTrue(cgg._handle_genshi_exception.called) - - def test_handle_event(self): - cgg = self.get_obj() - cgg.loader = Mock() - event = Mock() - cgg.handle_event(event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) - - cgg.loader.reset_mock() - cgg.loader.load.side_effect = OSError - self.assertRaises(PluginExecutionError, - cgg.handle_event, event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) +class TestCfgGenshiGenerator(TestCfgGenerator): + test_obj = CfgGenshiGenerator + + def test__init(self): + TestCfgGenerator.test__init(self) + cgg = self.get_obj() + self.assertIsInstance(cgg.loader, cgg.__loader_cls__) + + def test_get_data(self): + cgg = self.get_obj() + cgg._handle_genshi_exception = Mock() + cgg.setup = MagicMock() + cgg.template = Mock() + fltr = Mock() + cgg.template.generate.return_value = fltr + stream = Mock() + fltr.filter.return_value = stream + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + + def reset(): + cgg.template.reset_mock() + cgg._handle_genshi_exception.reset_mock() + cgg.setup.reset_mock() + + template_vars = dict( + name=entry.get("name"), + metadata=metadata, + path=cgg.name, + source_path=cgg.name, + repo=cgg.setup.__getitem__.return_value) + + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + def render(fmt, **kwargs): + stream.render.side_effect = None + raise TypeError + stream.render.side_effect = render + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + self.assertEqual(stream.render.call_args_list, + [call("text", encoding=cgg.encoding, + strip_whitespace=False), + call("text", encoding=cgg.encoding)]) + + reset() + stream.render.side_effect = UndefinedError("test") + self.assertRaises(UndefinedError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + stream.render.side_effect = ValueError + cgg._handle_genshi_exception.side_effect = ValueError + self.assertRaises(ValueError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + self.assertTrue(cgg._handle_genshi_exception.called) + + def test_handle_event(self): + cgg = self.get_obj() + cgg.loader = Mock() + event = Mock() + cgg.handle_event(event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) + + cgg.loader.reset_mock() + cgg.loader.load.side_effect = OSError + self.assertRaises(PluginExecutionError, + cgg.handle_event, event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) -- cgit v1.2.3-1-g7c22 From cfa769abf8e464acee43e1cac359b0f7e5fc5e4b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 23 Jan 2013 14:13:51 -0500 Subject: added genshi support to StructFile --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 118 ++++++++++++++------- 1 file changed, 80 insertions(+), 38 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 0b6f3fd87..0d44e7284 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -1,6 +1,7 @@ import os import sys import copy +import genshi import lxml.etree import Bcfg2.Server import genshi.core @@ -679,10 +680,47 @@ class TestStructFile(TestXMLFileBacked): children[4] = standalone return (xdata, groups, subgroups, children, subchildren, standalone) - def test_Index(self): + def _get_template_test_data(self): + (xdata, groups, subgroups, children, subchildren, standalone) = \ + self._get_test_data() + template_xdata = \ + lxml.etree.Element("Test", name="test", + nsmap=dict(py='http://genshi.edgewall.org/')) + template_xdata.extend(xdata.getchildren()) + return (template_xdata, groups, subgroups, children, subchildren, + standalone) + + @patch("genshi.template.TemplateLoader") + def test_Index(self, mock_TemplateLoader): has_crypto = Bcfg2.Server.Plugin.helpers.HAS_CRYPTO Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = False TestXMLFileBacked.test_Index(self) + + sf = self.get_obj() + sf.encoding = Mock() + (xdata, groups, subgroups, children, subchildren, standalone) = \ + self._get_test_data() + sf.data = lxml.etree.tostring(xdata) + + mock_TemplateLoader.reset_mock() + sf.Index() + self.assertFalse(mock_TemplateLoader.called) + + mock_TemplateLoader.reset_mock() + template_xdata = \ + lxml.etree.Element("Test", name="test", + nsmap=dict(py='http://genshi.edgewall.org/')) + template_xdata.extend(xdata.getchildren()) + sf.data = lxml.etree.tostring(template_xdata) + sf.Index() + mock_TemplateLoader.assert_called_with() + loader = mock_TemplateLoader.return_value + loader.load.assert_called_with(sf.name, + cls=genshi.template.MarkupTemplate, + encoding=sf.encoding) + self.assertEqual(sf.template, + loader.load.return_value) + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = has_crypto @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") @@ -813,16 +851,15 @@ class TestStructFile(TestXMLFileBacked): self.assertTrue(inc("Other")) - @patch("Bcfg2.Server.Plugin.helpers.%s._include_element" % - test_obj.__name__) - def test__match(self, mock_include): + def test__match(self): sf = self.get_obj() + sf._include_element = Mock() metadata = Mock() (xdata, groups, subgroups, children, subchildren, standalone) = \ self._get_test_data() - mock_include.side_effect = \ + sf._include_element.side_effect = \ lambda x, _: (x.tag not in ['Client', 'Group'] or x.get("include") == "true") @@ -842,46 +879,51 @@ class TestStructFile(TestXMLFileBacked): for el in standalone: self.assertXMLEqual(el, sf._match(el, metadata)[0]) - @patch("Bcfg2.Server.Plugin.helpers.%s._match" % test_obj.__name__) - def test_Match(self, mock_match): + def test_Match(self): sf = self.get_obj() + sf._match = Mock() metadata = Mock() - (xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() - sf.entries.extend(copy.deepcopy(xdata).getchildren()) - - def match_rv(el, _): - if el.tag not in ['Client', 'Group']: - return [el] - elif x.get("include") == "true": - return el.getchildren() - else: - return [] - mock_match.side_effect = match_rv - actual = sf.Match(metadata) - expected = reduce(lambda x, y: x + y, - list(children.values()) + list(subgroups.values())) - self.assertEqual(len(actual), len(expected)) - # easiest way to compare the values is actually to make - # them into an XML document and let assertXMLEqual compare - # them - xactual = lxml.etree.Element("Container") - xactual.extend(actual) - xexpected = lxml.etree.Element("Container") - xexpected.extend(expected) - self.assertXMLEqual(xactual, xexpected) + for test_data in [self._get_test_data(), + self._get_template_test_data()]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + sf.data = lxml.etree.tostring(xdata) + sf.Index() + + def match_rv(el, _): + if el.tag not in ['Client', 'Group']: + return [el] + elif el.get("include") == "true": + return el.getchildren() + else: + return [] + sf._match.side_effect = match_rv + actual = sf.Match(metadata) + expected = reduce(lambda x, y: x + y, + list(children.values()) + list(subgroups.values())) + print "doc: %s" % lxml.etree.tostring(xdata, pretty_print=True) + print "actual: %s" % [lxml.etree.tostring(el) for el in actual] + print "expected: %s" % [lxml.etree.tostring(el) for el in expected] + self.assertEqual(len(actual), len(expected)) + # easiest way to compare the values is actually to make + # them into an XML document and let assertXMLEqual compare + # them + xactual = lxml.etree.Element("Container") + xactual.extend(actual) + xexpected = lxml.etree.Element("Container") + xexpected.extend(expected) + self.assertXMLEqual(xactual, xexpected) - @patch("Bcfg2.Server.Plugin.helpers.%s._include_element" % - test_obj.__name__) - def test__xml_match(self, mock_include): + def test__xml_match(self): sf = self.get_obj() + sf._include_element = Mock() metadata = Mock() (xdata, groups, subgroups, children, subchildren, standalone) = \ self._get_test_data() - mock_include.side_effect = \ + sf._include_element.side_effect = \ lambda x, _: (x.tag not in ['Client', 'Group'] or x.get("include") == "true") @@ -895,9 +937,9 @@ class TestStructFile(TestXMLFileBacked): expected.extend(standalone) self.assertXMLEqual(actual, expected) - @patch("Bcfg2.Server.Plugin.helpers.%s._xml_match" % test_obj.__name__) - def test_Match(self, mock_xml_match): + def test_XMLMatch(self): sf = self.get_obj() + sf._xml_match = Mock() metadata = Mock() (sf.xdata, groups, subgroups, children, subchildren, standalone) = \ @@ -905,7 +947,7 @@ class TestStructFile(TestXMLFileBacked): sf.XMLMatch(metadata) actual = [] - for call in mock_xml_match.call_args_list: + for call in sf._xml_match.call_args_list: actual.append(call[0][0]) self.assertEqual(call[0][1], metadata) expected = list(groups.values()) + standalone -- cgit v1.2.3-1-g7c22 From c3c449f509e5f849212e853b410b7b74341d3e1e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 4 Feb 2013 16:21:03 -0500 Subject: testsuite: added unit tests for Bundler --- .../Testlib/TestServer/TestPlugins/TestBundler.py | 72 ++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py new file mode 100644 index 000000000..bc6a21b84 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -0,0 +1,72 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Bundler import * +from Bcfg2.Server.Plugin import PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestStructure, \ + TestXMLDirectoryBacked + + +class TestBundleFile(TestStructFile): + test_obj = BundleFile + path = os.path.join(datastore, "test", "test1.xml") + + def test_bundle_name(self): + cases = [("foo.xml", "foo"), + ("foo.bar.xml", "foo.bar"), + ("foo-bar-baz.xml", "foo-bar-baz"), + ("foo....xml", "foo..."), + ("foo.genshi", "foo")] + bf = self.get_obj() + for fname, bname in cases: + bf.name = fname + self.assertEqual(bf.bundle_name, bname) + + +class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): + test_obj = Bundler + + def get_obj(self, core=None): + @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, + self.test_obj.__name__), + Mock()) + def inner(): + return TestPlugin.get_obj(self, core=core) + return inner() + + @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent") + def test_HandleEvent(self, mock_HandleEvent): + b = self.get_obj() + b.bundles = dict(foo=Mock(), bar=Mock()) + b.entries = {"foo.xml": BundleFile("foo.xml"), + "baz.xml": BundleFile("baz.xml")} + event = Mock() + b.HandleEvent(event) + mock_HandleEvent.assert_called_with(b, event) + self.assertItemsEqual(b.bundles, + dict(foo=b.entries['foo.xml'], + baz=b.entries['baz.xml'])) + + def test_BuildStructures(self): + b = self.get_obj() + b.bundles = dict(foo=Mock(), bar=Mock(), baz=Mock()) + metadata = Mock() + metadata.bundles = ["foo", "baz"] + + self.assertItemsEqual(b.BuildStructures(metadata), + [b.bundles[n].XMLMatch.return_value + for n in metadata.bundles]) + for bname in metadata.bundles: + b.bundles[bname].XMLMatch.assert_called_with(metadata) -- cgit v1.2.3-1-g7c22 From 5d237f71575a109c10d5aad8d70dc5dda00a2d96 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 5 Feb 2013 09:00:04 -0500 Subject: testsuite: wrote unit tests for Decisions --- .../Testlib/TestServer/TestPlugins/TestBundler.py | 1 - .../TestServer/TestPlugins/TestDecisions.py | 60 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py index bc6a21b84..f64d66d11 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -3,7 +3,6 @@ import sys import lxml.etree from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugins.Bundler import * -from Bcfg2.Server.Plugin import PluginExecutionError # add all parent testsuite directories to sys.path to allow (most) # relative imports in python 2.4 diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py new file mode 100644 index 000000000..537ceb4ff --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py @@ -0,0 +1,60 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Decisions import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestDecision + + +class TestDecisionFile(TestStructFile): + test_obj = DecisionFile + + def test_get_decisions(self): + df = self.get_obj() + metadata = Mock() + + df.xdata = None + self.assertItemsEqual(df.get_decisions(metadata), []) + + df.xdata = lxml.etree.Element("Decisions") + df.XMLMatch = Mock() + df.XMLMatch.return_value = lxml.etree.Element("Decisions") + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Service", name='*') + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Path", + name='/etc/apt/apt.conf') + + self.assertItemsEqual(df.get_decisions(metadata), + [("Service", '*'), + ("Path", '/etc/apt/apt.conf')]) + df.XMLMatch.assert_called_with(metadata) + + +class TestDecisions(TestPlugin, TestDecision): + test_obj = Decisions + + def test_GetDecisions(self): + d = self.get_obj() + d.whitelist = Mock() + d.blacklist = Mock() + metadata = Mock() + + self.assertEqual(d.GetDecisions(metadata, "whitelist"), + d.whitelist.get_decision.return_value) + d.whitelist.get_decision.assert_called_with(metadata) + + self.assertEqual(d.GetDecisions(metadata, "blacklist"), + d.blacklist.get_decision.return_value) + d.blacklist.get_decision.assert_called_with(metadata) -- cgit v1.2.3-1-g7c22 From 25cb6db5ccb0c8e8302c220a90344a95baf3909b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 5 Feb 2013 14:04:09 -0500 Subject: moved some libraries in Bcfg2/ into more specific (Server/ or Client/) places --- .../Testsrc/Testlib/TestServer/TestEncryption.py | 175 +++++++++++++++++++++ .../Testlib/TestServer/TestPlugin/Testhelpers.py | 8 +- .../TestCfg/TestCfgPrivateKeyCreator.py | 7 +- 3 files changed, 183 insertions(+), 7 deletions(-) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestEncryption.py (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestEncryption.py b/testsuite/Testsrc/Testlib/TestServer/TestEncryption.py new file mode 100644 index 000000000..8f69f3bf0 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestEncryption.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +import os +import sys +from Bcfg2.Compat import b64decode +from mock import Mock, MagicMock, patch + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * + +try: + from Bcfg2.Server.Encryption import * + HAS_CRYPTO = True +except ImportError: + HAS_CRYPTO = False + + +if can_skip or HAS_CRYPTO: + class TestEncryption(Bcfg2TestCase): + plaintext = """foo bar +baz +ö +\t\tquux +""" + "a" * 16384 # 16K is completely arbitrary + iv = "0123456789ABCDEF" + salt = "01234567" + algo = "des_cbc" + + @skipUnless(HAS_CRYPTO, "Encryption libraries not found") + def setUp(self): + pass + + def test_str_crypt(self): + """ test str_encrypt/str_decrypt """ + key = "a simple key" + + # simple symmetrical test with no options + crypted = str_encrypt(self.plaintext, key) + self.assertEqual(self.plaintext, str_decrypt(crypted, key)) + + # symmetrical test with lots of options + crypted = str_encrypt(self.plaintext, key, + iv=self.iv, salt=self.salt, + algorithm=self.algo) + self.assertEqual(self.plaintext, + str_decrypt(crypted, key, iv=self.iv, + algorithm=self.algo)) + + # test that different algorithms are actually used + self.assertNotEqual(str_encrypt(self.plaintext, key), + str_encrypt(self.plaintext, key, + algorithm=self.algo)) + + # test that different keys are actually used + self.assertNotEqual(str_encrypt(self.plaintext, key), + str_encrypt(self.plaintext, "different key")) + + # test that different IVs are actually used + self.assertNotEqual(str_encrypt(self.plaintext, key, iv=self.iv), + str_encrypt(self.plaintext, key)) + + # test that errors are raised on bad decrypts + crypted = str_encrypt(self.plaintext, key, algorithm=self.algo) + self.assertRaises(EVPError, str_decrypt, + crypted, "bogus key", algorithm=self.algo) + self.assertRaises(EVPError, str_decrypt, + crypted, key) # bogus algorithm + + def test_ssl_crypt(self): + """ test ssl_encrypt/ssl_decrypt """ + passwd = "a simple passphrase" + + # simple symmetrical test + crypted = ssl_encrypt(self.plaintext, passwd) + self.assertEqual(self.plaintext, ssl_decrypt(crypted, passwd)) + + # more complex symmetrical test + crypted = ssl_encrypt(self.plaintext, passwd, algorithm=self.algo, + salt=self.salt) + self.assertEqual(self.plaintext, + ssl_decrypt(crypted, passwd, algorithm=self.algo)) + + # test that different algorithms are actually used + self.assertNotEqual(ssl_encrypt(self.plaintext, passwd), + ssl_encrypt(self.plaintext, passwd, + algorithm=self.algo)) + + # test that different passwords are actually used + self.assertNotEqual(ssl_encrypt(self.plaintext, passwd), + ssl_encrypt(self.plaintext, "different pass")) + + # there's no reasonable test we can do to see if the + # output is base64-encoded, unfortunately, but if it's + # obviously not we fail + crypted = ssl_encrypt(self.plaintext, passwd) + self.assertRegexpMatches(crypted, r'^[A-Za-z0-9+/]+[=]{0,2}$') + + # test that errors are raised on bad decrypts + crypted = ssl_encrypt(self.plaintext, passwd, + algorithm=self.algo) + self.assertRaises(EVPError, ssl_decrypt, + crypted, "bogus passwd", algorithm=self.algo) + self.assertRaises(EVPError, ssl_decrypt, + crypted, passwd) # bogus algorithm + + @patch("Bcfg2.Options.get_option_parser") + def test_get_passphrases(self, mock_get_option_parser): + setup = Mock() + setup.cfp.has_section.return_value = False + mock_get_option_parser.return_value = setup + self.assertEqual(get_passphrases(), dict()) + + setup.cfp.has_section.return_value = True + setup.cfp.options.return_value = ["foo", "bar", CFG_ALGORITHM] + setup.cfp.get.return_value = "passphrase" + self.assertItemsEqual(get_passphrases(), + dict(foo="passphrase", + bar="passphrase")) + + @patch("Bcfg2.Server.Encryption.get_passphrases") + def test_bruteforce_decrypt(self, mock_passphrases): + passwd = "a simple passphrase" + crypted = ssl_encrypt(self.plaintext, passwd) + + # test with no passphrases given nor in config + mock_passphrases.return_value = dict() + self.assertRaises(EVPError, + bruteforce_decrypt, crypted) + mock_passphrases.assert_called_with() + + # test with good passphrase given in function call + mock_passphrases.reset_mock() + self.assertEqual(self.plaintext, + bruteforce_decrypt(crypted, + passphrases=["bogus pass", + passwd, + "also bogus"])) + self.assertFalse(mock_passphrases.called) + + # test with no good passphrase given nor in config + mock_passphrases.reset_mock() + self.assertRaises(EVPError, + bruteforce_decrypt, + crypted, passphrases=["bogus", "also bogus"]) + self.assertFalse(mock_passphrases.called) + + # test with good passphrase in config file + mock_passphrases.reset_mock() + mock_passphrases.return_value = dict(bogus="bogus", + real=passwd, + bogus2="also bogus") + self.assertEqual(self.plaintext, + bruteforce_decrypt(crypted)) + mock_passphrases.assert_called_with() + + # test that passphrases given in function call take + # precedence over config + mock_passphrases.reset_mock() + self.assertRaises(EVPError, + bruteforce_decrypt, crypted, + passphrases=["bogus", "also bogus"]) + self.assertFalse(mock_passphrases.called) + + # test that different algorithms are used + mock_passphrases.reset_mock() + crypted = ssl_encrypt(self.plaintext, passwd, algorithm=self.algo) + self.assertEqual(self.plaintext, + bruteforce_decrypt(crypted, algorithm=self.algo)) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 0d44e7284..9eb584e90 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -23,7 +23,7 @@ from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable from TestServer.TestPlugin.Testinterfaces import TestGenerator try: - from Bcfg2.Encryption import EVPError + from Bcfg2.Server.Encryption import EVPError HAS_CRYPTO = True except: HAS_CRYPTO = False @@ -766,9 +766,9 @@ class TestStructFile(TestXMLFileBacked): [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.ssl_decrypt") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.bruteforce_decrypt") def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): sf = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index bf34d4c3c..2982825a9 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -7,7 +7,7 @@ from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import * from Bcfg2.Server.Plugin import PluginExecutionError import Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator try: - from Bcfg2.Encryption import EVPError + from Bcfg2.Server.Encryption import EVPError HAS_CRYPTO = True except: HAS_CRYPTO = False @@ -66,7 +66,8 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): pkc.setup.cfp.get.assert_called_with("sshkeys", "category") @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, + "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.get_passphrases") def test_passphrase(self, mock_get_passphrases): pkc = self.get_obj() pkc.setup = Mock() @@ -281,7 +282,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): if HAS_CRYPTO: @patch(passphrase, "foo") - @patch("Bcfg2.Encryption.ssl_encrypt") + @patch("Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.ssl_encrypt") def inner2(mock_ssl_encrypt): reset() mock_ssl_encrypt.return_value = "encryptedprivatekey" -- cgit v1.2.3-1-g7c22 From 2169edc1bba82076db776b75db89b79d6f2f4786 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 6 Feb 2013 15:45:20 -0500 Subject: converted InfoXML objects from XMLSrc to StructFile --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 312 ++++++++++----------- .../TestPlugins/TestCfg/TestCfgInfoXML.py | 37 +-- .../TestServer/TestPlugins/TestCfg/Test_init.py | 58 ++-- .../Testlib/TestServer/TestPlugins/TestRules.py | 82 ++---- 4 files changed, 183 insertions(+), 306 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 9eb584e90..93bf69d04 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -47,47 +47,6 @@ class TestFunctions(Bcfg2TestCase): data[1]] self.assertItemsEqual(list(removecomment(stream)), data) - def test_bind_info(self): - entry = lxml.etree.Element("Path", name="/test") - metadata = Mock() - default = dict(test1="test1", test2="test2") - # test without infoxml - bind_info(entry, metadata, default=default) - self.assertItemsEqual(entry.attrib, - dict(test1="test1", - test2="test2", - name="/test")) - - # test with bogus infoxml - entry = lxml.etree.Element("Path", name="/test") - infoxml = Mock() - self.assertRaises(PluginExecutionError, - bind_info, - entry, metadata, infoxml=infoxml) - infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry) - - # test with valid infoxml - entry = lxml.etree.Element("Path", name="/test") - infoxml.reset_mock() - infodata = {None: {"test3": "test3", "test4": "test4"}} - def infoxml_rv(metadata, rv, entry=None): - rv['Info'] = infodata - infoxml.pnode.Match.side_effect = infoxml_rv - bind_info(entry, metadata, infoxml=infoxml, default=default) - # mock objects don't properly track the called-with value of - # arguments whose value is changed by the function, so it - # thinks Match() was called with the final value of the mdata - # arg, not the initial value. makes this test a little less - # worthwhile, TBH. - infoxml.pnode.Match.assert_called_with(metadata, dict(Info=infodata), - entry=entry) - self.assertItemsEqual(entry.attrib, - dict(test1="test1", - test2="test2", - test3="test3", - test4="test4", - name="/test")) - class TestDatabaseBacked(TestPlugin): test_obj = DatabaseBacked @@ -652,7 +611,8 @@ class TestStructFile(TestXMLFileBacked): lxml.etree.SubElement(groups[1], "Child", name="c3") lxml.etree.SubElement(groups[1], "Child", name="c4") - standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s1")) + standalone.append(lxml.etree.SubElement(xdata, + "Standalone", name="s1")) groups[2] = lxml.etree.SubElement(xdata, "Client", name="client2", include="false") @@ -674,10 +634,10 @@ class TestStructFile(TestXMLFileBacked): subchildren[3] = [] lxml.etree.SubElement(children[3][-1], "SubChild", name="subchild") - standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s3")) + standalone.append(lxml.etree.SubElement(xdata, + "Standalone", name="s3")) lxml.etree.SubElement(standalone[-1], "SubStandalone", name="sub1") - children[4] = standalone return (xdata, groups, subgroups, children, subchildren, standalone) def _get_template_test_data(self): @@ -860,7 +820,7 @@ class TestStructFile(TestXMLFileBacked): self._get_test_data() sf._include_element.side_effect = \ - lambda x, _: (x.tag not in ['Client', 'Group'] or + lambda x, _: (x.tag not in sf._include_tests.keys() or x.get("include") == "true") for i, group in groups.items(): @@ -879,7 +839,7 @@ class TestStructFile(TestXMLFileBacked): for el in standalone: self.assertXMLEqual(el, sf._match(el, metadata)[0]) - def test_Match(self): + def test_do_match(self): sf = self.get_obj() sf._match = Mock() metadata = Mock() @@ -892,19 +852,17 @@ class TestStructFile(TestXMLFileBacked): sf.Index() def match_rv(el, _): - if el.tag not in ['Client', 'Group']: + if el.tag not in sf._include_tests.keys(): return [el] elif el.get("include") == "true": return el.getchildren() else: return [] sf._match.side_effect = match_rv - actual = sf.Match(metadata) + actual = sf._do_match(metadata) expected = reduce(lambda x, y: x + y, - list(children.values()) + list(subgroups.values())) - print "doc: %s" % lxml.etree.tostring(xdata, pretty_print=True) - print "actual: %s" % [lxml.etree.tostring(el) for el in actual] - print "expected: %s" % [lxml.etree.tostring(el) for el in expected] + list(children.values()) + \ + list(subgroups.values())) + standalone self.assertEqual(len(actual), len(expected)) # easiest way to compare the values is actually to make # them into an XML document and let assertXMLEqual compare @@ -924,7 +882,7 @@ class TestStructFile(TestXMLFileBacked): self._get_test_data() sf._include_element.side_effect = \ - lambda x, _: (x.tag not in ['Client', 'Group'] or + lambda x, _: (x.tag not in sf._include_tests.keys() or x.get("include") == "true") actual = copy.deepcopy(xdata) @@ -937,7 +895,7 @@ class TestStructFile(TestXMLFileBacked): expected.extend(standalone) self.assertXMLEqual(actual, expected) - def test_XMLMatch(self): + def test_do_xmlmatch(self): sf = self.get_obj() sf._xml_match = Mock() metadata = Mock() @@ -945,7 +903,7 @@ class TestStructFile(TestXMLFileBacked): (sf.xdata, groups, subgroups, children, subchildren, standalone) = \ self._get_test_data() - sf.XMLMatch(metadata) + sf._do_xmlmatch(metadata) actual = [] for call in sf._xml_match.call_args_list: actual.append(call[0][0]) @@ -1193,49 +1151,6 @@ class TestINode(Bcfg2TestCase): inode.predicate.assert_called_with(metadata, child) -class TestInfoNode(TestINode): - __test__ = True - test_obj = InfoNode - - def test_raw_predicates(self): - TestINode.test_raw_predicates(self) - metadata = Mock() - entry = lxml.etree.Element("Path", name="/tmp/foo", - realname="/tmp/bar") - - parent_predicate = lambda m, d: True - pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"), - dict(predicate=parent_predicate)) - self.assertTrue(pred(metadata, entry)) - pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"), - dict(predicate=parent_predicate)) - self.assertTrue(pred(metadata, entry)) - pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bogus"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - - pred = eval(self.test_obj.nraw['Path'] % dict(name="/tmp/foo"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bar"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"), - dict(predicate=parent_predicate)) - self.assertTrue(pred(metadata, entry)) - - parent_predicate = lambda m, d: False - pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - - class TestXMLSrc(TestXMLFileBacked): test_obj = XMLSrc @@ -1301,9 +1216,78 @@ class TestXMLSrc(TestXMLFileBacked): self.assertEqual(xsrc.cache[0], metadata) -class TestInfoXML(TestXMLSrc): +class TestInfoXML(TestStructFile): test_obj = InfoXML + def _get_test_data(self): + (xdata, groups, subgroups, children, subchildren, standalone) = \ + TestStructFile._get_test_data(self) + idx = max(groups.keys()) + 1 + groups[idx] = lxml.etree.SubElement( + xdata, "Path", name="path1", include="true") + children[idx] = [lxml.etree.SubElement(groups[idx], "Child", + name="pc1")] + subgroups[idx] = [lxml.etree.SubElement(groups[idx], "Group", + name="pg1", include="true"), + lxml.etree.SubElement(groups[idx], "Client", + name="pc1", include="false")] + subchildren[idx] = [lxml.etree.SubElement(subgroups[idx][0], + "SubChild", name="sc1")] + + idx += 1 + groups[idx] = lxml.etree.SubElement( + xdata, "Path", name="path2", include="false") + children[idx] = [] + subgroups[idx] = [] + subchildren[idx] = [] + + path2 = lxml.etree.SubElement(groups[0], "Path", name="path2", + include="true") + subgroups[0].append(path2) + subchildren[0].append(lxml.etree.SubElement(path2, "SubChild", + name="sc2")) + return xdata, groups, subgroups, children, subchildren, standalone + + def test_include_element(self): + TestStructFile.test_include_element(self) + + ix = self.get_obj() + metadata = Mock() + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + inc = lambda tag, **attrs: \ + ix._include_element(lxml.etree.Element(tag, **attrs), + metadata, entry) + + self.assertFalse(inc("Path", name="/etc/bar.conf")) + self.assertFalse(inc("Path", name="/etc/foo.conf", negate="true")) + self.assertFalse(inc("Path", name="/etc/foo.conf", negate="tRuE")) + self.assertTrue(inc("Path", name="/etc/foo.conf")) + self.assertTrue(inc("Path", name="/etc/foo.conf", negate="false")) + self.assertTrue(inc("Path", name="/etc/foo.conf", negate="faLSe")) + self.assertTrue(inc("Path", name="/etc/bar.conf", negate="true")) + self.assertTrue(inc("Path", name="/etc/bar.conf", negate="tRUe")) + + def test_BindEntry(self): + ix = self.get_obj() + entry = lxml.etree.Element("Path", name=self.path) + metadata = Mock() + + # test with bogus infoxml + ix.Match = Mock() + ix.Match.return_value = [] + self.assertRaises(PluginExecutionError, + ix.BindEntry, entry, metadata) + ix.Match.assert_called_with(metadata, entry) + + # test with valid infoxml + ix.Match.reset_mock() + ix.Match.return_value = [lxml.etree.Element("Info", + mode="0600", owner="root")] + ix.BindEntry(entry, metadata) + ix.Match.assert_called_with(metadata, entry) + self.assertItemsEqual(entry.attrib, + dict(name=self.path, mode="0600", owner="root")) + class TestXMLDirectoryBacked(TestDirectoryBacked): test_obj = XMLDirectoryBacked @@ -1348,32 +1332,17 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked): def test__matches(self): pd = self.get_obj() - self.assertTrue(pd._matches(lxml.etree.Element("Test", - name="/etc/foo.conf"), - Mock(), - {"/etc/foo.conf": pd.BindEntry, - "/etc/bar.conf": pd.BindEntry})) - self.assertFalse(pd._matches(lxml.etree.Element("Test", - name="/etc/baz.conf"), - Mock(), - {"/etc/foo.conf": pd.BindEntry, - "/etc/bar.conf": pd.BindEntry})) + entry = lxml.etree.Element("Test", name="/etc/foo.conf") + self.assertTrue(pd._matches(entry, Mock(), + lxml.etree.Element("Test", + name="/etc/foo.conf"))) + self.assertFalse(pd._matches(entry, Mock(), + lxml.etree.Element("Test", + name="/etc/baz.conf"))) def test_BindEntry(self): pd = self.get_obj() - pd.get_attrs = Mock(return_value=dict(test1="test1", test2="test2")) - entry = lxml.etree.Element("Path", name="/etc/foo.conf", test1="bogus") - metadata = Mock() - pd.BindEntry(entry, metadata) - pd.get_attrs.assert_called_with(entry, metadata) - self.assertItemsEqual(entry.attrib, - dict(name="/etc/foo.conf", - test1="test1", test2="test2")) - - def test_get_attrs(self): - pd = self.get_obj() - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - children = [lxml.etree.Element("Child")] + children = [lxml.etree.Element("Child", name="child")] metadata = Mock() pd.entries = dict() @@ -1381,58 +1350,59 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked): metadata.reset_mock() for src in pd.entries.values(): src.reset_mock() - src.cache = None # test with no matches - self.assertRaises(PluginExecutionError, - pd.get_attrs, entry, metadata) + self.assertRaises(PluginExecutionError, pd.BindEntry, Mock(), metadata) - def add_entry(name, data, prio=10): + def add_entry(name, data): path = os.path.join(pd.data, name) pd.entries[path] = Mock() - pd.entries[path].priority = prio - def do_Cache(metadata): - pd.entries[path].cache = (metadata, data) - pd.entries[path].Cache.side_effect = do_Cache - - add_entry('test1.xml', - dict(Path={'/etc/foo.conf': dict(attr="attr1", - __children__=children), - '/etc/bar.conf': dict()})) - add_entry('test2.xml', - dict(Path={'/etc/bar.conf': dict(__text__="text", - attr="attr1")}, - Package={'quux': dict(), - 'xyzzy': dict()}), - prio=20) - add_entry('test3.xml', - dict(Path={'/etc/baz.conf': dict()}, - Package={'xyzzy': dict()}), - prio=20) - - # test with exactly one match, __children__ + pd.entries[path].priority = data.get("priority") + pd.entries[path].XMLMatch.return_value = data + + test1 = lxml.etree.Element("Rules", priority="10") + path1 = lxml.etree.SubElement(test1, "Path", name="/etc/foo.conf", + attr="attr1") + path1.extend(children) + lxml.etree.SubElement(test1, "Path", name="/etc/bar.conf") + add_entry('test1.xml', test1) + + test2 = lxml.etree.Element("Rules", priority="20") + path2 = lxml.etree.SubElement(test2, "Path", name="/etc/bar.conf", + attr="attr1") + path2.text = "text" + lxml.etree.SubElement(test2, "Package", name="quux") + lxml.etree.SubElement(test2, "Package", name="xyzzy") + add_entry('test2.xml', test2) + + test3 = lxml.etree.Element("Rules", priority="20") + lxml.etree.SubElement(test3, "Path", name="/etc/baz.conf") + lxml.etree.SubElement(test3, "Package", name="xyzzy") + add_entry('test3.xml', test3) + + # test with exactly one match, children reset() - self.assertItemsEqual(pd.get_attrs(entry, metadata), - dict(attr="attr1")) + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + pd.BindEntry(entry, metadata) + self.assertXMLEqual(entry, path1) + self.assertIsNot(entry, path1) for src in pd.entries.values(): - src.Cache.assert_called_with(metadata) - self.assertEqual(len(entry.getchildren()), 1) - self.assertXMLEqual(entry.getchildren()[0], children[0]) + src.XMLMatch.assert_called_with(metadata) - # test with multiple matches with different priorities, __text__ + # test with multiple matches with different priorities, text reset() entry = lxml.etree.Element("Path", name="/etc/bar.conf") - self.assertItemsEqual(pd.get_attrs(entry, metadata), - dict(attr="attr1")) + pd.BindEntry(entry, metadata) + self.assertXMLEqual(entry, path2) + self.assertIsNot(entry, path2) for src in pd.entries.values(): - src.Cache.assert_called_with(metadata) - self.assertEqual(entry.text, "text") + src.XMLMatch.assert_called_with(metadata) # test with multiple matches with identical priorities reset() entry = lxml.etree.Element("Package", name="xyzzy") self.assertRaises(PluginExecutionError, - pd.get_attrs, entry, metadata) + pd.BindEntry, entry, metadata) class TestSpecificity(Bcfg2TestCase): @@ -1866,22 +1836,22 @@ class TestEntrySet(TestDebuggable): eset.reset_metadata(event) self.assertIsNone(eset.infoxml) - @patch("Bcfg2.Server.Plugin.helpers.bind_info") - def test_bind_info_to_entry(self, mock_bind_info): - # There's a strange scoping issue in py3k that prevents this - # test from working as expected on sub-classes of EntrySet. - # No idea what's going on, but until I can figure it out we - # skip this test on subclasses - if inPy3k and self.test_obj != EntrySet: - return skip("Skipping this test for py3k scoping issues") - + def test_bind_info_to_entry(self): eset = self.get_obj() - entry = Mock() + eset.metadata = dict(owner="root", group="root") + entry = lxml.etree.Element("Path", name="/test") metadata = Mock() + eset.infoxml = None + eset.bind_info_to_entry(entry, metadata) + self.assertItemsEqual(entry.attrib, + dict(name="/test", owner="root", group="root")) + + entry = lxml.etree.Element("Path", name="/test") + eset.infoxml = Mock() eset.bind_info_to_entry(entry, metadata) - mock_bind_info.assert_called_with(entry, metadata, - infoxml=eset.infoxml, - default=eset.metadata) + self.assertItemsEqual(entry.attrib, + dict(name="/test", owner="root", group="root")) + eset.infoxml.BindEntry.assert_called_with(entry, metadata) def test_bind_entry(self): eset = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py index 839e9c3b8..7e7cb5e3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py @@ -27,47 +27,16 @@ class TestCfgInfoXML(TestCfgInfo): self.assertIsInstance(ci.infoxml, InfoXML) def test_bind_info_to_entry(self): - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() ci = self.get_obj() ci.infoxml = Mock() - ci._set_info = Mock() - - self.assertRaises(PluginExecutionError, - ci.bind_info_to_entry, entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, dict(), - entry=entry) - self.assertFalse(ci._set_info.called) - - ci.infoxml.reset_mock() - ci._set_info.reset_mock() - mdata_value = Mock() - def set_mdata(metadata, mdata, entry=None): - mdata['Info'] = {None: mdata_value} + entry = Mock() + metadata = Mock() - ci.infoxml.pnode.Match.side_effect = set_mdata ci.bind_info_to_entry(entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, - dict(Info={None: mdata_value}), - entry=entry) - ci._set_info.assert_called_with(entry, mdata_value) + ci.infoxml.BindEntry.assert_called_with(entry, metadata) def test_handle_event(self): ci = self.get_obj() ci.infoxml = Mock() ci.handle_event(Mock) ci.infoxml.HandleEvent.assert_called_with() - - def test__set_info(self): - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info") - def inner(mock_set_info): - ci = self.get_obj() - entry = Mock() - info = {"foo": "foo", - "__children__": ["one", "two"]} - ci._set_info(entry, info) - self.assertItemsEqual(entry.append.call_args_list, - [call(c) for c in info['__children__']]) - - inner() - TestCfgInfo.test__set_info(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index ee0b0be9d..ab383e4f3 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -159,21 +159,6 @@ class TestCfgInfo(TestCfgBaseFileMatcher): self.assertRaises(NotImplementedError, ci.bind_info_to_entry, Mock(), Mock()) - def test__set_info(self): - ci = self.get_obj() - entry = Mock() - entry.attrib = dict() - - info = {"foo": "foo", - "_bar": "bar", - "bar:baz=quux": "quux", - "baz__": "baz", - "__quux": "quux"} - ci._set_info(entry, info) - self.assertItemsEqual(entry.attrib, - dict([(k, v) for k, v in info.items() - if not k.startswith("__")])) - class TestCfgVerifier(TestCfgBaseFileMatcher): test_obj = CfgVerifier @@ -259,29 +244,26 @@ class TestCfgCreator(TestCfgBaseFileMatcher): class TestCfgDefaultInfo(TestCfgInfo): test_obj = CfgDefaultInfo - def get_obj(self, defaults=None): - if defaults is None: - defaults = dict() - return self.test_obj(defaults) + def get_obj(self, *_): + return self.test_obj() - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__") - def test__init(self, mock__init): - defaults = Mock() - cdi = self.get_obj(defaults=defaults) - mock__init.assert_called_with(cdi, '') - self.assertEqual(defaults, cdi.defaults) + def test__init(self): + pass def test_handle_event(self): # this CfgInfo handler doesn't handle any events -- it's not # file-driven, but based on the built-in defaults pass - def test_bind_info_to_entry(self): + @patch("Bcfg2.Server.Plugin.default_path_metadata") + def test_bind_info_to_entry(self, mock_default_path_metadata): cdi = self.get_obj() - cdi._set_info = Mock() - entry = Mock() + entry = lxml.etree.Element("Test", name="test") + mock_default_path_metadata.return_value = \ + dict(owner="root", mode="0600") cdi.bind_info_to_entry(entry, Mock()) - cdi._set_info.assert_called_with(entry, cdi.defaults) + self.assertItemsEqual(entry.attrib, + dict(owner="root", mode="0600", name="test")) class TestCfgEntrySet(TestEntrySet): @@ -599,24 +581,24 @@ class TestCfgEntrySet(TestEntrySet): if entry.specific is not None: self.assertFalse(entry.specific.matches.called) - def test_bind_info_to_entry(self): - default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO + @patch("Bcfg2.Server.Plugins.Cfg.CfgDefaultInfo") + def test_bind_info_to_entry(self, mock_DefaultInfo): eset = self.get_obj() eset.get_handlers = Mock() eset.get_handlers.return_value = [] - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock() metadata = Mock() def reset(): eset.get_handlers.reset_mock() - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock() + mock_DefaultInfo.reset_mock() return lxml.etree.Element("Path", name="/test.txt") # test with no info handlers entry = reset() eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) self.assertEqual(entry.get("type"), "file") # test with one info handler @@ -625,7 +607,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = [handler] eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) handler.bind_info_to_entry.assert_called_with(entry, metadata) self.assertEqual(entry.get("type"), "file") @@ -635,7 +618,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = handlers eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) # we don't care which handler gets called as long as exactly # one of them does called = 0 @@ -646,8 +630,6 @@ class TestCfgEntrySet(TestEntrySet): self.assertEqual(called, 1) self.assertEqual(entry.get("type"), "file") - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info - def test_create_data(self): eset = self.get_obj() eset.best_matching = Mock() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py index f018b45dc..7083fff06 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py @@ -45,81 +45,37 @@ class TestRules(TestPrioDir): entry = lxml.etree.Element("Package", name="foo") self.assertFalse(r.HandlesEntry(entry, metadata)) - def test_BindEntry(self, method="BindEntry"): - r = self.get_obj() - r.get_attrs = Mock() - r.get_attrs.return_value = dict(overwrite="new", add="add", - text="text") - entry = lxml.etree.Element("Test", overwrite="old", keep="keep") - metadata = Mock() - - getattr(r, method)(entry, metadata) - r.get_attrs.assert_called_with(entry, metadata) - self.assertItemsEqual(entry.attrib, - dict(overwrite="old", add="add", keep="keep", - text="text")) - - def test_HandleEntry(self): - self.test_BindEntry(method="HandleEntry") - @patch("Bcfg2.Server.Plugin.PrioDir._matches") def test__matches(self, mock_matches): - """ test _matches() behavior regardless of state of _regex_enabled """ r = self.get_obj() metadata = Mock() + # test parent _matches() returning True entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] + candidate = lxml.etree.Element("Path", name="/etc/bar.conf") mock_matches.return_value = True - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) + self.assertTrue(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) - # test special Path cases -- adding and removing trailing slash + # test all conditions returning False mock_matches.reset_mock() mock_matches.return_value = False - rules = ["/etc/foo/", "/etc/bar"] - entry = lxml.etree.Element("Path", name="/etc/foo") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) + self.assertFalse(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) + # test special Path cases -- adding and removing trailing slash mock_matches.reset_mock() - entry = lxml.etree.Element("Path", name="/etc/bar/") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_disabled(self, mock_matches): - """ test failure to match with regex disabled """ - r = self.get_obj() - self.set_regex_enabled(r, False) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] - self.assertFalse(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_enabled(self, mock_matches): - """ test match with regex enabled """ - r = self.get_obj() - self.set_regex_enabled(r, True) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = ["/etc/.*\.conf", "/etc/bar"] - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) - - def set_regex_enabled(self, rules_obj, state): - """ set the state of regex_enabled for this implementation of - Rules """ - if not isinstance(rules_obj.core.setup, MagicMock): - rules_obj.core.setup = MagicMock() - rules_obj.core.setup.cfp.getboolean.return_value = state + withslash = lxml.etree.Element("Path", name="/etc/foo") + withoutslash = lxml.etree.Element("Path", name="/etc/foo/") + self.assertTrue(r._matches(withslash, metadata, withoutslash)) + self.assertTrue(r._matches(withoutslash, metadata, withslash)) + + if r._regex_enabled: + mock_matches.reset_mock() + candidate = lxml.etree.Element("Path", name="/etc/.*\.conf") + self.assertTrue(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) + self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) def test__regex_enabled(self): r = self.get_obj() -- cgit v1.2.3-1-g7c22 From 7db65d41386768a5081c34c16db17e82b96a5b7a Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 7 Feb 2013 08:26:39 -0500 Subject: made XInlcude and Encryption support more consistent --- testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py | 4 ---- 1 file changed, 4 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 93bf69d04..ba837f0c9 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -685,10 +685,6 @@ class TestStructFile(TestXMLFileBacked): @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") def test_Index_crypto(self): - if not self.test_obj.encryption: - return skip("Encryption disabled on %s objects, skipping" % - self.test_obj.__name__) - sf = self.get_obj() sf.setup = Mock() sf.setup.cfp.get.return_value = "strict" -- cgit v1.2.3-1-g7c22 From 0dab7e284017e4559019ac1e7b861ab7ccdadf5c Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Fri, 8 Feb 2013 10:29:03 -0500 Subject: Bundler: improved XInclude support, added inter-bundle dependencies --- .../Testlib/TestServer/TestPlugins/TestBundler.py | 48 ++++++++++++++++++---- 1 file changed, 41 insertions(+), 7 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py index f64d66d11..f5250ed85 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -60,12 +60,46 @@ class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): def test_BuildStructures(self): b = self.get_obj() - b.bundles = dict(foo=Mock(), bar=Mock(), baz=Mock()) + b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(), + has_dep=Mock(), is_dep=Mock()) + expected = dict() + + b.bundles['error'].XMLMatch.side_effect = TemplateError(None) + + xinclude = lxml.etree.Element("Bundle") + lxml.etree.SubElement(lxml.etree.SubElement(xinclude, "Bundle"), + "Path", name="/test") + b.bundles['xinclude'].XMLMatch.return_value = xinclude + expected['xinclude'] = lxml.etree.Element("Bundle", name="xinclude") + lxml.etree.SubElement(expected['xinclude'], "Path", name="/test") + + has_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(has_dep, "Bundle", name="is_dep") + lxml.etree.SubElement(has_dep, "Package", name="foo") + b.bundles['has_dep'].XMLMatch.return_value = has_dep + expected['has_dep'] = lxml.etree.Element("Bundle", name="has_dep") + lxml.etree.SubElement(expected['has_dep'], "Package", name="foo") + + is_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(is_dep, "Package", name="bar") + b.bundles['is_dep'].XMLMatch.return_value = is_dep + expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep") + lxml.etree.SubElement(expected['is_dep'], "Package", name="bar") + metadata = Mock() - metadata.bundles = ["foo", "baz"] + metadata.bundles = ["error", "xinclude", "has_dep"] + + rv = b.BuildStructures(metadata) + self.assertEqual(len(rv), 3) + for bundle in rv: + name = bundle.get("name") + self.assertIsNotNone(name, + "Bundle %s was not built" % name) + self.assertIn(name, expected, + "Unexpected bundle %s was built" % name) + self.assertXMLEqual(bundle, expected[name], + "Bundle %s was not built correctly" % name) + b.bundles[name].XMLMatch.assert_called_with(metadata) - self.assertItemsEqual(b.BuildStructures(metadata), - [b.bundles[n].XMLMatch.return_value - for n in metadata.bundles]) - for bname in metadata.bundles: - b.bundles[bname].XMLMatch.assert_called_with(metadata) + b.bundles['error'].XMLMatch.assert_called_with(metadata) + self.assertFalse(b.bundles['skip'].XMLMatch.called) -- cgit v1.2.3-1-g7c22 From 398be2c5cb613d9506e0c115510c1b55881ca64e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Fri, 8 Feb 2013 13:43:40 -0500 Subject: Bundler: added support for independent bundles --- .../Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py index f5250ed85..cfb379c40 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -61,7 +61,7 @@ class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): def test_BuildStructures(self): b = self.get_obj() b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(), - has_dep=Mock(), is_dep=Mock()) + has_dep=Mock(), is_dep=Mock(), indep=Mock()) expected = dict() b.bundles['error'].XMLMatch.side_effect = TemplateError(None) @@ -86,11 +86,17 @@ class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep") lxml.etree.SubElement(expected['is_dep'], "Package", name="bar") + indep = lxml.etree.Element("Bundle", independent="true") + lxml.etree.SubElement(indep, "Service", name="baz") + b.bundles['indep'].XMLMatch.return_value = indep + expected['indep'] = lxml.etree.Element("Independent", name="indep") + lxml.etree.SubElement(expected['indep'], "Service", name="baz") + metadata = Mock() - metadata.bundles = ["error", "xinclude", "has_dep"] + metadata.bundles = ["error", "xinclude", "has_dep", "indep"] rv = b.BuildStructures(metadata) - self.assertEqual(len(rv), 3) + self.assertEqual(len(rv), 4) for bundle in rv: name = bundle.get("name") self.assertIsNotNone(name, -- cgit v1.2.3-1-g7c22 From 5363e6d9a53146333da0d109aae170befc1b9481 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 12 Feb 2013 07:48:33 -0500 Subject: Added client ACLs: * IP and CIDR-based ACLs * Metadata (group/hostname)-based ACLs * Documentation * Unit tests --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 161 ++++++++++----- .../TestServer/TestPlugin/Testinterfaces.py | 18 ++ .../Testlib/TestServer/TestPlugins/TestACL.py | 222 +++++++++++++++++++++ 3 files changed, 348 insertions(+), 53 deletions(-) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index ba837f0c9..4cb148bb0 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -812,32 +812,44 @@ class TestStructFile(TestXMLFileBacked): sf._include_element = Mock() metadata = Mock() - (xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() - sf._include_element.side_effect = \ lambda x, _: (x.tag not in sf._include_tests.keys() or x.get("include") == "true") - for i, group in groups.items(): - actual = sf._match(group, metadata) - expected = children[i] + subchildren[i] - self.assertEqual(len(actual), len(expected)) - # easiest way to compare the values is actually to make - # them into an XML document and let assertXMLEqual compare - # them - xactual = lxml.etree.Element("Container") - xactual.extend(actual) - xexpected = lxml.etree.Element("Container") - xexpected.extend(expected) - self.assertXMLEqual(xactual, xexpected) + for test_data in [self._get_test_data(), + self._get_template_test_data()]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data - for el in standalone: - self.assertXMLEqual(el, sf._match(el, metadata)[0]) + for i, group in groups.items(): + actual = sf._match(group, metadata) + expected = children[i] + subchildren[i] + self.assertEqual(len(actual), len(expected)) + # easiest way to compare the values is actually to make + # them into an XML document and let assertXMLEqual compare + # them + xactual = lxml.etree.Element("Container") + xactual.extend(actual) + xexpected = lxml.etree.Element("Container") + xexpected.extend(expected) + self.assertXMLEqual(xactual, xexpected) + + for el in standalone: + self.assertXMLEqual(el, sf._match(el, metadata)[0]) def test_do_match(self): sf = self.get_obj() sf._match = Mock() + + def match_rv(el, _): + if el.tag not in sf._include_tests.keys(): + return [el] + elif el.get("include") == "true": + return el.getchildren() + else: + return [] + sf._match.side_effect = match_rv + metadata = Mock() for test_data in [self._get_test_data(), @@ -847,14 +859,6 @@ class TestStructFile(TestXMLFileBacked): sf.data = lxml.etree.tostring(xdata) sf.Index() - def match_rv(el, _): - if el.tag not in sf._include_tests.keys(): - return [el] - elif el.get("include") == "true": - return el.getchildren() - else: - return [] - sf._match.side_effect = match_rv actual = sf._do_match(metadata) expected = reduce(lambda x, y: x + y, list(children.values()) + \ @@ -874,45 +878,96 @@ class TestStructFile(TestXMLFileBacked): sf._include_element = Mock() metadata = Mock() - (xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() - sf._include_element.side_effect = \ lambda x, _: (x.tag not in sf._include_tests.keys() or x.get("include") == "true") - actual = copy.deepcopy(xdata) - for el in actual.getchildren(): - sf._xml_match(el, metadata) - expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib)) - expected.text = xdata.text - expected.extend(reduce(lambda x, y: x + y, - list(children.values()) + list(subchildren.values()))) - expected.extend(standalone) - self.assertXMLEqual(actual, expected) + for test_data in [self._get_test_data(), + self._get_template_test_data()]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + + actual = copy.deepcopy(xdata) + for el in actual.getchildren(): + sf._xml_match(el, metadata) + expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib)) + expected.text = xdata.text + expected.extend(reduce(lambda x, y: x + y, + list(children.values()) + \ + list(subchildren.values()))) + expected.extend(standalone) + self.assertXMLEqual(actual, expected) def test_do_xmlmatch(self): sf = self.get_obj() sf._xml_match = Mock() metadata = Mock() - (sf.xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() + for data_type, test_data in \ + [("", self._get_test_data()), + ("templated ", self._get_template_test_data())]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + sf.xdata = xdata + sf._xml_match.reset_mock() + + sf._do_xmlmatch(metadata) + actual = [] + for call in sf._xml_match.call_args_list: + actual.append(call[0][0]) + self.assertEqual(call[0][1], metadata) + expected = list(groups.values()) + standalone + # easiest way to compare the values is actually to make + # them into an XML document and let assertXMLEqual compare + # them + xactual = lxml.etree.Element("Container") + xactual.extend(actual) + xexpected = lxml.etree.Element("Container") + xexpected.extend(expected) + self.assertXMLEqual(xactual, xexpected, + "XMLMatch() calls were incorrect for " + "%stest data" % data_type) + + def test_match_ordering(self): + """ Match() returns elements in document order """ + sf = self.get_obj() + sf._match = Mock() + + def match_rv(el, _): + if el.tag not in sf._include_tests.keys(): + return [el] + elif el.get("include") == "true": + return el.getchildren() + else: + return [] + sf._match.side_effect = match_rv - sf._do_xmlmatch(metadata) - actual = [] - for call in sf._xml_match.call_args_list: - actual.append(call[0][0]) - self.assertEqual(call[0][1], metadata) - expected = list(groups.values()) + standalone - # easiest way to compare the values is actually to make - # them into an XML document and let assertXMLEqual compare - # them - xactual = lxml.etree.Element("Container") - xactual.extend(actual) - xexpected = lxml.etree.Element("Container") - xexpected.extend(expected) - self.assertXMLEqual(xactual, xexpected) + metadata = Mock() + + test_data = lxml.etree.Element("Test") + group = lxml.etree.SubElement(test_data, "Group", name="group", + include="true") + first = lxml.etree.SubElement(group, "Element", name="first") + second = lxml.etree.SubElement(test_data, "Element", name="second") + + # sanity check to ensure that first and second are in the + # correct document order + if test_data.xpath("//Element") != [first, second]: + skip("lxml.etree does not construct documents in a reliable order") + + sf.data = lxml.etree.tostring(test_data) + sf.Index() + rv = sf._do_match(metadata) + self.assertEqual(len(rv), 2, + "Match() seems to be broken, cannot test ordering") + msg = "Match() does not return elements in document order:\n" + \ + "Expected: [%s, %s]\n" % (first, second) + \ + "Actual: %s" % rv + self.assertXMLEqual(rv[0], first, msg) + self.assertXMLEqual(rv[1], second, msg) + + # TODO: add tests to ensure that XMLMatch() returns elements + # in document order class TestINode(Bcfg2TestCase): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py index 35f4e0700..6effe05de 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py @@ -368,3 +368,21 @@ class TestVersion(TestPlugin): class TestClientRunHooks(Bcfg2TestCase): """ placeholder for future tests """ pass + + +class TestClientACLs(Bcfg2TestCase): + test_obj = ClientACLs + + def get_obj(self): + return self.test_obj() + + def test_check_acl_ip(self): + ca = self.get_obj() + self.assertIn(ca.check_acl_ip(Mock(), Mock()), + [True, False, None]) + + def test_check_acl_metadata(self): + ca = self.get_obj() + self.assertIn(ca.check_acl_metadata(Mock(), Mock()), + [True, False]) + diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py new file mode 100644 index 000000000..e457ca7c1 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py @@ -0,0 +1,222 @@ +import os +import sys +import lxml.etree +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.ACL import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \ + TestClientACLs + + +class TestFunctions(Bcfg2TestCase): + def test_rmi_names_equal(self): + good_cases = [('*', 'foo'), + ('foo', 'foo'), + ('foo.*', 'foo.bar'), + ('*.*', 'foo.bar'), + ('foo.bar', 'foo.bar'), + ('*.bar', 'foo.bar'), + ('foo.*.bar', 'foo.baz.bar')] + bad_cases = [('foo', 'bar'), + ('*', 'foo.bar'), + ('*.*', 'foo'), + ('*.*', 'foo.bar.baz'), + ('foo.*', 'bar.foo'), + ('*.bar', 'bar.foo'), + ('foo.*', 'foobar')] + for first, second in good_cases: + self.assertTrue(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly False" % + (first, second)) + self.assertTrue(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly False" % + (second, first)) + for first, second in bad_cases: + self.assertFalse(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly True" % + (first, second)) + self.assertFalse(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly True" % + (second, first)) + + def test_ip_matches(self): + good_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + bad_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + for ip, entry in good_cases: + self.assertTrue(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly False" % + (ip, lxml.etree.tostring(entry))) + for ip, entry in bad_cases: + self.assertFalse(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly True" % + (ip, lxml.etree.tostring(entry))) + + +class TestIPACLFile(TestXMLFileBacked): + test_obj = IPACLFile + + @patch("Bcfg2.Server.Plugins.ACL.ip_matches") + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches): + af = self.get_obj() + ip = "10.0.0.8" + rmi = "ACL.test" + + def reset(): + mock_rmi_names_equal.reset_mock() + mock_ip_matches.reset_mock() + + # test default defer with no entries + af.entries = [] + self.assertIsNone(af.check_acl(ip, rmi)) + + # test explicit allow, deny, and defer + entries = dict(Allow=lxml.etree.Element("Allow", method=rmi), + Deny=lxml.etree.Element("Deny", method=rmi), + Defer=lxml.etree.Element("Defer", method=rmi)) + af.entries = list(entries.values()) + + def get_ip_matches(tag): + def ip_matches(ip, entry): + return entry.tag == tag + + return ip_matches + + mock_rmi_names_equal.return_value = True + + reset() + mock_ip_matches.side_effect = get_ip_matches("Allow") + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Allow']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Deny") + self.assertFalse(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Deny']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Defer") + self.assertIsNone(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Defer']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + # test matching RMI names + reset() + mock_ip_matches.side_effect = lambda i, e: True + mock_rmi_names_equal.side_effect = lambda a, b: a == b + rmi = "ACL.test2" + matching = lxml.etree.Element("Allow", method=rmi) + af.entries.append(matching) + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, matching) + self.assertTrue( + call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or + call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list) + + # test implicit allow for localhost, defer for others + reset() + mock_ip_matches.side_effect = lambda i, e: False + self.assertIsNone(af.check_acl(ip, rmi)) + + reset() + self.assertTrue(af.check_acl("127.0.0.1", rmi)) + + +class TestMetadataACLFile(TestStructFile): + test_obj = MetadataACLFile + + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal): + af = self.get_obj() + af.Match = Mock() + metadata = Mock() + mock_rmi_names_equal.side_effect = lambda a, b: a == b + + def reset(): + af.Match.reset_mock() + mock_rmi_names_equal.reset_mock() + + # test default allow + af.entries = [] + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + + # test explicit allow and deny + reset() + af.entries = [lxml.etree.Element("Allow", method='ACL.test'), + lxml.etree.Element("Deny", method='ACL.test2')] + af.Match.return_value = af.entries + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test', 'ACL.test'), + mock_rmi_names_equal.call_args_list) + + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test2')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test2', 'ACL.test2'), + mock_rmi_names_equal.call_args_list) + + # test default deny for non-localhost + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + # test default allow for localhost + reset() + metadata.hostname = 'localhost' + self.assertTrue(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + +class TestACL(TestPlugin, TestClientACLs): + test_obj = ACL + + def test_check_acl_ip(self): + acl = self.get_obj() + acl.ip_acls = Mock() + self.assertEqual(acl.check_acl_ip("192.168.1.10", "ACL.test"), + acl.ip_acls.check_acl.return_value) + acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test") + + def test_check_acl_metadata(self): + acl = self.get_obj() + acl.metadata_acls = Mock() + metadata = Mock() + self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"), + acl.metadata_acls.check_acl.return_value) + acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test") -- cgit v1.2.3-1-g7c22 From 088ca5fee4cc99f9143f18a880cdec6712326e1e Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 12 Feb 2013 09:47:04 -0500 Subject: fixed unit tests --- testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py index e457ca7c1..86a960701 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py @@ -209,7 +209,8 @@ class TestACL(TestPlugin, TestClientACLs): def test_check_acl_ip(self): acl = self.get_obj() acl.ip_acls = Mock() - self.assertEqual(acl.check_acl_ip("192.168.1.10", "ACL.test"), + self.assertEqual(acl.check_acl_ip(("192.168.1.10", "12345"), + "ACL.test"), acl.ip_acls.check_acl.return_value) acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test") -- cgit v1.2.3-1-g7c22 From e989e62b24479fac99e607d9e4fb3a292bd20418 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 5 Feb 2013 11:36:49 -0500 Subject: added support for wildcard XInclude in XMLFileBacked --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 53 +++++++++++++++------- .../Testlib/TestServer/TestPlugins/TestMetadata.py | 1 + 2 files changed, 38 insertions(+), 16 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 4cb148bb0..6187880b7 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -382,21 +382,22 @@ class TestXMLFileBacked(TestFileBacked): xfb = self.get_obj(should_monitor=True) xfb.fam.AddMonitor.assert_called_with(self.path, xfb) - @patch("os.path.exists") + @patch("glob.glob") @patch("lxml.etree.parse") - def test_follow_xincludes(self, mock_parse, mock_exists): + def test_follow_xincludes(self, mock_parse, mock_glob): xfb = self.get_obj() xfb.add_monitor = Mock() + xfb.add_monitor.side_effect = lambda p: xfb.extras.append(p) def reset(): xfb.add_monitor.reset_mock() + mock_glob.reset_mock() mock_parse.reset_mock() - mock_exists.reset_mock() xfb.extras = [] - mock_exists.return_value = True xdata = dict() mock_parse.side_effect = lambda p: xdata[p] + mock_glob.side_effect = lambda g: [g] base = os.path.dirname(self.path) @@ -430,7 +431,7 @@ class TestXMLFileBacked(TestFileBacked): xfb.add_monitor.assert_called_with(test2) self.assertItemsEqual(mock_parse.call_args_list, [call(f) for f in xdata.keys()]) - mock_exists.assert_called_with(test2) + mock_glob.assert_called_with(test2) reset() xfb._follow_xincludes(fname=self.path, xdata=xdata[self.path]) @@ -438,7 +439,7 @@ class TestXMLFileBacked(TestFileBacked): self.assertItemsEqual(mock_parse.call_args_list, [call(f) for f in xdata.keys() if f != self.path]) - mock_exists.assert_called_with(test2) + mock_glob.assert_called_with(test2) # test two-deep level of xinclude, with some files in another # directory @@ -466,21 +467,41 @@ class TestXMLFileBacked(TestFileBacked): reset() xfb._follow_xincludes(fname=self.path) - self.assertItemsEqual(xfb.add_monitor.call_args_list, - [call(f) for f in xdata.keys() if f != self.path]) + expected = [call(f) for f in xdata.keys() if f != self.path] + self.assertItemsEqual(xfb.add_monitor.call_args_list, expected) self.assertItemsEqual(mock_parse.call_args_list, [call(f) for f in xdata.keys()]) - self.assertItemsEqual(mock_exists.call_args_list, - [call(f) for f in xdata.keys() if f != self.path]) + self.assertItemsEqual(mock_glob.call_args_list, expected) reset() xfb._follow_xincludes(fname=self.path, xdata=xdata[self.path]) - self.assertItemsEqual(xfb.add_monitor.call_args_list, - [call(f) for f in xdata.keys() if f != self.path]) - self.assertItemsEqual(mock_parse.call_args_list, - [call(f) for f in xdata.keys() if f != self.path]) - self.assertItemsEqual(mock_exists.call_args_list, - [call(f) for f in xdata.keys() if f != self.path]) + expected = [call(f) for f in xdata.keys() if f != self.path] + self.assertItemsEqual(xfb.add_monitor.call_args_list, expected) + self.assertItemsEqual(mock_parse.call_args_list, expected) + self.assertItemsEqual(mock_glob.call_args_list, expected) + + # test wildcard xinclude + reset() + xdata[self.path] = lxml.etree.Element("Test").getroottree() + lxml.etree.SubElement(xdata[self.path].getroot(), + Bcfg2.Server.XI_NAMESPACE + "include", + href="*.xml") + + def glob_rv(path): + if path == os.path.join(base, '*.xml'): + return [self.path, test2, test3] + else: + return [path] + mock_glob.side_effect = glob_rv + + xfb._follow_xincludes(xdata=xdata[self.path]) + expected = [call(f) for f in xdata.keys() if f != self.path] + self.assertItemsEqual(xfb.add_monitor.call_args_list, expected) + self.assertItemsEqual(mock_parse.call_args_list, expected) + self.assertItemsEqual(mock_glob.call_args_list, + [call(os.path.join(base, '*.xml')), call(test4), + call(test5), call(test6)]) + @patch("lxml.etree._ElementTree", FakeElementTree) @patch("Bcfg2.Server.Plugin.helpers.%s._follow_xincludes" % diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index 0afa5220d..9f1e73541 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -198,6 +198,7 @@ if HAS_DJANGO or can_skip: class TestXMLMetadataConfig(TestXMLFileBacked): test_obj = XMLMetadataConfig + path = os.path.join(datastore, 'Metadata', 'clients.xml') def get_obj(self, basefile="clients.xml", core=None, watch_clients=False): self.metadata = get_metadata_object(core=core, -- cgit v1.2.3-1-g7c22 From 9bec4d6bbab599bee72256c7e09fe214cb849a1b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 5 Feb 2013 12:13:20 -0500 Subject: abstracted similar digit range classes in POSIXUsers/GroupPatterns into Bcfg2.Utils --- .../TestServer/TestPlugins/TestGroupPatterns.py | 21 --------------------- 1 file changed, 21 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestGroupPatterns.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestGroupPatterns.py index a7a6b3d6e..a9346156c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestGroupPatterns.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestGroupPatterns.py @@ -18,27 +18,6 @@ from common import * from TestPlugin import TestXMLFileBacked, TestPlugin, TestConnector -class TestPackedDigitRange(Bcfg2TestCase): - def test_includes(self): - # tuples of (range description, numbers that are included, - # numebrs that are excluded) - tests = [("1-3", [1, "2", 3], [4]), - ("1", [1], [0, "2"]), - ("10-11", [10, 11], [0, 1]), - ("9-9", [9], [8, 10]), - ("0-100", [0, 10, 99, 100], []), - ("1,3,5", [1, 3, 5], [0, 2, 4, 6]), - ("1-5,7", [1, 3, 5, 7], [0, 6, 8]), - ("1-5,7,9-11", [1, 3, 5, 7, 9, 11], [0, 6, 8, 12]), - ("852-855,321-497,763", [852, 855, 321, 400, 497, 763], [])] - for rng, inc, exc in tests: - r = PackedDigitRange(rng) - for test in inc: - self.assertTrue(r.includes(test)) - for test in exc: - self.assertFalse(r.includes(test)) - - class TestPatternMap(Bcfg2TestCase): def test_ranges(self): """ test processing NameRange patterns """ -- cgit v1.2.3-1-g7c22 From fd67a2735ada342251cb6baaa4e678532566e975 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 7 Feb 2013 10:00:37 -0500 Subject: moved common file locking code into Bcfg2.Utils --- testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index 9f1e73541..2697df292 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -328,7 +328,7 @@ class TestXMLMetadataConfig(TestXMLFileBacked): "clients.xml"), "") - @patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False)) + @patch('Bcfg2.Utils.locked', Mock(return_value=False)) @patch('fcntl.lockf', Mock()) @patch('os.open') @patch('os.fdopen') -- cgit v1.2.3-1-g7c22 From dc22e36574d4d4cdbde282906ef3e1d3c7fe7c94 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 7 Feb 2013 10:14:31 -0500 Subject: Metadata: process default client bootstrap mode properly --- .../Testlib/TestServer/TestPlugins/TestMetadata.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index 2697df292..221eb8a3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -20,7 +20,7 @@ while path != "/": path = os.path.dirname(path) from common import * from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \ - TestStatistics, TestDatabaseBacked + TestClientRunHooks, TestDatabaseBacked def get_clients_test_tree(): @@ -464,7 +464,7 @@ class TestClientMetadata(Bcfg2TestCase): self.assertFalse(cm.inGroup("group3")) -class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): +class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): test_obj = Metadata use_db = False @@ -497,7 +497,7 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): metadata = self.get_obj(core=core) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics) + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.ClientRunHooks) self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig) self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig) self.assertIsInstance(metadata.query, MetadataQuery) @@ -1227,17 +1227,16 @@ class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") - def test_process_statistics(self, mock_update_client): + def test_end_statistics(self, mock_update_client): metadata = self.load_clients_data(metadata=self.load_groups_data()) md = Mock() md.hostname = "client6" - metadata.process_statistics(md, None) - mock_update_client.assert_called_with(md.hostname, - dict(auth='cert')) + metadata.end_statistics(md) + mock_update_client.assert_called_with(md.hostname, dict(auth='cert')) mock_update_client.reset_mock() md.hostname = "client5" - metadata.process_statistics(md, None) + metadata.end_statistics(md) self.assertFalse(mock_update_client.called) def test_viz(self): @@ -1513,6 +1512,10 @@ class TestMetadata_NoClientsXML(TestMetadataBase): def test_handle_clients_xml_event(self): pass + def test_end_statistics(self): + # bootstrap mode, which is what is being tested here, doesn't + # work without clients.xml + pass class TestMetadata_ClientsXML(TestMetadataBase): """ test Metadata with a clients.xml. """ -- cgit v1.2.3-1-g7c22 From 3d06f311274d6b942ee89d8cdb13b2ecc99af1b0 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 14 Mar 2013 13:05:08 -0400 Subject: use Executor class for better subprocess calling on server --- .../TestCfg/TestCfgExternalCommandVerifier.py | 31 ++++++++++------------ .../TestCfg/TestCfgPrivateKeyCreator.py | 29 +++++++++----------- 2 files changed, 27 insertions(+), 33 deletions(-) (limited to 'testsuite/Testsrc/Testlib/TestServer') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py index 0f369113b..7ceedb7c2 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py @@ -21,35 +21,32 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier class TestCfgExternalCommandVerifier(TestCfgVerifier): test_obj = CfgExternalCommandVerifier - @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen") - def test_verify_entry(self, mock_Popen): - proc = Mock() - mock_Popen.return_value = proc - proc.wait.return_value = 0 - proc.communicate.return_value = ("stdout", "stderr") + def test_verify_entry(self): entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() ecv = self.get_obj() ecv.cmd = ["/bin/bash", "-x", "foo"] + ecv.exc = Mock() + ecv.exc.run.return_value = Mock() + ecv.exc.run.return_value.success = True + ecv.verify_entry(entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") - mock_Popen.reset_mock() - proc.wait.return_value = 13 + ecv.exc.reset_mock() + ecv.exc.run.return_value.success = False self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") + + ecv.exc.reset_mock() - mock_Popen.reset_mock() - mock_Popen.side_effect = OSError + ecv.exc.reset_mock() + ecv.exc.run.side_effect = OSError self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") @patch("os.access") def test_handle_event(self, mock_access): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index 2982825a9..6cfd2f666 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -98,24 +98,23 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): @patch("shutil.rmtree") @patch("tempfile.mkdtemp") - @patch("subprocess.Popen") - def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree): + def test__gen_keypair(self, mock_mkdtemp, mock_rmtree): pkc = self.get_obj() + pkc.cmd = Mock() pkc.XMLMatch = Mock() mock_mkdtemp.return_value = datastore metadata = Mock() - proc = Mock() - proc.wait.return_value = 0 - proc.communicate.return_value = MagicMock() - mock_Popen.return_value = proc + exc = Mock() + exc.success = True + pkc.cmd.run.return_value = exc spec = lxml.etree.Element("PrivateKey") pkc.XMLMatch.return_value = spec def reset(): pkc.XMLMatch.reset_mock() - mock_Popen.reset_mock() + pkc.cmd.reset_mock() mock_mkdtemp.reset_mock() mock_rmtree.reset_mock() @@ -123,10 +122,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "rsa", "-N", ""]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "rsa", "-N", ""]) reset() lxml.etree.SubElement(spec, "Params", bits="768", type="dsa") @@ -137,13 +135,12 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "dsa", "-b", "768", "-N", "foo"]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "dsa", "-b", "768", "-N", "foo"]) reset() - proc.wait.return_value = 1 + pkc.cmd.run.return_value.success = False self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata) mock_rmtree.assert_called_with(datastore) -- cgit v1.2.3-1-g7c22