diff options
Diffstat (limited to 'testsuite')
25 files changed, 1247 insertions, 1099 deletions
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py index f01082e86..c2c6c5d4c 100644 --- a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py @@ -49,7 +49,6 @@ class TestPOSIX(TestTool): mock_canVerify.assert_called_with(posix, entry) # next, test fully_specified failure - posix.logger.error.reset_mock() mock_canVerify.reset_mock() mock_canVerify.return_value = True mock_fully_spec = Mock() @@ -59,17 +58,14 @@ class TestPOSIX(TestTool): self.assertFalse(posix.canVerify(entry)) mock_canVerify.assert_called_with(posix, entry) mock_fully_spec.assert_called_with(entry) - self.assertTrue(posix.logger.error.called) # finally, test success - posix.logger.error.reset_mock() mock_canVerify.reset_mock() mock_fully_spec.reset_mock() mock_fully_spec.return_value = True self.assertTrue(posix.canVerify(entry)) mock_canVerify.assert_called_with(posix, entry) mock_fully_spec.assert_called_with(entry) - self.assertFalse(posix.logger.error.called) @patch("Bcfg2.Client.Tools.Tool.canInstall") def test_canInstall(self, mock_canInstall): @@ -82,7 +78,6 @@ class TestPOSIX(TestTool): mock_canInstall.assert_called_with(posix, entry) # next, test fully_specified failure - posix.logger.error.reset_mock() mock_canInstall.reset_mock() mock_canInstall.return_value = True mock_fully_spec = Mock() @@ -92,17 +87,14 @@ class TestPOSIX(TestTool): self.assertFalse(posix.canInstall(entry)) mock_canInstall.assert_called_with(posix, entry) mock_fully_spec.assert_called_with(entry) - self.assertTrue(posix.logger.error.called) # finally, test success - posix.logger.error.reset_mock() mock_canInstall.reset_mock() mock_fully_spec.reset_mock() mock_fully_spec.return_value = True self.assertTrue(posix.canInstall(entry)) mock_canInstall.assert_called_with(posix, entry) mock_fully_spec.assert_called_with(entry) - self.assertFalse(posix.logger.error.called) def test_InstallPath(self): posix = self.get_obj() @@ -152,7 +144,6 @@ class TestPOSIX(TestTool): def inner(mock_listdir): mock_listdir.side_effect = OSError posix._prune_old_backups(entry) - self.assertTrue(posix.logger.error.called) self.assertFalse(mock_remove.called) mock_listdir.assert_called_with(setup['ppath']) @@ -170,7 +161,6 @@ class TestPOSIX(TestTool): mock_listdir.reset_mock() mock_remove.reset_mock() mock_remove.side_effect = OSError - posix.logger.error.reset_mock() # test to ensure that we call os.remove() for all files that # need to be removed even if we get an error posix._prune_old_backups(entry) @@ -178,7 +168,6 @@ class TestPOSIX(TestTool): self.assertItemsEqual(mock_remove.call_args_list, [call(os.path.join(setup['ppath'], p)) for p in remove]) - self.assertTrue(posix.logger.error.called) inner() diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIXUsers.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIXUsers.py index 9478f7071..6d4644ea5 100644 --- a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIXUsers.py +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIXUsers.py @@ -24,11 +24,11 @@ from TestTools.Test_init import TestTool class TestPOSIXUsers(TestTool): test_obj = POSIXUsers - def get_obj(self, logger=None, setup=None, config=None): + def get_obj(self, setup=None, config=None): if setup is None: setup = MagicMock() setup.__getitem__.return_value = [] - return TestTool.get_obj(self, logger, setup, config) + return TestTool.get_obj(self, setup, config) @patch("pwd.getpwall") @patch("grp.getgrall") @@ -134,10 +134,9 @@ class TestPOSIXUsers(TestTool): users.set_defaults['POSIXUser'] = Mock() users.set_defaults['POSIXUser'].side_effect = lambda e: e - states = dict() - self.assertEqual(users.Inventory(states), + self.assertEqual(users.Inventory(), mock_Inventory.return_value) - mock_Inventory.assert_called_with(users, states, config.getchildren()) + mock_Inventory.assert_called_with(users, config.getchildren()) lxml.etree.SubElement(orig_bundle, "POSIXGroup", name="test") self.assertXMLEqual(orig_bundle, bundle) @@ -299,9 +298,8 @@ class TestPOSIXUsers(TestTool): entries = [lxml.etree.Element("POSIXUser", name="test"), lxml.etree.Element("POSIXGroup", name="test"), lxml.etree.Element("POSIXUser", name="test2")] - states = dict() - users.Install(entries, states) + states = users.Install(entries) self.assertItemsEqual(entries, states.keys()) for state in states.values(): self.assertEqual(state, users._install.return_value) diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/Test_init.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/Test_init.py index 390b92608..df7b7c217 100644 --- a/testsuite/Testsrc/Testlib/TestClient/TestTools/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/Test_init.py @@ -21,24 +21,23 @@ from common import * class TestTool(Bcfg2TestCase): test_obj = Tool - def get_obj(self, logger=None, setup=None, config=None): + def get_obj(self, setup=None, config=None): if config is None: config = lxml.etree.Element("Configuration") - if not logger: - def print_msg(msg): - print(msg) - logger = Mock() - logger.error = Mock(side_effect=print_msg) - logger.warning = Mock(side_effect=print_msg) - logger.info = Mock(side_effect=print_msg) - logger.debug = Mock(side_effect=print_msg) if not setup: setup = MagicMock() if 'command_timeout' not in setup: setup['command_timeout'] = None + execs = self.test_obj.__execs__ self.test_obj.__execs__ = [] - rv = self.test_obj(logger, setup, config) + + @patch("Bcfg2.Options.get_option_parser") + def inner(mock_option_parser): + mock_option_parser.return_value = setup + return self.test_obj(config) + + rv = inner() self.test_obj.__execs__ = execs return rv @@ -166,14 +165,12 @@ class TestTool(Bcfg2TestCase): self.assertItemsEqual(states, expected_states) self.assertEqual(t.extra, t.FindExtra.return_value) - actual_states = dict() - t.Inventory(actual_states, structures=[bundle1, bundle2]) + actual_states = t.Inventory(structures=[bundle1, bundle2]) perform_assertions(actual_states) reset() - actual_states = dict() t.config = config - t.Inventory(actual_states) + actual_states = t.Inventory() perform_assertions(actual_states) def test_Install(self): @@ -199,9 +196,8 @@ class TestTool(Bcfg2TestCase): expected_states.update(dict([(e, t.InstallService.return_value) for e in entries if e.tag == "Service"])) - actual_states = dict() t.modified = [] - t.Install(entries, actual_states) + actual_states = t.Install(entries) self.assertItemsEqual(t.InstallPath.call_args_list, [call(e) for e in entries if e.tag == "Path"]) self.assertItemsEqual(t.InstallPackage.call_args_list, @@ -385,8 +381,7 @@ class TestPkgTool(TestTool): # test single-pass install success reset() pt.cmd.run.return_value = True - states = dict([(p, False) for p in packages]) - pt.Install(packages, states) + states = pt.Install(packages) pt._get_package_command.assert_called_with(packages) pt.cmd.run.assert_called_with([p.get("name") for p in packages]) self.assertItemsEqual(states, @@ -406,8 +401,7 @@ class TestPkgTool(TestTool): pt.VerifyPackage.side_effect = lambda p, m: p.get("name") == "bar" pt.cmd.run.side_effect = run - states = dict([(p, False) for p in packages]) - pt.Install(packages, states) + states = pt.Install(packages) pt._get_package_command.assert_any_call(packages) for pkg in packages: pt.VerifyPackage.assert_any_call(pkg, []) @@ -556,10 +550,9 @@ class TestSvcTool(TestTool): st.handlesEntry = Mock() st.handlesEntry.side_effect = lambda e: e.tag == "Service" st.stop_service = Mock() - st.stop_service.return_value = 0 + st.stop_service.return_value = True st.restart_service = Mock() - st.restart_service.side_effect = lambda e: \ - int(e.get("name") != "failed") + st.restart_service.side_effect = lambda e: e.get("name") != "failed" def reset(): st.handlesEntry.reset_mock() @@ -593,8 +586,7 @@ class TestSvcTool(TestTool): # test in non-interactive mode reset() - states = dict() - st.BundleUpdated(bundle, states) + states = st.BundleUpdated(bundle) self.assertItemsEqual(st.handlesEntry.call_args_list, [call(e) for e in entries]) st.stop_service.assert_called_with(stop) @@ -607,8 +599,7 @@ class TestSvcTool(TestTool): reset() mock_prompt.side_effect = lambda p: "interactive2" not in p st.setup['interactive'] = True - states = dict() - st.BundleUpdated(bundle, states) + states = st.BundleUpdated(bundle) self.assertItemsEqual(st.handlesEntry.call_args_list, [call(e) for e in entries]) st.stop_service.assert_called_with(stop) @@ -622,8 +613,7 @@ class TestSvcTool(TestTool): reset() st.setup['interactive'] = False st.setup['servicemode'] = 'build' - states = dict() - st.BundleUpdated(bundle, states) + states = st.BundleUpdated(bundle) self.assertItemsEqual(st.handlesEntry.call_args_list, [call(e) for e in entries]) self.assertItemsEqual(st.stop_service.call_args_list, @@ -639,10 +629,9 @@ class TestSvcTool(TestTool): services = install + [lxml.etree.Element("Service", type="test", name="bar", install="false")] st = self.get_obj() - states = Mock() - self.assertEqual(st.Install(services, states), + self.assertEqual(st.Install(services), mock_Install.return_value) - mock_Install.assert_called_with(st, install, states) + mock_Install.assert_called_with(st, install) def test_InstallService(self): st = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestEncryption.py b/testsuite/Testsrc/Testlib/TestServer/TestEncryption.py index c03aa66e1..8f69f3bf0 100644 --- a/testsuite/Testsrc/Testlib/TestEncryption.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestEncryption.py @@ -16,7 +16,7 @@ while path != "/": from common import * try: - from Bcfg2.Encryption import * + from Bcfg2.Server.Encryption import * HAS_CRYPTO = True except ImportError: HAS_CRYPTO = False @@ -110,52 +110,30 @@ baz self.assertRaises(EVPError, ssl_decrypt, crypted, passwd) # bogus algorithm - def test_get_algorithm(self): - setup = Mock() - # we don't care what the default is, as long as there is - # one - setup.cfp.get.return_value = ALGORITHM - self.assertRegexpMatches(get_algorithm(setup), - r'^[a-z0-9]+_[a-z0-9_]+$') - setup.cfp.get.assert_called_with(CFG_SECTION, CFG_ALGORITHM, - default=ALGORITHM) - - setup.cfp.get.return_value = self.algo - self.assertEqual(get_algorithm(setup), self.algo) - setup.cfp.get.assert_called_with(CFG_SECTION, CFG_ALGORITHM, - default=ALGORITHM) - - # test that get_algorithm converts algorithms given in - # OpenSSL style to M2Crypto style - setup.cfp.get.return_value = "DES-EDE3-CFB8" - self.assertEqual(get_algorithm(setup), "des_ede3_cfb8") - setup.cfp.get.assert_called_with(CFG_SECTION, CFG_ALGORITHM, - default=ALGORITHM) - - def test_get_passphrases(self): + @patch("Bcfg2.Options.get_option_parser") + def test_get_passphrases(self, mock_get_option_parser): setup = Mock() setup.cfp.has_section.return_value = False - self.assertEqual(get_passphrases(setup), dict()) + mock_get_option_parser.return_value = setup + self.assertEqual(get_passphrases(), dict()) setup.cfp.has_section.return_value = True setup.cfp.options.return_value = ["foo", "bar", CFG_ALGORITHM] setup.cfp.get.return_value = "passphrase" - self.assertItemsEqual(get_passphrases(setup), + self.assertItemsEqual(get_passphrases(), dict(foo="passphrase", bar="passphrase")) - @patch("Bcfg2.Encryption.get_passphrases") + @patch("Bcfg2.Server.Encryption.get_passphrases") def test_bruteforce_decrypt(self, mock_passphrases): passwd = "a simple passphrase" crypted = ssl_encrypt(self.plaintext, passwd) - setup = Mock() # test with no passphrases given nor in config mock_passphrases.return_value = dict() self.assertRaises(EVPError, - bruteforce_decrypt, - crypted, setup=setup) - mock_passphrases.assert_called_with(setup) + bruteforce_decrypt, crypted) + mock_passphrases.assert_called_with() # test with good passphrase given in function call mock_passphrases.reset_mock() @@ -179,15 +157,14 @@ baz real=passwd, bogus2="also bogus") self.assertEqual(self.plaintext, - bruteforce_decrypt(crypted, setup=setup)) - mock_passphrases.assert_called_with(setup) + bruteforce_decrypt(crypted)) + mock_passphrases.assert_called_with() # test that passphrases given in function call take # precedence over config mock_passphrases.reset_mock() self.assertRaises(EVPError, - bruteforce_decrypt, - crypted, setup=setup, + bruteforce_decrypt, crypted, passphrases=["bogus", "also bogus"]) self.assertFalse(mock_passphrases.called) @@ -195,5 +172,4 @@ baz mock_passphrases.reset_mock() crypted = ssl_encrypt(self.plaintext, passwd, algorithm=self.algo) self.assertEqual(self.plaintext, - bruteforce_decrypt(crypted, setup=setup, - algorithm=self.algo)) + bruteforce_decrypt(crypted, algorithm=self.algo)) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 94866cf39..929f665b1 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -1,9 +1,10 @@ import os -import re import sys import copy +import genshi import lxml.etree import Bcfg2.Server +import genshi.core from Bcfg2.Compat import reduce from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugin.helpers import * @@ -21,6 +22,12 @@ from common import * from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable from TestServer.TestPlugin.Testinterfaces import TestGenerator +try: + from Bcfg2.Server.Encryption import EVPError + HAS_CRYPTO = True +except: + HAS_CRYPTO = False + def tostring(el): return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8') @@ -31,46 +38,14 @@ class FakeElementTree(lxml.etree._ElementTree): class TestFunctions(Bcfg2TestCase): - def test_bind_info(self): - entry = lxml.etree.Element("Path", name="/test") - metadata = Mock() - default = dict(test1="test1", test2="test2") - # test without infoxml - bind_info(entry, metadata, default=default) - self.assertItemsEqual(entry.attrib, - dict(test1="test1", - test2="test2", - name="/test")) - - # test with bogus infoxml - entry = lxml.etree.Element("Path", name="/test") - infoxml = Mock() - self.assertRaises(PluginExecutionError, - bind_info, - entry, metadata, infoxml=infoxml) - infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry) - - # test with valid infoxml - entry = lxml.etree.Element("Path", name="/test") - infoxml.reset_mock() - infodata = {None: {"test3": "test3", "test4": "test4"}} - def infoxml_rv(metadata, rv, entry=None): - rv['Info'] = infodata - infoxml.pnode.Match.side_effect = infoxml_rv - bind_info(entry, metadata, infoxml=infoxml, default=default) - # mock objects don't properly track the called-with value of - # arguments whose value is changed by the function, so it - # thinks Match() was called with the final value of the mdata - # arg, not the initial value. makes this test a little less - # worthwhile, TBH. - infoxml.pnode.Match.assert_called_with(metadata, dict(Info=infodata), - entry=entry) - self.assertItemsEqual(entry.attrib, - dict(test1="test1", - test2="test2", - test3="test3", - test4="test4", - name="/test")) + def test_removecomment(self): + data = [(None, "test", 1), + (None, "test2", 2)] + stream = [(genshi.core.COMMENT, "test", 0), + data[0], + (genshi.core.COMMENT, "test3", 0), + data[1]] + self.assertItemsEqual(list(removecomment(stream)), data) class TestDatabaseBacked(TestPlugin): @@ -109,10 +84,10 @@ class TestFileBacked(Bcfg2TestCase): test_obj = FileBacked path = os.path.join(datastore, "test") - def get_obj(self, path=None, fam=None): + def get_obj(self, path=None): if path is None: path = self.path - return self.test_obj(path, fam=fam) + return self.test_obj(path) @patch("%s.open" % builtins) def test_HandleEvent(self, mock_open): @@ -168,8 +143,7 @@ class TestDirectoryBacked(Bcfg2TestCase): Mock()) def inner(): return self.test_obj(os.path.join(datastore, - self.test_obj.__name__), - fam) + self.test_obj.__name__)) return inner() @patch("os.makedirs") @@ -178,8 +152,8 @@ class TestDirectoryBacked(Bcfg2TestCase): @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, self.test_obj.__name__)) def inner(mock_add_monitor): + db = self.test_obj(datastore) mock_exists.return_value = True - db = self.test_obj(datastore, Mock()) mock_add_monitor.assert_called_with('') mock_exists.assert_called_with(db.data) self.assertFalse(mock_makedirs.called) @@ -188,7 +162,7 @@ class TestDirectoryBacked(Bcfg2TestCase): mock_exists.reset_mock() mock_makedirs.reset_mock() mock_exists.return_value = False - db = self.test_obj(datastore, Mock()) + db = self.test_obj(datastore) mock_add_monitor.assert_called_with('') mock_exists.assert_called_with(db.data) mock_makedirs.assert_called_with(db.data) @@ -259,10 +233,9 @@ class TestDirectoryBacked(Bcfg2TestCase): db.fam = Mock() class MockChild(Mock): - def __init__(self, path, fam, **kwargs): + def __init__(self, path, **kwargs): Mock.__init__(self, **kwargs) self.path = path - self.fam = fam self.HandleEvent = Mock() db.__child__ = MockChild @@ -272,7 +245,6 @@ class TestDirectoryBacked(Bcfg2TestCase): self.assertIn(path, db.entries) self.assertEqual(db.entries[path].path, os.path.join(db.data, path)) - self.assertEqual(db.entries[path].fam, db.fam) db.entries[path].HandleEvent.assert_called_with(event) @patch("os.path.isdir") @@ -410,28 +382,27 @@ class TestXMLFileBacked(TestFileBacked): should_monitor = None path = os.path.join(datastore, "test", "test1.xml") - def get_obj(self, path=None, fam=None, should_monitor=False): + def get_obj(self, path=None, should_monitor=False): if path is None: path = self.path @patchIf(not isinstance(os.path.exists, Mock), "os.path.exists", Mock()) def inner(): - return self.test_obj(path, fam=fam, should_monitor=should_monitor) + return self.test_obj(path, should_monitor=should_monitor) return inner() - def test__init(self): - fam = Mock() + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): xfb = self.get_obj() + self.assertEqual(xfb.fam, mock_get_fam.return_value) + if self.should_monitor: - self.assertIsNotNone(xfb.fam) - fam.reset_mock() - xfb = self.get_obj(fam=fam, should_monitor=True) - fam.AddMonitor.assert_called_with(self.path, xfb) + xfb = self.get_obj(should_monitor=True) + xfb.fam.AddMonitor.assert_called_with(self.path, xfb) else: - self.assertIsNone(xfb.fam) - xfb = self.get_obj(fam=fam) - self.assertFalse(fam.AddMonitor.called) + xfb = self.get_obj() + self.assertFalse(xfb.fam.AddMonitor.called) @patch("glob.glob") @patch("lxml.etree.parse") @@ -600,6 +571,7 @@ class TestXMLFileBacked(TestFileBacked): test3 = lxml.etree.Element("Test", name="test3") replacements = {"/test/test2.xml": test2, "/test/test_dir/test3.xml": test3} + def xinclude(): for el in xfb.xdata.findall('//%sinclude' % Bcfg2.Server.XI_NAMESPACE): @@ -617,25 +589,24 @@ class TestXMLFileBacked(TestFileBacked): self.assertItemsEqual([tostring(e) for e in xfb.entries], [tostring(e) for e in children]) + @patch("Bcfg2.Server.FileMonitor.get_fam", Mock()) def test_add_monitor(self): xfb = self.get_obj() xfb.add_monitor("/test/test2.xml") self.assertIn("/test/test2.xml", xfb.extra_monitors) - fam = Mock() if self.should_monitor is not True: - fam.reset_mock() - xfb = self.get_obj(fam=fam) - fam.reset_mock() + xfb = self.get_obj() + xfb.fam = Mock() xfb.add_monitor("/test/test3.xml") - self.assertFalse(fam.AddMonitor.called) + self.assertFalse(xfb.fam.AddMonitor.called) self.assertIn("/test/test3.xml", xfb.extra_monitors) if self.should_monitor is not False: - fam.reset_mock() - xfb = self.get_obj(fam=fam, should_monitor=True) + xfb = self.get_obj(should_monitor=True) + xfb.fam = Mock() xfb.add_monitor("/test/test4.xml") - fam.AddMonitor.assert_called_with("/test/test4.xml", xfb) + xfb.fam.AddMonitor.assert_called_with("/test/test4.xml", xfb) self.assertIn("/test/test4.xml", xfb.extra_monitors) @@ -683,7 +654,8 @@ class TestStructFile(TestXMLFileBacked): lxml.etree.SubElement(groups[1], "Child", name="c3") lxml.etree.SubElement(groups[1], "Child", name="c4") - standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s1")) + standalone.append(lxml.etree.SubElement(xdata, + "Standalone", name="s1")) groups[2] = lxml.etree.SubElement(xdata, "Client", name="client2", include="false") @@ -705,12 +677,147 @@ class TestStructFile(TestXMLFileBacked): subchildren[3] = [] lxml.etree.SubElement(children[3][-1], "SubChild", name="subchild") - standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s3")) + standalone.append(lxml.etree.SubElement(xdata, + "Standalone", name="s3")) lxml.etree.SubElement(standalone[-1], "SubStandalone", name="sub1") - children[4] = standalone return (xdata, groups, subgroups, children, subchildren, standalone) + def _get_template_test_data(self): + (xdata, groups, subgroups, children, subchildren, standalone) = \ + self._get_test_data() + template_xdata = \ + lxml.etree.Element("Test", name="test", + nsmap=dict(py='http://genshi.edgewall.org/')) + template_xdata.extend(xdata.getchildren()) + return (template_xdata, groups, subgroups, children, subchildren, + standalone) + + @patch("genshi.template.TemplateLoader") + def test_Index(self, mock_TemplateLoader): + has_crypto = Bcfg2.Server.Plugin.helpers.HAS_CRYPTO + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = False + TestXMLFileBacked.test_Index(self) + + sf = self.get_obj() + sf.encoding = Mock() + (xdata, groups, subgroups, children, subchildren, standalone) = \ + self._get_test_data() + sf.data = lxml.etree.tostring(xdata) + + mock_TemplateLoader.reset_mock() + sf.Index() + self.assertFalse(mock_TemplateLoader.called) + + mock_TemplateLoader.reset_mock() + template_xdata = \ + lxml.etree.Element("Test", name="test", + nsmap=dict(py='http://genshi.edgewall.org/')) + template_xdata.extend(xdata.getchildren()) + sf.data = lxml.etree.tostring(template_xdata) + sf.Index() + mock_TemplateLoader.assert_called_with() + loader = mock_TemplateLoader.return_value + loader.load.assert_called_with(sf.name, + cls=genshi.template.MarkupTemplate, + encoding=sf.encoding) + self.assertEqual(sf.template, + loader.load.return_value) + + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = has_crypto + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_Index_crypto(self): + sf = self.get_obj() + sf.setup = Mock() + sf.setup.cfp.get.return_value = "strict" + sf._decrypt = Mock() + sf._decrypt.return_value = 'plaintext' + sf.data = ''' +<EncryptedData> + <Group name="test"> + <Datum encrypted="foo">crypted</Datum> + </Group> + <Group name="test" negate="true"> + <Datum>plain</Datum> + </Group> +</EncryptedData>''' + + # test successful decryption + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + for el in sf.xdata.xpath("//*[@encrypted]"): + self.assertEqual(el.text, sf._decrypt.return_value) + + # test failed decryption, strict + sf._decrypt.reset_mock() + sf._decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, sf.Index) + + # test failed decryption, lax + sf.setup.cfp.get.return_value = "lax" + sf._decrypt.reset_mock() + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.ssl_decrypt") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.bruteforce_decrypt") + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): + sf = self.get_obj() + + def reset(): + mock_bruteforce.reset_mock() + mock_get_passphrases.reset_mock() + mock_ssl.reset_mock() + + # test element without text contents + self.assertIsNone(sf._decrypt(lxml.etree.Element("Test"))) + self.assertFalse(mock_bruteforce.called) + self.assertFalse(mock_get_passphrases.called) + self.assertFalse(mock_ssl.called) + + # test element with a passphrase in the config file + reset() + el = lxml.etree.Element("Test", encrypted="foo") + el.text = "crypted" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") + mock_ssl.return_value = "decrypted with ssl" + self.assertEqual(sf._decrypt(el), mock_ssl.return_value) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test failure to decrypt element with a passphrase in the config + reset() + mock_ssl.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test element without valid passphrase + reset() + el.set("encrypted", "true") + mock_bruteforce.return_value = "decrypted with bruteforce" + self.assertEqual(sf._decrypt(el), mock_bruteforce.return_value) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + + # test failure to decrypt element without valid passphrase + reset() + mock_bruteforce.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + def test_include_element(self): sf = self.get_obj() metadata = Mock() @@ -743,22 +850,62 @@ class TestStructFile(TestXMLFileBacked): self.assertTrue(inc("Other")) - @patch("Bcfg2.Server.Plugin.helpers.%s._include_element" % - test_obj.__name__) - def test__match(self, mock_include): + def test__match(self): sf = self.get_obj() + sf._include_element = Mock() metadata = Mock() - (xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() - - mock_include.side_effect = \ - lambda x, _: (x.tag not in ['Client', 'Group'] or + sf._include_element.side_effect = \ + lambda x, _: (x.tag not in sf._include_tests.keys() or x.get("include") == "true") - for i, group in groups.items(): - actual = sf._match(group, metadata) - expected = children[i] + subchildren[i] + for test_data in [self._get_test_data(), + self._get_template_test_data()]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + + for i, group in groups.items(): + actual = sf._match(group, metadata) + expected = children[i] + subchildren[i] + self.assertEqual(len(actual), len(expected)) + # easiest way to compare the values is actually to make + # them into an XML document and let assertXMLEqual compare + # them + xactual = lxml.etree.Element("Container") + xactual.extend(actual) + xexpected = lxml.etree.Element("Container") + xexpected.extend(expected) + self.assertXMLEqual(xactual, xexpected) + + for el in standalone: + self.assertXMLEqual(el, sf._match(el, metadata)[0]) + + def test_do_match(self): + sf = self.get_obj() + sf._match = Mock() + + def match_rv(el, _): + if el.tag not in sf._include_tests.keys(): + return [el] + elif el.get("include") == "true": + return el.getchildren() + else: + return [] + sf._match.side_effect = match_rv + + metadata = Mock() + + for test_data in [self._get_test_data(), + self._get_template_test_data()]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + sf.data = lxml.etree.tostring(xdata) + sf.Index() + + actual = sf._do_match(metadata) + expected = reduce(lambda x, y: x + y, + list(children.values()) + \ + list(subgroups.values())) + standalone self.assertEqual(len(actual), len(expected)) # easiest way to compare the values is actually to make # them into an XML document and let assertXMLEqual compare @@ -769,84 +916,101 @@ class TestStructFile(TestXMLFileBacked): xexpected.extend(expected) self.assertXMLEqual(xactual, xexpected) - for el in standalone: - self.assertXMLEqual(el, sf._match(el, metadata)[0]) + def test__xml_match(self): + sf = self.get_obj() + sf._include_element = Mock() + metadata = Mock() - @patch("Bcfg2.Server.Plugin.helpers.%s._match" % test_obj.__name__) - def test_Match(self, mock_match): + sf._include_element.side_effect = \ + lambda x, _: (x.tag not in sf._include_tests.keys() or + x.get("include") == "true") + + for test_data in [self._get_test_data(), + self._get_template_test_data()]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + + actual = copy.deepcopy(xdata) + for el in actual.getchildren(): + sf._xml_match(el, metadata) + expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib)) + expected.text = xdata.text + expected.extend(reduce(lambda x, y: x + y, + list(children.values()) + \ + list(subchildren.values()))) + expected.extend(standalone) + self.assertXMLEqual(actual, expected) + + def test_do_xmlmatch(self): sf = self.get_obj() + sf._xml_match = Mock() metadata = Mock() - (xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() - sf.entries.extend(copy.deepcopy(xdata).getchildren()) + for data_type, test_data in \ + [("", self._get_test_data()), + ("templated ", self._get_template_test_data())]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + sf.xdata = xdata + sf._xml_match.reset_mock() + + sf._do_xmlmatch(metadata) + actual = [] + for call in sf._xml_match.call_args_list: + actual.append(call[0][0]) + self.assertEqual(call[0][1], metadata) + expected = list(groups.values()) + standalone + # easiest way to compare the values is actually to make + # them into an XML document and let assertXMLEqual compare + # them + xactual = lxml.etree.Element("Container") + xactual.extend(actual) + xexpected = lxml.etree.Element("Container") + xexpected.extend(expected) + self.assertXMLEqual(xactual, xexpected, + "XMLMatch() calls were incorrect for " + "%stest data" % data_type) + + def test_match_ordering(self): + """ Match() returns elements in document order """ + sf = self.get_obj() + sf._match = Mock() def match_rv(el, _): - if el.tag not in ['Client', 'Group']: + if el.tag not in sf._include_tests.keys(): return [el] elif el.get("include") == "true": return el.getchildren() else: return [] - mock_match.side_effect = match_rv - actual = sf.Match(metadata) - expected = reduce(lambda x, y: x + y, - list(children.values()) + list(subgroups.values())) - self.assertEqual(len(actual), len(expected)) - # easiest way to compare the values is actually to make - # them into an XML document and let assertXMLEqual compare - # them - xactual = lxml.etree.Element("Container") - xactual.extend(actual) - xexpected = lxml.etree.Element("Container") - xexpected.extend(expected) - self.assertXMLEqual(xactual, xexpected) - - @patch("Bcfg2.Server.Plugin.helpers.%s._include_element" % - test_obj.__name__) - def test__xml_match(self, mock_include): - sf = self.get_obj() - metadata = Mock() + sf._match.side_effect = match_rv - (xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() + metadata = Mock() - mock_include.side_effect = \ - lambda x, _: (x.tag not in ['Client', 'Group'] or - x.get("include") == "true") + test_data = lxml.etree.Element("Test") + group = lxml.etree.SubElement(test_data, "Group", name="group", + include="true") + first = lxml.etree.SubElement(group, "Element", name="first") + second = lxml.etree.SubElement(test_data, "Element", name="second") - actual = copy.deepcopy(xdata) - for el in actual.getchildren(): - sf._xml_match(el, metadata) - expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib)) - expected.text = xdata.text - expected.extend(reduce(lambda x, y: x + y, - list(children.values()) + list(subchildren.values()))) - expected.extend(standalone) - self.assertXMLEqual(actual, expected) - - @patch("Bcfg2.Server.Plugin.helpers.%s._xml_match" % test_obj.__name__) - def test_XMLMatch(self, mock_xml_match): - sf = self.get_obj() - metadata = Mock() + # sanity check to ensure that first and second are in the + # correct document order + if test_data.xpath("//Element") != [first, second]: + skip("lxml.etree does not construct documents in a reliable order") - (sf.xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() + sf.data = lxml.etree.tostring(test_data) + sf.Index() + rv = sf._do_match(metadata) + self.assertEqual(len(rv), 2, + "Match() seems to be broken, cannot test ordering") + msg = "Match() does not return elements in document order:\n" + \ + "Expected: [%s, %s]\n" % (first, second) + \ + "Actual: %s" % rv + self.assertXMLEqual(rv[0], first, msg) + self.assertXMLEqual(rv[1], second, msg) - sf.XMLMatch(metadata) - actual = [] - for call in mock_xml_match.call_args_list: - actual.append(call[0][0]) - self.assertEqual(call[0][1], metadata) - expected = list(groups.values()) + standalone - # easiest way to compare the values is actually to make - # them into an XML document and let assertXMLEqual compare - # them - xactual = lxml.etree.Element("Container") - xactual.extend(actual) - xexpected = lxml.etree.Element("Container") - xexpected.extend(expected) - self.assertXMLEqual(xactual, xexpected) + # TODO: add tests to ensure that XMLMatch() returns elements + # in document order class TestINode(Bcfg2TestCase): @@ -1081,49 +1245,6 @@ class TestINode(Bcfg2TestCase): inode.predicate.assert_called_with(metadata, child) -class TestInfoNode(TestINode): - __test__ = True - test_obj = InfoNode - - def test_raw_predicates(self): - TestINode.test_raw_predicates(self) - metadata = Mock() - entry = lxml.etree.Element("Path", name="/tmp/foo", - realname="/tmp/bar") - - parent_predicate = lambda m, d: True - pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"), - dict(predicate=parent_predicate)) - self.assertTrue(pred(metadata, entry)) - pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"), - dict(predicate=parent_predicate)) - self.assertTrue(pred(metadata, entry)) - pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bogus"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - - pred = eval(self.test_obj.nraw['Path'] % dict(name="/tmp/foo"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bar"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"), - dict(predicate=parent_predicate)) - self.assertTrue(pred(metadata, entry)) - - parent_predicate = lambda m, d: False - pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - - class TestXMLSrc(TestXMLFileBacked): test_obj = XMLSrc @@ -1189,9 +1310,78 @@ class TestXMLSrc(TestXMLFileBacked): self.assertEqual(xsrc.cache[0], metadata) -class TestInfoXML(TestXMLSrc): +class TestInfoXML(TestStructFile): test_obj = InfoXML + def _get_test_data(self): + (xdata, groups, subgroups, children, subchildren, standalone) = \ + TestStructFile._get_test_data(self) + idx = max(groups.keys()) + 1 + groups[idx] = lxml.etree.SubElement( + xdata, "Path", name="path1", include="true") + children[idx] = [lxml.etree.SubElement(groups[idx], "Child", + name="pc1")] + subgroups[idx] = [lxml.etree.SubElement(groups[idx], "Group", + name="pg1", include="true"), + lxml.etree.SubElement(groups[idx], "Client", + name="pc1", include="false")] + subchildren[idx] = [lxml.etree.SubElement(subgroups[idx][0], + "SubChild", name="sc1")] + + idx += 1 + groups[idx] = lxml.etree.SubElement( + xdata, "Path", name="path2", include="false") + children[idx] = [] + subgroups[idx] = [] + subchildren[idx] = [] + + path2 = lxml.etree.SubElement(groups[0], "Path", name="path2", + include="true") + subgroups[0].append(path2) + subchildren[0].append(lxml.etree.SubElement(path2, "SubChild", + name="sc2")) + return xdata, groups, subgroups, children, subchildren, standalone + + def test_include_element(self): + TestStructFile.test_include_element(self) + + ix = self.get_obj() + metadata = Mock() + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + inc = lambda tag, **attrs: \ + ix._include_element(lxml.etree.Element(tag, **attrs), + metadata, entry) + + self.assertFalse(inc("Path", name="/etc/bar.conf")) + self.assertFalse(inc("Path", name="/etc/foo.conf", negate="true")) + self.assertFalse(inc("Path", name="/etc/foo.conf", negate="tRuE")) + self.assertTrue(inc("Path", name="/etc/foo.conf")) + self.assertTrue(inc("Path", name="/etc/foo.conf", negate="false")) + self.assertTrue(inc("Path", name="/etc/foo.conf", negate="faLSe")) + self.assertTrue(inc("Path", name="/etc/bar.conf", negate="true")) + self.assertTrue(inc("Path", name="/etc/bar.conf", negate="tRUe")) + + def test_BindEntry(self): + ix = self.get_obj() + entry = lxml.etree.Element("Path", name=self.path) + metadata = Mock() + + # test with bogus infoxml + ix.Match = Mock() + ix.Match.return_value = [] + self.assertRaises(PluginExecutionError, + ix.BindEntry, entry, metadata) + ix.Match.assert_called_with(metadata, entry) + + # test with valid infoxml + ix.Match.reset_mock() + ix.Match.return_value = [lxml.etree.Element("Info", + mode="0600", owner="root")] + ix.BindEntry(entry, metadata) + ix.Match.assert_called_with(metadata, entry) + self.assertItemsEqual(entry.attrib, + dict(name=self.path, mode="0600", owner="root")) + class TestXMLDirectoryBacked(TestDirectoryBacked): test_obj = XMLDirectoryBacked @@ -1241,32 +1431,17 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked): def test__matches(self): pd = self.get_obj() - self.assertTrue(pd._matches(lxml.etree.Element("Test", - name="/etc/foo.conf"), - Mock(), - {"/etc/foo.conf": pd.BindEntry, - "/etc/bar.conf": pd.BindEntry})) - self.assertFalse(pd._matches(lxml.etree.Element("Test", - name="/etc/baz.conf"), - Mock(), - {"/etc/foo.conf": pd.BindEntry, - "/etc/bar.conf": pd.BindEntry})) + entry = lxml.etree.Element("Test", name="/etc/foo.conf") + self.assertTrue(pd._matches(entry, Mock(), + lxml.etree.Element("Test", + name="/etc/foo.conf"))) + self.assertFalse(pd._matches(entry, Mock(), + lxml.etree.Element("Test", + name="/etc/baz.conf"))) def test_BindEntry(self): pd = self.get_obj() - pd.get_attrs = Mock(return_value=dict(test1="test1", test2="test2")) - entry = lxml.etree.Element("Path", name="/etc/foo.conf", test1="bogus") - metadata = Mock() - pd.BindEntry(entry, metadata) - pd.get_attrs.assert_called_with(entry, metadata) - self.assertItemsEqual(entry.attrib, - dict(name="/etc/foo.conf", - test1="test1", test2="test2")) - - def test_get_attrs(self): - pd = self.get_obj() - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - children = [lxml.etree.Element("Child")] + children = [lxml.etree.Element("Child", name="child")] metadata = Mock() pd.entries = dict() @@ -1274,58 +1449,59 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked): metadata.reset_mock() for src in pd.entries.values(): src.reset_mock() - src.cache = None # test with no matches - self.assertRaises(PluginExecutionError, - pd.get_attrs, entry, metadata) + self.assertRaises(PluginExecutionError, pd.BindEntry, Mock(), metadata) - def add_entry(name, data, prio=10): + def add_entry(name, data): path = os.path.join(pd.data, name) pd.entries[path] = Mock() - pd.entries[path].priority = prio - def do_Cache(metadata): - pd.entries[path].cache = (metadata, data) - pd.entries[path].Cache.side_effect = do_Cache - - add_entry('test1.xml', - dict(Path={'/etc/foo.conf': dict(attr="attr1", - __children__=children), - '/etc/bar.conf': dict()})) - add_entry('test2.xml', - dict(Path={'/etc/bar.conf': dict(__text__="text", - attr="attr1")}, - Package={'quux': dict(), - 'xyzzy': dict()}), - prio=20) - add_entry('test3.xml', - dict(Path={'/etc/baz.conf': dict()}, - Package={'xyzzy': dict()}), - prio=20) - - # test with exactly one match, __children__ + pd.entries[path].priority = data.get("priority") + pd.entries[path].XMLMatch.return_value = data + + test1 = lxml.etree.Element("Rules", priority="10") + path1 = lxml.etree.SubElement(test1, "Path", name="/etc/foo.conf", + attr="attr1") + path1.extend(children) + lxml.etree.SubElement(test1, "Path", name="/etc/bar.conf") + add_entry('test1.xml', test1) + + test2 = lxml.etree.Element("Rules", priority="20") + path2 = lxml.etree.SubElement(test2, "Path", name="/etc/bar.conf", + attr="attr1") + path2.text = "text" + lxml.etree.SubElement(test2, "Package", name="quux") + lxml.etree.SubElement(test2, "Package", name="xyzzy") + add_entry('test2.xml', test2) + + test3 = lxml.etree.Element("Rules", priority="20") + lxml.etree.SubElement(test3, "Path", name="/etc/baz.conf") + lxml.etree.SubElement(test3, "Package", name="xyzzy") + add_entry('test3.xml', test3) + + # test with exactly one match, children reset() - self.assertItemsEqual(pd.get_attrs(entry, metadata), - dict(attr="attr1")) + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + pd.BindEntry(entry, metadata) + self.assertXMLEqual(entry, path1) + self.assertIsNot(entry, path1) for src in pd.entries.values(): - src.Cache.assert_called_with(metadata) - self.assertEqual(len(entry.getchildren()), 1) - self.assertXMLEqual(entry.getchildren()[0], children[0]) + src.XMLMatch.assert_called_with(metadata) - # test with multiple matches with different priorities, __text__ + # test with multiple matches with different priorities, text reset() entry = lxml.etree.Element("Path", name="/etc/bar.conf") - self.assertItemsEqual(pd.get_attrs(entry, metadata), - dict(attr="attr1")) + pd.BindEntry(entry, metadata) + self.assertXMLEqual(entry, path2) + self.assertIsNot(entry, path2) for src in pd.entries.values(): - src.Cache.assert_called_with(metadata) - self.assertEqual(entry.text, "text") + src.XMLMatch.assert_called_with(metadata) # test with multiple matches with identical priorities reset() entry = lxml.etree.Element("Package", name="xyzzy") self.assertRaises(PluginExecutionError, - pd.get_attrs, entry, metadata) + pd.BindEntry, entry, metadata) class TestSpecificity(Bcfg2TestCase): @@ -1572,25 +1748,25 @@ class TestEntrySet(TestDebuggable): eset.reset_metadata.reset_mock() eset.entry_init.reset_mock() - for fname in ["info", "info.xml", ":info"]: - for evt in ["exists", "created", "changed"]: - reset() - event = Mock() - event.code2str.return_value = evt - event.filename = fname - eset.handle_event(event) - eset.update_metadata.assert_called_with(event) - self.assertFalse(eset.entry_init.called) - self.assertFalse(eset.reset_metadata.called) - + fname = "info.xml" + for evt in ["exists", "created", "changed"]: reset() event = Mock() - event.code2str.return_value = "deleted" + event.code2str.return_value = evt event.filename = fname eset.handle_event(event) - eset.reset_metadata.assert_called_with(event) + eset.update_metadata.assert_called_with(event) self.assertFalse(eset.entry_init.called) - self.assertFalse(eset.update_metadata.called) + self.assertFalse(eset.reset_metadata.called) + + reset() + event = Mock() + event.code2str.return_value = "deleted" + event.filename = fname + eset.handle_event(event) + eset.reset_metadata.assert_called_with(event) + self.assertFalse(eset.entry_init.called) + self.assertFalse(eset.update_metadata.called) for evt in ["exists", "created", "changed"]: reset() @@ -1749,26 +1925,8 @@ class TestEntrySet(TestDebuggable): self.assertFalse(mock_InfoXML.called) eset.infoxml.HandleEvent.assert_called_with(event) - for fname in [':info', 'info']: - event = Mock() - event.filename = fname - - idata = ["owner:owner", - "group: GROUP", - "mode: 775", - "important: true", - "bogus: line"] - mock_open.return_value.readlines.return_value = idata - eset.update_metadata(event) - expected = DEFAULT_FILE_METADATA.copy() - expected['owner'] = 'owner' - expected['group'] = 'GROUP' - expected['mode'] = '0775' - expected['important'] = 'true' - self.assertItemsEqual(eset.metadata, - expected) - - def test_reset_metadata(self): + @patch("Bcfg2.Server.Plugin.helpers.default_path_metadata") + def test_reset_metadata(self, mock_default_path_metadata): eset = self.get_obj() # test info.xml @@ -1778,29 +1936,22 @@ class TestEntrySet(TestDebuggable): eset.reset_metadata(event) self.assertIsNone(eset.infoxml) - for fname in [':info', 'info']: - event = Mock() - event.filename = fname - eset.metadata = Mock() - eset.reset_metadata(event) - self.assertItemsEqual(eset.metadata, DEFAULT_FILE_METADATA) - - @patch("Bcfg2.Server.Plugin.helpers.bind_info") - def test_bind_info_to_entry(self, mock_bind_info): - # There's a strange scoping issue in py3k that prevents this - # test from working as expected on sub-classes of EntrySet. - # No idea what's going on, but until I can figure it out we - # skip this test on subclasses - if inPy3k and self.test_obj != EntrySet: - return skip("Skipping this test for py3k scoping issues") - + def test_bind_info_to_entry(self): eset = self.get_obj() - entry = Mock() + eset.metadata = dict(owner="root", group="root") + entry = lxml.etree.Element("Path", name="/test") metadata = Mock() + eset.infoxml = None + eset.bind_info_to_entry(entry, metadata) + self.assertItemsEqual(entry.attrib, + dict(name="/test", owner="root", group="root")) + + entry = lxml.etree.Element("Path", name="/test") + eset.infoxml = Mock() eset.bind_info_to_entry(entry, metadata) - mock_bind_info.assert_called_with(entry, metadata, - infoxml=eset.infoxml, - default=eset.metadata) + self.assertItemsEqual(entry.attrib, + dict(name="/test", owner="root", group="root")) + eset.infoxml.BindEntry.assert_called_with(entry, metadata) def test_bind_entry(self): eset = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py index 1f5c4790b..ac0454f84 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py @@ -357,3 +357,21 @@ class TestVersion(TestPlugin): class TestClientRunHooks(Bcfg2TestCase): """ placeholder for future tests """ pass + + +class TestClientACLs(Bcfg2TestCase): + test_obj = ClientACLs + + def get_obj(self): + return self.test_obj() + + def test_check_acl_ip(self): + ca = self.get_obj() + self.assertIn(ca.check_acl_ip(Mock(), Mock()), + [True, False, None]) + + def test_check_acl_metadata(self): + ca = self.get_obj() + self.assertIn(ca.check_acl_metadata(Mock(), Mock()), + [True, False]) + diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py new file mode 100644 index 000000000..86a960701 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py @@ -0,0 +1,223 @@ +import os +import sys +import lxml.etree +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.ACL import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \ + TestClientACLs + + +class TestFunctions(Bcfg2TestCase): + def test_rmi_names_equal(self): + good_cases = [('*', 'foo'), + ('foo', 'foo'), + ('foo.*', 'foo.bar'), + ('*.*', 'foo.bar'), + ('foo.bar', 'foo.bar'), + ('*.bar', 'foo.bar'), + ('foo.*.bar', 'foo.baz.bar')] + bad_cases = [('foo', 'bar'), + ('*', 'foo.bar'), + ('*.*', 'foo'), + ('*.*', 'foo.bar.baz'), + ('foo.*', 'bar.foo'), + ('*.bar', 'bar.foo'), + ('foo.*', 'foobar')] + for first, second in good_cases: + self.assertTrue(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly False" % + (first, second)) + self.assertTrue(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly False" % + (second, first)) + for first, second in bad_cases: + self.assertFalse(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly True" % + (first, second)) + self.assertFalse(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly True" % + (second, first)) + + def test_ip_matches(self): + good_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + bad_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + for ip, entry in good_cases: + self.assertTrue(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly False" % + (ip, lxml.etree.tostring(entry))) + for ip, entry in bad_cases: + self.assertFalse(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly True" % + (ip, lxml.etree.tostring(entry))) + + +class TestIPACLFile(TestXMLFileBacked): + test_obj = IPACLFile + + @patch("Bcfg2.Server.Plugins.ACL.ip_matches") + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches): + af = self.get_obj() + ip = "10.0.0.8" + rmi = "ACL.test" + + def reset(): + mock_rmi_names_equal.reset_mock() + mock_ip_matches.reset_mock() + + # test default defer with no entries + af.entries = [] + self.assertIsNone(af.check_acl(ip, rmi)) + + # test explicit allow, deny, and defer + entries = dict(Allow=lxml.etree.Element("Allow", method=rmi), + Deny=lxml.etree.Element("Deny", method=rmi), + Defer=lxml.etree.Element("Defer", method=rmi)) + af.entries = list(entries.values()) + + def get_ip_matches(tag): + def ip_matches(ip, entry): + return entry.tag == tag + + return ip_matches + + mock_rmi_names_equal.return_value = True + + reset() + mock_ip_matches.side_effect = get_ip_matches("Allow") + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Allow']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Deny") + self.assertFalse(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Deny']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Defer") + self.assertIsNone(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Defer']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + # test matching RMI names + reset() + mock_ip_matches.side_effect = lambda i, e: True + mock_rmi_names_equal.side_effect = lambda a, b: a == b + rmi = "ACL.test2" + matching = lxml.etree.Element("Allow", method=rmi) + af.entries.append(matching) + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, matching) + self.assertTrue( + call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or + call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list) + + # test implicit allow for localhost, defer for others + reset() + mock_ip_matches.side_effect = lambda i, e: False + self.assertIsNone(af.check_acl(ip, rmi)) + + reset() + self.assertTrue(af.check_acl("127.0.0.1", rmi)) + + +class TestMetadataACLFile(TestStructFile): + test_obj = MetadataACLFile + + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal): + af = self.get_obj() + af.Match = Mock() + metadata = Mock() + mock_rmi_names_equal.side_effect = lambda a, b: a == b + + def reset(): + af.Match.reset_mock() + mock_rmi_names_equal.reset_mock() + + # test default allow + af.entries = [] + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + + # test explicit allow and deny + reset() + af.entries = [lxml.etree.Element("Allow", method='ACL.test'), + lxml.etree.Element("Deny", method='ACL.test2')] + af.Match.return_value = af.entries + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test', 'ACL.test'), + mock_rmi_names_equal.call_args_list) + + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test2')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test2', 'ACL.test2'), + mock_rmi_names_equal.call_args_list) + + # test default deny for non-localhost + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + # test default allow for localhost + reset() + metadata.hostname = 'localhost' + self.assertTrue(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + +class TestACL(TestPlugin, TestClientACLs): + test_obj = ACL + + def test_check_acl_ip(self): + acl = self.get_obj() + acl.ip_acls = Mock() + self.assertEqual(acl.check_acl_ip(("192.168.1.10", "12345"), + "ACL.test"), + acl.ip_acls.check_acl.return_value) + acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test") + + def test_check_acl_metadata(self): + acl = self.get_obj() + acl.metadata_acls = Mock() + metadata = Mock() + self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"), + acl.metadata_acls.check_acl.return_value) + acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py new file mode 100644 index 000000000..cfb379c40 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py @@ -0,0 +1,111 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Bundler import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestStructure, \ + TestXMLDirectoryBacked + + +class TestBundleFile(TestStructFile): + test_obj = BundleFile + path = os.path.join(datastore, "test", "test1.xml") + + def test_bundle_name(self): + cases = [("foo.xml", "foo"), + ("foo.bar.xml", "foo.bar"), + ("foo-bar-baz.xml", "foo-bar-baz"), + ("foo....xml", "foo..."), + ("foo.genshi", "foo")] + bf = self.get_obj() + for fname, bname in cases: + bf.name = fname + self.assertEqual(bf.bundle_name, bname) + + +class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked): + test_obj = Bundler + + def get_obj(self, core=None): + @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, + self.test_obj.__name__), + Mock()) + def inner(): + return TestPlugin.get_obj(self, core=core) + return inner() + + @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent") + def test_HandleEvent(self, mock_HandleEvent): + b = self.get_obj() + b.bundles = dict(foo=Mock(), bar=Mock()) + b.entries = {"foo.xml": BundleFile("foo.xml"), + "baz.xml": BundleFile("baz.xml")} + event = Mock() + b.HandleEvent(event) + mock_HandleEvent.assert_called_with(b, event) + self.assertItemsEqual(b.bundles, + dict(foo=b.entries['foo.xml'], + baz=b.entries['baz.xml'])) + + def test_BuildStructures(self): + b = self.get_obj() + b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(), + has_dep=Mock(), is_dep=Mock(), indep=Mock()) + expected = dict() + + b.bundles['error'].XMLMatch.side_effect = TemplateError(None) + + xinclude = lxml.etree.Element("Bundle") + lxml.etree.SubElement(lxml.etree.SubElement(xinclude, "Bundle"), + "Path", name="/test") + b.bundles['xinclude'].XMLMatch.return_value = xinclude + expected['xinclude'] = lxml.etree.Element("Bundle", name="xinclude") + lxml.etree.SubElement(expected['xinclude'], "Path", name="/test") + + has_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(has_dep, "Bundle", name="is_dep") + lxml.etree.SubElement(has_dep, "Package", name="foo") + b.bundles['has_dep'].XMLMatch.return_value = has_dep + expected['has_dep'] = lxml.etree.Element("Bundle", name="has_dep") + lxml.etree.SubElement(expected['has_dep'], "Package", name="foo") + + is_dep = lxml.etree.Element("Bundle") + lxml.etree.SubElement(is_dep, "Package", name="bar") + b.bundles['is_dep'].XMLMatch.return_value = is_dep + expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep") + lxml.etree.SubElement(expected['is_dep'], "Package", name="bar") + + indep = lxml.etree.Element("Bundle", independent="true") + lxml.etree.SubElement(indep, "Service", name="baz") + b.bundles['indep'].XMLMatch.return_value = indep + expected['indep'] = lxml.etree.Element("Independent", name="indep") + lxml.etree.SubElement(expected['indep'], "Service", name="baz") + + metadata = Mock() + metadata.bundles = ["error", "xinclude", "has_dep", "indep"] + + rv = b.BuildStructures(metadata) + self.assertEqual(len(rv), 4) + for bundle in rv: + name = bundle.get("name") + self.assertIsNotNone(name, + "Bundle %s was not built" % name) + self.assertIn(name, expected, + "Unexpected bundle %s was built" % name) + self.assertXMLEqual(bundle, expected[name], + "Bundle %s was not built correctly" % name) + b.bundles[name].XMLMatch.assert_called_with(metadata) + + b.bundles['error'].XMLMatch.assert_called_with(metadata) + self.assertFalse(b.bundles['skip'].XMLMatch.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py index d655a20cd..b77d52033 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py @@ -42,27 +42,25 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): def test_category(self): akg = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp + akg.setup = Mock() + akg.setup.cfp.has_section.return_value = False + akg.setup.cfp.has_option.return_value = False self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + akg.setup.reset_mock() + akg.setup.cfp.has_section.return_value = True self.assertIsNone(akg.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(akg.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + + akg.setup.reset_mock() + akg.setup.cfp.has_option.return_value = True + self.assertEqual(akg.category, akg.setup.cfp.get.return_value) + akg.setup.cfp.has_section.assert_called_with("sshkeys") + akg.setup.cfp.has_option.assert_called_with("sshkeys", "category") + akg.setup.cfp.get.assert_called_with("sshkeys", "category") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata") @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py index fc5d5e53d..31227329c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py @@ -31,11 +31,11 @@ if HAS_CHEETAH or can_skip: ccg.data = "data" entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock() + ccg.setup = MagicMock() self.assertEqual(ccg.get_data(entry, metadata), mock_Template.return_value.respond.return_value) - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo") + ccg.setup.__getitem__.assert_called_with("repo") mock_Template.assert_called_with("data".decode(ccg.encoding), compilerSettings=ccg.settings) tmpl = mock_Template.return_value @@ -44,5 +44,4 @@ if HAS_CHEETAH or can_skip: self.assertEqual(tmpl.name, entry.get("name")) self.assertEqual(tmpl.path, entry.get("name")) self.assertEqual(tmpl.source_path, ccg.name) - self.assertEqual(tmpl.repo, - Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value) + self.assertEqual(tmpl.repo, ccg.setup.__getitem__.return_value) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py index 71a7410da..f8e9b1990 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py @@ -27,15 +27,12 @@ if can_skip or HAS_CRYPTO: pass @patchIf(HAS_CRYPTO, - "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm") - @patchIf(HAS_CRYPTO, "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") - def test_handle_event(self, mock_decrypt, mock_get_algorithm): + def test_handle_event(self, mock_decrypt): @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") def inner(mock_handle_event): def reset(): mock_decrypt.reset_mock() - mock_get_algorithm.reset_mock() mock_handle_event.reset_mock() def get_event_data(obj, event): @@ -47,9 +44,7 @@ if can_skip or HAS_CRYPTO: ceg = self.get_obj() ceg.handle_event(event) mock_handle_event.assert_called_with(ceg, event) - mock_decrypt.assert_called_with("encrypted", - setup=SETUP, - algorithm=mock_get_algorithm.return_value) + mock_decrypt.assert_called_with("encrypted") self.assertEqual(ceg.data, "plaintext") reset() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py index b447a9bb8..330779c99 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py @@ -14,20 +14,14 @@ while path != "/": path = os.path.dirname(path) from common import * -try: - from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ - TestCfgGenshiGenerator - HAS_GENSHI = True -except ImportError: - TestCfgGenshiGenerator = object - HAS_GENSHI = False +from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ + TestCfgGenshiGenerator -if can_skip or (HAS_CRYPTO and HAS_GENSHI): +if can_skip or HAS_CRYPTO: class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator): test_obj = CfgEncryptedGenshiGenerator @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") def setUp(self): pass diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py index 0f369113b..7ceedb7c2 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py @@ -21,35 +21,32 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier class TestCfgExternalCommandVerifier(TestCfgVerifier): test_obj = CfgExternalCommandVerifier - @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen") - def test_verify_entry(self, mock_Popen): - proc = Mock() - mock_Popen.return_value = proc - proc.wait.return_value = 0 - proc.communicate.return_value = ("stdout", "stderr") + def test_verify_entry(self): entry = lxml.etree.Element("Path", name="/test.txt") metadata = Mock() ecv = self.get_obj() ecv.cmd = ["/bin/bash", "-x", "foo"] + ecv.exc = Mock() + ecv.exc.run.return_value = Mock() + ecv.exc.run.return_value.success = True + ecv.verify_entry(entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") - mock_Popen.reset_mock() - proc.wait.return_value = 13 + ecv.exc.reset_mock() + ecv.exc.run.return_value.success = False self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) - proc.communicate.assert_called_with(input="data") - proc.wait.assert_called_with() + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") + + ecv.exc.reset_mock() - mock_Popen.reset_mock() - mock_Popen.side_effect = OSError + ecv.exc.reset_mock() + ecv.exc.run.side_effect = OSError self.assertRaises(CfgVerificationError, ecv.verify_entry, entry, metadata, "data") - self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) + ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data") @patch("os.access") def test_handle_event(self, mock_access): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py index 2e8b7bfa5..b73670fb7 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -19,111 +19,97 @@ from common import * from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator -if can_skip or HAS_GENSHI: - class TestCfgGenshiGenerator(TestCfgGenerator): - test_obj = CfgGenshiGenerator - - @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") - def setUp(self): - pass - - def test_removecomment(self): - data = [(None, "test", 1), - (None, "test2", 2)] - stream = [(genshi.core.COMMENT, "test", 0), - data[0], - (genshi.core.COMMENT, "test3", 0), - data[1]] - self.assertItemsEqual(list(removecomment(stream)), data) - - def test__init(self): - TestCfgGenerator.test__init(self) - cgg = self.get_obj() - self.assertIsInstance(cgg.loader, cgg.__loader_cls__) - - def test_get_data(self): - cgg = self.get_obj() - cgg._handle_genshi_exception = Mock() - cgg.template = Mock() - fltr = Mock() - cgg.template.generate.return_value = fltr - stream = Mock() - fltr.filter.return_value = stream - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() - - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock() - - def reset(): - cgg.template.reset_mock() - cgg._handle_genshi_exception.reset_mock() - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock() - - template_vars = dict( - name=entry.get("name"), - metadata=metadata, - path=cgg.name, - source_path=cgg.name, - repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value) - - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - def render(fmt, **kwargs): - stream.render.side_effect = None - raise TypeError - stream.render.side_effect = render - self.assertEqual(cgg.get_data(entry, metadata), - stream.render.return_value) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - self.assertEqual(stream.render.call_args_list, - [call("text", encoding=cgg.encoding, - strip_whitespace=False), - call("text", encoding=cgg.encoding)]) - - reset() - stream.render.side_effect = UndefinedError("test") - self.assertRaises(UndefinedError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - - reset() - stream.render.side_effect = ValueError - cgg._handle_genshi_exception.side_effect = ValueError - self.assertRaises(ValueError, - cgg.get_data, entry, metadata) - cgg.template.generate.assert_called_with(**template_vars) - Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo") - fltr.filter.assert_called_with(removecomment) - stream.render.assert_called_with("text", encoding=cgg.encoding, - strip_whitespace=False) - self.assertTrue(cgg._handle_genshi_exception.called) - - def test_handle_event(self): - cgg = self.get_obj() - cgg.loader = Mock() - event = Mock() - cgg.handle_event(event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) - - cgg.loader.reset_mock() - cgg.loader.load.side_effect = OSError - self.assertRaises(PluginExecutionError, - cgg.handle_event, event) - cgg.loader.load.assert_called_with(cgg.name, - cls=NewTextTemplate, - encoding=cgg.encoding) +class TestCfgGenshiGenerator(TestCfgGenerator): + test_obj = CfgGenshiGenerator + + def test__init(self): + TestCfgGenerator.test__init(self) + cgg = self.get_obj() + self.assertIsInstance(cgg.loader, cgg.__loader_cls__) + + def test_get_data(self): + cgg = self.get_obj() + cgg._handle_genshi_exception = Mock() + cgg.setup = MagicMock() + cgg.template = Mock() + fltr = Mock() + cgg.template.generate.return_value = fltr + stream = Mock() + fltr.filter.return_value = stream + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + + def reset(): + cgg.template.reset_mock() + cgg._handle_genshi_exception.reset_mock() + cgg.setup.reset_mock() + + template_vars = dict( + name=entry.get("name"), + metadata=metadata, + path=cgg.name, + source_path=cgg.name, + repo=cgg.setup.__getitem__.return_value) + + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + def render(fmt, **kwargs): + stream.render.side_effect = None + raise TypeError + stream.render.side_effect = render + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + self.assertEqual(stream.render.call_args_list, + [call("text", encoding=cgg.encoding, + strip_whitespace=False), + call("text", encoding=cgg.encoding)]) + + reset() + stream.render.side_effect = UndefinedError("test") + self.assertRaises(UndefinedError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + stream.render.side_effect = ValueError + cgg._handle_genshi_exception.side_effect = ValueError + self.assertRaises(ValueError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(**template_vars) + cgg.setup.__getitem__.assert_called_with("repo") + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + self.assertTrue(cgg._handle_genshi_exception.called) + + def test_handle_event(self): + cgg = self.get_obj() + cgg.loader = Mock() + event = Mock() + cgg.handle_event(event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) + + cgg.loader.reset_mock() + cgg.loader.load.side_effect = OSError + self.assertRaises(PluginExecutionError, + cgg.handle_event, event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py index 839e9c3b8..7e7cb5e3c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py @@ -27,47 +27,16 @@ class TestCfgInfoXML(TestCfgInfo): self.assertIsInstance(ci.infoxml, InfoXML) def test_bind_info_to_entry(self): - entry = lxml.etree.Element("Path", name="/test.txt") - metadata = Mock() ci = self.get_obj() ci.infoxml = Mock() - ci._set_info = Mock() - - self.assertRaises(PluginExecutionError, - ci.bind_info_to_entry, entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, dict(), - entry=entry) - self.assertFalse(ci._set_info.called) - - ci.infoxml.reset_mock() - ci._set_info.reset_mock() - mdata_value = Mock() - def set_mdata(metadata, mdata, entry=None): - mdata['Info'] = {None: mdata_value} + entry = Mock() + metadata = Mock() - ci.infoxml.pnode.Match.side_effect = set_mdata ci.bind_info_to_entry(entry, metadata) - ci.infoxml.pnode.Match.assert_called_with(metadata, - dict(Info={None: mdata_value}), - entry=entry) - ci._set_info.assert_called_with(entry, mdata_value) + ci.infoxml.BindEntry.assert_called_with(entry, metadata) def test_handle_event(self): ci = self.get_obj() ci.infoxml = Mock() ci.handle_event(Mock) ci.infoxml.HandleEvent.assert_called_with() - - def test__set_info(self): - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info") - def inner(mock_set_info): - ci = self.get_obj() - entry = Mock() - info = {"foo": "foo", - "__children__": ["one", "two"]} - ci._set_info(entry, info) - self.assertItemsEqual(entry.append.call_args_list, - [call(c) for c in info['__children__']]) - - inner() - TestCfgInfo.test__set_info(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py index dc4b11241..48d5cdbfe 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -7,7 +7,7 @@ from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import * from Bcfg2.Server.Plugin import PluginExecutionError import Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator try: - from Bcfg2.Encryption import EVPError + from Bcfg2.Server.Encryption import EVPError HAS_CRYPTO = True except: HAS_CRYPTO = False @@ -44,77 +44,76 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): def test_category(self): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.category) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") - cfp.reset_mock() - cfp.has_option.return_value = True - self.assertEqual(pkc.category, cfp.get.return_value) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "category") - cfp.get.assert_called_with("sshkeys", "category") + pkc.setup.reset_mock() + pkc.setup.cfp.has_option.return_value = True + self.assertEqual(pkc.category, pkc.setup.cfp.get.return_value) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category") + pkc.setup.cfp.get.assert_called_with("sshkeys", "category") @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.get_passphrases") def test_passphrase(self, mock_get_passphrases): pkc = self.get_obj() - cfp = Mock() - cfp.has_section.return_value = False - cfp.has_option.return_value = False - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + pkc.setup = Mock() + pkc.setup.cfp = Mock() + pkc.setup.cfp.has_section.return_value = False + pkc.setup.cfp.has_option.return_value = False self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") - cfp.reset_mock() - cfp.has_section.return_value = True + pkc.setup.reset_mock() + pkc.setup.cfp.has_section.return_value = True self.assertIsNone(pkc.passphrase) - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") - cfp.reset_mock() - cfp.get.return_value = "test" + pkc.setup.reset_mock() + pkc.setup.cfp.get.return_value = "test" mock_get_passphrases.return_value = dict(test="foo", test2="bar") - cfp.has_option.return_value = True + pkc.setup.cfp.has_option.return_value = True self.assertEqual(pkc.passphrase, "foo") - cfp.has_section.assert_called_with("sshkeys") - cfp.has_option.assert_called_with("sshkeys", "passphrase") - cfp.get.assert_called_with("sshkeys", "passphrase") - mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) + pkc.setup.cfp.has_section.assert_called_with("sshkeys") + pkc.setup.cfp.has_option.assert_called_with("sshkeys", + "passphrase") + pkc.setup.cfp.get.assert_called_with("sshkeys", "passphrase") + mock_get_passphrases.assert_called_with() @patch("shutil.rmtree") @patch("tempfile.mkdtemp") - @patch("subprocess.Popen") - def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree): + def test__gen_keypair(self, mock_mkdtemp, mock_rmtree): pkc = self.get_obj() + pkc.cmd = Mock() pkc.XMLMatch = Mock() mock_mkdtemp.return_value = datastore metadata = Mock() - proc = Mock() - proc.wait.return_value = 0 - proc.communicate.return_value = MagicMock() - mock_Popen.return_value = proc + exc = Mock() + exc.success = True + pkc.cmd.run.return_value = exc spec = lxml.etree.Element("PrivateKey") pkc.XMLMatch.return_value = spec def reset(): pkc.XMLMatch.reset_mock() - mock_Popen.reset_mock() + pkc.cmd.reset_mock() mock_mkdtemp.reset_mock() mock_rmtree.reset_mock() @@ -122,10 +121,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "rsa", "-N", ""]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "rsa", "-N", ""]) reset() lxml.etree.SubElement(spec, "Params", bits="768", type="dsa") @@ -136,13 +134,12 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): os.path.join(datastore, "privkey")) pkc.XMLMatch.assert_called_with(metadata) mock_mkdtemp.assert_called_with() - self.assertItemsEqual(mock_Popen.call_args[0][0], - ["ssh-keygen", "-f", - os.path.join(datastore, "privkey"), - "-t", "dsa", "-b", "768", "-N", "foo"]) + pkc.cmd.run.assert_called_with(["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "dsa", "-b", "768", "-N", "foo"]) reset() - proc.wait.return_value = 1 + pkc.cmd.run.return_value.success = False self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata) mock_rmtree.assert_called_with(datastore) @@ -281,9 +278,8 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): if HAS_CRYPTO: @patch(passphrase, "foo") - @patch("Bcfg2.Encryption.ssl_encrypt") - @patch("Bcfg2.Encryption.get_algorithm") - def inner2(mock_get_algorithm, mock_ssl_encrypt): + @patch("Bcfg2.Server.Encryption.ssl_encrypt") + def inner2(mock_ssl_encrypt): reset() mock_ssl_encrypt.return_value = "encryptedprivatekey" Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True @@ -302,136 +298,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): "ssh-rsa publickey pubkey.filename\n", group="foo") pkc.write_data.assert_called_with("encryptedprivatekey", group="foo", ext=".crypt") - mock_ssl_encrypt.assert_called_with( - "privatekey", "foo", - algorithm=mock_get_algorithm.return_value) + mock_ssl_encrypt.assert_called_with("privatekey", "foo") mock_rmtree.assert_called_with(datastore) inner2() - - def test_Index(self): - has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False - TestStructFile.test_Index(self) - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict" - - pkc = self.get_obj() - pkc._decrypt = Mock() - pkc._decrypt.return_value = 'plaintext' - pkc.data = ''' -<PrivateKey> - <Group name="test"> - <Passphrase encrypted="foo">crypted</Passphrase> - </Group> - <Group name="test" negate="true"> - <Passphrase>plain</Passphrase> - </Group> -</PrivateKey>''' - - # test successful decryption - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - for el in pkc.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pkc._decrypt.return_value) - - # test failed decryption, strict - pkc._decrypt.reset_mock() - pkc._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pkc.Index) - - # test failed decryption, lax - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax" - pkc._decrypt.reset_mock() - pkc.Index() - self.assertItemsEqual( - pkc._decrypt.call_args_list, - [call(el) - for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): - pkc = self.get_obj() - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pkc._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index 2e758774e..ab383e4f3 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -159,21 +159,6 @@ class TestCfgInfo(TestCfgBaseFileMatcher): self.assertRaises(NotImplementedError, ci.bind_info_to_entry, Mock(), Mock()) - def test__set_info(self): - ci = self.get_obj() - entry = Mock() - entry.attrib = dict() - - info = {"foo": "foo", - "_bar": "bar", - "bar:baz=quux": "quux", - "baz__": "baz", - "__quux": "quux"} - ci._set_info(entry, info) - self.assertItemsEqual(entry.attrib, - dict([(k, v) for k, v in info.items() - if not k.startswith("__")])) - class TestCfgVerifier(TestCfgBaseFileMatcher): test_obj = CfgVerifier @@ -259,29 +244,26 @@ class TestCfgCreator(TestCfgBaseFileMatcher): class TestCfgDefaultInfo(TestCfgInfo): test_obj = CfgDefaultInfo - def get_obj(self, defaults=None): - if defaults is None: - defaults = dict() - return self.test_obj(defaults) + def get_obj(self, *_): + return self.test_obj() - @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__") - def test__init(self, mock__init): - defaults = Mock() - cdi = self.get_obj(defaults=defaults) - mock__init.assert_called_with(cdi, '') - self.assertEqual(defaults, cdi.defaults) + def test__init(self): + pass def test_handle_event(self): # this CfgInfo handler doesn't handle any events -- it's not # file-driven, but based on the built-in defaults pass - def test_bind_info_to_entry(self): + @patch("Bcfg2.Server.Plugin.default_path_metadata") + def test_bind_info_to_entry(self, mock_default_path_metadata): cdi = self.get_obj() - cdi._set_info = Mock() - entry = Mock() + entry = lxml.etree.Element("Test", name="test") + mock_default_path_metadata.return_value = \ + dict(owner="root", mode="0600") cdi.bind_info_to_entry(entry, Mock()) - cdi._set_info.assert_called_with(entry, cdi.defaults) + self.assertItemsEqual(entry.attrib, + dict(owner="root", mode="0600", name="test")) class TestCfgEntrySet(TestEntrySet): @@ -439,8 +421,6 @@ class TestCfgEntrySet(TestEntrySet): @patch("Bcfg2.Server.Plugins.Cfg.u_str") @patch("Bcfg2.Server.Plugins.Cfg.b64encode") def test_bind_entry(self, mock_b64encode, mock_u_str): - Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False) - mock_u_str.side_effect = lambda x: x eset = self.get_obj() @@ -448,6 +428,7 @@ class TestCfgEntrySet(TestEntrySet): eset._generate_data = Mock() eset.get_handlers = Mock() eset._validate_data = Mock() + eset.setup = dict(validate=False) def reset(): mock_b64encode.reset_mock() @@ -523,7 +504,7 @@ class TestCfgEntrySet(TestEntrySet): # test successful validation entry = reset() - Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True + eset.setup['validate'] = True bound = eset.bind_entry(entry, metadata) eset.bind_info_to_entry.assert_called_with(entry, metadata) eset._generate_data.assert_called_with(entry, metadata) @@ -600,24 +581,24 @@ class TestCfgEntrySet(TestEntrySet): if entry.specific is not None: self.assertFalse(entry.specific.matches.called) - def test_bind_info_to_entry(self): - default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO + @patch("Bcfg2.Server.Plugins.Cfg.CfgDefaultInfo") + def test_bind_info_to_entry(self, mock_DefaultInfo): eset = self.get_obj() eset.get_handlers = Mock() eset.get_handlers.return_value = [] - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock() metadata = Mock() def reset(): eset.get_handlers.reset_mock() - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock() + mock_DefaultInfo.reset_mock() return lxml.etree.Element("Path", name="/test.txt") # test with no info handlers entry = reset() eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) self.assertEqual(entry.get("type"), "file") # test with one info handler @@ -626,7 +607,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = [handler] eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) handler.bind_info_to_entry.assert_called_with(entry, metadata) self.assertEqual(entry.get("type"), "file") @@ -636,7 +618,8 @@ class TestCfgEntrySet(TestEntrySet): eset.get_handlers.return_value = handlers eset.bind_info_to_entry(entry, metadata) eset.get_handlers.assert_called_with(metadata, CfgInfo) - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with( + entry, metadata) # we don't care which handler gets called as long as exactly # one of them does called = 0 @@ -647,8 +630,6 @@ class TestCfgEntrySet(TestEntrySet): self.assertEqual(called, 1) self.assertEqual(entry.get("type"), "file") - Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info - def test_create_data(self): eset = self.get_obj() eset.best_matching = Mock() @@ -753,31 +734,39 @@ class TestCfg(TestGroupSpool, TestPullTarget): def get_obj(self, core=None): if core is None: core = Mock() - core.setup = MagicMock() return TestGroupSpool.get_obj(self, core=core) + @patch("Bcfg2.Options.get_option_parser") @patch("Bcfg2.Server.Plugin.GroupSpool.__init__") @patch("Bcfg2.Server.Plugin.PullTarget.__init__") - def test__init(self, mock_pulltarget_init, mock_groupspool_init): + def test__init(self, mock_pulltarget_init, mock_groupspool_init, + mock_get_option_parser): + setup = MagicMock() + setup.__contains__.return_value = False + mock_get_option_parser.return_value = setup + + def reset(): + core.reset_mock() + setup.reset_mock() + mock_pulltarget_init.reset_mock() + mock_groupspool_init.reset_mock() + core = Mock() - core.setup = MagicMock() cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - core.setup.add_option.assert_called_with("validate", - Bcfg2.Options.CFG_VALIDATION) - core.setup.reparse.assert_called_with() - - core.reset_mock() - core.setup.reset_mock() - mock_pulltarget_init.reset_mock() - mock_groupspool_init.reset_mock() - core.setup.__contains__.return_value = True + setup.add_option.assert_called_with( + "validate", + Bcfg2.Options.CFG_VALIDATION) + mock_get_option_parser.return_value.reparse.assert_called_with() + + reset() + setup.__contains__.return_value = True cfg = self.test_obj(core, datastore) mock_pulltarget_init.assert_called_with(cfg) mock_groupspool_init.assert_called_with(cfg, core, datastore) - self.assertFalse(core.setup.add_option.called) - self.assertFalse(core.setup.reparse.called) + self.assertFalse(setup.add_option.called) + self.assertFalse(setup.reparse.called) def test_has_generator(self): cfg = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py new file mode 100644 index 000000000..537ceb4ff --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py @@ -0,0 +1,60 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Decisions import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestStructFile, TestPlugin, TestDecision + + +class TestDecisionFile(TestStructFile): + test_obj = DecisionFile + + def test_get_decisions(self): + df = self.get_obj() + metadata = Mock() + + df.xdata = None + self.assertItemsEqual(df.get_decisions(metadata), []) + + df.xdata = lxml.etree.Element("Decisions") + df.XMLMatch = Mock() + df.XMLMatch.return_value = lxml.etree.Element("Decisions") + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Service", name='*') + lxml.etree.SubElement(df.XMLMatch.return_value, + "Decision", type="Path", + name='/etc/apt/apt.conf') + + self.assertItemsEqual(df.get_decisions(metadata), + [("Service", '*'), + ("Path", '/etc/apt/apt.conf')]) + df.XMLMatch.assert_called_with(metadata) + + +class TestDecisions(TestPlugin, TestDecision): + test_obj = Decisions + + def test_GetDecisions(self): + d = self.get_obj() + d.whitelist = Mock() + d.blacklist = Mock() + metadata = Mock() + + self.assertEqual(d.GetDecisions(metadata, "whitelist"), + d.whitelist.get_decision.return_value) + d.whitelist.get_decision.assert_called_with(metadata) + + self.assertEqual(d.GetDecisions(metadata, "blacklist"), + d.blacklist.get_decision.return_value) + d.blacklist.get_decision.assert_called_with(metadata) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index 742946c42..a9e9d9701 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -215,9 +215,9 @@ class TestXMLMetadataConfig(TestXMLFileBacked): return XMLMetadataConfig(self.metadata, watch_clients, basefile) return inner() + @patch("Bcfg2.Server.FileMonitor.get_fam", Mock()) def test__init(self): xmc = self.get_obj() - self.assertEqual(self.metadata.core.fam, xmc.fam) self.assertFalse(xmc.fam.AddMonitor.called) def test_xdata(self): @@ -262,20 +262,21 @@ class TestXMLMetadataConfig(TestXMLFileBacked): self.assertEqual(config.base_xdata, "<test/>") def test_add_monitor(self): - core = MagicMock() - config = self.get_obj(core=core) + config = self.get_obj() + config.fam = Mock() fname = "test.xml" fpath = os.path.join(self.metadata.data, fname) config.extras = [] config.add_monitor(fpath) - self.assertFalse(core.fam.AddMonitor.called) + self.assertFalse(config.fam.AddMonitor.called) self.assertEqual(config.extras, [fpath]) - config = self.get_obj(core=core, watch_clients=True) + config = self.get_obj(watch_clients=True) + config.fam = Mock() config.add_monitor(fpath) - core.fam.AddMonitor.assert_called_with(fpath, config.metadata) + config.fam.AddMonitor.assert_called_with(fpath, config.metadata) self.assertItemsEqual(config.extras, [fpath]) def test_Index(self): @@ -499,7 +500,8 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): client_name = "%s%s" % (prefix, i) return client_name - def test__init(self): + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): # test with watch_clients=False core = MagicMock() metadata = self.get_obj(core=core) @@ -512,18 +514,19 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): self.assertEqual(metadata.states, dict()) # test with watch_clients=True - core.fam = MagicMock() metadata = self.get_obj(core=core, watch_clients=True) self.assertEqual(len(metadata.states), 2) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - core.fam.reset_mock() - core.fam.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) + + mock_get_fam.reset_mock() + fam = Mock() + fam.AddMonitor = Mock(side_effect=IOError) + mock_get_fam.return_value = fam self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, self.get_obj, core=core, watch_clients=True) @@ -586,6 +589,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_add_group(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -618,6 +622,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_update_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -635,6 +640,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_remove_group(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -650,6 +656,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_add_bundle(self): metadata = self.get_obj() metadata.groups_xml.write = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -673,6 +680,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_remove_bundle(self): metadata = self.get_obj() metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.load_xml = Mock() metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) @@ -688,6 +696,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_add_client(self): metadata = self.get_obj() metadata.clients_xml.write = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree() metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -722,6 +731,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): def test_update_client(self): metadata = self.get_obj() metadata.clients_xml.write_xml = Mock() + metadata.clients_xml.load_xml = Mock() metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree()) metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) @@ -1269,25 +1279,24 @@ class TestMetadataBase(TestMetadata): return client_name @patch('os.path.exists') - def test__init(self, mock_exists): - core = MagicMock() - core.fam = Mock() + @patch('Bcfg2.Server.FileMonitor.get_fam') + def test__init(self, mock_get_fam, mock_exists): mock_exists.return_value = False - metadata = self.get_obj(core=core, watch_clients=True) + metadata = self.get_obj(watch_clients=True) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) - core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, - "groups.xml"), - metadata) + mock_get_fam.return_value.AddMonitor.assert_called_with( + os.path.join(metadata.data, "groups.xml"), + metadata) mock_exists.return_value = True - core.fam.reset_mock() - metadata = self.get_obj(core=core, watch_clients=True) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) + mock_get_fam.reset_mock() + metadata = self.get_obj(watch_clients=True) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "groups.xml"), + metadata) + mock_get_fam.return_value.AddMonitor.assert_any_call( + os.path.join(metadata.data, "clients.xml"), + metadata) def test_add_group(self): pass @@ -1530,7 +1539,8 @@ class TestMetadata_ClientsXML(TestMetadataBase): def load_clients_data(self, metadata=None, xdata=None): if metadata is None: metadata = self.get_obj() - metadata.core.fam = Mock() + fam = Bcfg2.Server.FileMonitor._FAM + Bcfg2.Server.FileMonitor._FAM = MagicMock() @patchIf(not isinstance(lxml.etree.Element, Mock), "lxml.etree.Element", Mock()) def inner(): @@ -1538,5 +1548,7 @@ class TestMetadata_ClientsXML(TestMetadataBase): inner() metadata = TestMetadata.load_clients_data(self, metadata=metadata, xdata=xdata) - return TestMetadataBase.load_clients_data(self, metadata=metadata, - xdata=xdata) + rv = TestMetadataBase.load_clients_data(self, metadata=metadata, + xdata=xdata) + Bcfg2.Server.FileMonitor._FAM = fam + return rv diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index 0794db62e..30b08ef2f 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -134,22 +134,20 @@ class TestProbeSet(TestEntrySet): ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"] bogus_names = ["test.py"] - def get_obj(self, path=datastore, fam=None, encoding=None, + def get_obj(self, path=datastore, encoding=None, plugin_name="Probes", basename=None): # get_obj() accepts the basename argument, accepted by the # parent get_obj() method, and just throws it away, since # ProbeSet uses a regex for the "basename" - if fam is None: - fam = Mock() - rv = self.test_obj(path, fam, encoding, plugin_name) + rv = self.test_obj(path, encoding, plugin_name) rv.entry_type = MagicMock() return rv - def test__init(self): - fam = Mock() - ps = self.get_obj(fam=fam) + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): + ps = self.get_obj() self.assertEqual(ps.plugin_name, "Probes") - fam.AddMonitor.assert_called_with(datastore, ps) + mock_get_fam.return_value.AddMonitor.assert_called_with(datastore, ps) TestEntrySet.test__init(self) def test_HandleEvent(self): @@ -300,9 +298,6 @@ text def test__init(self): mock_load_data = Mock() probes = self.get_probes_object(load_data=mock_load_data) - probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore, - probes.name), - probes.probes) mock_load_data.assert_any_call() self.assertEqual(probes.probedata, ClientProbeDataSet()) self.assertEqual(probes.cgroups, dict()) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py index 896f5861e..92dc85fb1 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -19,12 +19,6 @@ from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \ TestPlugin, TestDirectoryBacked try: - from Bcfg2.Encryption import EVPError - HAS_CRYPTO = True -except: - HAS_CRYPTO = False - -try: import json JSON = "json" except ImportError: @@ -41,10 +35,10 @@ class TestPropertyFile(Bcfg2TestCase): return self.test_obj(path) def test_write(self): - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() pf = self.get_obj() pf.validate_data = Mock() pf._write = Mock() + pf.setup = Mock() xstr = u("<Properties/>\n") pf.xdata = lxml.etree.XML(xstr) @@ -52,20 +46,20 @@ class TestPropertyFile(Bcfg2TestCase): def reset(): pf.validate_data.reset_mock() pf._write.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() # test writes disabled - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertRaises(PluginExecutionError, pf.write) self.assertFalse(pf.validate_data.called) self.assertFalse(pf._write.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", + pf.setup.cfp.getboolean.assert_called_with("properties", "writes_enabled", default=True) # test successful write reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.write(), pf._write.return_value) pf.validate_data.assert_called_with() pf._write.assert_called_with() @@ -243,178 +237,61 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile): mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) - def test_Index(self): - TestStructFile.test_Index(self) - - pf = self.get_obj() - pf.xdata = lxml.etree.Element("Properties") - lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo") - pf.data = lxml.etree.tostring(pf.xdata) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - def test_Index_crypto(self): - pf = self.get_obj() - pf._decrypt = Mock() - pf._decrypt.return_value = 'plaintext' - pf.data = ''' -<Properties decrypt="strict"> - <Crypted encrypted="foo"> - crypted - <Plain foo="bar">plain</Plain> - </Crypted> - <Crypted encrypted="bar">crypted</Crypted> - <Plain bar="baz">plain</Plain> - <Plain> - <Crypted encrypted="foo">crypted</Crypted> - </Plain> -</Properties>''' - - # test successful decryption - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - for el in pf.xdata.xpath("//Crypted"): - self.assertEqual(el.text, pf._decrypt.return_value) - - # test failed decryption, strict - pf._decrypt.reset_mock() - pf._decrypt.side_effect = EVPError - self.assertRaises(PluginExecutionError, pf.Index) - - # test failed decryption, lax - pf.data = pf.data.replace("strict", "lax") - pf._decrypt.reset_mock() - pf.Index() - self.assertItemsEqual(pf._decrypt.call_args_list, - [call(el) for el in pf.xdata.xpath("//Crypted")]) - - @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases") - @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt") - def test_decrypt(self, mock_bruteforce, mock_get_passphrases, - mock_get_algorithm, mock_ssl): - pf = self.get_obj() - Bcfg2.Server.Plugins.Properties.SETUP = MagicMock() - - def reset(): - mock_bruteforce.reset_mock() - mock_get_algorithm.reset_mock() - mock_get_passphrases.reset_mock() - mock_ssl.reset_mock() - - # test element without text contents - self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) - self.assertFalse(mock_bruteforce.called) - self.assertFalse(mock_get_passphrases.called) - self.assertFalse(mock_ssl.called) - - # test element with a passphrase in the config file - reset() - el = lxml.etree.Element("Test", encrypted="foo") - el.text = "crypted" - mock_get_passphrases.return_value = dict(foo="foopass", - bar="barpass") - mock_get_algorithm.return_value = "bf_cbc" - mock_ssl.return_value = "decrypted with ssl" - self.assertEqual(pf._decrypt(el), mock_ssl.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test failure to decrypt element with a passphrase in the config - reset() - mock_ssl.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_ssl.assert_called_with(el.text, "foopass", - algorithm="bf_cbc") - self.assertFalse(mock_bruteforce.called) - - # test element without valid passphrase - reset() - el.set("encrypted", "true") - mock_bruteforce.return_value = "decrypted with bruteforce" - self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) - - # test failure to decrypt element without valid passphrase - reset() - mock_bruteforce.side_effect = EVPError - self.assertRaises(EVPError, pf._decrypt, el) - mock_get_passphrases.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_get_algorithm.assert_called_with( - Bcfg2.Server.Plugins.Properties.SETUP) - mock_bruteforce.assert_called_with(el.text, - passphrases=["foopass", - "barpass"], - algorithm="bf_cbc") - self.assertFalse(mock_ssl.called) - @patch("copy.copy") def test_get_additional_data(self, mock_copy): - Bcfg2.Server.Plugins.Properties.SETUP = Mock() pf = self.get_obj() + pf.setup = Mock() pf.XMLMatch = Mock() metadata = Mock() def reset(): mock_copy.reset_mock() pf.XMLMatch.reset_mock() - Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + pf.setup.reset_mock() pf.xdata = lxml.etree.Element("Properties", automatch="true") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) pf.xdata = lxml.etree.Element("Properties", automatch="false") for automatch in [True, False]: reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch + pf.setup.cfp.getboolean.return_value = automatch self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) pf.xdata = lxml.etree.Element("Properties") reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + pf.setup.cfp.getboolean.return_value = False self.assertEqual(pf.get_additional_data(metadata), mock_copy.return_value) mock_copy.assert_called_with(pf) self.assertFalse(pf.XMLMatch.called) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) reset() - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + pf.setup.cfp.getboolean.return_value = True self.assertEqual(pf.get_additional_data(metadata), pf.XMLMatch.return_value) pf.XMLMatch.assert_called_with(metadata) - Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False) + pf.setup.cfp.getboolean.assert_called_with("properties", + "automatch", + default=False) self.assertFalse(mock_copy.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py index f018b45dc..7083fff06 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py @@ -45,81 +45,37 @@ class TestRules(TestPrioDir): entry = lxml.etree.Element("Package", name="foo") self.assertFalse(r.HandlesEntry(entry, metadata)) - def test_BindEntry(self, method="BindEntry"): - r = self.get_obj() - r.get_attrs = Mock() - r.get_attrs.return_value = dict(overwrite="new", add="add", - text="text") - entry = lxml.etree.Element("Test", overwrite="old", keep="keep") - metadata = Mock() - - getattr(r, method)(entry, metadata) - r.get_attrs.assert_called_with(entry, metadata) - self.assertItemsEqual(entry.attrib, - dict(overwrite="old", add="add", keep="keep", - text="text")) - - def test_HandleEntry(self): - self.test_BindEntry(method="HandleEntry") - @patch("Bcfg2.Server.Plugin.PrioDir._matches") def test__matches(self, mock_matches): - """ test _matches() behavior regardless of state of _regex_enabled """ r = self.get_obj() metadata = Mock() + # test parent _matches() returning True entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] + candidate = lxml.etree.Element("Path", name="/etc/bar.conf") mock_matches.return_value = True - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) + self.assertTrue(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) - # test special Path cases -- adding and removing trailing slash + # test all conditions returning False mock_matches.reset_mock() mock_matches.return_value = False - rules = ["/etc/foo/", "/etc/bar"] - entry = lxml.etree.Element("Path", name="/etc/foo") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) + self.assertFalse(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) + # test special Path cases -- adding and removing trailing slash mock_matches.reset_mock() - entry = lxml.etree.Element("Path", name="/etc/bar/") - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_disabled(self, mock_matches): - """ test failure to match with regex disabled """ - r = self.get_obj() - self.set_regex_enabled(r, False) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = [] - self.assertFalse(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - - @patch("Bcfg2.Server.Plugin.PrioDir._matches") - def test__matches_regex_enabled(self, mock_matches): - """ test match with regex enabled """ - r = self.get_obj() - self.set_regex_enabled(r, True) - metadata = Mock() - mock_matches.return_value = False - - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - rules = ["/etc/.*\.conf", "/etc/bar"] - self.assertTrue(r._matches(entry, metadata, rules)) - mock_matches.assert_called_with(r, entry, metadata, rules) - self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) - - def set_regex_enabled(self, rules_obj, state): - """ set the state of regex_enabled for this implementation of - Rules """ - if not isinstance(rules_obj.core.setup, MagicMock): - rules_obj.core.setup = MagicMock() - rules_obj.core.setup.cfp.getboolean.return_value = state + withslash = lxml.etree.Element("Path", name="/etc/foo") + withoutslash = lxml.etree.Element("Path", name="/etc/foo/") + self.assertTrue(r._matches(withslash, metadata, withoutslash)) + self.assertTrue(r._matches(withoutslash, metadata, withslash)) + + if r._regex_enabled: + mock_matches.reset_mock() + candidate = lxml.etree.Element("Path", name="/etc/.*\.conf") + self.assertTrue(r._matches(entry, metadata, candidate)) + mock_matches.assert_called_with(r, entry, metadata, candidate) + self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) def test__regex_enabled(self): r = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py index bf9a3ced3..128d6cae5 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py @@ -25,7 +25,7 @@ class TestHelperModule(Bcfg2TestCase): def get_obj(self, path=None): if path is None: path = self.path - return self.test_obj(path, fam=Mock()) + return self.test_obj(path) def test__init(self): hm = self.get_obj() diff --git a/testsuite/Testsrc/test_code_checks.py b/testsuite/Testsrc/test_code_checks.py index a38710fd4..415b316fd 100644 --- a/testsuite/Testsrc/test_code_checks.py +++ b/testsuite/Testsrc/test_code_checks.py @@ -35,7 +35,7 @@ contingent_checks = { "lib/Bcfg2/Server/Admin": ["Reports.py", "Syncdb.py"], "sbin": ["bcfg2-reports"]}, ("pyinotify",): {"lib/Bcfg2/Server/FileMonitor": ["Inotify.py"]}, - ("yum",): {"lib/Bcfg2/Client/Tools": ["YUM*"]}, + ("yum",): {"lib/Bcfg2/Client/Tools": ["YUM.py"]}, ("genshi",): {"lib/Bcfg2/Server/Plugins/Cfg": ["CfgGenshiGenerator.py"]}, ("Cheetah",): {"lib/Bcfg2/Server/Plugins/Cfg": ["CfgCheetahGenerator.py"]}, ("M2Crypto",): {"lib/Bcfg2": ["Encryption.py"], @@ -49,11 +49,10 @@ contingent_checks = { # perform only error checking on the listed files error_checks = { - "sbin": ["bcfg2-build-reports"], - "lib/Bcfg2": ["Proxy.py", "SSLServer.py", "Reporting"], - "lib/Bcfg2/Server": ["Reports", "SchemaUpdater"], - "lib/Bcfg2/Server/Admin": ["Compare.py", - "Snapshots.py"], + "lib/Bcfg2": ["Reporting"], + "lib/Bcfg2/Client": ["Proxy.py"], + "lib/Bcfg2/Server": ["Reports", "SchemaUpdater", "SSLServer.py"], + "lib/Bcfg2/Server/Admin": ["Compare.py"], "lib/Bcfg2/Client/Tools": ["OpenCSW.py", "Blast.py", "FreeBSDInit.py", @@ -67,17 +66,9 @@ error_checks = { # perform no checks at all on the listed files no_checks = { "lib/Bcfg2/Client/Tools": ["APT.py", "RPM.py", "rpmtools.py"], - "lib/Bcfg2/Server": ["Snapshots", "Hostbase"], "lib/Bcfg2": ["manage.py"], "lib/Bcfg2/Server/Reports": ["manage.py"], - "lib/Bcfg2/Server/Plugins": ["Account.py", - "Base.py", - "Editor.py", - "Hostbase.py", - "Snapshots.py", - "Statistics.py", - "TCheetah.py", - "TGenshi.py"], + "lib/Bcfg2/Server/Plugins": ["Base.py"], } diff --git a/testsuite/common.py b/testsuite/common.py index e26d0be61..4bf5c85f3 100644 --- a/testsuite/common.py +++ b/testsuite/common.py @@ -13,6 +13,7 @@ import re import sys import codecs import unittest +import lxml.etree from mock import patch, MagicMock, _patch, DEFAULT from Bcfg2.Compat import wraps @@ -265,11 +266,18 @@ class Bcfg2TestCase(unittest.TestCase): """ Test that the two XML trees given are equal. Both elements and all children are expected to have ``name`` attributes. """ - self.assertEqual(el1.tag, el2.tag, msg=msg) - self.assertEqual(el1.text, el2.text, msg=msg) - self.assertItemsEqual(el1.attrib.items(), el2.attrib.items(), msg=msg) + if msg is None: + msg = "XML trees were not equal" + fullmsg = msg + "\nFirst: %s" % lxml.etree.tostring(el1) + \ + "\nSecond: %s" % lxml.etree.tostring(el2) + + self.assertEqual(el1.tag, el2.tag, msg=fullmsg) + self.assertEqual(el1.text, el2.text, msg=fullmsg) + self.assertItemsEqual(el1.attrib.items(), el2.attrib.items(), + msg=fullmsg) self.assertEqual(len(el1.getchildren()), - len(el2.getchildren())) + len(el2.getchildren()), + msg=fullmsg) for child1 in el1.getchildren(): cname = child1.get("name") self.assertIsNotNone(cname, |