From d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 4 Sep 2012 09:52:57 -0400 Subject: reorganized testsuite to allow tests on stuff outside of src --- .../TestClient/TestTools/TestPOSIX/Testbase.py | 991 +++++++++++++++++++++ 1 file changed, 991 insertions(+) create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py (limited to 'testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py') diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py new file mode 100644 index 000000000..b447ab642 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py @@ -0,0 +1,991 @@ +import os +import sys +import copy +import stat +import lxml.etree +from mock import Mock, MagicMock, patch +import Bcfg2.Client.Tools +from Bcfg2.Client.Tools.POSIX.base import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from Test__init import get_posix_object +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +try: + import selinux + has_selinux = True +except ImportError: + has_selinux = False + +try: + import posix1e + has_acls = True +except ImportError: + has_acls = False + +class TestPOSIXTool(Bcfg2TestCase): + test_obj = POSIXTool + + def get_obj(self, posix=None): + if posix is None: + posix = get_posix_object() + return self.test_obj(posix.logger, posix.setup, posix.config) + + def setUp(self): + self.ptool = self.get_obj() + + def tearDown(self): + # just to guarantee that we start fresh each time + self.ptool = None + + def test_fully_specified(self): + # fully_specified should do no checking on the abstract + # POSIXTool object + self.assertTrue(self.ptool.fully_specified(Mock())) + + @patch('os.stat') + @patch('os.walk') + @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_metadata" % + test_obj.__name__) + def test_verify(self, mock_verify, mock_walk, mock_stat): + entry = lxml.etree.Element("Path", name="/test", type="file") + + mock_stat.return_value = MagicMock() + mock_verify.return_value = False + self.assertFalse(self.ptool.verify(entry, [])) + mock_verify.assert_called_with(entry) + + mock_verify.reset_mock() + mock_verify.return_value = True + self.assertTrue(self.ptool.verify(entry, [])) + mock_verify.assert_called_with(entry) + + mock_verify.reset_mock() + entry.set("recursive", "true") + walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]), + ("/dir1", ["dir3"], []), + ("/dir2", [], ["file3", "file4"])] + mock_walk.return_value = walk_rv + self.assertTrue(self.ptool.verify(entry, [])) + mock_walk.assert_called_with(entry.get("name")) + all_verifies = [call(entry)] + for root, dirs, files in walk_rv: + all_verifies.extend([call(entry, path=os.path.join(root, p)) + for p in dirs + files]) + self.assertItemsEqual(mock_verify.call_args_list, all_verifies) + + @patch('os.walk') + @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__) + def test_install(self, mock_set_perms, mock_walk): + entry = lxml.etree.Element("Path", name="/test", type="file") + + mock_set_perms.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_set_perms.assert_called_with(entry) + + mock_set_perms.reset_mock() + entry.set("recursive", "true") + walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]), + ("/dir1", ["dir3"], []), + ("/dir2", [], ["file3", "file4"])] + mock_walk.return_value = walk_rv + + mock_set_perms.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_walk.assert_called_with(entry.get("name")) + all_set_perms = [call(entry)] + for root, dirs, files in walk_rv: + all_set_perms.extend([call(entry, path=os.path.join(root, p)) + for p in dirs + files]) + self.assertItemsEqual(mock_set_perms.call_args_list, + all_set_perms) + + mock_walk.reset_mock() + mock_set_perms.reset_mock() + + def set_perms_rv(entry, path=None): + if path == '/dir2/file3': + return False + else: + return True + mock_set_perms.side_effect = set_perms_rv + + self.assertFalse(self.ptool.install(entry)) + mock_walk.assert_called_with(entry.get("name")) + self.assertItemsEqual(mock_set_perms.call_args_list, all_set_perms) + + @patch('os.lstat') + @patch("os.unlink") + @patch("os.path.isdir") + @patch("shutil.rmtree") + def test_exists(self, mock_rmtree, mock_isdir, mock_unlink, mock_lstat): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + + mock_lstat.side_effect = OSError + self.assertFalse(self.ptool._exists(entry)) + mock_lstat.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + mock_lstat.reset_mock() + mock_unlink.reset_mock() + rv = MagicMock() + mock_lstat.return_value = rv + mock_lstat.side_effect = None + self.assertEqual(self.ptool._exists(entry), rv) + mock_lstat.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + mock_lstat.reset_mock() + mock_unlink.reset_mock() + mock_isdir.return_value = False + self.assertFalse(self.ptool._exists(entry, remove=True)) + mock_isdir.assert_called_with(entry.get('name')) + mock_lstat.assert_called_with(entry.get('name')) + mock_unlink.assert_called_with(entry.get('name')) + self.assertFalse(mock_rmtree.called) + + mock_lstat.reset_mock() + mock_isdir.reset_mock() + mock_unlink.reset_mock() + mock_rmtree.reset_mock() + mock_isdir.return_value = True + self.assertFalse(self.ptool._exists(entry, remove=True)) + mock_isdir.assert_called_with(entry.get('name')) + mock_lstat.assert_called_with(entry.get('name')) + mock_rmtree.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + mock_isdir.reset_mock() + mock_lstat.reset_mock() + mock_unlink.reset_mock() + mock_rmtree.reset_mock() + mock_rmtree.side_effect = OSError + self.assertEqual(self.ptool._exists(entry, remove=True), rv) + mock_isdir.assert_called_with(entry.get('name')) + mock_lstat.assert_called_with(entry.get('name')) + mock_rmtree.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + @patch("os.chown") + @patch("os.chmod") + @patch("os.utime") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_acls" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_secontext" % + test_obj.__name__) + def test_set_perms(self, mock_set_secontext, mock_set_acls, mock_norm_gid, + mock_norm_uid, mock_utime, mock_chmod, mock_chown): + def reset(): + mock_set_secontext.reset_mock() + mock_set_acls.reset_mock() + mock_norm_gid.reset_mock() + mock_norm_uid.reset_mock() + mock_chmod.reset_mock() + mock_chown.reset_mock() + mock_utime.reset_mock() + + entry = lxml.etree.Element("Path", name="/etc/foo", to="/etc/bar", + type="symlink") + mock_set_acls.return_value = True + mock_set_secontext.return_value = True + self.assertTrue(self.ptool._set_perms(entry)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + entry = lxml.etree.Element("Path", name="/etc/foo", owner="owner", + group="group", perms="644", type="file") + mock_norm_uid.return_value = 10 + mock_norm_gid.return_value = 100 + + reset() + self.assertTrue(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + self.assertFalse(mock_utime.called) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + reset() + mtime = 1344459042 + entry.set("mtime", str(mtime)) + self.assertTrue(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + reset() + self.assertTrue(self.ptool._set_perms(entry, path='/etc/bar')) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with('/etc/bar', 10, 100) + mock_chmod.assert_called_with('/etc/bar', int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path='/etc/bar') + mock_set_acls.assert_called_with(entry, path='/etc/bar') + + # test dev_type modification of perms, failure of chown + reset() + def chown_rv(path, owner, group): + if owner == 0 and group == 0: + return True + else: + raise KeyError + os.chown.side_effect = chown_rv + entry.set("type", "device") + entry.set("dev_type", list(device_map.keys())[0]) + self.assertFalse(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 0, 0) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8) | list(device_map.values())[0]) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + # test failure of chmod + reset() + os.chown.side_effect = None + os.chmod.side_effect = OSError + entry.set("type", "file") + del entry.attrib["dev_type"] + self.assertFalse(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + # test that even when everything fails, we try to do it all. + # e.g., when chmod fails, we still try to apply acls, set + # selinux context, etc. + reset() + os.chown.side_effect = OSError + os.utime.side_effect = OSError + mock_set_acls.return_value = False + mock_set_secontext.return_value = False + self.assertFalse(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + @skipUnless(has_acls, "ACLS not found, skipping") + @patchIf(has_acls, "posix1e.ACL") + @patchIf(has_acls, "posix1e.Entry") + @patch("os.path.isdir") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" % + test_obj.__name__) + def test_set_acls(self, mock_list_entry_acls, mock_norm_gid, mock_norm_uid, + mock_isdir, mock_Entry, mock_ACL): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + + # disable acls for the initial test + Bcfg2.Client.Tools.POSIX.base.has_acls = False + self.assertTrue(self.ptool._set_acls(entry)) + Bcfg2.Client.Tools.POSIX.base.has_acls = True + + # build a set of file ACLs to return from posix1e.ACL(file=...) + file_acls = [] + acl = Mock() + acl.tag_type = posix1e.ACL_USER + acl.name = "remove" + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_GROUP + acl.name = "remove" + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_MASK + acl.name = "keep" + file_acls.append(acl) + remove_acls = [a for a in file_acls if a.name == "remove"] + + # build a set of ACLs listed on the entry as returned by + # _list_entry_acls() + entry_acls = {("default", posix1e.ACL_USER, "user"): 7, + ("access", posix1e.ACL_GROUP, "group"): 5} + mock_list_entry_acls.return_value = entry_acls + mock_norm_uid.return_value = 10 + mock_norm_gid.return_value = 100 + + # set up the unreasonably complex return value for + # posix1e.ACL(), which has three separate uses + fileacl_rv = MagicMock() + fileacl_rv.valid.return_value = True + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv = MagicMock() + filedef_rv.valid.return_value = True + filedef_rv.__iter__.return_value = iter(file_acls) + acl_rv = MagicMock() + def mock_acl_rv(file=None, filedef=None, acl=None): + if file: + return fileacl_rv + elif filedef: + return filedef_rv + elif acl: + return acl_rv + + # set up the equally unreasonably complex return value for + # posix1e.Entry, which returns a new entry and adds it to + # an ACL, so we have to track the Mock objects it returns. + # why can't they just have an acl.add_entry() method?!? + acl_entries = [] + def mock_entry_rv(acl): + rv = MagicMock() + rv.acl = acl + rv.permset = set() + acl_entries.append(rv) + return rv + mock_Entry.side_effect = mock_entry_rv + + def reset(): + mock_isdir.reset_mock() + mock_ACL.reset_mock() + mock_Entry.reset_mock() + fileacl_rv.reset_mock() + + # test fs mounted noacl + mock_ACL.side_effect = IOError(95, "Operation not permitted") + self.assertFalse(self.ptool._set_acls(entry)) + + # test other error + reset() + mock_ACL.side_effect = IOError + self.assertFalse(self.ptool._set_acls(entry)) + + reset() + mock_ACL.side_effect = mock_acl_rv + mock_isdir.return_value = True + self.assertTrue(self.ptool._set_acls(entry)) + self.assertItemsEqual(mock_ACL.call_args_list, + [call(file=entry.get("name")), + call(filedef=entry.get("name"))]) + self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list, + [call(a) for a in remove_acls]) + self.assertItemsEqual(filedef_rv.delete_entry.call_args_list, + [call(a) for a in remove_acls]) + mock_list_entry_acls.assert_called_with(entry) + mock_norm_uid.assert_called_with("user") + mock_norm_gid.assert_called_with("group") + fileacl_rv.calc_mask.assert_any_call() + fileacl_rv.applyto.assert_called_with(entry.get("name"), + posix1e.ACL_TYPE_ACCESS) + filedef_rv.calc_mask.assert_any_call() + filedef_rv.applyto.assert_called_with(entry.get("name"), + posix1e.ACL_TYPE_DEFAULT) + + # build tuples of the Entry objects that were added to acl + # and defaacl so they're easier to compare for equality + added_acls = [] + for acl in acl_entries: + added_acls.append((acl.acl, acl.tag_type, acl.qualifier, + sum(acl.permset))) + self.assertItemsEqual(added_acls, + [(filedef_rv, posix1e.ACL_USER, 10, 7), + (fileacl_rv, posix1e.ACL_GROUP, 100, 5)]) + + reset() + # have to reassign these because they're iterators, and + # they've already been iterated over once + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv.__iter__.return_value = iter(file_acls) + mock_list_entry_acls.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + mock_isdir.return_value = False + acl_entries = [] + self.assertTrue(self.ptool._set_acls(entry, path="/bin/bar")) + mock_ACL.assert_called_with(file="/bin/bar") + self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list, + [call(a) for a in remove_acls]) + mock_list_entry_acls.assert_called_with(entry) + mock_norm_gid.assert_called_with("group") + fileacl_rv.calc_mask.assert_any_call() + fileacl_rv.applyto.assert_called_with("/bin/bar", + posix1e.ACL_TYPE_ACCESS) + + added_acls = [] + for acl in acl_entries: + added_acls.append((acl.acl, acl.tag_type, acl.qualifier, + sum(acl.permset))) + self.assertItemsEqual(added_acls, + [(fileacl_rv, posix1e.ACL_GROUP, 100, 5)]) + + @skipUnless(has_selinux, "SELinux not found, skipping") + @patchIf(has_selinux, "selinux.restorecon") + @patchIf(has_selinux, "selinux.lsetfilecon") + def test_set_secontext(self, mock_lsetfilecon, mock_restorecon): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + + # disable selinux for the initial test + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + self.assertTrue(self.ptool._set_secontext(entry)) + Bcfg2.Client.Tools.POSIX.base.has_selinux = True + + # no context given + self.assertTrue(self.ptool._set_secontext(entry)) + self.assertFalse(mock_restorecon.called) + self.assertFalse(mock_lsetfilecon.called) + + mock_restorecon.reset_mock() + mock_lsetfilecon.reset_mock() + entry.set("secontext", "__default__") + self.assertTrue(self.ptool._set_secontext(entry)) + mock_restorecon.assert_called_with(entry.get("name")) + self.assertFalse(mock_lsetfilecon.called) + + mock_restorecon.reset_mock() + mock_lsetfilecon.reset_mock() + mock_lsetfilecon.return_value = 0 + entry.set("secontext", "foo_t") + self.assertTrue(self.ptool._set_secontext(entry)) + self.assertFalse(mock_restorecon.called) + mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t") + + mock_restorecon.reset_mock() + mock_lsetfilecon.reset_mock() + mock_lsetfilecon.return_value = 1 + self.assertFalse(self.ptool._set_secontext(entry)) + self.assertFalse(mock_restorecon.called) + mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t") + + @patch("grp.getgrnam") + def test_norm_gid(self, mock_getgrnam): + self.assertEqual(5, self.ptool._norm_gid("5")) + self.assertFalse(mock_getgrnam.called) + + mock_getgrnam.reset_mock() + mock_getgrnam.return_value = ("group", "x", 5, []) + self.assertEqual(5, self.ptool._norm_gid("group")) + mock_getgrnam.assert_called_with("group") + + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__) + def test_norm_entry_gid(self, mock_norm_gid): + entry = lxml.etree.Element("Path", name="/test", type="file", + group="group", owner="user") + mock_norm_gid.return_value = 10 + self.assertEqual(10, self.ptool._norm_entry_gid(entry)) + mock_norm_gid.assert_called_with(entry.get("group")) + + mock_norm_gid.reset_mock() + mock_norm_gid.side_effect = KeyError + self.assertEqual(0, self.ptool._norm_entry_gid(entry)) + mock_norm_gid.assert_called_with(entry.get("group")) + + @patch("pwd.getpwnam") + def test_norm_uid(self, mock_getpwnam): + self.assertEqual(5, self.ptool._norm_uid("5")) + self.assertFalse(mock_getpwnam.called) + + mock_getpwnam.reset_mock() + mock_getpwnam.return_value = ("user", "x", 5, 5, "User", "/home/user", + "/bin/zsh") + self.assertEqual(5, self.ptool._norm_uid("user")) + mock_getpwnam.assert_called_with("user") + + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__) + def test_norm_entry_uid(self, mock_norm_uid): + entry = lxml.etree.Element("Path", name="/test", type="file", + group="group", owner="user") + mock_norm_uid.return_value = 10 + self.assertEqual(10, self.ptool._norm_entry_uid(entry)) + mock_norm_uid.assert_called_with(entry.get("owner")) + + mock_norm_uid.reset_mock() + mock_norm_uid.side_effect = KeyError + self.assertEqual(0, self.ptool._norm_entry_uid(entry)) + mock_norm_uid.assert_called_with(entry.get("owner")) + + def test_norm_acl_perms(self): + # there's basically no reasonably way to test the Permset + # object parsing feature without writing our own Mock object + # that re-implements Permset.test(). silly pylibacl won't let + # us create standalone Entry or Permset objects. + self.assertEqual(5, self.ptool._norm_acl_perms("5")) + self.assertEqual(0, self.ptool._norm_acl_perms("55")) + self.assertEqual(5, self.ptool._norm_acl_perms("rx")) + self.assertEqual(5, self.ptool._norm_acl_perms("r-x")) + self.assertEqual(6, self.ptool._norm_acl_perms("wr-")) + self.assertEqual(0, self.ptool._norm_acl_perms("rwrw")) + self.assertEqual(0, self.ptool._norm_acl_perms("-")) + self.assertEqual(0, self.ptool._norm_acl_perms("a")) + self.assertEqual(6, self.ptool._norm_acl_perms("rwa")) + self.assertEqual(4, self.ptool._norm_acl_perms("rr")) + + @patch('os.stat') + def test__gather_data(self, mock_stat): + path = '/test' + mock_stat.side_effect = OSError + self.assertFalse(self.ptool._gather_data(path)[0]) + mock_stat.assert_called_with(path) + + mock_stat.reset_mock() + mock_stat.side_effect = None + # create a return value + stat_rv = MagicMock() + def stat_getitem(key): + if int(key) == stat.ST_UID: + return 0 + elif int(key) == stat.ST_GID: + return 10 + elif int(key) == stat.ST_MODE: + # return extra bits in the mode to emulate a device + # and ensure that they're stripped + return int('060660', 8) + stat_rv.__getitem__ = Mock(side_effect=stat_getitem) + mock_stat.return_value = stat_rv + + # disable selinux and acls for this call -- we test them + # separately so that we can skip those tests as appropriate + states = (Bcfg2.Client.Tools.POSIX.base.has_selinux, + Bcfg2.Client.Tools.POSIX.base.has_acls) + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + Bcfg2.Client.Tools.POSIX.base.has_acls = False + self.assertEqual(self.ptool._gather_data(path), + (stat_rv, '0', '10', '0660', None, None)) + Bcfg2.Client.Tools.POSIX.base.has_selinux, \ + Bcfg2.Client.Tools.POSIX.base.has_acls = states + mock_stat.assert_called_with(path) + + @skipUnless(has_selinux, "SELinux not found, skipping") + def test__gather_data_selinux(self): + context = 'system_u:object_r:root_t:s0' + path = '/test' + + @patch('os.stat') + @patchIf(has_selinux, "selinux.getfilecon") + def inner(mock_getfilecon, mock_stat): + mock_getfilecon.return_value = [len(context) + 1, context] + mock_stat.return_value = MagicMock() + # disable acls for this call and test them separately + state = Bcfg2.Client.Tools.POSIX.base.has_acls + Bcfg2.Client.Tools.POSIX.base.has_acls = False + self.assertEqual(self.ptool._gather_data(path)[4], 'root_t') + Bcfg2.Client.Tools.POSIX.base.has_acls = state + mock_getfilecon.assert_called_with(path) + + inner() + + @skipUnless(has_acls, "ACLS not found, skipping") + @patch('os.stat') + @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" % + test_obj.__name__) + def test__gather_data_acls(self, mock_list_file_acls, mock_stat): + acls = {("default", posix1e.ACL_USER, "testuser"): "rwx", + ("access", posix1e.ACL_GROUP, "testgroup"): "rx"} + mock_list_file_acls.return_value = acls + path = '/test' + mock_stat.return_value = MagicMock() + # disable selinux for this call and test it separately + state = Bcfg2.Client.Tools.POSIX.base.has_selinux + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + self.assertItemsEqual(self.ptool._gather_data(path)[5], acls) + Bcfg2.Client.Tools.POSIX.base.has_selinux = state + mock_list_file_acls.assert_called_with(path) + + @patchIf(has_selinux, "selinux.matchpathcon") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_acls" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._gather_data" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" % + test_obj.__name__) + def test_verify_metadata(self, mock_norm_gid, mock_norm_uid, + mock_gather_data, mock_verify_acls, + mock_matchpathcon): + entry = lxml.etree.Element("Path", name="/test", type="file", + group="group", owner="user", perms="664", + secontext='etc_t') + # _verify_metadata() mutates the entry, so we keep a backup so we + # can start fresh every time + orig_entry = copy.deepcopy(entry) + + def reset(): + mock_gather_data.reset_mock() + mock_verify_acls.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + return copy.deepcopy(orig_entry) + + # test nonexistent file + mock_gather_data.return_value = (False, None, None, None, None, None) + self.assertFalse(self.ptool._verify_metadata(entry)) + self.assertEqual(entry.get("current_exists", "").lower(), "false") + mock_gather_data.assert_called_with(entry.get("name")) + + # expected data. tuple of attr, return value index, value + expected = [('current_owner', 1, '0'), + ('current_group', 2, '10'), + ('current_perms', 3, '0664'), + ('current_secontext', 4, 'etc_t')] + mock_norm_uid.return_value = 0 + mock_norm_gid.return_value = 10 + gather_data_rv = [MagicMock(), None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + + entry = reset() + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + + # test when secontext is None + entry = reset() + gather_data_rv[4] = None + sestate = Bcfg2.Client.Tools.POSIX.base.has_selinux + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + if attr != 'current_secontext': + self.assertEqual(entry.get(attr), val) + Bcfg2.Client.Tools.POSIX.base.has_selinux = sestate + + gather_data_rv = [MagicMock(), None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + mock_gather_data.return_value = tuple(gather_data_rv) + + mtime = 1344430414 + entry = reset() + entry.set("mtime", str(mtime)) + stat_rv = MagicMock() + stat_rv.__getitem__.return_value = mtime + gather_data_rv[0] = stat_rv + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + # failure modes for each checked datum. tuple of changed attr, + # return value index, new (failing) value + failures = [('current_owner', 1, '10'), + ('current_group', 2, '100'), + ('current_perms', 3, '0660')] + if has_selinux: + failures.append(('current_secontext', 4, 'root_t')) + + for fail_attr, fail_idx, fail_val in failures: + entry = reset() + entry.set("mtime", str(mtime)) + gather_data_rv = [stat_rv, None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + gather_data_rv[fail_idx] = fail_val + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertFalse(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + self.assertEqual(entry.get(fail_attr), fail_val) + for attr, idx, val in expected: + if attr != fail_attr: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + # failure mode for mtime + fail_mtime = 1344431162 + entry = reset() + entry.set("mtime", str(mtime)) + fail_stat_rv = MagicMock() + fail_stat_rv.__getitem__.return_value = fail_mtime + gather_data_rv = [fail_stat_rv, None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertFalse(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(fail_mtime)) + + if has_selinux: + # test success and failure for __default__ secontext + entry = reset() + entry.set("mtime", str(mtime)) + entry.set("secontext", "__default__") + + context1 = "system_u:object_r:etc_t:s0" + context2 = "system_u:object_r:root_t:s0" + mock_matchpathcon.return_value = [1 + len(context1), + context1] + gather_data_rv = [stat_rv, None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, + path=entry.get("name")) + mock_matchpathcon.assert_called_with(entry.get("name"), 0) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + entry = reset() + entry.set("mtime", str(mtime)) + entry.set("secontext", "__default__") + mock_matchpathcon.return_value = [1 + len(context2), + context2] + self.assertFalse(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, + path=entry.get("name")) + mock_matchpathcon.assert_called_with(entry.get("name"), 0) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + @skipUnless(has_acls, "ACLS not found, skipping") + def test_list_entry_acls(self): + entry = lxml.etree.Element("Path", name="/test", type="file") + lxml.etree.SubElement(entry, "ACL", scope="user", type="default", + user="user", perms="rwx") + lxml.etree.SubElement(entry, "ACL", scope="group", type="access", + group="group", perms="5") + self.assertItemsEqual(self.ptool._list_entry_acls(entry), + {("default", posix1e.ACL_USER, "user"): 7, + ("access", posix1e.ACL_GROUP, "group"): 5}) + + @skipUnless(has_acls, "ACLS not found, skipping") + @patch("pwd.getpwuid") + @patch("grp.getgrgid") + @patch("os.path.isdir") + def test_list_file_acls(self, mock_isdir, mock_getgrgid, mock_getpwuid, + mock_ACL): + path = '/test' + + # build a set of file ACLs to return from posix1e.ACL(file=...) + file_acls = [] + acl = Mock() + acl.tag_type = posix1e.ACL_USER + acl.qualifier = 10 + # yes, this is a bogus permset. thanks to _norm_acl_perms + # it works and is easier than many of the alternatives. + acl.permset = 'rwx' + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_GROUP + acl.qualifier = 100 + acl.permset = 'rx' + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_MASK + file_acls.append(acl) + acls = {("access", posix1e.ACL_USER, "user"): 7, + ("access", posix1e.ACL_GROUP, "group"): 5} + + # set up the unreasonably complex return value for + # posix1e.ACL(), which has two separate uses + fileacl_rv = MagicMock() + fileacl_rv.valid.return_value = True + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv = MagicMock() + filedef_rv.valid.return_value = True + filedef_rv.__iter__.return_value = iter(file_acls) + def mock_acl_rv(file=None, filedef=None): + if file: + return fileacl_rv + elif filedef: + return filedef_rv + # other return values + mock_isdir.return_value = False + mock_getgrgid.return_value = ("group", "x", 5, []) + mock_getpwuid.return_value = ("user", "x", 5, 5, "User", + "/home/user", "/bin/zsh") + + def reset(): + mock_isdir.reset_mock() + mock_getgrgid.reset_mock() + mock_getpwuid.reset_mock() + mock_ACL.reset_mock() + + mock_ACL.side_effect = IOError(95, "Operation not supported") + self.assertItemsEqual(self.ptool._list_file_acls(path), dict()) + + reset() + mock_ACL.side_effect = IOError + self.assertItemsEqual(self.ptool._list_file_acls(path), dict()) + + reset() + mock_ACL.side_effect = mock_acl_rv + self.assertItemsEqual(self.ptool._list_file_acls(path), acls) + mock_isdir.assert_called_with(path) + mock_getgrgid.assert_called_with(100) + mock_getpwuid.assert_called_with(10) + mock_ACL.assert_called_with(file=path) + + reset() + mock_isdir.return_value = True + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv.__iter__.return_value = iter(file_acls) + + defacls = acls + for akey, perms in acls.items(): + defacls[('default', akey[1], akey[2])] = perms + self.assertItemsEqual(self.ptool._list_file_acls(path), defacls) + mock_isdir.assert_called_with(path) + self.assertItemsEqual(mock_getgrgid.call_args_list, + [call(100), call(100)]) + self.assertItemsEqual(mock_getpwuid.call_args_list, + [call(10), call(10)]) + self.assertItemsEqual(mock_ACL.call_args_list, + [call(file=path), call(filedef=path)]) + + if has_acls: + # python 2.6 applies decorators at compile-time, not at + # run-time, so we can't do these as decorators because + # pylibacl might not be installed. (If it's not, this test + # will be skipped, so as long as this is done at run-time + # we're safe.) + test_list_file_acls = patch("posix1e.ACL")(test_list_file_acls) + + @skipUnless(has_acls, "ACLS not found, skipping") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" % + test_obj.__name__) + def test_verify_acls(self, mock_list_entry_acls, mock_list_file_acls): + entry = lxml.etree.Element("Path", name="/test", type="file") + # we can't test to make sure that errors get properly sorted + # into (missing, extra, wrong) without refactoring the + # _verify_acls code, and I don't feel like doing that, so eff + # it. let's just test to make sure that failures are + # identified at all for now. + + acls = {("access", posix1e.ACL_USER, "user"): 7, + ("default", posix1e.ACL_GROUP, "group"): 5} + extra_acls = copy.deepcopy(acls) + extra_acls[("access", posix1e.ACL_USER, "user2")] = 4 + + mock_list_entry_acls.return_value = acls + mock_list_file_acls.return_value = acls + self.assertTrue(self.ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + # test missing + mock_list_entry_acls.reset_mock() + mock_list_file_acls.reset_mock() + mock_list_file_acls.return_value = extra_acls + self.assertFalse(self.ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + # test extra + mock_list_entry_acls.reset_mock() + mock_list_file_acls.reset_mock() + mock_list_entry_acls.return_value = extra_acls + mock_list_file_acls.return_value = acls + self.assertFalse(self.ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + # test wrong + wrong_acls = copy.deepcopy(extra_acls) + wrong_acls[("access", posix1e.ACL_USER, "user2")] = 5 + mock_list_entry_acls.reset_mock() + mock_list_file_acls.reset_mock() + mock_list_entry_acls.return_value = extra_acls + mock_list_file_acls.return_value = wrong_acls + self.assertFalse(self.ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + @patch("os.makedirs") + @patch("os.path.exists") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__) + def test_makedirs(self, mock_set_perms, mock_exists, mock_makedirs): + entry = lxml.etree.Element("Path", name="/test/foo/bar", + type="directory") + + def reset(): + mock_exists.reset_mock() + mock_set_perms.reset_mock() + mock_makedirs.reset_mock() + + mock_set_perms.return_value = True + def path_exists_rv(path): + if path == "/test": + return True + else: + return False + mock_exists.side_effect = path_exists_rv + self.assertTrue(self.ptool._makedirs(entry)) + self.assertItemsEqual(mock_exists.call_args_list, + [call("/test"), call("/test/foo"), + call("/test/foo/bar")]) + self.assertItemsEqual(mock_set_perms.call_args_list, + [call(entry, path="/test/foo"), + call(entry, path="/test/foo/bar")]) + mock_makedirs.assert_called_with(entry.get("name")) + + reset() + mock_makedirs.side_effect = OSError + self.assertFalse(self.ptool._makedirs(entry)) + self.assertItemsEqual(mock_set_perms.call_args_list, + [call(entry, path="/test/foo"), + call(entry, path="/test/foo/bar")]) + + reset() + mock_makedirs.side_effect = None + def set_perms_rv(entry, path=None): + if path == '/test/foo': + return False + else: + return True + mock_set_perms.side_effect = set_perms_rv + self.assertFalse(self.ptool._makedirs(entry)) + self.assertItemsEqual(mock_exists.call_args_list, + [call("/test"), call("/test/foo"), + call("/test/foo/bar")]) + self.assertItemsEqual(mock_set_perms.call_args_list, + [call(entry, path="/test/foo"), + call(entry, path="/test/foo/bar")]) + mock_makedirs.assert_called_with(entry.get("name")) -- cgit v1.2.3-1-g7c22