From 5363e6d9a53146333da0d109aae170befc1b9481 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 12 Feb 2013 07:48:33 -0500 Subject: Added client ACLs: * IP and CIDR-based ACLs * Metadata (group/hostname)-based ACLs * Documentation * Unit tests --- .../Testlib/TestServer/TestPlugins/TestACL.py | 222 +++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py new file mode 100644 index 000000000..e457ca7c1 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py @@ -0,0 +1,222 @@ +import os +import sys +import lxml.etree +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.ACL import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \ + TestClientACLs + + +class TestFunctions(Bcfg2TestCase): + def test_rmi_names_equal(self): + good_cases = [('*', 'foo'), + ('foo', 'foo'), + ('foo.*', 'foo.bar'), + ('*.*', 'foo.bar'), + ('foo.bar', 'foo.bar'), + ('*.bar', 'foo.bar'), + ('foo.*.bar', 'foo.baz.bar')] + bad_cases = [('foo', 'bar'), + ('*', 'foo.bar'), + ('*.*', 'foo'), + ('*.*', 'foo.bar.baz'), + ('foo.*', 'bar.foo'), + ('*.bar', 'bar.foo'), + ('foo.*', 'foobar')] + for first, second in good_cases: + self.assertTrue(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly False" % + (first, second)) + self.assertTrue(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly False" % + (second, first)) + for first, second in bad_cases: + self.assertFalse(rmi_names_equal(first, second), + "rmi_names_equal(%s, %s) unexpectedly True" % + (first, second)) + self.assertFalse(rmi_names_equal(second, first), + "rmi_names_equal(%s, %s) unexpectedly True" % + (second, first)) + + def test_ip_matches(self): + good_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + bad_cases = [ + ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="24")), + ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.0")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="255.255.255.224")), + ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0", + netmask="27")), + ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0", + netmask="16"))] + for ip, entry in good_cases: + self.assertTrue(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly False" % + (ip, lxml.etree.tostring(entry))) + for ip, entry in bad_cases: + self.assertFalse(ip_matches(ip, entry), + "ip_matches(%s, %s) unexpectedly True" % + (ip, lxml.etree.tostring(entry))) + + +class TestIPACLFile(TestXMLFileBacked): + test_obj = IPACLFile + + @patch("Bcfg2.Server.Plugins.ACL.ip_matches") + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches): + af = self.get_obj() + ip = "10.0.0.8" + rmi = "ACL.test" + + def reset(): + mock_rmi_names_equal.reset_mock() + mock_ip_matches.reset_mock() + + # test default defer with no entries + af.entries = [] + self.assertIsNone(af.check_acl(ip, rmi)) + + # test explicit allow, deny, and defer + entries = dict(Allow=lxml.etree.Element("Allow", method=rmi), + Deny=lxml.etree.Element("Deny", method=rmi), + Defer=lxml.etree.Element("Defer", method=rmi)) + af.entries = list(entries.values()) + + def get_ip_matches(tag): + def ip_matches(ip, entry): + return entry.tag == tag + + return ip_matches + + mock_rmi_names_equal.return_value = True + + reset() + mock_ip_matches.side_effect = get_ip_matches("Allow") + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Allow']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Deny") + self.assertFalse(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Deny']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + reset() + mock_ip_matches.side_effect = get_ip_matches("Defer") + self.assertIsNone(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, entries['Defer']) + mock_rmi_names_equal.assert_called_with(rmi, rmi) + + # test matching RMI names + reset() + mock_ip_matches.side_effect = lambda i, e: True + mock_rmi_names_equal.side_effect = lambda a, b: a == b + rmi = "ACL.test2" + matching = lxml.etree.Element("Allow", method=rmi) + af.entries.append(matching) + self.assertTrue(af.check_acl(ip, rmi)) + mock_ip_matches.assert_called_with(ip, matching) + self.assertTrue( + call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or + call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list) + + # test implicit allow for localhost, defer for others + reset() + mock_ip_matches.side_effect = lambda i, e: False + self.assertIsNone(af.check_acl(ip, rmi)) + + reset() + self.assertTrue(af.check_acl("127.0.0.1", rmi)) + + +class TestMetadataACLFile(TestStructFile): + test_obj = MetadataACLFile + + @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal") + def test_check_acl(self, mock_rmi_names_equal): + af = self.get_obj() + af.Match = Mock() + metadata = Mock() + mock_rmi_names_equal.side_effect = lambda a, b: a == b + + def reset(): + af.Match.reset_mock() + mock_rmi_names_equal.reset_mock() + + # test default allow + af.entries = [] + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + + # test explicit allow and deny + reset() + af.entries = [lxml.etree.Element("Allow", method='ACL.test'), + lxml.etree.Element("Deny", method='ACL.test2')] + af.Match.return_value = af.entries + self.assertTrue(af.check_acl(metadata, 'ACL.test')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test', 'ACL.test'), + mock_rmi_names_equal.call_args_list) + + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test2')) + af.Match.assert_called_with(metadata) + self.assertIn(call('ACL.test2', 'ACL.test2'), + mock_rmi_names_equal.call_args_list) + + # test default deny for non-localhost + reset() + self.assertFalse(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + # test default allow for localhost + reset() + metadata.hostname = 'localhost' + self.assertTrue(af.check_acl(metadata, 'ACL.test3')) + af.Match.assert_called_with(metadata) + + +class TestACL(TestPlugin, TestClientACLs): + test_obj = ACL + + def test_check_acl_ip(self): + acl = self.get_obj() + acl.ip_acls = Mock() + self.assertEqual(acl.check_acl_ip("192.168.1.10", "ACL.test"), + acl.ip_acls.check_acl.return_value) + acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test") + + def test_check_acl_metadata(self): + acl = self.get_obj() + acl.metadata_acls = Mock() + metadata = Mock() + self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"), + acl.metadata_acls.check_acl.return_value) + acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test") -- cgit v1.2.3-1-g7c22