summaryrefslogtreecommitdiffstats
path: root/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py')
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py223
1 files changed, 223 insertions, 0 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py
new file mode 100644
index 000000000..86a960701
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py
@@ -0,0 +1,223 @@
+import os
+import sys
+import lxml.etree
+import Bcfg2.Server.Plugin
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.ACL import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \
+ TestClientACLs
+
+
+class TestFunctions(Bcfg2TestCase):
+ def test_rmi_names_equal(self):
+ good_cases = [('*', 'foo'),
+ ('foo', 'foo'),
+ ('foo.*', 'foo.bar'),
+ ('*.*', 'foo.bar'),
+ ('foo.bar', 'foo.bar'),
+ ('*.bar', 'foo.bar'),
+ ('foo.*.bar', 'foo.baz.bar')]
+ bad_cases = [('foo', 'bar'),
+ ('*', 'foo.bar'),
+ ('*.*', 'foo'),
+ ('*.*', 'foo.bar.baz'),
+ ('foo.*', 'bar.foo'),
+ ('*.bar', 'bar.foo'),
+ ('foo.*', 'foobar')]
+ for first, second in good_cases:
+ self.assertTrue(rmi_names_equal(first, second),
+ "rmi_names_equal(%s, %s) unexpectedly False" %
+ (first, second))
+ self.assertTrue(rmi_names_equal(second, first),
+ "rmi_names_equal(%s, %s) unexpectedly False" %
+ (second, first))
+ for first, second in bad_cases:
+ self.assertFalse(rmi_names_equal(first, second),
+ "rmi_names_equal(%s, %s) unexpectedly True" %
+ (first, second))
+ self.assertFalse(rmi_names_equal(second, first),
+ "rmi_names_equal(%s, %s) unexpectedly True" %
+ (second, first))
+
+ def test_ip_matches(self):
+ good_cases = [
+ ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")),
+ ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="24")),
+ ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.0")),
+ ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.224")),
+ ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="27")),
+ ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0",
+ netmask="16"))]
+ bad_cases = [
+ ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")),
+ ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="24")),
+ ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.0")),
+ ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.224")),
+ ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="27")),
+ ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0",
+ netmask="16"))]
+ for ip, entry in good_cases:
+ self.assertTrue(ip_matches(ip, entry),
+ "ip_matches(%s, %s) unexpectedly False" %
+ (ip, lxml.etree.tostring(entry)))
+ for ip, entry in bad_cases:
+ self.assertFalse(ip_matches(ip, entry),
+ "ip_matches(%s, %s) unexpectedly True" %
+ (ip, lxml.etree.tostring(entry)))
+
+
+class TestIPACLFile(TestXMLFileBacked):
+ test_obj = IPACLFile
+
+ @patch("Bcfg2.Server.Plugins.ACL.ip_matches")
+ @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal")
+ def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches):
+ af = self.get_obj()
+ ip = "10.0.0.8"
+ rmi = "ACL.test"
+
+ def reset():
+ mock_rmi_names_equal.reset_mock()
+ mock_ip_matches.reset_mock()
+
+ # test default defer with no entries
+ af.entries = []
+ self.assertIsNone(af.check_acl(ip, rmi))
+
+ # test explicit allow, deny, and defer
+ entries = dict(Allow=lxml.etree.Element("Allow", method=rmi),
+ Deny=lxml.etree.Element("Deny", method=rmi),
+ Defer=lxml.etree.Element("Defer", method=rmi))
+ af.entries = list(entries.values())
+
+ def get_ip_matches(tag):
+ def ip_matches(ip, entry):
+ return entry.tag == tag
+
+ return ip_matches
+
+ mock_rmi_names_equal.return_value = True
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Allow")
+ self.assertTrue(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Allow'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Deny")
+ self.assertFalse(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Deny'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Defer")
+ self.assertIsNone(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Defer'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ # test matching RMI names
+ reset()
+ mock_ip_matches.side_effect = lambda i, e: True
+ mock_rmi_names_equal.side_effect = lambda a, b: a == b
+ rmi = "ACL.test2"
+ matching = lxml.etree.Element("Allow", method=rmi)
+ af.entries.append(matching)
+ self.assertTrue(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, matching)
+ self.assertTrue(
+ call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or
+ call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list)
+
+ # test implicit allow for localhost, defer for others
+ reset()
+ mock_ip_matches.side_effect = lambda i, e: False
+ self.assertIsNone(af.check_acl(ip, rmi))
+
+ reset()
+ self.assertTrue(af.check_acl("127.0.0.1", rmi))
+
+
+class TestMetadataACLFile(TestStructFile):
+ test_obj = MetadataACLFile
+
+ @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal")
+ def test_check_acl(self, mock_rmi_names_equal):
+ af = self.get_obj()
+ af.Match = Mock()
+ metadata = Mock()
+ mock_rmi_names_equal.side_effect = lambda a, b: a == b
+
+ def reset():
+ af.Match.reset_mock()
+ mock_rmi_names_equal.reset_mock()
+
+ # test default allow
+ af.entries = []
+ self.assertTrue(af.check_acl(metadata, 'ACL.test'))
+
+ # test explicit allow and deny
+ reset()
+ af.entries = [lxml.etree.Element("Allow", method='ACL.test'),
+ lxml.etree.Element("Deny", method='ACL.test2')]
+ af.Match.return_value = af.entries
+ self.assertTrue(af.check_acl(metadata, 'ACL.test'))
+ af.Match.assert_called_with(metadata)
+ self.assertIn(call('ACL.test', 'ACL.test'),
+ mock_rmi_names_equal.call_args_list)
+
+ reset()
+ self.assertFalse(af.check_acl(metadata, 'ACL.test2'))
+ af.Match.assert_called_with(metadata)
+ self.assertIn(call('ACL.test2', 'ACL.test2'),
+ mock_rmi_names_equal.call_args_list)
+
+ # test default deny for non-localhost
+ reset()
+ self.assertFalse(af.check_acl(metadata, 'ACL.test3'))
+ af.Match.assert_called_with(metadata)
+
+ # test default allow for localhost
+ reset()
+ metadata.hostname = 'localhost'
+ self.assertTrue(af.check_acl(metadata, 'ACL.test3'))
+ af.Match.assert_called_with(metadata)
+
+
+class TestACL(TestPlugin, TestClientACLs):
+ test_obj = ACL
+
+ def test_check_acl_ip(self):
+ acl = self.get_obj()
+ acl.ip_acls = Mock()
+ self.assertEqual(acl.check_acl_ip(("192.168.1.10", "12345"),
+ "ACL.test"),
+ acl.ip_acls.check_acl.return_value)
+ acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test")
+
+ def test_check_acl_metadata(self):
+ acl = self.get_obj()
+ acl.metadata_acls = Mock()
+ metadata = Mock()
+ self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"),
+ acl.metadata_acls.check_acl.return_value)
+ acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test")