From d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 4 Sep 2012 09:52:57 -0400 Subject: reorganized testsuite to allow tests on stuff outside of src --- .../TestClient/TestTools/TestPOSIX/Testbase.py | 991 --------------------- 1 file changed, 991 deletions(-) delete mode 100644 testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py (limited to 'testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py') diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py deleted file mode 100644 index b447ab642..000000000 --- a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py +++ /dev/null @@ -1,991 +0,0 @@ -import os -import sys -import copy -import stat -import lxml.etree -from mock import Mock, MagicMock, patch -import Bcfg2.Client.Tools -from Bcfg2.Client.Tools.POSIX.base import * - -# add all parent testsuite directories to sys.path to allow (most) -# relative imports in python 2.4 -path = os.path.dirname(__file__) -while path != "/": - if os.path.basename(path).lower().startswith("test"): - sys.path.append(path) - if os.path.basename(path) == "testsuite": - break - path = os.path.dirname(path) -from Test__init import get_posix_object -from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ - skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ - patchIf, datastore - -try: - import selinux - has_selinux = True -except ImportError: - has_selinux = False - -try: - import posix1e - has_acls = True -except ImportError: - has_acls = False - -class TestPOSIXTool(Bcfg2TestCase): - test_obj = POSIXTool - - def get_obj(self, posix=None): - if posix is None: - posix = get_posix_object() - return self.test_obj(posix.logger, posix.setup, posix.config) - - def setUp(self): - self.ptool = self.get_obj() - - def tearDown(self): - # just to guarantee that we start fresh each time - self.ptool = None - - def test_fully_specified(self): - # fully_specified should do no checking on the abstract - # POSIXTool object - self.assertTrue(self.ptool.fully_specified(Mock())) - - @patch('os.stat') - @patch('os.walk') - @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_metadata" % - test_obj.__name__) - def test_verify(self, mock_verify, mock_walk, mock_stat): - entry = lxml.etree.Element("Path", name="/test", type="file") - - mock_stat.return_value = MagicMock() - mock_verify.return_value = False - self.assertFalse(self.ptool.verify(entry, [])) - mock_verify.assert_called_with(entry) - - mock_verify.reset_mock() - mock_verify.return_value = True - self.assertTrue(self.ptool.verify(entry, [])) - mock_verify.assert_called_with(entry) - - mock_verify.reset_mock() - entry.set("recursive", "true") - walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]), - ("/dir1", ["dir3"], []), - ("/dir2", [], ["file3", "file4"])] - mock_walk.return_value = walk_rv - self.assertTrue(self.ptool.verify(entry, [])) - mock_walk.assert_called_with(entry.get("name")) - all_verifies = [call(entry)] - for root, dirs, files in walk_rv: - all_verifies.extend([call(entry, path=os.path.join(root, p)) - for p in dirs + files]) - self.assertItemsEqual(mock_verify.call_args_list, all_verifies) - - @patch('os.walk') - @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__) - def test_install(self, mock_set_perms, mock_walk): - entry = lxml.etree.Element("Path", name="/test", type="file") - - mock_set_perms.return_value = True - self.assertTrue(self.ptool.install(entry)) - mock_set_perms.assert_called_with(entry) - - mock_set_perms.reset_mock() - entry.set("recursive", "true") - walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]), - ("/dir1", ["dir3"], []), - ("/dir2", [], ["file3", "file4"])] - mock_walk.return_value = walk_rv - - mock_set_perms.return_value = True - self.assertTrue(self.ptool.install(entry)) - mock_walk.assert_called_with(entry.get("name")) - all_set_perms = [call(entry)] - for root, dirs, files in walk_rv: - all_set_perms.extend([call(entry, path=os.path.join(root, p)) - for p in dirs + files]) - self.assertItemsEqual(mock_set_perms.call_args_list, - all_set_perms) - - mock_walk.reset_mock() - mock_set_perms.reset_mock() - - def set_perms_rv(entry, path=None): - if path == '/dir2/file3': - return False - else: - return True - mock_set_perms.side_effect = set_perms_rv - - self.assertFalse(self.ptool.install(entry)) - mock_walk.assert_called_with(entry.get("name")) - self.assertItemsEqual(mock_set_perms.call_args_list, all_set_perms) - - @patch('os.lstat') - @patch("os.unlink") - @patch("os.path.isdir") - @patch("shutil.rmtree") - def test_exists(self, mock_rmtree, mock_isdir, mock_unlink, mock_lstat): - entry = lxml.etree.Element("Path", name="/etc/foo", type="file") - - mock_lstat.side_effect = OSError - self.assertFalse(self.ptool._exists(entry)) - mock_lstat.assert_called_with(entry.get('name')) - self.assertFalse(mock_unlink.called) - - mock_lstat.reset_mock() - mock_unlink.reset_mock() - rv = MagicMock() - mock_lstat.return_value = rv - mock_lstat.side_effect = None - self.assertEqual(self.ptool._exists(entry), rv) - mock_lstat.assert_called_with(entry.get('name')) - self.assertFalse(mock_unlink.called) - - mock_lstat.reset_mock() - mock_unlink.reset_mock() - mock_isdir.return_value = False - self.assertFalse(self.ptool._exists(entry, remove=True)) - mock_isdir.assert_called_with(entry.get('name')) - mock_lstat.assert_called_with(entry.get('name')) - mock_unlink.assert_called_with(entry.get('name')) - self.assertFalse(mock_rmtree.called) - - mock_lstat.reset_mock() - mock_isdir.reset_mock() - mock_unlink.reset_mock() - mock_rmtree.reset_mock() - mock_isdir.return_value = True - self.assertFalse(self.ptool._exists(entry, remove=True)) - mock_isdir.assert_called_with(entry.get('name')) - mock_lstat.assert_called_with(entry.get('name')) - mock_rmtree.assert_called_with(entry.get('name')) - self.assertFalse(mock_unlink.called) - - mock_isdir.reset_mock() - mock_lstat.reset_mock() - mock_unlink.reset_mock() - mock_rmtree.reset_mock() - mock_rmtree.side_effect = OSError - self.assertEqual(self.ptool._exists(entry, remove=True), rv) - mock_isdir.assert_called_with(entry.get('name')) - mock_lstat.assert_called_with(entry.get('name')) - mock_rmtree.assert_called_with(entry.get('name')) - self.assertFalse(mock_unlink.called) - - @patch("os.chown") - @patch("os.chmod") - @patch("os.utime") - @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" % - test_obj.__name__) - @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" % - test_obj.__name__) - @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_acls" % test_obj.__name__) - @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_secontext" % - test_obj.__name__) - def test_set_perms(self, mock_set_secontext, mock_set_acls, mock_norm_gid, - mock_norm_uid, mock_utime, mock_chmod, mock_chown): - def reset(): - mock_set_secontext.reset_mock() - mock_set_acls.reset_mock() - mock_norm_gid.reset_mock() - mock_norm_uid.reset_mock() - mock_chmod.reset_mock() - mock_chown.reset_mock() - mock_utime.reset_mock() - - entry = lxml.etree.Element("Path", name="/etc/foo", to="/etc/bar", - type="symlink") - mock_set_acls.return_value = True - mock_set_secontext.return_value = True - self.assertTrue(self.ptool._set_perms(entry)) - mock_set_secontext.assert_called_with(entry, path=entry.get("name")) - mock_set_acls.assert_called_with(entry, path=entry.get("name")) - - entry = lxml.etree.Element("Path", name="/etc/foo", owner="owner", - group="group", perms="644", type="file") - mock_norm_uid.return_value = 10 - mock_norm_gid.return_value = 100 - - reset() - self.assertTrue(self.ptool._set_perms(entry)) - mock_norm_uid.assert_called_with(entry) - mock_norm_gid.assert_called_with(entry) - mock_chown.assert_called_with(entry.get("name"), 10, 100) - mock_chmod.assert_called_with(entry.get("name"), - int(entry.get("perms"), 8)) - self.assertFalse(mock_utime.called) - mock_set_secontext.assert_called_with(entry, path=entry.get("name")) - mock_set_acls.assert_called_with(entry, path=entry.get("name")) - - reset() - mtime = 1344459042 - entry.set("mtime", str(mtime)) - self.assertTrue(self.ptool._set_perms(entry)) - mock_norm_uid.assert_called_with(entry) - mock_norm_gid.assert_called_with(entry) - mock_chown.assert_called_with(entry.get("name"), 10, 100) - mock_chmod.assert_called_with(entry.get("name"), - int(entry.get("perms"), 8)) - mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) - mock_set_secontext.assert_called_with(entry, path=entry.get("name")) - mock_set_acls.assert_called_with(entry, path=entry.get("name")) - - reset() - self.assertTrue(self.ptool._set_perms(entry, path='/etc/bar')) - mock_norm_uid.assert_called_with(entry) - mock_norm_gid.assert_called_with(entry) - mock_chown.assert_called_with('/etc/bar', 10, 100) - mock_chmod.assert_called_with('/etc/bar', int(entry.get("perms"), 8)) - mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) - mock_set_secontext.assert_called_with(entry, path='/etc/bar') - mock_set_acls.assert_called_with(entry, path='/etc/bar') - - # test dev_type modification of perms, failure of chown - reset() - def chown_rv(path, owner, group): - if owner == 0 and group == 0: - return True - else: - raise KeyError - os.chown.side_effect = chown_rv - entry.set("type", "device") - entry.set("dev_type", list(device_map.keys())[0]) - self.assertFalse(self.ptool._set_perms(entry)) - mock_norm_uid.assert_called_with(entry) - mock_norm_gid.assert_called_with(entry) - mock_chown.assert_called_with(entry.get("name"), 0, 0) - mock_chmod.assert_called_with(entry.get("name"), - int(entry.get("perms"), 8) | list(device_map.values())[0]) - mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) - mock_set_secontext.assert_called_with(entry, path=entry.get("name")) - mock_set_acls.assert_called_with(entry, path=entry.get("name")) - - # test failure of chmod - reset() - os.chown.side_effect = None - os.chmod.side_effect = OSError - entry.set("type", "file") - del entry.attrib["dev_type"] - self.assertFalse(self.ptool._set_perms(entry)) - mock_norm_uid.assert_called_with(entry) - mock_norm_gid.assert_called_with(entry) - mock_chown.assert_called_with(entry.get("name"), 10, 100) - mock_chmod.assert_called_with(entry.get("name"), - int(entry.get("perms"), 8)) - mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) - mock_set_secontext.assert_called_with(entry, path=entry.get("name")) - mock_set_acls.assert_called_with(entry, path=entry.get("name")) - - # test that even when everything fails, we try to do it all. - # e.g., when chmod fails, we still try to apply acls, set - # selinux context, etc. - reset() - os.chown.side_effect = OSError - os.utime.side_effect = OSError - mock_set_acls.return_value = False - mock_set_secontext.return_value = False - self.assertFalse(self.ptool._set_perms(entry)) - mock_norm_uid.assert_called_with(entry) - mock_norm_gid.assert_called_with(entry) - mock_chown.assert_called_with(entry.get("name"), 10, 100) - mock_chmod.assert_called_with(entry.get("name"), - int(entry.get("perms"), 8)) - mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) - mock_set_secontext.assert_called_with(entry, path=entry.get("name")) - mock_set_acls.assert_called_with(entry, path=entry.get("name")) - - @skipUnless(has_acls, "ACLS not found, skipping") - @patchIf(has_acls, "posix1e.ACL") - @patchIf(has_acls, "posix1e.Entry") - @patch("os.path.isdir") - @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__) - @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__) - @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" % - test_obj.__name__) - def test_set_acls(self, mock_list_entry_acls, mock_norm_gid, mock_norm_uid, - mock_isdir, mock_Entry, mock_ACL): - entry = lxml.etree.Element("Path", name="/etc/foo", type="file") - - # disable acls for the initial test - Bcfg2.Client.Tools.POSIX.base.has_acls = False - self.assertTrue(self.ptool._set_acls(entry)) - Bcfg2.Client.Tools.POSIX.base.has_acls = True - - # build a set of file ACLs to return from posix1e.ACL(file=...) - file_acls = [] - acl = Mock() - acl.tag_type = posix1e.ACL_USER - acl.name = "remove" - file_acls.append(acl) - acl = Mock() - acl.tag_type = posix1e.ACL_GROUP - acl.name = "remove" - file_acls.append(acl) - acl = Mock() - acl.tag_type = posix1e.ACL_MASK - acl.name = "keep" - file_acls.append(acl) - remove_acls = [a for a in file_acls if a.name == "remove"] - - # build a set of ACLs listed on the entry as returned by - # _list_entry_acls() - entry_acls = {("default", posix1e.ACL_USER, "user"): 7, - ("access", posix1e.ACL_GROUP, "group"): 5} - mock_list_entry_acls.return_value = entry_acls - mock_norm_uid.return_value = 10 - mock_norm_gid.return_value = 100 - - # set up the unreasonably complex return value for - # posix1e.ACL(), which has three separate uses - fileacl_rv = MagicMock() - fileacl_rv.valid.return_value = True - fileacl_rv.__iter__.return_value = iter(file_acls) - filedef_rv = MagicMock() - filedef_rv.valid.return_value = True - filedef_rv.__iter__.return_value = iter(file_acls) - acl_rv = MagicMock() - def mock_acl_rv(file=None, filedef=None, acl=None): - if file: - return fileacl_rv - elif filedef: - return filedef_rv - elif acl: - return acl_rv - - # set up the equally unreasonably complex return value for - # posix1e.Entry, which returns a new entry and adds it to - # an ACL, so we have to track the Mock objects it returns. - # why can't they just have an acl.add_entry() method?!? - acl_entries = [] - def mock_entry_rv(acl): - rv = MagicMock() - rv.acl = acl - rv.permset = set() - acl_entries.append(rv) - return rv - mock_Entry.side_effect = mock_entry_rv - - def reset(): - mock_isdir.reset_mock() - mock_ACL.reset_mock() - mock_Entry.reset_mock() - fileacl_rv.reset_mock() - - # test fs mounted noacl - mock_ACL.side_effect = IOError(95, "Operation not permitted") - self.assertFalse(self.ptool._set_acls(entry)) - - # test other error - reset() - mock_ACL.side_effect = IOError - self.assertFalse(self.ptool._set_acls(entry)) - - reset() - mock_ACL.side_effect = mock_acl_rv - mock_isdir.return_value = True - self.assertTrue(self.ptool._set_acls(entry)) - self.assertItemsEqual(mock_ACL.call_args_list, - [call(file=entry.get("name")), - call(filedef=entry.get("name"))]) - self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list, - [call(a) for a in remove_acls]) - self.assertItemsEqual(filedef_rv.delete_entry.call_args_list, - [call(a) for a in remove_acls]) - mock_list_entry_acls.assert_called_with(entry) - mock_norm_uid.assert_called_with("user") - mock_norm_gid.assert_called_with("group") - fileacl_rv.calc_mask.assert_any_call() - fileacl_rv.applyto.assert_called_with(entry.get("name"), - posix1e.ACL_TYPE_ACCESS) - filedef_rv.calc_mask.assert_any_call() - filedef_rv.applyto.assert_called_with(entry.get("name"), - posix1e.ACL_TYPE_DEFAULT) - - # build tuples of the Entry objects that were added to acl - # and defaacl so they're easier to compare for equality - added_acls = [] - for acl in acl_entries: - added_acls.append((acl.acl, acl.tag_type, acl.qualifier, - sum(acl.permset))) - self.assertItemsEqual(added_acls, - [(filedef_rv, posix1e.ACL_USER, 10, 7), - (fileacl_rv, posix1e.ACL_GROUP, 100, 5)]) - - reset() - # have to reassign these because they're iterators, and - # they've already been iterated over once - fileacl_rv.__iter__.return_value = iter(file_acls) - filedef_rv.__iter__.return_value = iter(file_acls) - mock_list_entry_acls.reset_mock() - mock_norm_uid.reset_mock() - mock_norm_gid.reset_mock() - mock_isdir.return_value = False - acl_entries = [] - self.assertTrue(self.ptool._set_acls(entry, path="/bin/bar")) - mock_ACL.assert_called_with(file="/bin/bar") - self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list, - [call(a) for a in remove_acls]) - mock_list_entry_acls.assert_called_with(entry) - mock_norm_gid.assert_called_with("group") - fileacl_rv.calc_mask.assert_any_call() - fileacl_rv.applyto.assert_called_with("/bin/bar", - posix1e.ACL_TYPE_ACCESS) - - added_acls = [] - for acl in acl_entries: - added_acls.append((acl.acl, acl.tag_type, acl.qualifier, - sum(acl.permset))) - self.assertItemsEqual(added_acls, - [(fileacl_rv, posix1e.ACL_GROUP, 100, 5)]) - - @skipUnless(has_selinux, "SELinux not found, skipping") - @patchIf(has_selinux, "selinux.restorecon") - @patchIf(has_selinux, "selinux.lsetfilecon") - def test_set_secontext(self, mock_lsetfilecon, mock_restorecon): - entry = lxml.etree.Element("Path", name="/etc/foo", type="file") - - # disable selinux for the initial test - Bcfg2.Client.Tools.POSIX.base.has_selinux = False - self.assertTrue(self.ptool._set_secontext(entry)) - Bcfg2.Client.Tools.POSIX.base.has_selinux = True - - # no context given - self.assertTrue(self.ptool._set_secontext(entry)) - self.assertFalse(mock_restorecon.called) - self.assertFalse(mock_lsetfilecon.called) - - mock_restorecon.reset_mock() - mock_lsetfilecon.reset_mock() - entry.set("secontext", "__default__") - self.assertTrue(self.ptool._set_secontext(entry)) - mock_restorecon.assert_called_with(entry.get("name")) - self.assertFalse(mock_lsetfilecon.called) - - mock_restorecon.reset_mock() - mock_lsetfilecon.reset_mock() - mock_lsetfilecon.return_value = 0 - entry.set("secontext", "foo_t") - self.assertTrue(self.ptool._set_secontext(entry)) - self.assertFalse(mock_restorecon.called) - mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t") - - mock_restorecon.reset_mock() - mock_lsetfilecon.reset_mock() - mock_lsetfilecon.return_value = 1 - self.assertFalse(self.ptool._set_secontext(entry)) - self.assertFalse(mock_restorecon.called) - mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t") - - @patch("grp.getgrnam") - def test_norm_gid(self, mock_getgrnam): - self.assertEqual(5, self.ptool._norm_gid("5")) - self.assertFalse(mock_getgrnam.called) - - mock_getgrnam.reset_mock() - mock_getgrnam.return_value = ("group", "x", 5, []) - self.assertEqual(5, self.ptool._norm_gid("group")) - mock_getgrnam.assert_called_with("group") - - @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__) - def test_norm_entry_gid(self, mock_norm_gid): - entry = lxml.etree.Element("Path", name="/test", type="file", - group="group", owner="user") - mock_norm_gid.return_value = 10 - self.assertEqual(10, self.ptool._norm_entry_gid(entry)) - mock_norm_gid.assert_called_with(entry.get("group")) - - mock_norm_gid.reset_mock() - mock_norm_gid.side_effect = KeyError - self.assertEqual(0, self.ptool._norm_entry_gid(entry)) - mock_norm_gid.assert_called_with(entry.get("group")) - - @patch("pwd.getpwnam") - def test_norm_uid(self, mock_getpwnam): - self.assertEqual(5, self.ptool._norm_uid("5")) - self.assertFalse(mock_getpwnam.called) - - mock_getpwnam.reset_mock() - mock_getpwnam.return_value = ("user", "x", 5, 5, "User", "/home/user", - "/bin/zsh") - self.assertEqual(5, self.ptool._norm_uid("user")) - mock_getpwnam.assert_called_with("user") - - @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__) - def test_norm_entry_uid(self, mock_norm_uid): - entry = lxml.etree.Element("Path", name="/test", type="file", - group="group", owner="user") - mock_norm_uid.return_value = 10 - self.assertEqual(10, self.ptool._norm_entry_uid(entry)) - mock_norm_uid.assert_called_with(entry.get("owner")) - - mock_norm_uid.reset_mock() - mock_norm_uid.side_effect = KeyError - self.assertEqual(0, self.ptool._norm_entry_uid(entry)) - mock_norm_uid.assert_called_with(entry.get("owner")) - - def test_norm_acl_perms(self): - # there's basically no reasonably way to test the Permset - # object parsing feature without writing our own Mock object - # that re-implements Permset.test(). silly pylibacl won't let - # us create standalone Entry or Permset objects. - self.assertEqual(5, self.ptool._norm_acl_perms("5")) - self.assertEqual(0, self.ptool._norm_acl_perms("55")) - self.assertEqual(5, self.ptool._norm_acl_perms("rx")) - self.assertEqual(5, self.ptool._norm_acl_perms("r-x")) - self.assertEqual(6, self.ptool._norm_acl_perms("wr-")) - self.assertEqual(0, self.ptool._norm_acl_perms("rwrw")) - self.assertEqual(0, self.ptool._norm_acl_perms("-")) - self.assertEqual(0, self.ptool._norm_acl_perms("a")) - self.assertEqual(6, self.ptool._norm_acl_perms("rwa")) - self.assertEqual(4, self.ptool._norm_acl_perms("rr")) - - @patch('os.stat') - def test__gather_data(self, mock_stat): - path = '/test' - mock_stat.side_effect = OSError - self.assertFalse(self.ptool._gather_data(path)[0]) - mock_stat.assert_called_with(path) - - mock_stat.reset_mock() - mock_stat.side_effect = None - # create a return value - stat_rv = MagicMock() - def stat_getitem(key): - if int(key) == stat.ST_UID: - return 0 - elif int(key) == stat.ST_GID: - return 10 - elif int(key) == stat.ST_MODE: - # return extra bits in the mode to emulate a device - # and ensure that they're stripped - return int('060660', 8) - stat_rv.__getitem__ = Mock(side_effect=stat_getitem) - mock_stat.return_value = stat_rv - - # disable selinux and acls for this call -- we test them - # separately so that we can skip those tests as appropriate - states = (Bcfg2.Client.Tools.POSIX.base.has_selinux, - Bcfg2.Client.Tools.POSIX.base.has_acls) - Bcfg2.Client.Tools.POSIX.base.has_selinux = False - Bcfg2.Client.Tools.POSIX.base.has_acls = False - self.assertEqual(self.ptool._gather_data(path), - (stat_rv, '0', '10', '0660', None, None)) - Bcfg2.Client.Tools.POSIX.base.has_selinux, \ - Bcfg2.Client.Tools.POSIX.base.has_acls = states - mock_stat.assert_called_with(path) - - @skipUnless(has_selinux, "SELinux not found, skipping") - def test__gather_data_selinux(self): - context = 'system_u:object_r:root_t:s0' - path = '/test' - - @patch('os.stat') - @patchIf(has_selinux, "selinux.getfilecon") - def inner(mock_getfilecon, mock_stat): - mock_getfilecon.return_value = [len(context) + 1, context] - mock_stat.return_value = MagicMock() - # disable acls for this call and test them separately - state = Bcfg2.Client.Tools.POSIX.base.has_acls - Bcfg2.Client.Tools.POSIX.base.has_acls = False - self.assertEqual(self.ptool._gather_data(path)[4], 'root_t') - Bcfg2.Client.Tools.POSIX.base.has_acls = state - mock_getfilecon.assert_called_with(path) - - inner() - - @skipUnless(has_acls, "ACLS not found, skipping") - @patch('os.stat') - @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" % - test_obj.__name__) - def test__gather_data_acls(self, mock_list_file_acls, mock_stat): - acls = {("default", posix1e.ACL_USER, "testuser"): "rwx", - ("access", posix1e.ACL_GROUP, "testgroup"): "rx"} - mock_list_file_acls.return_value = acls - path = '/test' - mock_stat.return_value = MagicMock() - # disable selinux for this call and test it separately - state = Bcfg2.Client.Tools.POSIX.base.has_selinux - Bcfg2.Client.Tools.POSIX.base.has_selinux = False - self.assertItemsEqual(self.ptool._gather_data(path)[5], acls) - Bcfg2.Client.Tools.POSIX.base.has_selinux = state - mock_list_file_acls.assert_called_with(path) - - @patchIf(has_selinux, "selinux.matchpathcon") - @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_acls" % test_obj.__name__) - @patch("Bcfg2.Client.Tools.POSIX.base.%s._gather_data" % test_obj.__name__) - @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" % - test_obj.__name__) - @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" % - test_obj.__name__) - def test_verify_metadata(self, mock_norm_gid, mock_norm_uid, - mock_gather_data, mock_verify_acls, - mock_matchpathcon): - entry = lxml.etree.Element("Path", name="/test", type="file", - group="group", owner="user", perms="664", - secontext='etc_t') - # _verify_metadata() mutates the entry, so we keep a backup so we - # can start fresh every time - orig_entry = copy.deepcopy(entry) - - def reset(): - mock_gather_data.reset_mock() - mock_verify_acls.reset_mock() - mock_norm_uid.reset_mock() - mock_norm_gid.reset_mock() - return copy.deepcopy(orig_entry) - - # test nonexistent file - mock_gather_data.return_value = (False, None, None, None, None, None) - self.assertFalse(self.ptool._verify_metadata(entry)) - self.assertEqual(entry.get("current_exists", "").lower(), "false") - mock_gather_data.assert_called_with(entry.get("name")) - - # expected data. tuple of attr, return value index, value - expected = [('current_owner', 1, '0'), - ('current_group', 2, '10'), - ('current_perms', 3, '0664'), - ('current_secontext', 4, 'etc_t')] - mock_norm_uid.return_value = 0 - mock_norm_gid.return_value = 10 - gather_data_rv = [MagicMock(), None, None, None, None, []] - for attr, idx, val in expected: - gather_data_rv[idx] = val - - entry = reset() - mock_gather_data.return_value = tuple(gather_data_rv) - self.assertTrue(self.ptool._verify_metadata(entry)) - mock_gather_data.assert_called_with(entry.get("name")) - mock_verify_acls.assert_called_with(entry, path=entry.get("name")) - self.assertEqual(entry.get("current_exists", 'true'), 'true') - for attr, idx, val in expected: - self.assertEqual(entry.get(attr), val) - - # test when secontext is None - entry = reset() - gather_data_rv[4] = None - sestate = Bcfg2.Client.Tools.POSIX.base.has_selinux - Bcfg2.Client.Tools.POSIX.base.has_selinux = False - mock_gather_data.return_value = tuple(gather_data_rv) - self.assertTrue(self.ptool._verify_metadata(entry)) - mock_gather_data.assert_called_with(entry.get("name")) - mock_verify_acls.assert_called_with(entry, path=entry.get("name")) - self.assertEqual(entry.get("current_exists", 'true'), 'true') - for attr, idx, val in expected: - if attr != 'current_secontext': - self.assertEqual(entry.get(attr), val) - Bcfg2.Client.Tools.POSIX.base.has_selinux = sestate - - gather_data_rv = [MagicMock(), None, None, None, None, []] - for attr, idx, val in expected: - gather_data_rv[idx] = val - mock_gather_data.return_value = tuple(gather_data_rv) - - mtime = 1344430414 - entry = reset() - entry.set("mtime", str(mtime)) - stat_rv = MagicMock() - stat_rv.__getitem__.return_value = mtime - gather_data_rv[0] = stat_rv - mock_gather_data.return_value = tuple(gather_data_rv) - self.assertTrue(self.ptool._verify_metadata(entry)) - mock_gather_data.assert_called_with(entry.get("name")) - mock_verify_acls.assert_called_with(entry, path=entry.get("name")) - self.assertEqual(entry.get("current_exists", 'true'), 'true') - for attr, idx, val in expected: - self.assertEqual(entry.get(attr), val) - self.assertEqual(entry.get("current_mtime"), str(mtime)) - - # failure modes for each checked datum. tuple of changed attr, - # return value index, new (failing) value - failures = [('current_owner', 1, '10'), - ('current_group', 2, '100'), - ('current_perms', 3, '0660')] - if has_selinux: - failures.append(('current_secontext', 4, 'root_t')) - - for fail_attr, fail_idx, fail_val in failures: - entry = reset() - entry.set("mtime", str(mtime)) - gather_data_rv = [stat_rv, None, None, None, None, []] - for attr, idx, val in expected: - gather_data_rv[idx] = val - gather_data_rv[fail_idx] = fail_val - mock_gather_data.return_value = tuple(gather_data_rv) - self.assertFalse(self.ptool._verify_metadata(entry)) - mock_gather_data.assert_called_with(entry.get("name")) - mock_verify_acls.assert_called_with(entry, path=entry.get("name")) - self.assertEqual(entry.get("current_exists", 'true'), 'true') - self.assertEqual(entry.get(fail_attr), fail_val) - for attr, idx, val in expected: - if attr != fail_attr: - self.assertEqual(entry.get(attr), val) - self.assertEqual(entry.get("current_mtime"), str(mtime)) - - # failure mode for mtime - fail_mtime = 1344431162 - entry = reset() - entry.set("mtime", str(mtime)) - fail_stat_rv = MagicMock() - fail_stat_rv.__getitem__.return_value = fail_mtime - gather_data_rv = [fail_stat_rv, None, None, None, None, []] - for attr, idx, val in expected: - gather_data_rv[idx] = val - mock_gather_data.return_value = tuple(gather_data_rv) - self.assertFalse(self.ptool._verify_metadata(entry)) - mock_gather_data.assert_called_with(entry.get("name")) - mock_verify_acls.assert_called_with(entry, path=entry.get("name")) - self.assertEqual(entry.get("current_exists", 'true'), 'true') - for attr, idx, val in expected: - self.assertEqual(entry.get(attr), val) - self.assertEqual(entry.get("current_mtime"), str(fail_mtime)) - - if has_selinux: - # test success and failure for __default__ secontext - entry = reset() - entry.set("mtime", str(mtime)) - entry.set("secontext", "__default__") - - context1 = "system_u:object_r:etc_t:s0" - context2 = "system_u:object_r:root_t:s0" - mock_matchpathcon.return_value = [1 + len(context1), - context1] - gather_data_rv = [stat_rv, None, None, None, None, []] - for attr, idx, val in expected: - gather_data_rv[idx] = val - mock_gather_data.return_value = tuple(gather_data_rv) - self.assertTrue(self.ptool._verify_metadata(entry)) - mock_gather_data.assert_called_with(entry.get("name")) - mock_verify_acls.assert_called_with(entry, - path=entry.get("name")) - mock_matchpathcon.assert_called_with(entry.get("name"), 0) - self.assertEqual(entry.get("current_exists", 'true'), 'true') - for attr, idx, val in expected: - self.assertEqual(entry.get(attr), val) - self.assertEqual(entry.get("current_mtime"), str(mtime)) - - entry = reset() - entry.set("mtime", str(mtime)) - entry.set("secontext", "__default__") - mock_matchpathcon.return_value = [1 + len(context2), - context2] - self.assertFalse(self.ptool._verify_metadata(entry)) - mock_gather_data.assert_called_with(entry.get("name")) - mock_verify_acls.assert_called_with(entry, - path=entry.get("name")) - mock_matchpathcon.assert_called_with(entry.get("name"), 0) - self.assertEqual(entry.get("current_exists", 'true'), 'true') - for attr, idx, val in expected: - self.assertEqual(entry.get(attr), val) - self.assertEqual(entry.get("current_mtime"), str(mtime)) - - @skipUnless(has_acls, "ACLS not found, skipping") - def test_list_entry_acls(self): - entry = lxml.etree.Element("Path", name="/test", type="file") - lxml.etree.SubElement(entry, "ACL", scope="user", type="default", - user="user", perms="rwx") - lxml.etree.SubElement(entry, "ACL", scope="group", type="access", - group="group", perms="5") - self.assertItemsEqual(self.ptool._list_entry_acls(entry), - {("default", posix1e.ACL_USER, "user"): 7, - ("access", posix1e.ACL_GROUP, "group"): 5}) - - @skipUnless(has_acls, "ACLS not found, skipping") - @patch("pwd.getpwuid") - @patch("grp.getgrgid") - @patch("os.path.isdir") - def test_list_file_acls(self, mock_isdir, mock_getgrgid, mock_getpwuid, - mock_ACL): - path = '/test' - - # build a set of file ACLs to return from posix1e.ACL(file=...) - file_acls = [] - acl = Mock() - acl.tag_type = posix1e.ACL_USER - acl.qualifier = 10 - # yes, this is a bogus permset. thanks to _norm_acl_perms - # it works and is easier than many of the alternatives. - acl.permset = 'rwx' - file_acls.append(acl) - acl = Mock() - acl.tag_type = posix1e.ACL_GROUP - acl.qualifier = 100 - acl.permset = 'rx' - file_acls.append(acl) - acl = Mock() - acl.tag_type = posix1e.ACL_MASK - file_acls.append(acl) - acls = {("access", posix1e.ACL_USER, "user"): 7, - ("access", posix1e.ACL_GROUP, "group"): 5} - - # set up the unreasonably complex return value for - # posix1e.ACL(), which has two separate uses - fileacl_rv = MagicMock() - fileacl_rv.valid.return_value = True - fileacl_rv.__iter__.return_value = iter(file_acls) - filedef_rv = MagicMock() - filedef_rv.valid.return_value = True - filedef_rv.__iter__.return_value = iter(file_acls) - def mock_acl_rv(file=None, filedef=None): - if file: - return fileacl_rv - elif filedef: - return filedef_rv - # other return values - mock_isdir.return_value = False - mock_getgrgid.return_value = ("group", "x", 5, []) - mock_getpwuid.return_value = ("user", "x", 5, 5, "User", - "/home/user", "/bin/zsh") - - def reset(): - mock_isdir.reset_mock() - mock_getgrgid.reset_mock() - mock_getpwuid.reset_mock() - mock_ACL.reset_mock() - - mock_ACL.side_effect = IOError(95, "Operation not supported") - self.assertItemsEqual(self.ptool._list_file_acls(path), dict()) - - reset() - mock_ACL.side_effect = IOError - self.assertItemsEqual(self.ptool._list_file_acls(path), dict()) - - reset() - mock_ACL.side_effect = mock_acl_rv - self.assertItemsEqual(self.ptool._list_file_acls(path), acls) - mock_isdir.assert_called_with(path) - mock_getgrgid.assert_called_with(100) - mock_getpwuid.assert_called_with(10) - mock_ACL.assert_called_with(file=path) - - reset() - mock_isdir.return_value = True - fileacl_rv.__iter__.return_value = iter(file_acls) - filedef_rv.__iter__.return_value = iter(file_acls) - - defacls = acls - for akey, perms in acls.items(): - defacls[('default', akey[1], akey[2])] = perms - self.assertItemsEqual(self.ptool._list_file_acls(path), defacls) - mock_isdir.assert_called_with(path) - self.assertItemsEqual(mock_getgrgid.call_args_list, - [call(100), call(100)]) - self.assertItemsEqual(mock_getpwuid.call_args_list, - [call(10), call(10)]) - self.assertItemsEqual(mock_ACL.call_args_list, - [call(file=path), call(filedef=path)]) - - if has_acls: - # python 2.6 applies decorators at compile-time, not at - # run-time, so we can't do these as decorators because - # pylibacl might not be installed. (If it's not, this test - # will be skipped, so as long as this is done at run-time - # we're safe.) - test_list_file_acls = patch("posix1e.ACL")(test_list_file_acls) - - @skipUnless(has_acls, "ACLS not found, skipping") - @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" % - test_obj.__name__) - @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" % - test_obj.__name__) - def test_verify_acls(self, mock_list_entry_acls, mock_list_file_acls): - entry = lxml.etree.Element("Path", name="/test", type="file") - # we can't test to make sure that errors get properly sorted - # into (missing, extra, wrong) without refactoring the - # _verify_acls code, and I don't feel like doing that, so eff - # it. let's just test to make sure that failures are - # identified at all for now. - - acls = {("access", posix1e.ACL_USER, "user"): 7, - ("default", posix1e.ACL_GROUP, "group"): 5} - extra_acls = copy.deepcopy(acls) - extra_acls[("access", posix1e.ACL_USER, "user2")] = 4 - - mock_list_entry_acls.return_value = acls - mock_list_file_acls.return_value = acls - self.assertTrue(self.ptool._verify_acls(entry)) - mock_list_entry_acls.assert_called_with(entry) - mock_list_file_acls.assert_called_with(entry.get("name")) - - # test missing - mock_list_entry_acls.reset_mock() - mock_list_file_acls.reset_mock() - mock_list_file_acls.return_value = extra_acls - self.assertFalse(self.ptool._verify_acls(entry)) - mock_list_entry_acls.assert_called_with(entry) - mock_list_file_acls.assert_called_with(entry.get("name")) - - # test extra - mock_list_entry_acls.reset_mock() - mock_list_file_acls.reset_mock() - mock_list_entry_acls.return_value = extra_acls - mock_list_file_acls.return_value = acls - self.assertFalse(self.ptool._verify_acls(entry)) - mock_list_entry_acls.assert_called_with(entry) - mock_list_file_acls.assert_called_with(entry.get("name")) - - # test wrong - wrong_acls = copy.deepcopy(extra_acls) - wrong_acls[("access", posix1e.ACL_USER, "user2")] = 5 - mock_list_entry_acls.reset_mock() - mock_list_file_acls.reset_mock() - mock_list_entry_acls.return_value = extra_acls - mock_list_file_acls.return_value = wrong_acls - self.assertFalse(self.ptool._verify_acls(entry)) - mock_list_entry_acls.assert_called_with(entry) - mock_list_file_acls.assert_called_with(entry.get("name")) - - @patch("os.makedirs") - @patch("os.path.exists") - @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__) - def test_makedirs(self, mock_set_perms, mock_exists, mock_makedirs): - entry = lxml.etree.Element("Path", name="/test/foo/bar", - type="directory") - - def reset(): - mock_exists.reset_mock() - mock_set_perms.reset_mock() - mock_makedirs.reset_mock() - - mock_set_perms.return_value = True - def path_exists_rv(path): - if path == "/test": - return True - else: - return False - mock_exists.side_effect = path_exists_rv - self.assertTrue(self.ptool._makedirs(entry)) - self.assertItemsEqual(mock_exists.call_args_list, - [call("/test"), call("/test/foo"), - call("/test/foo/bar")]) - self.assertItemsEqual(mock_set_perms.call_args_list, - [call(entry, path="/test/foo"), - call(entry, path="/test/foo/bar")]) - mock_makedirs.assert_called_with(entry.get("name")) - - reset() - mock_makedirs.side_effect = OSError - self.assertFalse(self.ptool._makedirs(entry)) - self.assertItemsEqual(mock_set_perms.call_args_list, - [call(entry, path="/test/foo"), - call(entry, path="/test/foo/bar")]) - - reset() - mock_makedirs.side_effect = None - def set_perms_rv(entry, path=None): - if path == '/test/foo': - return False - else: - return True - mock_set_perms.side_effect = set_perms_rv - self.assertFalse(self.ptool._makedirs(entry)) - self.assertItemsEqual(mock_exists.call_args_list, - [call("/test"), call("/test/foo"), - call("/test/foo/bar")]) - self.assertItemsEqual(mock_set_perms.call_args_list, - [call(entry, path="/test/foo"), - call(entry, path="/test/foo/bar")]) - mock_makedirs.assert_called_with(entry.get("name")) -- cgit v1.2.3-1-g7c22