diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-04-23 14:50:09 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-04-23 14:50:09 -0400 |
commit | 46a47b4120b3d892b8149a5e181e4d976ad87f99 (patch) | |
tree | f2697f233fc7f5ad5022864222a5ca87715a651b /testsuite/Testsrc/Testlib/TestServer/TestPlugin | |
parent | e1f99d1d5045e0511db42debb30aa97da2018796 (diff) | |
parent | 3d06f311274d6b942ee89d8cdb13b2ecc99af1b0 (diff) | |
download | bcfg2-46a47b4120b3d892b8149a5e181e4d976ad87f99.tar.gz bcfg2-46a47b4120b3d892b8149a5e181e4d976ad87f99.tar.bz2 bcfg2-46a47b4120b3d892b8149a5e181e4d976ad87f99.zip |
Merge branch '1.4.x'
Conflicts:
debian/bcfg2-server.install
doc/server/plugins/grouping/metadata.txt
src/lib/Bcfg2/Client/Client.py
src/lib/Bcfg2/Client/Tools/Portage.py
src/lib/Bcfg2/Client/Tools/RcUpdate.py
src/lib/Bcfg2/Client/Tools/YUM24.py
src/lib/Bcfg2/Client/Tools/__init__.py
src/lib/Bcfg2/Client/Tools/launchd.py
src/lib/Bcfg2/Options.py
src/lib/Bcfg2/Server/Core.py
src/lib/Bcfg2/Server/Plugin/helpers.py
src/lib/Bcfg2/Server/Plugins/Metadata.py
src/lib/Bcfg2/Server/models.py
src/lib/Bcfg2/Utils.py
src/sbin/bcfg2-info
src/sbin/bcfg2-test
testsuite/Testsrc/Testlib/TestClient/TestTools/Test_init.py
testsuite/Testsrc/test_code_checks.py
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugin')
-rw-r--r-- | testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py | 762 | ||||
-rw-r--r-- | testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py | 18 |
2 files changed, 470 insertions, 310 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index fb51eb1fe..6187880b7 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -1,9 +1,10 @@ import os -import re import sys import copy +import genshi import lxml.etree import Bcfg2.Server +import genshi.core from Bcfg2.Compat import reduce from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugin.helpers import * @@ -21,6 +22,12 @@ from common import * from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable from TestServer.TestPlugin.Testinterfaces import TestGenerator +try: + from Bcfg2.Server.Encryption import EVPError + HAS_CRYPTO = True +except: + HAS_CRYPTO = False + def tostring(el): return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8') @@ -31,46 +38,14 @@ class FakeElementTree(lxml.etree._ElementTree): class TestFunctions(Bcfg2TestCase): - def test_bind_info(self): - entry = lxml.etree.Element("Path", name="/test") - metadata = Mock() - default = dict(test1="test1", test2="test2") - # test without infoxml - bind_info(entry, metadata, default=default) - self.assertItemsEqual(entry.attrib, - dict(test1="test1", - test2="test2", - name="/test")) - - # test with bogus infoxml - entry = lxml.etree.Element("Path", name="/test") - infoxml = Mock() - self.assertRaises(PluginExecutionError, - bind_info, - entry, metadata, infoxml=infoxml) - infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry) - - # test with valid infoxml - entry = lxml.etree.Element("Path", name="/test") - infoxml.reset_mock() - infodata = {None: {"test3": "test3", "test4": "test4"}} - def infoxml_rv(metadata, rv, entry=None): - rv['Info'] = infodata - infoxml.pnode.Match.side_effect = infoxml_rv - bind_info(entry, metadata, infoxml=infoxml, default=default) - # mock objects don't properly track the called-with value of - # arguments whose value is changed by the function, so it - # thinks Match() was called with the final value of the mdata - # arg, not the initial value. makes this test a little less - # worthwhile, TBH. - infoxml.pnode.Match.assert_called_with(metadata, dict(Info=infodata), - entry=entry) - self.assertItemsEqual(entry.attrib, - dict(test1="test1", - test2="test2", - test3="test3", - test4="test4", - name="/test")) + def test_removecomment(self): + data = [(None, "test", 1), + (None, "test2", 2)] + stream = [(genshi.core.COMMENT, "test", 0), + data[0], + (genshi.core.COMMENT, "test3", 0), + data[1]] + self.assertItemsEqual(list(removecomment(stream)), data) class TestDatabaseBacked(TestPlugin): @@ -109,10 +84,10 @@ class TestFileBacked(Bcfg2TestCase): test_obj = FileBacked path = os.path.join(datastore, "test") - def get_obj(self, path=None, fam=None): + def get_obj(self, path=None): if path is None: path = self.path - return self.test_obj(path, fam=fam) + return self.test_obj(path) @patch("%s.open" % builtins) def test_HandleEvent(self, mock_open): @@ -158,24 +133,20 @@ class TestDirectoryBacked(Bcfg2TestCase): """ ensure that the child object has the correct interface """ self.assertTrue(hasattr(self.test_obj.__child__, "HandleEvent")) - def get_obj(self, fam=None): - if fam is None: - fam = Mock() - + def get_obj(self): @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, self.test_obj.__name__), Mock()) def inner(): return self.test_obj(os.path.join(datastore, - self.test_obj.__name__), - fam) + self.test_obj.__name__)) return inner() def test__init(self): @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__, self.test_obj.__name__)) def inner(mock_add_monitor): - db = self.test_obj(datastore, Mock()) + db = self.test_obj(datastore) mock_add_monitor.assert_called_with('') inner() @@ -244,10 +215,9 @@ class TestDirectoryBacked(Bcfg2TestCase): db.fam = Mock() class MockChild(Mock): - def __init__(self, path, fam, **kwargs): + def __init__(self, path, **kwargs): Mock.__init__(self, **kwargs) self.path = path - self.fam = fam self.HandleEvent = Mock() db.__child__ = MockChild @@ -257,7 +227,6 @@ class TestDirectoryBacked(Bcfg2TestCase): self.assertIn(path, db.entries) self.assertEqual(db.entries[path].path, os.path.join(db.data, path)) - self.assertEqual(db.entries[path].fam, db.fam) db.entries[path].HandleEvent.assert_called_with(event) @patch("os.path.isdir") @@ -395,27 +364,23 @@ class TestXMLFileBacked(TestFileBacked): should_monitor = None path = os.path.join(datastore, "test", "test1.xml") - def get_obj(self, path=None, fam=None, should_monitor=False): + def get_obj(self, path=None, should_monitor=False): if path is None: path = self.path - return self.test_obj(path, fam=fam, should_monitor=should_monitor) + return self.test_obj(path, should_monitor=should_monitor) - def test__init(self): - fam = Mock() + @patch("Bcfg2.Server.FileMonitor.get_fam") + def test__init(self, mock_get_fam): xfb = self.get_obj() - if self.should_monitor is True: - self.assertIsNotNone(xfb.fam) - else: - self.assertIsNone(xfb.fam) + self.assertEqual(xfb.fam, mock_get_fam.return_value) if self.should_monitor is not True: - xfb = self.get_obj(fam=fam) - self.assertFalse(fam.AddMonitor.called) + xfb = self.get_obj() + self.assertFalse(xfb.fam.AddMonitor.called) if self.should_monitor is not False: - fam.reset_mock() - xfb = self.get_obj(fam=fam, should_monitor=True) - fam.AddMonitor.assert_called_with(self.path, xfb) + xfb = self.get_obj(should_monitor=True) + xfb.fam.AddMonitor.assert_called_with(self.path, xfb) @patch("glob.glob") @patch("lxml.etree.parse") @@ -584,6 +549,7 @@ class TestXMLFileBacked(TestFileBacked): test3 = lxml.etree.Element("Test", name="test3") replacements = {"/test/test2.xml": test2, "/test/test_dir/test3.xml": test3} + def xinclude(): for el in xfb.xdata.findall('//%sinclude' % Bcfg2.Server.XI_NAMESPACE): @@ -601,25 +567,24 @@ class TestXMLFileBacked(TestFileBacked): self.assertItemsEqual([tostring(e) for e in xfb.entries], [tostring(e) for e in children]) + @patch("Bcfg2.Server.FileMonitor.get_fam", Mock()) def test_add_monitor(self): xfb = self.get_obj() xfb.add_monitor("/test/test2.xml") self.assertIn("/test/test2.xml", xfb.extras) - fam = Mock() if self.should_monitor is not True: - fam.reset_mock() - xfb = self.get_obj(fam=fam) - fam.reset_mock() + xfb = self.get_obj() + xfb.fam = Mock() xfb.add_monitor("/test/test3.xml") - self.assertFalse(fam.AddMonitor.called) + self.assertFalse(xfb.fam.AddMonitor.called) self.assertIn("/test/test3.xml", xfb.extras) if self.should_monitor is not False: - fam.reset_mock() - xfb = self.get_obj(fam=fam, should_monitor=True) + xfb = self.get_obj(should_monitor=True) + xfb.fam = Mock() xfb.add_monitor("/test/test4.xml") - fam.AddMonitor.assert_called_with("/test/test4.xml", xfb) + xfb.fam.AddMonitor.assert_called_with("/test/test4.xml", xfb) self.assertIn("/test/test4.xml", xfb.extras) @@ -667,7 +632,8 @@ class TestStructFile(TestXMLFileBacked): lxml.etree.SubElement(groups[1], "Child", name="c3") lxml.etree.SubElement(groups[1], "Child", name="c4") - standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s1")) + standalone.append(lxml.etree.SubElement(xdata, + "Standalone", name="s1")) groups[2] = lxml.etree.SubElement(xdata, "Client", name="client2", include="false") @@ -689,12 +655,147 @@ class TestStructFile(TestXMLFileBacked): subchildren[3] = [] lxml.etree.SubElement(children[3][-1], "SubChild", name="subchild") - standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s3")) + standalone.append(lxml.etree.SubElement(xdata, + "Standalone", name="s3")) lxml.etree.SubElement(standalone[-1], "SubStandalone", name="sub1") - children[4] = standalone return (xdata, groups, subgroups, children, subchildren, standalone) + def _get_template_test_data(self): + (xdata, groups, subgroups, children, subchildren, standalone) = \ + self._get_test_data() + template_xdata = \ + lxml.etree.Element("Test", name="test", + nsmap=dict(py='http://genshi.edgewall.org/')) + template_xdata.extend(xdata.getchildren()) + return (template_xdata, groups, subgroups, children, subchildren, + standalone) + + @patch("genshi.template.TemplateLoader") + def test_Index(self, mock_TemplateLoader): + has_crypto = Bcfg2.Server.Plugin.helpers.HAS_CRYPTO + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = False + TestXMLFileBacked.test_Index(self) + + sf = self.get_obj() + sf.encoding = Mock() + (xdata, groups, subgroups, children, subchildren, standalone) = \ + self._get_test_data() + sf.data = lxml.etree.tostring(xdata) + + mock_TemplateLoader.reset_mock() + sf.Index() + self.assertFalse(mock_TemplateLoader.called) + + mock_TemplateLoader.reset_mock() + template_xdata = \ + lxml.etree.Element("Test", name="test", + nsmap=dict(py='http://genshi.edgewall.org/')) + template_xdata.extend(xdata.getchildren()) + sf.data = lxml.etree.tostring(template_xdata) + sf.Index() + mock_TemplateLoader.assert_called_with() + loader = mock_TemplateLoader.return_value + loader.load.assert_called_with(sf.name, + cls=genshi.template.MarkupTemplate, + encoding=sf.encoding) + self.assertEqual(sf.template, + loader.load.return_value) + + Bcfg2.Server.Plugin.helpers.HAS_CRYPTO = has_crypto + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_Index_crypto(self): + sf = self.get_obj() + sf.setup = Mock() + sf.setup.cfp.get.return_value = "strict" + sf._decrypt = Mock() + sf._decrypt.return_value = 'plaintext' + sf.data = ''' +<EncryptedData> + <Group name="test"> + <Datum encrypted="foo">crypted</Datum> + </Group> + <Group name="test" negate="true"> + <Datum>plain</Datum> + </Group> +</EncryptedData>''' + + # test successful decryption + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + for el in sf.xdata.xpath("//*[@encrypted]"): + self.assertEqual(el.text, sf._decrypt.return_value) + + # test failed decryption, strict + sf._decrypt.reset_mock() + sf._decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, sf.Index) + + # test failed decryption, lax + sf.setup.cfp.get.return_value = "lax" + sf._decrypt.reset_mock() + sf.Index() + self.assertItemsEqual( + sf._decrypt.call_args_list, + [call(el) for el in sf.xdata.xpath("//*[@encrypted]")]) + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.ssl_decrypt") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.get_passphrases") + @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.bruteforce_decrypt") + def test_decrypt(self, mock_bruteforce, mock_get_passphrases, mock_ssl): + sf = self.get_obj() + + def reset(): + mock_bruteforce.reset_mock() + mock_get_passphrases.reset_mock() + mock_ssl.reset_mock() + + # test element without text contents + self.assertIsNone(sf._decrypt(lxml.etree.Element("Test"))) + self.assertFalse(mock_bruteforce.called) + self.assertFalse(mock_get_passphrases.called) + self.assertFalse(mock_ssl.called) + + # test element with a passphrase in the config file + reset() + el = lxml.etree.Element("Test", encrypted="foo") + el.text = "crypted" + mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") + mock_ssl.return_value = "decrypted with ssl" + self.assertEqual(sf._decrypt(el), mock_ssl.return_value) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test failure to decrypt element with a passphrase in the config + reset() + mock_ssl.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_ssl.assert_called_with(el.text, "foopass") + self.assertFalse(mock_bruteforce.called) + + # test element without valid passphrase + reset() + el.set("encrypted", "true") + mock_bruteforce.return_value = "decrypted with bruteforce" + self.assertEqual(sf._decrypt(el), mock_bruteforce.return_value) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + + # test failure to decrypt element without valid passphrase + reset() + mock_bruteforce.side_effect = EVPError + self.assertRaises(EVPError, sf._decrypt, el) + mock_get_passphrases.assert_called_with() + mock_bruteforce.assert_called_with(el.text) + self.assertFalse(mock_ssl.called) + def test_include_element(self): sf = self.get_obj() metadata = Mock() @@ -727,22 +828,62 @@ class TestStructFile(TestXMLFileBacked): self.assertTrue(inc("Other")) - @patch("Bcfg2.Server.Plugin.helpers.%s._include_element" % - test_obj.__name__) - def test__match(self, mock_include): + def test__match(self): sf = self.get_obj() + sf._include_element = Mock() metadata = Mock() - (xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() - - mock_include.side_effect = \ - lambda x, _: (x.tag not in ['Client', 'Group'] or + sf._include_element.side_effect = \ + lambda x, _: (x.tag not in sf._include_tests.keys() or x.get("include") == "true") - for i, group in groups.items(): - actual = sf._match(group, metadata) - expected = children[i] + subchildren[i] + for test_data in [self._get_test_data(), + self._get_template_test_data()]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + + for i, group in groups.items(): + actual = sf._match(group, metadata) + expected = children[i] + subchildren[i] + self.assertEqual(len(actual), len(expected)) + # easiest way to compare the values is actually to make + # them into an XML document and let assertXMLEqual compare + # them + xactual = lxml.etree.Element("Container") + xactual.extend(actual) + xexpected = lxml.etree.Element("Container") + xexpected.extend(expected) + self.assertXMLEqual(xactual, xexpected) + + for el in standalone: + self.assertXMLEqual(el, sf._match(el, metadata)[0]) + + def test_do_match(self): + sf = self.get_obj() + sf._match = Mock() + + def match_rv(el, _): + if el.tag not in sf._include_tests.keys(): + return [el] + elif el.get("include") == "true": + return el.getchildren() + else: + return [] + sf._match.side_effect = match_rv + + metadata = Mock() + + for test_data in [self._get_test_data(), + self._get_template_test_data()]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + sf.data = lxml.etree.tostring(xdata) + sf.Index() + + actual = sf._do_match(metadata) + expected = reduce(lambda x, y: x + y, + list(children.values()) + \ + list(subgroups.values())) + standalone self.assertEqual(len(actual), len(expected)) # easiest way to compare the values is actually to make # them into an XML document and let assertXMLEqual compare @@ -753,84 +894,101 @@ class TestStructFile(TestXMLFileBacked): xexpected.extend(expected) self.assertXMLEqual(xactual, xexpected) - for el in standalone: - self.assertXMLEqual(el, sf._match(el, metadata)[0]) + def test__xml_match(self): + sf = self.get_obj() + sf._include_element = Mock() + metadata = Mock() - @patch("Bcfg2.Server.Plugin.helpers.%s._match" % test_obj.__name__) - def test_Match(self, mock_match): + sf._include_element.side_effect = \ + lambda x, _: (x.tag not in sf._include_tests.keys() or + x.get("include") == "true") + + for test_data in [self._get_test_data(), + self._get_template_test_data()]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + + actual = copy.deepcopy(xdata) + for el in actual.getchildren(): + sf._xml_match(el, metadata) + expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib)) + expected.text = xdata.text + expected.extend(reduce(lambda x, y: x + y, + list(children.values()) + \ + list(subchildren.values()))) + expected.extend(standalone) + self.assertXMLEqual(actual, expected) + + def test_do_xmlmatch(self): sf = self.get_obj() + sf._xml_match = Mock() metadata = Mock() - (xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() - sf.entries.extend(copy.deepcopy(xdata).getchildren()) + for data_type, test_data in \ + [("", self._get_test_data()), + ("templated ", self._get_template_test_data())]: + (xdata, groups, subgroups, children, subchildren, standalone) = \ + test_data + sf.xdata = xdata + sf._xml_match.reset_mock() + + sf._do_xmlmatch(metadata) + actual = [] + for call in sf._xml_match.call_args_list: + actual.append(call[0][0]) + self.assertEqual(call[0][1], metadata) + expected = list(groups.values()) + standalone + # easiest way to compare the values is actually to make + # them into an XML document and let assertXMLEqual compare + # them + xactual = lxml.etree.Element("Container") + xactual.extend(actual) + xexpected = lxml.etree.Element("Container") + xexpected.extend(expected) + self.assertXMLEqual(xactual, xexpected, + "XMLMatch() calls were incorrect for " + "%stest data" % data_type) + + def test_match_ordering(self): + """ Match() returns elements in document order """ + sf = self.get_obj() + sf._match = Mock() def match_rv(el, _): - if el.tag not in ['Client', 'Group']: + if el.tag not in sf._include_tests.keys(): return [el] elif el.get("include") == "true": return el.getchildren() else: return [] - mock_match.side_effect = match_rv - actual = sf.Match(metadata) - expected = reduce(lambda x, y: x + y, - list(children.values()) + list(subgroups.values())) - self.assertEqual(len(actual), len(expected)) - # easiest way to compare the values is actually to make - # them into an XML document and let assertXMLEqual compare - # them - xactual = lxml.etree.Element("Container") - xactual.extend(actual) - xexpected = lxml.etree.Element("Container") - xexpected.extend(expected) - self.assertXMLEqual(xactual, xexpected) - - @patch("Bcfg2.Server.Plugin.helpers.%s._include_element" % - test_obj.__name__) - def test__xml_match(self, mock_include): - sf = self.get_obj() + sf._match.side_effect = match_rv + metadata = Mock() - (xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() + test_data = lxml.etree.Element("Test") + group = lxml.etree.SubElement(test_data, "Group", name="group", + include="true") + first = lxml.etree.SubElement(group, "Element", name="first") + second = lxml.etree.SubElement(test_data, "Element", name="second") - mock_include.side_effect = \ - lambda x, _: (x.tag not in ['Client', 'Group'] or - x.get("include") == "true") + # sanity check to ensure that first and second are in the + # correct document order + if test_data.xpath("//Element") != [first, second]: + skip("lxml.etree does not construct documents in a reliable order") - actual = copy.deepcopy(xdata) - for el in actual.getchildren(): - sf._xml_match(el, metadata) - expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib)) - expected.text = xdata.text - expected.extend(reduce(lambda x, y: x + y, - list(children.values()) + list(subchildren.values()))) - expected.extend(standalone) - self.assertXMLEqual(actual, expected) - - @patch("Bcfg2.Server.Plugin.helpers.%s._xml_match" % test_obj.__name__) - def test_XMLMatch(self, mock_xml_match): - sf = self.get_obj() - metadata = Mock() + sf.data = lxml.etree.tostring(test_data) + sf.Index() + rv = sf._do_match(metadata) + self.assertEqual(len(rv), 2, + "Match() seems to be broken, cannot test ordering") + msg = "Match() does not return elements in document order:\n" + \ + "Expected: [%s, %s]\n" % (first, second) + \ + "Actual: %s" % rv + self.assertXMLEqual(rv[0], first, msg) + self.assertXMLEqual(rv[1], second, msg) - (sf.xdata, groups, subgroups, children, subchildren, standalone) = \ - self._get_test_data() - - sf.XMLMatch(metadata) - actual = [] - for call in mock_xml_match.call_args_list: - actual.append(call[0][0]) - self.assertEqual(call[0][1], metadata) - expected = list(groups.values()) + standalone - # easiest way to compare the values is actually to make - # them into an XML document and let assertXMLEqual compare - # them - xactual = lxml.etree.Element("Container") - xactual.extend(actual) - xexpected = lxml.etree.Element("Container") - xexpected.extend(expected) - self.assertXMLEqual(xactual, xexpected) + # TODO: add tests to ensure that XMLMatch() returns elements + # in document order class TestINode(Bcfg2TestCase): @@ -1065,49 +1223,6 @@ class TestINode(Bcfg2TestCase): inode.predicate.assert_called_with(metadata, child) -class TestInfoNode(TestINode): - __test__ = True - test_obj = InfoNode - - def test_raw_predicates(self): - TestINode.test_raw_predicates(self) - metadata = Mock() - entry = lxml.etree.Element("Path", name="/tmp/foo", - realname="/tmp/bar") - - parent_predicate = lambda m, d: True - pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"), - dict(predicate=parent_predicate)) - self.assertTrue(pred(metadata, entry)) - pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"), - dict(predicate=parent_predicate)) - self.assertTrue(pred(metadata, entry)) - pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bogus"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - - pred = eval(self.test_obj.nraw['Path'] % dict(name="/tmp/foo"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bar"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"), - dict(predicate=parent_predicate)) - self.assertTrue(pred(metadata, entry)) - - parent_predicate = lambda m, d: False - pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"), - dict(predicate=parent_predicate)) - self.assertFalse(pred(metadata, entry)) - - class TestXMLSrc(TestXMLFileBacked): test_obj = XMLSrc @@ -1173,9 +1288,78 @@ class TestXMLSrc(TestXMLFileBacked): self.assertEqual(xsrc.cache[0], metadata) -class TestInfoXML(TestXMLSrc): +class TestInfoXML(TestStructFile): test_obj = InfoXML + def _get_test_data(self): + (xdata, groups, subgroups, children, subchildren, standalone) = \ + TestStructFile._get_test_data(self) + idx = max(groups.keys()) + 1 + groups[idx] = lxml.etree.SubElement( + xdata, "Path", name="path1", include="true") + children[idx] = [lxml.etree.SubElement(groups[idx], "Child", + name="pc1")] + subgroups[idx] = [lxml.etree.SubElement(groups[idx], "Group", + name="pg1", include="true"), + lxml.etree.SubElement(groups[idx], "Client", + name="pc1", include="false")] + subchildren[idx] = [lxml.etree.SubElement(subgroups[idx][0], + "SubChild", name="sc1")] + + idx += 1 + groups[idx] = lxml.etree.SubElement( + xdata, "Path", name="path2", include="false") + children[idx] = [] + subgroups[idx] = [] + subchildren[idx] = [] + + path2 = lxml.etree.SubElement(groups[0], "Path", name="path2", + include="true") + subgroups[0].append(path2) + subchildren[0].append(lxml.etree.SubElement(path2, "SubChild", + name="sc2")) + return xdata, groups, subgroups, children, subchildren, standalone + + def test_include_element(self): + TestStructFile.test_include_element(self) + + ix = self.get_obj() + metadata = Mock() + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + inc = lambda tag, **attrs: \ + ix._include_element(lxml.etree.Element(tag, **attrs), + metadata, entry) + + self.assertFalse(inc("Path", name="/etc/bar.conf")) + self.assertFalse(inc("Path", name="/etc/foo.conf", negate="true")) + self.assertFalse(inc("Path", name="/etc/foo.conf", negate="tRuE")) + self.assertTrue(inc("Path", name="/etc/foo.conf")) + self.assertTrue(inc("Path", name="/etc/foo.conf", negate="false")) + self.assertTrue(inc("Path", name="/etc/foo.conf", negate="faLSe")) + self.assertTrue(inc("Path", name="/etc/bar.conf", negate="true")) + self.assertTrue(inc("Path", name="/etc/bar.conf", negate="tRUe")) + + def test_BindEntry(self): + ix = self.get_obj() + entry = lxml.etree.Element("Path", name=self.path) + metadata = Mock() + + # test with bogus infoxml + ix.Match = Mock() + ix.Match.return_value = [] + self.assertRaises(PluginExecutionError, + ix.BindEntry, entry, metadata) + ix.Match.assert_called_with(metadata, entry) + + # test with valid infoxml + ix.Match.reset_mock() + ix.Match.return_value = [lxml.etree.Element("Info", + mode="0600", owner="root")] + ix.BindEntry(entry, metadata) + ix.Match.assert_called_with(metadata, entry) + self.assertItemsEqual(entry.attrib, + dict(name=self.path, mode="0600", owner="root")) + class TestXMLDirectoryBacked(TestDirectoryBacked): test_obj = XMLDirectoryBacked @@ -1220,32 +1404,17 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked): def test__matches(self): pd = self.get_obj() - self.assertTrue(pd._matches(lxml.etree.Element("Test", - name="/etc/foo.conf"), - Mock(), - {"/etc/foo.conf": pd.BindEntry, - "/etc/bar.conf": pd.BindEntry})) - self.assertFalse(pd._matches(lxml.etree.Element("Test", - name="/etc/baz.conf"), - Mock(), - {"/etc/foo.conf": pd.BindEntry, - "/etc/bar.conf": pd.BindEntry})) + entry = lxml.etree.Element("Test", name="/etc/foo.conf") + self.assertTrue(pd._matches(entry, Mock(), + lxml.etree.Element("Test", + name="/etc/foo.conf"))) + self.assertFalse(pd._matches(entry, Mock(), + lxml.etree.Element("Test", + name="/etc/baz.conf"))) def test_BindEntry(self): pd = self.get_obj() - pd.get_attrs = Mock(return_value=dict(test1="test1", test2="test2")) - entry = lxml.etree.Element("Path", name="/etc/foo.conf", test1="bogus") - metadata = Mock() - pd.BindEntry(entry, metadata) - pd.get_attrs.assert_called_with(entry, metadata) - self.assertItemsEqual(entry.attrib, - dict(name="/etc/foo.conf", - test1="test1", test2="test2")) - - def test_get_attrs(self): - pd = self.get_obj() - entry = lxml.etree.Element("Path", name="/etc/foo.conf") - children = [lxml.etree.Element("Child")] + children = [lxml.etree.Element("Child", name="child")] metadata = Mock() pd.entries = dict() @@ -1253,58 +1422,59 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked): metadata.reset_mock() for src in pd.entries.values(): src.reset_mock() - src.cache = None # test with no matches - self.assertRaises(PluginExecutionError, - pd.get_attrs, entry, metadata) + self.assertRaises(PluginExecutionError, pd.BindEntry, Mock(), metadata) - def add_entry(name, data, prio=10): + def add_entry(name, data): path = os.path.join(pd.data, name) pd.entries[path] = Mock() - pd.entries[path].priority = prio - def do_Cache(metadata): - pd.entries[path].cache = (metadata, data) - pd.entries[path].Cache.side_effect = do_Cache - - add_entry('test1.xml', - dict(Path={'/etc/foo.conf': dict(attr="attr1", - __children__=children), - '/etc/bar.conf': dict()})) - add_entry('test2.xml', - dict(Path={'/etc/bar.conf': dict(__text__="text", - attr="attr1")}, - Package={'quux': dict(), - 'xyzzy': dict()}), - prio=20) - add_entry('test3.xml', - dict(Path={'/etc/baz.conf': dict()}, - Package={'xyzzy': dict()}), - prio=20) - - # test with exactly one match, __children__ + pd.entries[path].priority = data.get("priority") + pd.entries[path].XMLMatch.return_value = data + + test1 = lxml.etree.Element("Rules", priority="10") + path1 = lxml.etree.SubElement(test1, "Path", name="/etc/foo.conf", + attr="attr1") + path1.extend(children) + lxml.etree.SubElement(test1, "Path", name="/etc/bar.conf") + add_entry('test1.xml', test1) + + test2 = lxml.etree.Element("Rules", priority="20") + path2 = lxml.etree.SubElement(test2, "Path", name="/etc/bar.conf", + attr="attr1") + path2.text = "text" + lxml.etree.SubElement(test2, "Package", name="quux") + lxml.etree.SubElement(test2, "Package", name="xyzzy") + add_entry('test2.xml', test2) + + test3 = lxml.etree.Element("Rules", priority="20") + lxml.etree.SubElement(test3, "Path", name="/etc/baz.conf") + lxml.etree.SubElement(test3, "Package", name="xyzzy") + add_entry('test3.xml', test3) + + # test with exactly one match, children reset() - self.assertItemsEqual(pd.get_attrs(entry, metadata), - dict(attr="attr1")) + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + pd.BindEntry(entry, metadata) + self.assertXMLEqual(entry, path1) + self.assertIsNot(entry, path1) for src in pd.entries.values(): - src.Cache.assert_called_with(metadata) - self.assertEqual(len(entry.getchildren()), 1) - self.assertXMLEqual(entry.getchildren()[0], children[0]) + src.XMLMatch.assert_called_with(metadata) - # test with multiple matches with different priorities, __text__ + # test with multiple matches with different priorities, text reset() entry = lxml.etree.Element("Path", name="/etc/bar.conf") - self.assertItemsEqual(pd.get_attrs(entry, metadata), - dict(attr="attr1")) + pd.BindEntry(entry, metadata) + self.assertXMLEqual(entry, path2) + self.assertIsNot(entry, path2) for src in pd.entries.values(): - src.Cache.assert_called_with(metadata) - self.assertEqual(entry.text, "text") + src.XMLMatch.assert_called_with(metadata) # test with multiple matches with identical priorities reset() entry = lxml.etree.Element("Package", name="xyzzy") self.assertRaises(PluginExecutionError, - pd.get_attrs, entry, metadata) + pd.BindEntry, entry, metadata) class TestSpecificity(Bcfg2TestCase): @@ -1550,25 +1720,25 @@ class TestEntrySet(TestDebuggable): eset.reset_metadata.reset_mock() eset.entry_init.reset_mock() - for fname in ["info", "info.xml", ":info"]: - for evt in ["exists", "created", "changed"]: - reset() - event = Mock() - event.code2str.return_value = evt - event.filename = fname - eset.handle_event(event) - eset.update_metadata.assert_called_with(event) - self.assertFalse(eset.entry_init.called) - self.assertFalse(eset.reset_metadata.called) - + fname = "info.xml" + for evt in ["exists", "created", "changed"]: reset() event = Mock() - event.code2str.return_value = "deleted" + event.code2str.return_value = evt event.filename = fname eset.handle_event(event) - eset.reset_metadata.assert_called_with(event) + eset.update_metadata.assert_called_with(event) self.assertFalse(eset.entry_init.called) - self.assertFalse(eset.update_metadata.called) + self.assertFalse(eset.reset_metadata.called) + + reset() + event = Mock() + event.code2str.return_value = "deleted" + event.filename = fname + eset.handle_event(event) + eset.reset_metadata.assert_called_with(event) + self.assertFalse(eset.entry_init.called) + self.assertFalse(eset.update_metadata.called) for evt in ["exists", "created", "changed"]: reset() @@ -1727,26 +1897,8 @@ class TestEntrySet(TestDebuggable): self.assertFalse(mock_InfoXML.called) eset.infoxml.HandleEvent.assert_called_with(event) - for fname in [':info', 'info']: - event = Mock() - event.filename = fname - - idata = ["owner:owner", - "group: GROUP", - "mode: 775", - "important: true", - "bogus: line"] - mock_open.return_value.readlines.return_value = idata - eset.update_metadata(event) - expected = DEFAULT_FILE_METADATA.copy() - expected['owner'] = 'owner' - expected['group'] = 'GROUP' - expected['mode'] = '0775' - expected['important'] = 'true' - self.assertItemsEqual(eset.metadata, - expected) - - def test_reset_metadata(self): + @patch("Bcfg2.Server.Plugin.helpers.default_path_metadata") + def test_reset_metadata(self, mock_default_path_metadata): eset = self.get_obj() # test info.xml @@ -1756,29 +1908,22 @@ class TestEntrySet(TestDebuggable): eset.reset_metadata(event) self.assertIsNone(eset.infoxml) - for fname in [':info', 'info']: - event = Mock() - event.filename = fname - eset.metadata = Mock() - eset.reset_metadata(event) - self.assertItemsEqual(eset.metadata, DEFAULT_FILE_METADATA) - - @patch("Bcfg2.Server.Plugin.helpers.bind_info") - def test_bind_info_to_entry(self, mock_bind_info): - # There's a strange scoping issue in py3k that prevents this - # test from working as expected on sub-classes of EntrySet. - # No idea what's going on, but until I can figure it out we - # skip this test on subclasses - if inPy3k and self.test_obj != EntrySet: - return skip("Skipping this test for py3k scoping issues") - + def test_bind_info_to_entry(self): eset = self.get_obj() - entry = Mock() + eset.metadata = dict(owner="root", group="root") + entry = lxml.etree.Element("Path", name="/test") metadata = Mock() + eset.infoxml = None + eset.bind_info_to_entry(entry, metadata) + self.assertItemsEqual(entry.attrib, + dict(name="/test", owner="root", group="root")) + + entry = lxml.etree.Element("Path", name="/test") + eset.infoxml = Mock() eset.bind_info_to_entry(entry, metadata) - mock_bind_info.assert_called_with(entry, metadata, - infoxml=eset.infoxml, - default=eset.metadata) + self.assertItemsEqual(entry.attrib, + dict(name="/test", owner="root", group="root")) + eset.infoxml.BindEntry.assert_called_with(entry, metadata) def test_bind_entry(self): eset = self.get_obj() @@ -2010,6 +2155,3 @@ class TestGroupSpool(TestPlugin, TestGenerator): gs.event_id.assert_called_with(event) self.assertNotIn("/baz/quux", gs.entries) self.assertNotIn("/baz/quux", gs.Entries[gs.entry_type]) - - - diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py index 35f4e0700..6effe05de 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py @@ -368,3 +368,21 @@ class TestVersion(TestPlugin): class TestClientRunHooks(Bcfg2TestCase): """ placeholder for future tests """ pass + + +class TestClientACLs(Bcfg2TestCase): + test_obj = ClientACLs + + def get_obj(self): + return self.test_obj() + + def test_check_acl_ip(self): + ca = self.get_obj() + self.assertIn(ca.check_acl_ip(Mock(), Mock()), + [True, False, None]) + + def test_check_acl_metadata(self): + ca = self.get_obj() + self.assertIn(ca.check_acl_metadata(Mock(), Mock()), + [True, False]) + |