summaryrefslogtreecommitdiffstats
path: root/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py')
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py991
1 files changed, 991 insertions, 0 deletions
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py
new file mode 100644
index 000000000..b447ab642
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py
@@ -0,0 +1,991 @@
+import os
+import sys
+import copy
+import stat
+import lxml.etree
+from mock import Mock, MagicMock, patch
+import Bcfg2.Client.Tools
+from Bcfg2.Client.Tools.POSIX.base import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from Test__init import get_posix_object
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+try:
+ import selinux
+ has_selinux = True
+except ImportError:
+ has_selinux = False
+
+try:
+ import posix1e
+ has_acls = True
+except ImportError:
+ has_acls = False
+
+class TestPOSIXTool(Bcfg2TestCase):
+ test_obj = POSIXTool
+
+ def get_obj(self, posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return self.test_obj(posix.logger, posix.setup, posix.config)
+
+ def setUp(self):
+ self.ptool = self.get_obj()
+
+ def tearDown(self):
+ # just to guarantee that we start fresh each time
+ self.ptool = None
+
+ def test_fully_specified(self):
+ # fully_specified should do no checking on the abstract
+ # POSIXTool object
+ self.assertTrue(self.ptool.fully_specified(Mock()))
+
+ @patch('os.stat')
+ @patch('os.walk')
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_metadata" %
+ test_obj.__name__)
+ def test_verify(self, mock_verify, mock_walk, mock_stat):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+
+ mock_stat.return_value = MagicMock()
+ mock_verify.return_value = False
+ self.assertFalse(self.ptool.verify(entry, []))
+ mock_verify.assert_called_with(entry)
+
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(self.ptool.verify(entry, []))
+ mock_verify.assert_called_with(entry)
+
+ mock_verify.reset_mock()
+ entry.set("recursive", "true")
+ walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]),
+ ("/dir1", ["dir3"], []),
+ ("/dir2", [], ["file3", "file4"])]
+ mock_walk.return_value = walk_rv
+ self.assertTrue(self.ptool.verify(entry, []))
+ mock_walk.assert_called_with(entry.get("name"))
+ all_verifies = [call(entry)]
+ for root, dirs, files in walk_rv:
+ all_verifies.extend([call(entry, path=os.path.join(root, p))
+ for p in dirs + files])
+ self.assertItemsEqual(mock_verify.call_args_list, all_verifies)
+
+ @patch('os.walk')
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__)
+ def test_install(self, mock_set_perms, mock_walk):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+
+ mock_set_perms.return_value = True
+ self.assertTrue(self.ptool.install(entry))
+ mock_set_perms.assert_called_with(entry)
+
+ mock_set_perms.reset_mock()
+ entry.set("recursive", "true")
+ walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]),
+ ("/dir1", ["dir3"], []),
+ ("/dir2", [], ["file3", "file4"])]
+ mock_walk.return_value = walk_rv
+
+ mock_set_perms.return_value = True
+ self.assertTrue(self.ptool.install(entry))
+ mock_walk.assert_called_with(entry.get("name"))
+ all_set_perms = [call(entry)]
+ for root, dirs, files in walk_rv:
+ all_set_perms.extend([call(entry, path=os.path.join(root, p))
+ for p in dirs + files])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ all_set_perms)
+
+ mock_walk.reset_mock()
+ mock_set_perms.reset_mock()
+
+ def set_perms_rv(entry, path=None):
+ if path == '/dir2/file3':
+ return False
+ else:
+ return True
+ mock_set_perms.side_effect = set_perms_rv
+
+ self.assertFalse(self.ptool.install(entry))
+ mock_walk.assert_called_with(entry.get("name"))
+ self.assertItemsEqual(mock_set_perms.call_args_list, all_set_perms)
+
+ @patch('os.lstat')
+ @patch("os.unlink")
+ @patch("os.path.isdir")
+ @patch("shutil.rmtree")
+ def test_exists(self, mock_rmtree, mock_isdir, mock_unlink, mock_lstat):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+
+ mock_lstat.side_effect = OSError
+ self.assertFalse(self.ptool._exists(entry))
+ mock_lstat.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ rv = MagicMock()
+ mock_lstat.return_value = rv
+ mock_lstat.side_effect = None
+ self.assertEqual(self.ptool._exists(entry), rv)
+ mock_lstat.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ mock_isdir.return_value = False
+ self.assertFalse(self.ptool._exists(entry, remove=True))
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_unlink.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_rmtree.called)
+
+ mock_lstat.reset_mock()
+ mock_isdir.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_isdir.return_value = True
+ self.assertFalse(self.ptool._exists(entry, remove=True))
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_rmtree.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_isdir.reset_mock()
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_rmtree.side_effect = OSError
+ self.assertEqual(self.ptool._exists(entry, remove=True), rv)
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_rmtree.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ @patch("os.chown")
+ @patch("os.chmod")
+ @patch("os.utime")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" %
+ test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" %
+ test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_acls" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_secontext" %
+ test_obj.__name__)
+ def test_set_perms(self, mock_set_secontext, mock_set_acls, mock_norm_gid,
+ mock_norm_uid, mock_utime, mock_chmod, mock_chown):
+ def reset():
+ mock_set_secontext.reset_mock()
+ mock_set_acls.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_chmod.reset_mock()
+ mock_chown.reset_mock()
+ mock_utime.reset_mock()
+
+ entry = lxml.etree.Element("Path", name="/etc/foo", to="/etc/bar",
+ type="symlink")
+ mock_set_acls.return_value = True
+ mock_set_secontext.return_value = True
+ self.assertTrue(self.ptool._set_perms(entry))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ entry = lxml.etree.Element("Path", name="/etc/foo", owner="owner",
+ group="group", perms="644", type="file")
+ mock_norm_uid.return_value = 10
+ mock_norm_gid.return_value = 100
+
+ reset()
+ self.assertTrue(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ self.assertFalse(mock_utime.called)
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ reset()
+ mtime = 1344459042
+ entry.set("mtime", str(mtime))
+ self.assertTrue(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ reset()
+ self.assertTrue(self.ptool._set_perms(entry, path='/etc/bar'))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with('/etc/bar', 10, 100)
+ mock_chmod.assert_called_with('/etc/bar', int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path='/etc/bar')
+ mock_set_acls.assert_called_with(entry, path='/etc/bar')
+
+ # test dev_type modification of perms, failure of chown
+ reset()
+ def chown_rv(path, owner, group):
+ if owner == 0 and group == 0:
+ return True
+ else:
+ raise KeyError
+ os.chown.side_effect = chown_rv
+ entry.set("type", "device")
+ entry.set("dev_type", list(device_map.keys())[0])
+ self.assertFalse(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 0, 0)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8) | list(device_map.values())[0])
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ # test failure of chmod
+ reset()
+ os.chown.side_effect = None
+ os.chmod.side_effect = OSError
+ entry.set("type", "file")
+ del entry.attrib["dev_type"]
+ self.assertFalse(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ # test that even when everything fails, we try to do it all.
+ # e.g., when chmod fails, we still try to apply acls, set
+ # selinux context, etc.
+ reset()
+ os.chown.side_effect = OSError
+ os.utime.side_effect = OSError
+ mock_set_acls.return_value = False
+ mock_set_secontext.return_value = False
+ self.assertFalse(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ @patchIf(has_acls, "posix1e.ACL")
+ @patchIf(has_acls, "posix1e.Entry")
+ @patch("os.path.isdir")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" %
+ test_obj.__name__)
+ def test_set_acls(self, mock_list_entry_acls, mock_norm_gid, mock_norm_uid,
+ mock_isdir, mock_Entry, mock_ACL):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+
+ # disable acls for the initial test
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertTrue(self.ptool._set_acls(entry))
+ Bcfg2.Client.Tools.POSIX.base.has_acls = True
+
+ # build a set of file ACLs to return from posix1e.ACL(file=...)
+ file_acls = []
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_USER
+ acl.name = "remove"
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_GROUP
+ acl.name = "remove"
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_MASK
+ acl.name = "keep"
+ file_acls.append(acl)
+ remove_acls = [a for a in file_acls if a.name == "remove"]
+
+ # build a set of ACLs listed on the entry as returned by
+ # _list_entry_acls()
+ entry_acls = {("default", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5}
+ mock_list_entry_acls.return_value = entry_acls
+ mock_norm_uid.return_value = 10
+ mock_norm_gid.return_value = 100
+
+ # set up the unreasonably complex return value for
+ # posix1e.ACL(), which has three separate uses
+ fileacl_rv = MagicMock()
+ fileacl_rv.valid.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv = MagicMock()
+ filedef_rv.valid.return_value = True
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ acl_rv = MagicMock()
+ def mock_acl_rv(file=None, filedef=None, acl=None):
+ if file:
+ return fileacl_rv
+ elif filedef:
+ return filedef_rv
+ elif acl:
+ return acl_rv
+
+ # set up the equally unreasonably complex return value for
+ # posix1e.Entry, which returns a new entry and adds it to
+ # an ACL, so we have to track the Mock objects it returns.
+ # why can't they just have an acl.add_entry() method?!?
+ acl_entries = []
+ def mock_entry_rv(acl):
+ rv = MagicMock()
+ rv.acl = acl
+ rv.permset = set()
+ acl_entries.append(rv)
+ return rv
+ mock_Entry.side_effect = mock_entry_rv
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_ACL.reset_mock()
+ mock_Entry.reset_mock()
+ fileacl_rv.reset_mock()
+
+ # test fs mounted noacl
+ mock_ACL.side_effect = IOError(95, "Operation not permitted")
+ self.assertFalse(self.ptool._set_acls(entry))
+
+ # test other error
+ reset()
+ mock_ACL.side_effect = IOError
+ self.assertFalse(self.ptool._set_acls(entry))
+
+ reset()
+ mock_ACL.side_effect = mock_acl_rv
+ mock_isdir.return_value = True
+ self.assertTrue(self.ptool._set_acls(entry))
+ self.assertItemsEqual(mock_ACL.call_args_list,
+ [call(file=entry.get("name")),
+ call(filedef=entry.get("name"))])
+ self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ self.assertItemsEqual(filedef_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_norm_uid.assert_called_with("user")
+ mock_norm_gid.assert_called_with("group")
+ fileacl_rv.calc_mask.assert_any_call()
+ fileacl_rv.applyto.assert_called_with(entry.get("name"),
+ posix1e.ACL_TYPE_ACCESS)
+ filedef_rv.calc_mask.assert_any_call()
+ filedef_rv.applyto.assert_called_with(entry.get("name"),
+ posix1e.ACL_TYPE_DEFAULT)
+
+ # build tuples of the Entry objects that were added to acl
+ # and defaacl so they're easier to compare for equality
+ added_acls = []
+ for acl in acl_entries:
+ added_acls.append((acl.acl, acl.tag_type, acl.qualifier,
+ sum(acl.permset)))
+ self.assertItemsEqual(added_acls,
+ [(filedef_rv, posix1e.ACL_USER, 10, 7),
+ (fileacl_rv, posix1e.ACL_GROUP, 100, 5)])
+
+ reset()
+ # have to reassign these because they're iterators, and
+ # they've already been iterated over once
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ mock_list_entry_acls.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_isdir.return_value = False
+ acl_entries = []
+ self.assertTrue(self.ptool._set_acls(entry, path="/bin/bar"))
+ mock_ACL.assert_called_with(file="/bin/bar")
+ self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_norm_gid.assert_called_with("group")
+ fileacl_rv.calc_mask.assert_any_call()
+ fileacl_rv.applyto.assert_called_with("/bin/bar",
+ posix1e.ACL_TYPE_ACCESS)
+
+ added_acls = []
+ for acl in acl_entries:
+ added_acls.append((acl.acl, acl.tag_type, acl.qualifier,
+ sum(acl.permset)))
+ self.assertItemsEqual(added_acls,
+ [(fileacl_rv, posix1e.ACL_GROUP, 100, 5)])
+
+ @skipUnless(has_selinux, "SELinux not found, skipping")
+ @patchIf(has_selinux, "selinux.restorecon")
+ @patchIf(has_selinux, "selinux.lsetfilecon")
+ def test_set_secontext(self, mock_lsetfilecon, mock_restorecon):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+
+ # disable selinux for the initial test
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ self.assertTrue(self.ptool._set_secontext(entry))
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = True
+
+ # no context given
+ self.assertTrue(self.ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ self.assertFalse(mock_lsetfilecon.called)
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ entry.set("secontext", "__default__")
+ self.assertTrue(self.ptool._set_secontext(entry))
+ mock_restorecon.assert_called_with(entry.get("name"))
+ self.assertFalse(mock_lsetfilecon.called)
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ mock_lsetfilecon.return_value = 0
+ entry.set("secontext", "foo_t")
+ self.assertTrue(self.ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t")
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ mock_lsetfilecon.return_value = 1
+ self.assertFalse(self.ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t")
+
+ @patch("grp.getgrnam")
+ def test_norm_gid(self, mock_getgrnam):
+ self.assertEqual(5, self.ptool._norm_gid("5"))
+ self.assertFalse(mock_getgrnam.called)
+
+ mock_getgrnam.reset_mock()
+ mock_getgrnam.return_value = ("group", "x", 5, [])
+ self.assertEqual(5, self.ptool._norm_gid("group"))
+ mock_getgrnam.assert_called_with("group")
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__)
+ def test_norm_entry_gid(self, mock_norm_gid):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user")
+ mock_norm_gid.return_value = 10
+ self.assertEqual(10, self.ptool._norm_entry_gid(entry))
+ mock_norm_gid.assert_called_with(entry.get("group"))
+
+ mock_norm_gid.reset_mock()
+ mock_norm_gid.side_effect = KeyError
+ self.assertEqual(0, self.ptool._norm_entry_gid(entry))
+ mock_norm_gid.assert_called_with(entry.get("group"))
+
+ @patch("pwd.getpwnam")
+ def test_norm_uid(self, mock_getpwnam):
+ self.assertEqual(5, self.ptool._norm_uid("5"))
+ self.assertFalse(mock_getpwnam.called)
+
+ mock_getpwnam.reset_mock()
+ mock_getpwnam.return_value = ("user", "x", 5, 5, "User", "/home/user",
+ "/bin/zsh")
+ self.assertEqual(5, self.ptool._norm_uid("user"))
+ mock_getpwnam.assert_called_with("user")
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__)
+ def test_norm_entry_uid(self, mock_norm_uid):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user")
+ mock_norm_uid.return_value = 10
+ self.assertEqual(10, self.ptool._norm_entry_uid(entry))
+ mock_norm_uid.assert_called_with(entry.get("owner"))
+
+ mock_norm_uid.reset_mock()
+ mock_norm_uid.side_effect = KeyError
+ self.assertEqual(0, self.ptool._norm_entry_uid(entry))
+ mock_norm_uid.assert_called_with(entry.get("owner"))
+
+ def test_norm_acl_perms(self):
+ # there's basically no reasonably way to test the Permset
+ # object parsing feature without writing our own Mock object
+ # that re-implements Permset.test(). silly pylibacl won't let
+ # us create standalone Entry or Permset objects.
+ self.assertEqual(5, self.ptool._norm_acl_perms("5"))
+ self.assertEqual(0, self.ptool._norm_acl_perms("55"))
+ self.assertEqual(5, self.ptool._norm_acl_perms("rx"))
+ self.assertEqual(5, self.ptool._norm_acl_perms("r-x"))
+ self.assertEqual(6, self.ptool._norm_acl_perms("wr-"))
+ self.assertEqual(0, self.ptool._norm_acl_perms("rwrw"))
+ self.assertEqual(0, self.ptool._norm_acl_perms("-"))
+ self.assertEqual(0, self.ptool._norm_acl_perms("a"))
+ self.assertEqual(6, self.ptool._norm_acl_perms("rwa"))
+ self.assertEqual(4, self.ptool._norm_acl_perms("rr"))
+
+ @patch('os.stat')
+ def test__gather_data(self, mock_stat):
+ path = '/test'
+ mock_stat.side_effect = OSError
+ self.assertFalse(self.ptool._gather_data(path)[0])
+ mock_stat.assert_called_with(path)
+
+ mock_stat.reset_mock()
+ mock_stat.side_effect = None
+ # create a return value
+ stat_rv = MagicMock()
+ def stat_getitem(key):
+ if int(key) == stat.ST_UID:
+ return 0
+ elif int(key) == stat.ST_GID:
+ return 10
+ elif int(key) == stat.ST_MODE:
+ # return extra bits in the mode to emulate a device
+ # and ensure that they're stripped
+ return int('060660', 8)
+ stat_rv.__getitem__ = Mock(side_effect=stat_getitem)
+ mock_stat.return_value = stat_rv
+
+ # disable selinux and acls for this call -- we test them
+ # separately so that we can skip those tests as appropriate
+ states = (Bcfg2.Client.Tools.POSIX.base.has_selinux,
+ Bcfg2.Client.Tools.POSIX.base.has_acls)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertEqual(self.ptool._gather_data(path),
+ (stat_rv, '0', '10', '0660', None, None))
+ Bcfg2.Client.Tools.POSIX.base.has_selinux, \
+ Bcfg2.Client.Tools.POSIX.base.has_acls = states
+ mock_stat.assert_called_with(path)
+
+ @skipUnless(has_selinux, "SELinux not found, skipping")
+ def test__gather_data_selinux(self):
+ context = 'system_u:object_r:root_t:s0'
+ path = '/test'
+
+ @patch('os.stat')
+ @patchIf(has_selinux, "selinux.getfilecon")
+ def inner(mock_getfilecon, mock_stat):
+ mock_getfilecon.return_value = [len(context) + 1, context]
+ mock_stat.return_value = MagicMock()
+ # disable acls for this call and test them separately
+ state = Bcfg2.Client.Tools.POSIX.base.has_acls
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertEqual(self.ptool._gather_data(path)[4], 'root_t')
+ Bcfg2.Client.Tools.POSIX.base.has_acls = state
+ mock_getfilecon.assert_called_with(path)
+
+ inner()
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ @patch('os.stat')
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" %
+ test_obj.__name__)
+ def test__gather_data_acls(self, mock_list_file_acls, mock_stat):
+ acls = {("default", posix1e.ACL_USER, "testuser"): "rwx",
+ ("access", posix1e.ACL_GROUP, "testgroup"): "rx"}
+ mock_list_file_acls.return_value = acls
+ path = '/test'
+ mock_stat.return_value = MagicMock()
+ # disable selinux for this call and test it separately
+ state = Bcfg2.Client.Tools.POSIX.base.has_selinux
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ self.assertItemsEqual(self.ptool._gather_data(path)[5], acls)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = state
+ mock_list_file_acls.assert_called_with(path)
+
+ @patchIf(has_selinux, "selinux.matchpathcon")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_acls" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._gather_data" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" %
+ test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" %
+ test_obj.__name__)
+ def test_verify_metadata(self, mock_norm_gid, mock_norm_uid,
+ mock_gather_data, mock_verify_acls,
+ mock_matchpathcon):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user", perms="664",
+ secontext='etc_t')
+ # _verify_metadata() mutates the entry, so we keep a backup so we
+ # can start fresh every time
+ orig_entry = copy.deepcopy(entry)
+
+ def reset():
+ mock_gather_data.reset_mock()
+ mock_verify_acls.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ return copy.deepcopy(orig_entry)
+
+ # test nonexistent file
+ mock_gather_data.return_value = (False, None, None, None, None, None)
+ self.assertFalse(self.ptool._verify_metadata(entry))
+ self.assertEqual(entry.get("current_exists", "").lower(), "false")
+ mock_gather_data.assert_called_with(entry.get("name"))
+
+ # expected data. tuple of attr, return value index, value
+ expected = [('current_owner', 1, '0'),
+ ('current_group', 2, '10'),
+ ('current_perms', 3, '0664'),
+ ('current_secontext', 4, 'etc_t')]
+ mock_norm_uid.return_value = 0
+ mock_norm_gid.return_value = 10
+ gather_data_rv = [MagicMock(), None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+
+ entry = reset()
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+
+ # test when secontext is None
+ entry = reset()
+ gather_data_rv[4] = None
+ sestate = Bcfg2.Client.Tools.POSIX.base.has_selinux
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ if attr != 'current_secontext':
+ self.assertEqual(entry.get(attr), val)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = sestate
+
+ gather_data_rv = [MagicMock(), None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+
+ mtime = 1344430414
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ stat_rv = MagicMock()
+ stat_rv.__getitem__.return_value = mtime
+ gather_data_rv[0] = stat_rv
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ # failure modes for each checked datum. tuple of changed attr,
+ # return value index, new (failing) value
+ failures = [('current_owner', 1, '10'),
+ ('current_group', 2, '100'),
+ ('current_perms', 3, '0660')]
+ if has_selinux:
+ failures.append(('current_secontext', 4, 'root_t'))
+
+ for fail_attr, fail_idx, fail_val in failures:
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ gather_data_rv = [stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ gather_data_rv[fail_idx] = fail_val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertFalse(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ self.assertEqual(entry.get(fail_attr), fail_val)
+ for attr, idx, val in expected:
+ if attr != fail_attr:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ # failure mode for mtime
+ fail_mtime = 1344431162
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ fail_stat_rv = MagicMock()
+ fail_stat_rv.__getitem__.return_value = fail_mtime
+ gather_data_rv = [fail_stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertFalse(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(fail_mtime))
+
+ if has_selinux:
+ # test success and failure for __default__ secontext
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ entry.set("secontext", "__default__")
+
+ context1 = "system_u:object_r:etc_t:s0"
+ context2 = "system_u:object_r:root_t:s0"
+ mock_matchpathcon.return_value = [1 + len(context1),
+ context1]
+ gather_data_rv = [stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry,
+ path=entry.get("name"))
+ mock_matchpathcon.assert_called_with(entry.get("name"), 0)
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ entry.set("secontext", "__default__")
+ mock_matchpathcon.return_value = [1 + len(context2),
+ context2]
+ self.assertFalse(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry,
+ path=entry.get("name"))
+ mock_matchpathcon.assert_called_with(entry.get("name"), 0)
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ def test_list_entry_acls(self):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ lxml.etree.SubElement(entry, "ACL", scope="user", type="default",
+ user="user", perms="rwx")
+ lxml.etree.SubElement(entry, "ACL", scope="group", type="access",
+ group="group", perms="5")
+ self.assertItemsEqual(self.ptool._list_entry_acls(entry),
+ {("default", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5})
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("pwd.getpwuid")
+ @patch("grp.getgrgid")
+ @patch("os.path.isdir")
+ def test_list_file_acls(self, mock_isdir, mock_getgrgid, mock_getpwuid,
+ mock_ACL):
+ path = '/test'
+
+ # build a set of file ACLs to return from posix1e.ACL(file=...)
+ file_acls = []
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_USER
+ acl.qualifier = 10
+ # yes, this is a bogus permset. thanks to _norm_acl_perms
+ # it works and is easier than many of the alternatives.
+ acl.permset = 'rwx'
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_GROUP
+ acl.qualifier = 100
+ acl.permset = 'rx'
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_MASK
+ file_acls.append(acl)
+ acls = {("access", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5}
+
+ # set up the unreasonably complex return value for
+ # posix1e.ACL(), which has two separate uses
+ fileacl_rv = MagicMock()
+ fileacl_rv.valid.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv = MagicMock()
+ filedef_rv.valid.return_value = True
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ def mock_acl_rv(file=None, filedef=None):
+ if file:
+ return fileacl_rv
+ elif filedef:
+ return filedef_rv
+ # other return values
+ mock_isdir.return_value = False
+ mock_getgrgid.return_value = ("group", "x", 5, [])
+ mock_getpwuid.return_value = ("user", "x", 5, 5, "User",
+ "/home/user", "/bin/zsh")
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_getgrgid.reset_mock()
+ mock_getpwuid.reset_mock()
+ mock_ACL.reset_mock()
+
+ mock_ACL.side_effect = IOError(95, "Operation not supported")
+ self.assertItemsEqual(self.ptool._list_file_acls(path), dict())
+
+ reset()
+ mock_ACL.side_effect = IOError
+ self.assertItemsEqual(self.ptool._list_file_acls(path), dict())
+
+ reset()
+ mock_ACL.side_effect = mock_acl_rv
+ self.assertItemsEqual(self.ptool._list_file_acls(path), acls)
+ mock_isdir.assert_called_with(path)
+ mock_getgrgid.assert_called_with(100)
+ mock_getpwuid.assert_called_with(10)
+ mock_ACL.assert_called_with(file=path)
+
+ reset()
+ mock_isdir.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv.__iter__.return_value = iter(file_acls)
+
+ defacls = acls
+ for akey, perms in acls.items():
+ defacls[('default', akey[1], akey[2])] = perms
+ self.assertItemsEqual(self.ptool._list_file_acls(path), defacls)
+ mock_isdir.assert_called_with(path)
+ self.assertItemsEqual(mock_getgrgid.call_args_list,
+ [call(100), call(100)])
+ self.assertItemsEqual(mock_getpwuid.call_args_list,
+ [call(10), call(10)])
+ self.assertItemsEqual(mock_ACL.call_args_list,
+ [call(file=path), call(filedef=path)])
+
+ if has_acls:
+ # python 2.6 applies decorators at compile-time, not at
+ # run-time, so we can't do these as decorators because
+ # pylibacl might not be installed. (If it's not, this test
+ # will be skipped, so as long as this is done at run-time
+ # we're safe.)
+ test_list_file_acls = patch("posix1e.ACL")(test_list_file_acls)
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" %
+ test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" %
+ test_obj.__name__)
+ def test_verify_acls(self, mock_list_entry_acls, mock_list_file_acls):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ # we can't test to make sure that errors get properly sorted
+ # into (missing, extra, wrong) without refactoring the
+ # _verify_acls code, and I don't feel like doing that, so eff
+ # it. let's just test to make sure that failures are
+ # identified at all for now.
+
+ acls = {("access", posix1e.ACL_USER, "user"): 7,
+ ("default", posix1e.ACL_GROUP, "group"): 5}
+ extra_acls = copy.deepcopy(acls)
+ extra_acls[("access", posix1e.ACL_USER, "user2")] = 4
+
+ mock_list_entry_acls.return_value = acls
+ mock_list_file_acls.return_value = acls
+ self.assertTrue(self.ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test missing
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_file_acls.return_value = extra_acls
+ self.assertFalse(self.ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test extra
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_entry_acls.return_value = extra_acls
+ mock_list_file_acls.return_value = acls
+ self.assertFalse(self.ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test wrong
+ wrong_acls = copy.deepcopy(extra_acls)
+ wrong_acls[("access", posix1e.ACL_USER, "user2")] = 5
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_entry_acls.return_value = extra_acls
+ mock_list_file_acls.return_value = wrong_acls
+ self.assertFalse(self.ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ @patch("os.makedirs")
+ @patch("os.path.exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__)
+ def test_makedirs(self, mock_set_perms, mock_exists, mock_makedirs):
+ entry = lxml.etree.Element("Path", name="/test/foo/bar",
+ type="directory")
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_set_perms.reset_mock()
+ mock_makedirs.reset_mock()
+
+ mock_set_perms.return_value = True
+ def path_exists_rv(path):
+ if path == "/test":
+ return True
+ else:
+ return False
+ mock_exists.side_effect = path_exists_rv
+ self.assertTrue(self.ptool._makedirs(entry))
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call("/test"), call("/test/foo"),
+ call("/test/foo/bar")])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+ mock_makedirs.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_makedirs.side_effect = OSError
+ self.assertFalse(self.ptool._makedirs(entry))
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+
+ reset()
+ mock_makedirs.side_effect = None
+ def set_perms_rv(entry, path=None):
+ if path == '/test/foo':
+ return False
+ else:
+ return True
+ mock_set_perms.side_effect = set_perms_rv
+ self.assertFalse(self.ptool._makedirs(entry))
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call("/test"), call("/test/foo"),
+ call("/test/foo/bar")])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+ mock_makedirs.assert_called_with(entry.get("name"))