From d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 4 Sep 2012 09:52:57 -0400 Subject: reorganized testsuite to allow tests on stuff outside of src --- .../TestClient/TestTools/TestPOSIX/TestDevice.py | 144 +++ .../TestTools/TestPOSIX/TestDirectory.py | 159 ++++ .../TestClient/TestTools/TestPOSIX/TestFile.py | 448 ++++++++++ .../TestClient/TestTools/TestPOSIX/TestHardlink.py | 85 ++ .../TestTools/TestPOSIX/TestNonexistent.py | 91 ++ .../TestTools/TestPOSIX/TestPermissions.py | 5 + .../TestClient/TestTools/TestPOSIX/TestSymlink.py | 99 ++ .../TestClient/TestTools/TestPOSIX/Test__init.py | 252 ++++++ .../TestClient/TestTools/TestPOSIX/Testbase.py | 991 +++++++++++++++++++++ .../TestClient/TestTools/TestPOSIX/__init__.py | 0 .../Testlib/TestClient/TestTools/__init__.py | 0 testsuite/Testsrc/Testlib/TestClient/__init__.py | 0 12 files changed, 2274 insertions(+) create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/__init__.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/__init__.py create mode 100644 testsuite/Testsrc/Testlib/TestClient/__init__.py (limited to 'testsuite/Testsrc/Testlib/TestClient') diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py new file mode 100644 index 000000000..a18327fe0 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py @@ -0,0 +1,144 @@ +import os +import sys +import copy +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Device import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from Test__init import get_posix_object +from Testbase import TestPOSIXTool +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +class TestPOSIXDevice(TestPOSIXTool): + test_obj = POSIXDevice + + def test_fully_specified(self): + ptool = self.get_obj() + orig_entry = lxml.etree.Element("Path", name="/test", type="device", + dev_type="fifo") + self.assertTrue(ptool.fully_specified(orig_entry)) + for dtype in ["block", "char"]: + for attr in ["major", "minor"]: + entry = copy.deepcopy(orig_entry) + entry.set("dev_type", dtype) + entry.set(attr, "0") + self.assertFalse(ptool.fully_specified(entry)) + entry = copy.deepcopy(orig_entry) + entry.set("dev_type", dtype) + entry.set("major", "0") + entry.set("minor", "0") + self.assertTrue(ptool.fully_specified(entry)) + + @patch("os.major") + @patch("os.minor") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._exists") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + def test_verify(self, mock_verify, mock_exists, mock_minor, mock_major): + entry = lxml.etree.Element("Path", name="/test", type="device", + mode='0644', owner='root', group='root', + dev_type="block", major="0", minor="10") + ptool = self.get_obj() + + def reset(): + mock_exists.reset_mock() + mock_verify.reset_mock() + mock_minor.reset_mock() + mock_major.reset_mock() + + mock_exists.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + + reset() + mock_exists.return_value = MagicMock() + mock_major.return_value = 0 + mock_minor.return_value = 10 + mock_verify.return_value = True + self.assertTrue(ptool.verify(entry, [])) + mock_verify.assert_called_with(ptool, entry, []) + mock_exists.assert_called_with(entry) + mock_major.assert_called_with(mock_exists.return_value.st_rdev) + mock_minor.assert_called_with(mock_exists.return_value.st_rdev) + + reset() + mock_exists.return_value = MagicMock() + mock_major.return_value = 0 + mock_minor.return_value = 10 + mock_verify.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_verify.assert_called_with(ptool, entry, []) + mock_exists.assert_called_with(entry) + mock_major.assert_called_with(mock_exists.return_value.st_rdev) + mock_minor.assert_called_with(mock_exists.return_value.st_rdev) + + reset() + mock_verify.return_value = True + entry = lxml.etree.Element("Path", name="/test", type="device", + mode='0644', owner='root', group='root', + dev_type="fifo") + self.assertTrue(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + self.assertFalse(mock_major.called) + self.assertFalse(mock_minor.called) + + @patch("os.makedev") + @patch("os.mknod") + @patch("Bcfg2.Client.Tools.POSIX.Device.%s._exists" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + def test_install(self, mock_install, mock_exists, mock_mknod, mock_makedev): + entry = lxml.etree.Element("Path", name="/test", type="device", + mode='0644', owner='root', group='root', + dev_type="block", major="0", minor="10") + ptool = self.get_obj() + + mock_exists.return_value = False + mock_makedev.return_value = Mock() + mock_install.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_makedev.assert_called_with(0, 10) + mock_mknod.assert_called_with(entry.get("name"), # 0o644 + device_map[entry.get("dev_type")] | 420, + mock_makedev.return_value) + mock_install.assert_called_with(ptool, entry) + + mock_makedev.reset_mock() + mock_mknod.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_makedev.side_effect = OSError + self.assertFalse(ptool.install(entry)) + + mock_makedev.reset_mock() + mock_mknod.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_mknod.side_effect = OSError + self.assertFalse(ptool.install(entry)) + + mock_makedev.reset_mock() + mock_mknod.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_mknod.side_effect = None + entry = lxml.etree.Element("Path", name="/test", type="device", + mode='0644', owner='root', group='root', + dev_type="fifo") + + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_mknod.assert_called_with(entry.get("name"), # 0o644 + device_map[entry.get("dev_type")] | 420) + mock_install.assert_called_with(ptool, entry) diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py new file mode 100644 index 000000000..e01bd7453 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py @@ -0,0 +1,159 @@ +import os +import sys +import stat +import copy +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Directory import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from Test__init import get_posix_object +from Testbase import TestPOSIXTool +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +class TestPOSIXDirectory(TestPOSIXTool): + test_obj = POSIXDirectory + + @patch("os.listdir") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + @patch("Bcfg2.Client.Tools.POSIX.Directory.%s._exists" % test_obj.__name__) + def test_verify(self, mock_exists, mock_verify, mock_listdir): + entry = lxml.etree.Element("Path", name="/test", type="directory", + perms='0644', owner='root', group='root') + + mock_exists.return_value = False + self.assertFalse(self.ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + + mock_exists.reset_mock() + exists_rv = MagicMock() + exists_rv.__getitem__.return_value = stat.S_IFREG | 420 # 0o644 + mock_exists.return_value = exists_rv + self.assertFalse(self.ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + + mock_exists.reset_mock() + mock_verify.return_value = False + exists_rv.__getitem__.return_value = stat.S_IFDIR | 420 # 0o644 + self.assertFalse(self.ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(self.ptool, entry, []) + + mock_exists.reset_mock() + mock_verify.reset_mock() + mock_verify.return_value = True + self.assertTrue(self.ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(self.ptool, entry, []) + + mock_exists.reset_mock() + mock_verify.reset_mock() + entry.set("prune", "true") + orig_entry = copy.deepcopy(entry) + + entries = ["foo", "bar", "bar/baz"] + mock_listdir.return_value = entries + modlist = [os.path.join(entry.get("name"), entries[0])] + self.assertFalse(self.ptool.verify(entry, modlist)) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(self.ptool, entry, modlist) + mock_listdir.assert_called_with(entry.get("name")) + expected = [os.path.join(entry.get("name"), e) + for e in entries + if os.path.join(entry.get("name"), e) not in modlist] + actual = [e.get("path") for e in entry.findall("Prune")] + self.assertItemsEqual(expected, actual) + + mock_verify.reset_mock() + mock_exists.reset_mock() + mock_listdir.reset_mock() + entry = copy.deepcopy(orig_entry) + modlist = [os.path.join(entry.get("name"), e) + for e in entries] + self.assertTrue(self.ptool.verify(entry, modlist)) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(self.ptool, entry, modlist) + mock_listdir.assert_called_with(entry.get("name")) + self.assertEqual(len(entry.findall("Prune")), 0) + + @patch("os.unlink") + @patch("os.path.isdir") + @patch("shutil.rmtree") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.Directory.%s._exists" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.Directory.%s._makedirs" % + test_obj.__name__) + def test_install(self, mock_makedirs, mock_exists, mock_install, + mock_rmtree, mock_isdir, mock_unlink): + entry = lxml.etree.Element("Path", name="/test/foo/bar", + type="directory", perms='0644', + owner='root', group='root') + + def reset(): + mock_exists.reset_mock() + mock_install.reset_mock() + mock_unlink.reset_mock() + mock_rmtree.reset_mock() + mock_rmtree.mock_makedirs() + + mock_makedirs.return_value = True + mock_exists.return_value = False + mock_install.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_exists.assert_called_with(entry) + mock_install.assert_called_with(self.ptool, entry) + mock_makedirs.assert_called_with(entry) + + reset() + exists_rv = MagicMock() + exists_rv.__getitem__.return_value = stat.S_IFREG | 420 # 0o644 + mock_exists.return_value = exists_rv + self.assertTrue(self.ptool.install(entry)) + mock_unlink.assert_called_with(entry.get("name")) + mock_exists.assert_called_with(entry) + mock_makedirs.assert_called_with(entry) + mock_install.assert_called_with(self.ptool, entry) + + reset() + exists_rv.__getitem__.return_value = stat.S_IFDIR | 420 # 0o644 + mock_install.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_exists.assert_called_with(entry) + mock_install.assert_called_with(self.ptool, entry) + + reset() + mock_install.return_value = False + self.assertFalse(self.ptool.install(entry)) + mock_install.assert_called_with(self.ptool, entry) + + entry.set("prune", "true") + prune = ["/test/foo/bar/prune1", "/test/foo/bar/prune2"] + for path in prune: + lxml.etree.SubElement(entry, "Prune", path=path) + + reset() + mock_install.return_value = True + + def isdir_rv(path): + if path.endswith("prune2"): + return True + else: + return False + mock_isdir.side_effect = isdir_rv + self.assertTrue(self.ptool.install(entry)) + mock_exists.assert_called_with(entry) + mock_install.assert_called_with(self.ptool, entry) + self.assertItemsEqual(mock_isdir.call_args_list, + [call(p) for p in prune]) + mock_unlink.assert_called_with("/test/foo/bar/prune1") + mock_rmtree.assert_called_with("/test/foo/bar/prune2") diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py new file mode 100644 index 000000000..cdf11ce5e --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py @@ -0,0 +1,448 @@ +# -*- coding: utf-8 -*- +import os +import sys +import copy +import difflib +import binascii +import lxml.etree +from Bcfg2.Compat import b64encode, b64decode, u_str +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.File import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from Test__init import get_posix_object +from Testbase import TestPOSIXTool +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +def get_file_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXFile(posix.logger, posix.setup, posix.config) + +class TestPOSIXFile(TestPOSIXTool): + test_obj = POSIXFile + + def test_fully_specified(self): + entry = lxml.etree.Element("Path", name="/test", type="file") + self.assertFalse(self.ptool.fully_specified(entry)) + + entry.set("empty", "true") + self.assertTrue(self.ptool.fully_specified(entry)) + + entry.set("empty", "false") + entry.text = "text" + self.assertTrue(self.ptool.fully_specified(entry)) + + def test_is_string(self): + for char in list(range(8)) + list(range(14, 32)): + self.assertFalse(self.ptool._is_string("foo" + chr(char) + "bar", + 'UTF-8')) + for char in list(range(9, 14)) + list(range(33, 128)): + self.assertTrue(self.ptool._is_string("foo" + chr(char) + "bar", + 'UTF-8')) + ustr = 'é' + self.assertTrue(self.ptool._is_string(ustr, 'UTF-8')) + if not inPy3k: + self.assertFalse(self.ptool._is_string("foo" + chr(128) + "bar", + 'ascii')) + self.assertFalse(self.ptool._is_string(ustr, 'ascii')) + + def test_get_data(self): + orig_entry = lxml.etree.Element("Path", name="/test", type="file") + setup = dict(encoding="ascii", ppath='/', max_copies=5) + ptool = self.get_obj(posix=get_posix_object(setup=setup)) + + entry = copy.deepcopy(orig_entry) + entry.text = b64encode("test") + entry.set("encoding", "base64") + self.assertEqual(ptool._get_data(entry), ("test", True)) + + entry = copy.deepcopy(orig_entry) + entry.set("empty", "true") + self.assertEqual(ptool._get_data(entry), ("", False)) + + entry = copy.deepcopy(orig_entry) + entry.text = "test" + self.assertEqual(ptool._get_data(entry), ("test", False)) + + if inPy3k: + ustr = 'é' + else: + ustr = u_str('é', 'UTF-8') + entry = copy.deepcopy(orig_entry) + entry.text = ustr + self.assertEqual(ptool._get_data(entry), (ustr, False)) + + @patch("%s.open" % builtins) + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + @patch("Bcfg2.Client.Tools.POSIX.File.%s._exists" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_diffs" % test_obj.__name__) + def test_verify(self, mock_get_diffs, mock_get_data, mock_exists, + mock_verify, mock_open): + entry = lxml.etree.Element("Path", name="/test", type="file") + setup = dict(interactive=False, ppath='/', max_copies=5) + ptool = self.get_obj(posix=get_posix_object(setup=setup)) + + def reset(): + mock_get_diffs.reset_mock() + mock_get_data.reset_mock() + mock_exists.reset_mock() + mock_verify.reset_mock() + mock_open.reset_mock() + + mock_get_data.return_value = ("test", False) + mock_exists.return_value = False + mock_verify.return_value = True + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=False, + is_binary=False, + content="") + + reset() + exists_rv = MagicMock() + exists_rv.__getitem__.return_value = 5 + mock_exists.return_value = exists_rv + mock_get_data.return_value = ("test", True) + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=False, + is_binary=True, + content=None) + + reset() + mock_get_data.return_value = ("test", False) + exists_rv.__getitem__.return_value = 4 + entry.set("sensitive", "true") + mock_open.return_value.read.return_value = "tart" + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_called_with() + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=True, + is_binary=False, + content="tart") + + reset() + mock_open.return_value.read.return_value = "test" + self.assertTrue(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_called_with() + self.assertFalse(mock_get_diffs.called) + + reset() + mock_open.side_effect = IOError + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_open.assert_called_with(entry.get("name")) + + @patch("os.fdopen") + @patch("tempfile.mkstemp") + @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__) + def test_write_tmpfile(self, mock_get_data, mock_mkstemp, mock_fdopen): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + newfile = "/foo/bar" + + def reset(): + mock_get_data.reset_mock() + mock_mkstemp.reset_mock() + mock_fdopen.reset_mock() + + mock_get_data.return_value = ("test", False) + mock_mkstemp.return_value = (5, newfile) + self.assertEqual(self.ptool._write_tmpfile(entry), newfile) + mock_get_data.assert_called_with(entry) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + mock_fdopen.assert_called_with(5, 'w') + mock_fdopen.return_value.write.assert_called_with("test") + + reset() + mock_mkstemp.side_effect = OSError + self.assertFalse(self.ptool._write_tmpfile(entry)) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + + reset() + mock_mkstemp.side_effect = None + mock_fdopen.side_effect = OSError + self.assertFalse(self.ptool._write_tmpfile(entry)) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + mock_get_data.assert_called_with(entry) + mock_fdopen.assert_called_with(5, 'w') + + @patch("os.rename") + @patch("os.unlink") + def test_rename_tmpfile(self, mock_unlink, mock_rename): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + newfile = "/foo/bar" + + self.assertTrue(self.ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + + mock_rename.reset_mock() + mock_unlink.reset_mock() + mock_rename.side_effect = OSError + self.assertFalse(self.ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + mock_unlink.assert_called_with(newfile) + + # even if the unlink fails, return false gracefully + mock_rename.reset_mock() + mock_unlink.reset_mock() + mock_unlink.side_effect = OSError + self.assertFalse(self.ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + mock_unlink.assert_called_with(newfile) + + @patch("%s.open" % builtins) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._diff" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._is_string" % test_obj.__name__) + def test__get_diffs(self, mock_is_string, mock_get_data, mock_diff, + mock_open): + orig_entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', + group='root') + orig_entry.text = "test" + ondisk = "test2" + setup = dict(encoding="utf-8", ppath='/', max_copies=5) + ptool = self.get_obj(posix=get_posix_object(setup=setup)) + + def reset(): + mock_is_string.reset_mock() + mock_get_data.reset_mock() + mock_diff.reset_mock() + mock_open.reset_mock() + return copy.deepcopy(orig_entry) + + mock_is_string.return_value = True + mock_get_data.return_value = (orig_entry.text, False) + mock_open.return_value.read.return_value = ondisk + mock_diff.return_value = ["-test2", "+test"] + + # binary data in the entry + entry = reset() + ptool._get_diffs(entry, is_binary=True) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_any_call() + self.assertFalse(mock_diff.called) + self.assertEqual(entry.get("current_bfile"), b64encode(ondisk)) + + # binary data on disk + entry = reset() + mock_is_string.return_value = False + ptool._get_diffs(entry, content=ondisk) + self.assertFalse(mock_open.called) + self.assertFalse(mock_diff.called) + self.assertEqual(entry.get("current_bfile"), b64encode(ondisk)) + + # sensitive, non-interactive -- do nothing + entry = reset() + mock_is_string.return_value = True + ptool._get_diffs(entry, sensitive=True, interactive=False) + self.assertFalse(mock_open.called) + self.assertFalse(mock_diff.called) + self.assertXMLEqual(entry, orig_entry) + + # sensitive, interactive + entry = reset() + ptool._get_diffs(entry, sensitive=True, interactive=True) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_any_call() + mock_diff.assert_called_with(ondisk, entry.text, difflib.unified_diff, + filename=entry.get("name")) + self.assertIsNotNone(entry.get("qtext")) + del entry.attrib['qtext'] + self.assertItemsEqual(orig_entry.attrib, entry.attrib) + + # non-sensitive, non-interactive + entry = reset() + ptool._get_diffs(entry, content=ondisk) + self.assertFalse(mock_open.called) + mock_diff.assert_called_with(ondisk, entry.text, difflib.ndiff, + filename=entry.get("name")) + self.assertIsNone(entry.get("qtext")) + self.assertEqual(entry.get("current_bdiff"), + b64encode("\n".join(mock_diff.return_value))) + del entry.attrib["current_bdiff"] + self.assertItemsEqual(orig_entry.attrib, entry.attrib) + + # non-sensitive, interactive -- do everything. also test + # appending to qtext + entry = reset() + entry.set("qtext", "test") + ptool._get_diffs(entry, interactive=True) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_any_call() + self.assertItemsEqual(mock_diff.call_args_list, + [call(ondisk, entry.text, difflib.unified_diff, + filename=entry.get("name")), + call(ondisk, entry.text, difflib.ndiff, + filename=entry.get("name"))]) + self.assertIsNotNone(entry.get("qtext")) + self.assertTrue(entry.get("qtext").startswith("test\n")) + self.assertEqual(entry.get("current_bdiff"), + b64encode("\n".join(mock_diff.return_value))) + del entry.attrib['qtext'] + del entry.attrib["current_bdiff"] + self.assertItemsEqual(orig_entry.attrib, entry.attrib) + + # non-sensitive, interactive with unicode data + entry = reset() + entry.text = u("tëst") + encoded = entry.text.encode(setup['encoding']) + mock_diff.return_value = ["-test2", "+tëst"] + mock_get_data.return_value = (encoded, False) + ptool._get_diffs(entry, interactive=True) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_any_call() + self.assertItemsEqual(mock_diff.call_args_list, + [call(ondisk, encoded, difflib.unified_diff, + filename=entry.get("name")), + call(ondisk, encoded, difflib.ndiff, + filename=entry.get("name"))]) + self.assertIsNotNone(entry.get("qtext")) + self.assertEqual(entry.get("current_bdiff"), + b64encode("\n".join(mock_diff.return_value))) + del entry.attrib['qtext'] + del entry.attrib["current_bdiff"] + self.assertItemsEqual(orig_entry.attrib, entry.attrib) + + @patch("os.path.exists") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.File.%s._makedirs" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._set_perms" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._write_tmpfile" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._rename_tmpfile" % + test_obj.__name__) + def test_install(self, mock_rename, mock_write, mock_set_perms, + mock_makedirs, mock_install, mock_exists): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + + def reset(): + mock_rename.reset_mock() + mock_write.reset_mock() + mock_set_perms.reset_mock() + mock_makedirs.reset_mock() + mock_install.reset_mock() + mock_exists.reset_mock() + + mock_exists.return_value = False + mock_makedirs.return_value = False + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + + reset() + mock_makedirs.return_value = True + mock_write.return_value = False + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + + reset() + newfile = '/test.X987yS' + mock_write.return_value = newfile + mock_set_perms.return_value = False + mock_rename.return_value = False + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + + reset() + mock_rename.return_value = True + mock_install.return_value = False + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(self.ptool, entry) + + reset() + mock_install.return_value = True + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(self.ptool, entry) + + reset() + mock_set_perms.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(self.ptool, entry) + + reset() + mock_exists.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + self.assertFalse(mock_makedirs.called) + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(self.ptool, entry) + + @patch("time.time") + def test_diff(self, mock_time): + content1 = "line1\nline2" + content2 = "line3" + + self.now = 1345640723 + def time_rv(): + self.now += 1 + return self.now + mock_time.side_effect = time_rv + + rv = ["line1", "line2", "line3"] + func = Mock() + func.return_value = rv + self.assertItemsEqual(self.ptool._diff(content1, content2, func), rv) + func.assert_called_with(["line1", "line2"], ["line3"]) + + func.reset_mock() + mock_time.reset_mock() + def time_rv(): + self.now += 5 + return self.now + mock_time.side_effect = time_rv + + def slow_diff(content1, content2): + for i in range(1, 10): + yield "line%s" % i + func.side_effect = slow_diff + self.assertFalse(self.ptool._diff(content1, content2, func), rv) + func.assert_called_with(["line1", "line2"], ["line3"]) diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py new file mode 100644 index 000000000..d68e15837 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py @@ -0,0 +1,85 @@ +import os +import sys +import copy +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Hardlink import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from Test__init import get_posix_object +from Testbase import TestPOSIXTool +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +class TestPOSIXHardlink(TestPOSIXTool): + test_obj = POSIXHardlink + + @patch("os.path.samefile") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + def test_verify(self, mock_verify, mock_samefile): + entry = lxml.etree.Element("Path", name="/test", type="hardlink", + to="/dest") + ptool = self.get_obj() + + mock_samefile.return_value = True + mock_verify.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_samefile.assert_called_with(entry.get("name"), + entry.get("to")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_samefile.reset_mock() + mock_verify.reset_mock() + mock_verify.return_value = True + self.assertTrue(ptool.verify(entry, [])) + mock_samefile.assert_called_with(entry.get("name"), + entry.get("to")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_samefile.reset_mock() + mock_verify.reset_mock() + mock_samefile.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_samefile.assert_called_with(entry.get("name"), + entry.get("to")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_samefile.reset_mock() + mock_verify.reset_mock() + mock_samefile.side_effect = OSError + self.assertFalse(ptool.verify(entry, [])) + mock_samefile.assert_called_with(entry.get("name"), + entry.get("to")) + + @patch("os.link") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.Hardlink.%s._exists" % test_obj.__name__) + def test_install(self, mock_exists, mock_install, mock_link): + entry = lxml.etree.Element("Path", name="/test", type="hardlink", + to="/dest") + ptool = self.get_obj() + + mock_exists.return_value = False + mock_install.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_link.assert_called_with(entry.get("to"), entry.get("name")) + mock_install.assert_called_with(ptool, entry) + + mock_link.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_link.side_effect = OSError + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_link.assert_called_with(entry.get("to"), entry.get("name")) + mock_install.assert_called_with(ptool, entry) diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py new file mode 100644 index 000000000..375ff00eb --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py @@ -0,0 +1,91 @@ +import os +import sys +import copy +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Nonexistent import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from Test__init import get_config, get_posix_object +from Testbase import TestPOSIXTool +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +class TestPOSIXNonexistent(TestPOSIXTool): + test_obj = POSIXNonexistent + + @patch("os.path.lexists") + def test_verify(self, mock_lexists): + entry = lxml.etree.Element("Path", name="/test", type="nonexistent") + + for val in [True, False]: + mock_lexists.reset_mock() + mock_lexists.return_value = val + self.assertEqual(self.ptool.verify(entry, []), not val) + mock_lexists.assert_called_with(entry.get("name")) + + @patch("os.rmdir") + @patch("os.remove") + @patch("os.path.isdir") + @patch("shutil.rmtree") + def test_install(self, mock_rmtree, mock_isdir, mock_remove, mock_rmdir): + entry = lxml.etree.Element("Path", name="/test", type="nonexistent") + + def reset(): + mock_isdir.reset_mock() + mock_remove.reset_mock() + mock_rmdir.reset_mock() + mock_rmtree.reset_mock() + + mock_isdir.return_value = False + self.assertTrue(self.ptool.install(entry)) + mock_remove.assert_called_with(entry.get("name")) + + reset() + mock_remove.side_effect = OSError + self.assertFalse(self.ptool.install(entry)) + mock_remove.assert_called_with(entry.get("name")) + + reset() + mock_isdir.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_rmdir.assert_called_with(entry.get("name")) + + reset() + mock_rmdir.side_effect = OSError + self.assertFalse(self.ptool.install(entry)) + mock_rmdir.assert_called_with(entry.get("name")) + + reset() + entry.set("recursive", "true") + self.assertTrue(self.ptool.install(entry)) + mock_rmtree.assert_called_with(entry.get("name")) + + reset() + mock_rmtree.side_effect = OSError + self.assertFalse(self.ptool.install(entry)) + mock_rmtree.assert_called_with(entry.get("name")) + + reset() + child_entry = lxml.etree.Element("Path", name="/test/foo", + type="nonexistent") + ptool = self.get_obj(posix=get_posix_object(config=get_config([child_entry]))) + mock_rmtree.side_effect = None + self.assertTrue(ptool.install(entry)) + mock_rmtree.assert_called_with(entry.get("name")) + + reset() + child_entry = lxml.etree.Element("Path", name="/test/foo", + type="file") + ptool = self.get_obj(posix=get_posix_object(config=get_config([child_entry]))) + mock_rmtree.side_effect = None + self.assertFalse(ptool.install(entry)) diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py new file mode 100644 index 000000000..565857437 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py @@ -0,0 +1,5 @@ +from Bcfg2.Client.Tools.POSIX.Permissions import * +from Testbase import TestPOSIXTool + +class TestPOSIXPermissions(TestPOSIXTool): + test_obj = POSIXPermissions diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py new file mode 100644 index 000000000..b02f7b3c3 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py @@ -0,0 +1,99 @@ +import os +import sys +import copy +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Symlink import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from Test__init import get_posix_object +from Testbase import TestPOSIXTool +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +class TestPOSIXSymlink(TestPOSIXTool): + test_obj = POSIXSymlink + + @patch("os.readlink") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + def test_verify(self, mock_verify, mock_readlink): + entry = lxml.etree.Element("Path", name="/test", type="symlink", + to="/dest") + ptool = self.get_obj() + + mock_readlink.return_value = entry.get("to") + mock_verify.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_readlink.assert_called_with(entry.get("name")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_readlink.reset_mock() + mock_verify.reset_mock() + mock_verify.return_value = True + self.assertTrue(ptool.verify(entry, [])) + mock_readlink.assert_called_with(entry.get("name")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_readlink.reset_mock() + mock_verify.reset_mock() + mock_readlink.return_value = "/bogus" + self.assertFalse(ptool.verify(entry, [])) + mock_readlink.assert_called_with(entry.get("name")) + mock_verify.assert_called_with(ptool, entry, []) + + # relative symlink + mock_readlink.reset_mock() + mock_verify.reset_mock() + entry = lxml.etree.Element("Path", name="/test", type="symlink", + to="dest") + mock_readlink.return_value = entry.get("to") + self.assertTrue(ptool.verify(entry, [])) + mock_readlink.assert_called_with(entry.get("name")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_readlink.reset_mock() + mock_verify.reset_mock() + mock_readlink.side_effect = OSError + self.assertFalse(ptool.verify(entry, [])) + mock_readlink.assert_called_with(entry.get("name")) + + @patch("os.symlink") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.Symlink.%s._exists" % test_obj.__name__) + def test_install(self, mock_exists, mock_install, mock_symlink): + entry = lxml.etree.Element("Path", name="/test", type="symlink", + to="/dest") + ptool = self.get_obj() + + mock_exists.return_value = False + mock_install.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_symlink.assert_called_with(entry.get("to"), entry.get("name")) + mock_install.assert_called_with(ptool, entry) + + # relative symlink + entry = lxml.etree.Element("Path", name="/test", type="symlink", + to="dest") + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_symlink.assert_called_with(entry.get("to"), entry.get("name")) + mock_install.assert_called_with(ptool, entry) + + mock_symlink.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_symlink.side_effect = OSError + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_symlink.assert_called_with(entry.get("to"), entry.get("name")) + mock_install.assert_called_with(ptool, entry) diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py new file mode 100644 index 000000000..14a2520df --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py @@ -0,0 +1,252 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +import Bcfg2.Client.Tools +import Bcfg2.Client.Tools.POSIX + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +def get_config(entries): + config = lxml.etree.Element("Configuration") + bundle = lxml.etree.SubElement(config, "Bundle", name="test") + bundle.extend(entries) + return config + +def get_posix_object(logger=None, setup=None, config=None): + if config is None: + config = lxml.etree.Element("Configuration") + if not logger: + def print_msg(msg): + print(msg) + logger = Mock() + logger.error = Mock(side_effect=print_msg) + logger.warning = Mock(side_effect=print_msg) + logger.info = Mock(side_effect=print_msg) + logger.debug = Mock(side_effect=print_msg) + if not setup: + setup = MagicMock() + return Bcfg2.Client.Tools.POSIX.POSIX(logger, setup, config) + + +class TestPOSIX(Bcfg2TestCase): + def setUp(self): + self.posix = get_posix_object() + + def tearDown(self): + # just to guarantee that we start fresh each time + self.posix = None + + def test__init(self): + entries = [lxml.etree.Element("Path", name="test", type="file")] + posix = get_posix_object(config=get_config(entries)) + self.assertIsInstance(posix, Bcfg2.Client.Tools.Tool) + self.assertIsInstance(posix, Bcfg2.Client.Tools.POSIX.POSIX) + self.assertIn('Path', posix.__req__) + self.assertGreater(len(posix.__req__['Path']), 0) + self.assertGreater(len(posix.__handles__), 0) + self.assertItemsEqual(posix.handled, entries) + + @patch("Bcfg2.Client.Tools.Tool.canVerify") + def test_canVerify(self, mock_canVerify): + entry = lxml.etree.Element("Path", name="test", type="file") + + # first, test superclass canVerify failure + mock_canVerify.return_value = False + self.assertFalse(self.posix.canVerify(entry)) + mock_canVerify.assert_called_with(self.posix, entry) + + # next, test fully_specified failure + self.posix.logger.error.reset_mock() + mock_canVerify.reset_mock() + mock_canVerify.return_value = True + mock_fully_spec = Mock() + mock_fully_spec.return_value = False + self.posix._handlers[entry.get("type")].fully_specified = \ + mock_fully_spec + self.assertFalse(self.posix.canVerify(entry)) + mock_canVerify.assert_called_with(self.posix, entry) + mock_fully_spec.assert_called_with(entry) + self.assertTrue(self.posix.logger.error.called) + + # finally, test success + self.posix.logger.error.reset_mock() + mock_canVerify.reset_mock() + mock_fully_spec.reset_mock() + mock_fully_spec.return_value = True + self.assertTrue(self.posix.canVerify(entry)) + mock_canVerify.assert_called_with(self.posix, entry) + mock_fully_spec.assert_called_with(entry) + self.assertFalse(self.posix.logger.error.called) + + @patch("Bcfg2.Client.Tools.Tool.canInstall") + def test_canInstall(self, mock_canInstall): + entry = lxml.etree.Element("Path", name="test", type="file") + + # first, test superclass canInstall failure + mock_canInstall.return_value = False + self.assertFalse(self.posix.canInstall(entry)) + mock_canInstall.assert_called_with(self.posix, entry) + + # next, test fully_specified failure + self.posix.logger.error.reset_mock() + mock_canInstall.reset_mock() + mock_canInstall.return_value = True + mock_fully_spec = Mock() + mock_fully_spec.return_value = False + self.posix._handlers[entry.get("type")].fully_specified = \ + mock_fully_spec + self.assertFalse(self.posix.canInstall(entry)) + mock_canInstall.assert_called_with(self.posix, entry) + mock_fully_spec.assert_called_with(entry) + self.assertTrue(self.posix.logger.error.called) + + # finally, test success + self.posix.logger.error.reset_mock() + mock_canInstall.reset_mock() + mock_fully_spec.reset_mock() + mock_fully_spec.return_value = True + self.assertTrue(self.posix.canInstall(entry)) + mock_canInstall.assert_called_with(self.posix, entry) + mock_fully_spec.assert_called_with(entry) + self.assertFalse(self.posix.logger.error.called) + + def test_InstallPath(self): + entry = lxml.etree.Element("Path", name="test", type="file") + + mock_install = Mock() + mock_install.return_value = True + self.posix._handlers[entry.get("type")].install = mock_install + self.assertTrue(self.posix.InstallPath(entry)) + mock_install.assert_called_with(entry) + + def test_VerifyPath(self): + entry = lxml.etree.Element("Path", name="test", type="file") + modlist = [] + + mock_verify = Mock() + mock_verify.return_value = True + self.posix._handlers[entry.get("type")].verify = mock_verify + self.assertTrue(self.posix.VerifyPath(entry, modlist)) + mock_verify.assert_called_with(entry, modlist) + + mock_verify.reset_mock() + mock_verify.return_value = False + self.posix.setup.__getitem__.return_value = True + self.assertFalse(self.posix.VerifyPath(entry, modlist)) + self.assertIsNotNone(entry.get('qtext')) + + @patch('os.remove') + def test_prune_old_backups(self, mock_remove): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + setup = dict(ppath='/', max_copies=5, paranoid=True) + posix = get_posix_object(setup=setup) + + remove = ["_etc_foo_2012-07-20T04:13:22.364989", + "_etc_foo_2012-07-31T04:13:23.894958", + "_etc_foo_2012-07-17T04:13:22.493316",] + keep = ["_etc_foo_bar_2011-08-07T04:13:22.519978", + "_etc_foo_2012-08-04T04:13:22.519978", + "_etc_Foo_2011-08-07T04:13:22.519978", + "_etc_foo_2012-08-06T04:13:22.519978", + "_etc_foo_2012-08-03T04:13:22.191895", + "_etc_test_2011-08-07T04:13:22.519978", + "_etc_foo_2012-08-07T04:13:22.519978",] + + @patch('os.listdir') + def inner(mock_listdir): + mock_listdir.side_effect = OSError + posix._prune_old_backups(entry) + self.assertTrue(posix.logger.error.called) + self.assertFalse(mock_remove.called) + mock_listdir.assert_called_with(setup['ppath']) + + mock_listdir.reset_mock() + mock_remove.reset_mock() + mock_listdir.side_effect = None + mock_listdir.return_value = keep + remove + + posix._prune_old_backups(entry) + mock_listdir.assert_called_with(setup['ppath']) + self.assertItemsEqual(mock_remove.call_args_list, + [call(os.path.join(setup['ppath'], p)) + for p in remove]) + + mock_listdir.reset_mock() + mock_remove.reset_mock() + mock_remove.side_effect = OSError + posix.logger.error.reset_mock() + # test to ensure that we call os.remove() for all files that + # need to be removed even if we get an error + posix._prune_old_backups(entry) + mock_listdir.assert_called_with(setup['ppath']) + self.assertItemsEqual(mock_remove.call_args_list, + [call(os.path.join(setup['ppath'], p)) + for p in remove]) + self.assertTrue(posix.logger.error.called) + + inner() + + @patch("shutil.copy") + @patch("os.path.isdir") + @patch("Bcfg2.Client.Tools.POSIX.POSIX._prune_old_backups") + def test_paranoid_backup(self, mock_prune, mock_isdir, mock_copy): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + setup = dict(ppath='/', max_copies=5, paranoid=False) + posix = get_posix_object(setup=setup) + + # paranoid false globally + posix._paranoid_backup(entry) + self.assertFalse(mock_prune.called) + self.assertFalse(mock_copy.called) + + # paranoid false on the entry + mock_prune.reset_mock() + setup['paranoid'] = True + posix = get_posix_object(setup=setup) + posix._paranoid_backup(entry) + self.assertFalse(mock_prune.called) + self.assertFalse(mock_copy.called) + + # entry does not exist on filesystem + mock_prune.reset_mock() + entry.set("paranoid", "true") + entry.set("current_exists", "false") + posix._paranoid_backup(entry) + self.assertFalse(mock_prune.called) + self.assertFalse(mock_copy.called) + + # entry is a directory on the filesystem + mock_prune.reset_mock() + entry.set("current_exists", "true") + mock_isdir.return_value = True + posix._paranoid_backup(entry) + self.assertFalse(mock_prune.called) + self.assertFalse(mock_copy.called) + mock_isdir.assert_called_with(entry.get("name")) + + # test the actual backup now + mock_prune.reset_mock() + mock_isdir.return_value = False + posix._paranoid_backup(entry) + mock_isdir.assert_called_with(entry.get("name")) + mock_prune.assert_called_with(entry) + # it's basically impossible to test the shutil.copy() call + # exactly because the destination includes microseconds, so we + # just test it good enough + self.assertEqual(mock_copy.call_args[0][0], + entry.get("name")) + bkupnam = os.path.join(setup['ppath'], + entry.get('name').replace('/', '_')) + '_' + self.assertEqual(bkupnam, mock_copy.call_args[0][1][:len(bkupnam)]) diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py new file mode 100644 index 000000000..b447ab642 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py @@ -0,0 +1,991 @@ +import os +import sys +import copy +import stat +import lxml.etree +from mock import Mock, MagicMock, patch +import Bcfg2.Client.Tools +from Bcfg2.Client.Tools.POSIX.base import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from Test__init import get_posix_object +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +try: + import selinux + has_selinux = True +except ImportError: + has_selinux = False + +try: + import posix1e + has_acls = True +except ImportError: + has_acls = False + +class TestPOSIXTool(Bcfg2TestCase): + test_obj = POSIXTool + + def get_obj(self, posix=None): + if posix is None: + posix = get_posix_object() + return self.test_obj(posix.logger, posix.setup, posix.config) + + def setUp(self): + self.ptool = self.get_obj() + + def tearDown(self): + # just to guarantee that we start fresh each time + self.ptool = None + + def test_fully_specified(self): + # fully_specified should do no checking on the abstract + # POSIXTool object + self.assertTrue(self.ptool.fully_specified(Mock())) + + @patch('os.stat') + @patch('os.walk') + @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_metadata" % + test_obj.__name__) + def test_verify(self, mock_verify, mock_walk, mock_stat): + entry = lxml.etree.Element("Path", name="/test", type="file") + + mock_stat.return_value = MagicMock() + mock_verify.return_value = False + self.assertFalse(self.ptool.verify(entry, [])) + mock_verify.assert_called_with(entry) + + mock_verify.reset_mock() + mock_verify.return_value = True + self.assertTrue(self.ptool.verify(entry, [])) + mock_verify.assert_called_with(entry) + + mock_verify.reset_mock() + entry.set("recursive", "true") + walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]), + ("/dir1", ["dir3"], []), + ("/dir2", [], ["file3", "file4"])] + mock_walk.return_value = walk_rv + self.assertTrue(self.ptool.verify(entry, [])) + mock_walk.assert_called_with(entry.get("name")) + all_verifies = [call(entry)] + for root, dirs, files in walk_rv: + all_verifies.extend([call(entry, path=os.path.join(root, p)) + for p in dirs + files]) + self.assertItemsEqual(mock_verify.call_args_list, all_verifies) + + @patch('os.walk') + @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__) + def test_install(self, mock_set_perms, mock_walk): + entry = lxml.etree.Element("Path", name="/test", type="file") + + mock_set_perms.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_set_perms.assert_called_with(entry) + + mock_set_perms.reset_mock() + entry.set("recursive", "true") + walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]), + ("/dir1", ["dir3"], []), + ("/dir2", [], ["file3", "file4"])] + mock_walk.return_value = walk_rv + + mock_set_perms.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_walk.assert_called_with(entry.get("name")) + all_set_perms = [call(entry)] + for root, dirs, files in walk_rv: + all_set_perms.extend([call(entry, path=os.path.join(root, p)) + for p in dirs + files]) + self.assertItemsEqual(mock_set_perms.call_args_list, + all_set_perms) + + mock_walk.reset_mock() + mock_set_perms.reset_mock() + + def set_perms_rv(entry, path=None): + if path == '/dir2/file3': + return False + else: + return True + mock_set_perms.side_effect = set_perms_rv + + self.assertFalse(self.ptool.install(entry)) + mock_walk.assert_called_with(entry.get("name")) + self.assertItemsEqual(mock_set_perms.call_args_list, all_set_perms) + + @patch('os.lstat') + @patch("os.unlink") + @patch("os.path.isdir") + @patch("shutil.rmtree") + def test_exists(self, mock_rmtree, mock_isdir, mock_unlink, mock_lstat): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + + mock_lstat.side_effect = OSError + self.assertFalse(self.ptool._exists(entry)) + mock_lstat.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + mock_lstat.reset_mock() + mock_unlink.reset_mock() + rv = MagicMock() + mock_lstat.return_value = rv + mock_lstat.side_effect = None + self.assertEqual(self.ptool._exists(entry), rv) + mock_lstat.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + mock_lstat.reset_mock() + mock_unlink.reset_mock() + mock_isdir.return_value = False + self.assertFalse(self.ptool._exists(entry, remove=True)) + mock_isdir.assert_called_with(entry.get('name')) + mock_lstat.assert_called_with(entry.get('name')) + mock_unlink.assert_called_with(entry.get('name')) + self.assertFalse(mock_rmtree.called) + + mock_lstat.reset_mock() + mock_isdir.reset_mock() + mock_unlink.reset_mock() + mock_rmtree.reset_mock() + mock_isdir.return_value = True + self.assertFalse(self.ptool._exists(entry, remove=True)) + mock_isdir.assert_called_with(entry.get('name')) + mock_lstat.assert_called_with(entry.get('name')) + mock_rmtree.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + mock_isdir.reset_mock() + mock_lstat.reset_mock() + mock_unlink.reset_mock() + mock_rmtree.reset_mock() + mock_rmtree.side_effect = OSError + self.assertEqual(self.ptool._exists(entry, remove=True), rv) + mock_isdir.assert_called_with(entry.get('name')) + mock_lstat.assert_called_with(entry.get('name')) + mock_rmtree.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + @patch("os.chown") + @patch("os.chmod") + @patch("os.utime") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_acls" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_secontext" % + test_obj.__name__) + def test_set_perms(self, mock_set_secontext, mock_set_acls, mock_norm_gid, + mock_norm_uid, mock_utime, mock_chmod, mock_chown): + def reset(): + mock_set_secontext.reset_mock() + mock_set_acls.reset_mock() + mock_norm_gid.reset_mock() + mock_norm_uid.reset_mock() + mock_chmod.reset_mock() + mock_chown.reset_mock() + mock_utime.reset_mock() + + entry = lxml.etree.Element("Path", name="/etc/foo", to="/etc/bar", + type="symlink") + mock_set_acls.return_value = True + mock_set_secontext.return_value = True + self.assertTrue(self.ptool._set_perms(entry)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + entry = lxml.etree.Element("Path", name="/etc/foo", owner="owner", + group="group", perms="644", type="file") + mock_norm_uid.return_value = 10 + mock_norm_gid.return_value = 100 + + reset() + self.assertTrue(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + self.assertFalse(mock_utime.called) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + reset() + mtime = 1344459042 + entry.set("mtime", str(mtime)) + self.assertTrue(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + reset() + self.assertTrue(self.ptool._set_perms(entry, path='/etc/bar')) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with('/etc/bar', 10, 100) + mock_chmod.assert_called_with('/etc/bar', int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path='/etc/bar') + mock_set_acls.assert_called_with(entry, path='/etc/bar') + + # test dev_type modification of perms, failure of chown + reset() + def chown_rv(path, owner, group): + if owner == 0 and group == 0: + return True + else: + raise KeyError + os.chown.side_effect = chown_rv + entry.set("type", "device") + entry.set("dev_type", list(device_map.keys())[0]) + self.assertFalse(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 0, 0) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8) | list(device_map.values())[0]) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + # test failure of chmod + reset() + os.chown.side_effect = None + os.chmod.side_effect = OSError + entry.set("type", "file") + del entry.attrib["dev_type"] + self.assertFalse(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + # test that even when everything fails, we try to do it all. + # e.g., when chmod fails, we still try to apply acls, set + # selinux context, etc. + reset() + os.chown.side_effect = OSError + os.utime.side_effect = OSError + mock_set_acls.return_value = False + mock_set_secontext.return_value = False + self.assertFalse(self.ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + @skipUnless(has_acls, "ACLS not found, skipping") + @patchIf(has_acls, "posix1e.ACL") + @patchIf(has_acls, "posix1e.Entry") + @patch("os.path.isdir") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" % + test_obj.__name__) + def test_set_acls(self, mock_list_entry_acls, mock_norm_gid, mock_norm_uid, + mock_isdir, mock_Entry, mock_ACL): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + + # disable acls for the initial test + Bcfg2.Client.Tools.POSIX.base.has_acls = False + self.assertTrue(self.ptool._set_acls(entry)) + Bcfg2.Client.Tools.POSIX.base.has_acls = True + + # build a set of file ACLs to return from posix1e.ACL(file=...) + file_acls = [] + acl = Mock() + acl.tag_type = posix1e.ACL_USER + acl.name = "remove" + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_GROUP + acl.name = "remove" + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_MASK + acl.name = "keep" + file_acls.append(acl) + remove_acls = [a for a in file_acls if a.name == "remove"] + + # build a set of ACLs listed on the entry as returned by + # _list_entry_acls() + entry_acls = {("default", posix1e.ACL_USER, "user"): 7, + ("access", posix1e.ACL_GROUP, "group"): 5} + mock_list_entry_acls.return_value = entry_acls + mock_norm_uid.return_value = 10 + mock_norm_gid.return_value = 100 + + # set up the unreasonably complex return value for + # posix1e.ACL(), which has three separate uses + fileacl_rv = MagicMock() + fileacl_rv.valid.return_value = True + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv = MagicMock() + filedef_rv.valid.return_value = True + filedef_rv.__iter__.return_value = iter(file_acls) + acl_rv = MagicMock() + def mock_acl_rv(file=None, filedef=None, acl=None): + if file: + return fileacl_rv + elif filedef: + return filedef_rv + elif acl: + return acl_rv + + # set up the equally unreasonably complex return value for + # posix1e.Entry, which returns a new entry and adds it to + # an ACL, so we have to track the Mock objects it returns. + # why can't they just have an acl.add_entry() method?!? + acl_entries = [] + def mock_entry_rv(acl): + rv = MagicMock() + rv.acl = acl + rv.permset = set() + acl_entries.append(rv) + return rv + mock_Entry.side_effect = mock_entry_rv + + def reset(): + mock_isdir.reset_mock() + mock_ACL.reset_mock() + mock_Entry.reset_mock() + fileacl_rv.reset_mock() + + # test fs mounted noacl + mock_ACL.side_effect = IOError(95, "Operation not permitted") + self.assertFalse(self.ptool._set_acls(entry)) + + # test other error + reset() + mock_ACL.side_effect = IOError + self.assertFalse(self.ptool._set_acls(entry)) + + reset() + mock_ACL.side_effect = mock_acl_rv + mock_isdir.return_value = True + self.assertTrue(self.ptool._set_acls(entry)) + self.assertItemsEqual(mock_ACL.call_args_list, + [call(file=entry.get("name")), + call(filedef=entry.get("name"))]) + self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list, + [call(a) for a in remove_acls]) + self.assertItemsEqual(filedef_rv.delete_entry.call_args_list, + [call(a) for a in remove_acls]) + mock_list_entry_acls.assert_called_with(entry) + mock_norm_uid.assert_called_with("user") + mock_norm_gid.assert_called_with("group") + fileacl_rv.calc_mask.assert_any_call() + fileacl_rv.applyto.assert_called_with(entry.get("name"), + posix1e.ACL_TYPE_ACCESS) + filedef_rv.calc_mask.assert_any_call() + filedef_rv.applyto.assert_called_with(entry.get("name"), + posix1e.ACL_TYPE_DEFAULT) + + # build tuples of the Entry objects that were added to acl + # and defaacl so they're easier to compare for equality + added_acls = [] + for acl in acl_entries: + added_acls.append((acl.acl, acl.tag_type, acl.qualifier, + sum(acl.permset))) + self.assertItemsEqual(added_acls, + [(filedef_rv, posix1e.ACL_USER, 10, 7), + (fileacl_rv, posix1e.ACL_GROUP, 100, 5)]) + + reset() + # have to reassign these because they're iterators, and + # they've already been iterated over once + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv.__iter__.return_value = iter(file_acls) + mock_list_entry_acls.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + mock_isdir.return_value = False + acl_entries = [] + self.assertTrue(self.ptool._set_acls(entry, path="/bin/bar")) + mock_ACL.assert_called_with(file="/bin/bar") + self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list, + [call(a) for a in remove_acls]) + mock_list_entry_acls.assert_called_with(entry) + mock_norm_gid.assert_called_with("group") + fileacl_rv.calc_mask.assert_any_call() + fileacl_rv.applyto.assert_called_with("/bin/bar", + posix1e.ACL_TYPE_ACCESS) + + added_acls = [] + for acl in acl_entries: + added_acls.append((acl.acl, acl.tag_type, acl.qualifier, + sum(acl.permset))) + self.assertItemsEqual(added_acls, + [(fileacl_rv, posix1e.ACL_GROUP, 100, 5)]) + + @skipUnless(has_selinux, "SELinux not found, skipping") + @patchIf(has_selinux, "selinux.restorecon") + @patchIf(has_selinux, "selinux.lsetfilecon") + def test_set_secontext(self, mock_lsetfilecon, mock_restorecon): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + + # disable selinux for the initial test + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + self.assertTrue(self.ptool._set_secontext(entry)) + Bcfg2.Client.Tools.POSIX.base.has_selinux = True + + # no context given + self.assertTrue(self.ptool._set_secontext(entry)) + self.assertFalse(mock_restorecon.called) + self.assertFalse(mock_lsetfilecon.called) + + mock_restorecon.reset_mock() + mock_lsetfilecon.reset_mock() + entry.set("secontext", "__default__") + self.assertTrue(self.ptool._set_secontext(entry)) + mock_restorecon.assert_called_with(entry.get("name")) + self.assertFalse(mock_lsetfilecon.called) + + mock_restorecon.reset_mock() + mock_lsetfilecon.reset_mock() + mock_lsetfilecon.return_value = 0 + entry.set("secontext", "foo_t") + self.assertTrue(self.ptool._set_secontext(entry)) + self.assertFalse(mock_restorecon.called) + mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t") + + mock_restorecon.reset_mock() + mock_lsetfilecon.reset_mock() + mock_lsetfilecon.return_value = 1 + self.assertFalse(self.ptool._set_secontext(entry)) + self.assertFalse(mock_restorecon.called) + mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t") + + @patch("grp.getgrnam") + def test_norm_gid(self, mock_getgrnam): + self.assertEqual(5, self.ptool._norm_gid("5")) + self.assertFalse(mock_getgrnam.called) + + mock_getgrnam.reset_mock() + mock_getgrnam.return_value = ("group", "x", 5, []) + self.assertEqual(5, self.ptool._norm_gid("group")) + mock_getgrnam.assert_called_with("group") + + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__) + def test_norm_entry_gid(self, mock_norm_gid): + entry = lxml.etree.Element("Path", name="/test", type="file", + group="group", owner="user") + mock_norm_gid.return_value = 10 + self.assertEqual(10, self.ptool._norm_entry_gid(entry)) + mock_norm_gid.assert_called_with(entry.get("group")) + + mock_norm_gid.reset_mock() + mock_norm_gid.side_effect = KeyError + self.assertEqual(0, self.ptool._norm_entry_gid(entry)) + mock_norm_gid.assert_called_with(entry.get("group")) + + @patch("pwd.getpwnam") + def test_norm_uid(self, mock_getpwnam): + self.assertEqual(5, self.ptool._norm_uid("5")) + self.assertFalse(mock_getpwnam.called) + + mock_getpwnam.reset_mock() + mock_getpwnam.return_value = ("user", "x", 5, 5, "User", "/home/user", + "/bin/zsh") + self.assertEqual(5, self.ptool._norm_uid("user")) + mock_getpwnam.assert_called_with("user") + + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__) + def test_norm_entry_uid(self, mock_norm_uid): + entry = lxml.etree.Element("Path", name="/test", type="file", + group="group", owner="user") + mock_norm_uid.return_value = 10 + self.assertEqual(10, self.ptool._norm_entry_uid(entry)) + mock_norm_uid.assert_called_with(entry.get("owner")) + + mock_norm_uid.reset_mock() + mock_norm_uid.side_effect = KeyError + self.assertEqual(0, self.ptool._norm_entry_uid(entry)) + mock_norm_uid.assert_called_with(entry.get("owner")) + + def test_norm_acl_perms(self): + # there's basically no reasonably way to test the Permset + # object parsing feature without writing our own Mock object + # that re-implements Permset.test(). silly pylibacl won't let + # us create standalone Entry or Permset objects. + self.assertEqual(5, self.ptool._norm_acl_perms("5")) + self.assertEqual(0, self.ptool._norm_acl_perms("55")) + self.assertEqual(5, self.ptool._norm_acl_perms("rx")) + self.assertEqual(5, self.ptool._norm_acl_perms("r-x")) + self.assertEqual(6, self.ptool._norm_acl_perms("wr-")) + self.assertEqual(0, self.ptool._norm_acl_perms("rwrw")) + self.assertEqual(0, self.ptool._norm_acl_perms("-")) + self.assertEqual(0, self.ptool._norm_acl_perms("a")) + self.assertEqual(6, self.ptool._norm_acl_perms("rwa")) + self.assertEqual(4, self.ptool._norm_acl_perms("rr")) + + @patch('os.stat') + def test__gather_data(self, mock_stat): + path = '/test' + mock_stat.side_effect = OSError + self.assertFalse(self.ptool._gather_data(path)[0]) + mock_stat.assert_called_with(path) + + mock_stat.reset_mock() + mock_stat.side_effect = None + # create a return value + stat_rv = MagicMock() + def stat_getitem(key): + if int(key) == stat.ST_UID: + return 0 + elif int(key) == stat.ST_GID: + return 10 + elif int(key) == stat.ST_MODE: + # return extra bits in the mode to emulate a device + # and ensure that they're stripped + return int('060660', 8) + stat_rv.__getitem__ = Mock(side_effect=stat_getitem) + mock_stat.return_value = stat_rv + + # disable selinux and acls for this call -- we test them + # separately so that we can skip those tests as appropriate + states = (Bcfg2.Client.Tools.POSIX.base.has_selinux, + Bcfg2.Client.Tools.POSIX.base.has_acls) + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + Bcfg2.Client.Tools.POSIX.base.has_acls = False + self.assertEqual(self.ptool._gather_data(path), + (stat_rv, '0', '10', '0660', None, None)) + Bcfg2.Client.Tools.POSIX.base.has_selinux, \ + Bcfg2.Client.Tools.POSIX.base.has_acls = states + mock_stat.assert_called_with(path) + + @skipUnless(has_selinux, "SELinux not found, skipping") + def test__gather_data_selinux(self): + context = 'system_u:object_r:root_t:s0' + path = '/test' + + @patch('os.stat') + @patchIf(has_selinux, "selinux.getfilecon") + def inner(mock_getfilecon, mock_stat): + mock_getfilecon.return_value = [len(context) + 1, context] + mock_stat.return_value = MagicMock() + # disable acls for this call and test them separately + state = Bcfg2.Client.Tools.POSIX.base.has_acls + Bcfg2.Client.Tools.POSIX.base.has_acls = False + self.assertEqual(self.ptool._gather_data(path)[4], 'root_t') + Bcfg2.Client.Tools.POSIX.base.has_acls = state + mock_getfilecon.assert_called_with(path) + + inner() + + @skipUnless(has_acls, "ACLS not found, skipping") + @patch('os.stat') + @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" % + test_obj.__name__) + def test__gather_data_acls(self, mock_list_file_acls, mock_stat): + acls = {("default", posix1e.ACL_USER, "testuser"): "rwx", + ("access", posix1e.ACL_GROUP, "testgroup"): "rx"} + mock_list_file_acls.return_value = acls + path = '/test' + mock_stat.return_value = MagicMock() + # disable selinux for this call and test it separately + state = Bcfg2.Client.Tools.POSIX.base.has_selinux + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + self.assertItemsEqual(self.ptool._gather_data(path)[5], acls) + Bcfg2.Client.Tools.POSIX.base.has_selinux = state + mock_list_file_acls.assert_called_with(path) + + @patchIf(has_selinux, "selinux.matchpathcon") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_acls" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._gather_data" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" % + test_obj.__name__) + def test_verify_metadata(self, mock_norm_gid, mock_norm_uid, + mock_gather_data, mock_verify_acls, + mock_matchpathcon): + entry = lxml.etree.Element("Path", name="/test", type="file", + group="group", owner="user", perms="664", + secontext='etc_t') + # _verify_metadata() mutates the entry, so we keep a backup so we + # can start fresh every time + orig_entry = copy.deepcopy(entry) + + def reset(): + mock_gather_data.reset_mock() + mock_verify_acls.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + return copy.deepcopy(orig_entry) + + # test nonexistent file + mock_gather_data.return_value = (False, None, None, None, None, None) + self.assertFalse(self.ptool._verify_metadata(entry)) + self.assertEqual(entry.get("current_exists", "").lower(), "false") + mock_gather_data.assert_called_with(entry.get("name")) + + # expected data. tuple of attr, return value index, value + expected = [('current_owner', 1, '0'), + ('current_group', 2, '10'), + ('current_perms', 3, '0664'), + ('current_secontext', 4, 'etc_t')] + mock_norm_uid.return_value = 0 + mock_norm_gid.return_value = 10 + gather_data_rv = [MagicMock(), None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + + entry = reset() + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + + # test when secontext is None + entry = reset() + gather_data_rv[4] = None + sestate = Bcfg2.Client.Tools.POSIX.base.has_selinux + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + if attr != 'current_secontext': + self.assertEqual(entry.get(attr), val) + Bcfg2.Client.Tools.POSIX.base.has_selinux = sestate + + gather_data_rv = [MagicMock(), None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + mock_gather_data.return_value = tuple(gather_data_rv) + + mtime = 1344430414 + entry = reset() + entry.set("mtime", str(mtime)) + stat_rv = MagicMock() + stat_rv.__getitem__.return_value = mtime + gather_data_rv[0] = stat_rv + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + # failure modes for each checked datum. tuple of changed attr, + # return value index, new (failing) value + failures = [('current_owner', 1, '10'), + ('current_group', 2, '100'), + ('current_perms', 3, '0660')] + if has_selinux: + failures.append(('current_secontext', 4, 'root_t')) + + for fail_attr, fail_idx, fail_val in failures: + entry = reset() + entry.set("mtime", str(mtime)) + gather_data_rv = [stat_rv, None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + gather_data_rv[fail_idx] = fail_val + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertFalse(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + self.assertEqual(entry.get(fail_attr), fail_val) + for attr, idx, val in expected: + if attr != fail_attr: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + # failure mode for mtime + fail_mtime = 1344431162 + entry = reset() + entry.set("mtime", str(mtime)) + fail_stat_rv = MagicMock() + fail_stat_rv.__getitem__.return_value = fail_mtime + gather_data_rv = [fail_stat_rv, None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertFalse(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(fail_mtime)) + + if has_selinux: + # test success and failure for __default__ secontext + entry = reset() + entry.set("mtime", str(mtime)) + entry.set("secontext", "__default__") + + context1 = "system_u:object_r:etc_t:s0" + context2 = "system_u:object_r:root_t:s0" + mock_matchpathcon.return_value = [1 + len(context1), + context1] + gather_data_rv = [stat_rv, None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, + path=entry.get("name")) + mock_matchpathcon.assert_called_with(entry.get("name"), 0) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + entry = reset() + entry.set("mtime", str(mtime)) + entry.set("secontext", "__default__") + mock_matchpathcon.return_value = [1 + len(context2), + context2] + self.assertFalse(self.ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, + path=entry.get("name")) + mock_matchpathcon.assert_called_with(entry.get("name"), 0) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + @skipUnless(has_acls, "ACLS not found, skipping") + def test_list_entry_acls(self): + entry = lxml.etree.Element("Path", name="/test", type="file") + lxml.etree.SubElement(entry, "ACL", scope="user", type="default", + user="user", perms="rwx") + lxml.etree.SubElement(entry, "ACL", scope="group", type="access", + group="group", perms="5") + self.assertItemsEqual(self.ptool._list_entry_acls(entry), + {("default", posix1e.ACL_USER, "user"): 7, + ("access", posix1e.ACL_GROUP, "group"): 5}) + + @skipUnless(has_acls, "ACLS not found, skipping") + @patch("pwd.getpwuid") + @patch("grp.getgrgid") + @patch("os.path.isdir") + def test_list_file_acls(self, mock_isdir, mock_getgrgid, mock_getpwuid, + mock_ACL): + path = '/test' + + # build a set of file ACLs to return from posix1e.ACL(file=...) + file_acls = [] + acl = Mock() + acl.tag_type = posix1e.ACL_USER + acl.qualifier = 10 + # yes, this is a bogus permset. thanks to _norm_acl_perms + # it works and is easier than many of the alternatives. + acl.permset = 'rwx' + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_GROUP + acl.qualifier = 100 + acl.permset = 'rx' + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_MASK + file_acls.append(acl) + acls = {("access", posix1e.ACL_USER, "user"): 7, + ("access", posix1e.ACL_GROUP, "group"): 5} + + # set up the unreasonably complex return value for + # posix1e.ACL(), which has two separate uses + fileacl_rv = MagicMock() + fileacl_rv.valid.return_value = True + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv = MagicMock() + filedef_rv.valid.return_value = True + filedef_rv.__iter__.return_value = iter(file_acls) + def mock_acl_rv(file=None, filedef=None): + if file: + return fileacl_rv + elif filedef: + return filedef_rv + # other return values + mock_isdir.return_value = False + mock_getgrgid.return_value = ("group", "x", 5, []) + mock_getpwuid.return_value = ("user", "x", 5, 5, "User", + "/home/user", "/bin/zsh") + + def reset(): + mock_isdir.reset_mock() + mock_getgrgid.reset_mock() + mock_getpwuid.reset_mock() + mock_ACL.reset_mock() + + mock_ACL.side_effect = IOError(95, "Operation not supported") + self.assertItemsEqual(self.ptool._list_file_acls(path), dict()) + + reset() + mock_ACL.side_effect = IOError + self.assertItemsEqual(self.ptool._list_file_acls(path), dict()) + + reset() + mock_ACL.side_effect = mock_acl_rv + self.assertItemsEqual(self.ptool._list_file_acls(path), acls) + mock_isdir.assert_called_with(path) + mock_getgrgid.assert_called_with(100) + mock_getpwuid.assert_called_with(10) + mock_ACL.assert_called_with(file=path) + + reset() + mock_isdir.return_value = True + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv.__iter__.return_value = iter(file_acls) + + defacls = acls + for akey, perms in acls.items(): + defacls[('default', akey[1], akey[2])] = perms + self.assertItemsEqual(self.ptool._list_file_acls(path), defacls) + mock_isdir.assert_called_with(path) + self.assertItemsEqual(mock_getgrgid.call_args_list, + [call(100), call(100)]) + self.assertItemsEqual(mock_getpwuid.call_args_list, + [call(10), call(10)]) + self.assertItemsEqual(mock_ACL.call_args_list, + [call(file=path), call(filedef=path)]) + + if has_acls: + # python 2.6 applies decorators at compile-time, not at + # run-time, so we can't do these as decorators because + # pylibacl might not be installed. (If it's not, this test + # will be skipped, so as long as this is done at run-time + # we're safe.) + test_list_file_acls = patch("posix1e.ACL")(test_list_file_acls) + + @skipUnless(has_acls, "ACLS not found, skipping") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" % + test_obj.__name__) + def test_verify_acls(self, mock_list_entry_acls, mock_list_file_acls): + entry = lxml.etree.Element("Path", name="/test", type="file") + # we can't test to make sure that errors get properly sorted + # into (missing, extra, wrong) without refactoring the + # _verify_acls code, and I don't feel like doing that, so eff + # it. let's just test to make sure that failures are + # identified at all for now. + + acls = {("access", posix1e.ACL_USER, "user"): 7, + ("default", posix1e.ACL_GROUP, "group"): 5} + extra_acls = copy.deepcopy(acls) + extra_acls[("access", posix1e.ACL_USER, "user2")] = 4 + + mock_list_entry_acls.return_value = acls + mock_list_file_acls.return_value = acls + self.assertTrue(self.ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + # test missing + mock_list_entry_acls.reset_mock() + mock_list_file_acls.reset_mock() + mock_list_file_acls.return_value = extra_acls + self.assertFalse(self.ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + # test extra + mock_list_entry_acls.reset_mock() + mock_list_file_acls.reset_mock() + mock_list_entry_acls.return_value = extra_acls + mock_list_file_acls.return_value = acls + self.assertFalse(self.ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + # test wrong + wrong_acls = copy.deepcopy(extra_acls) + wrong_acls[("access", posix1e.ACL_USER, "user2")] = 5 + mock_list_entry_acls.reset_mock() + mock_list_file_acls.reset_mock() + mock_list_entry_acls.return_value = extra_acls + mock_list_file_acls.return_value = wrong_acls + self.assertFalse(self.ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + @patch("os.makedirs") + @patch("os.path.exists") + @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__) + def test_makedirs(self, mock_set_perms, mock_exists, mock_makedirs): + entry = lxml.etree.Element("Path", name="/test/foo/bar", + type="directory") + + def reset(): + mock_exists.reset_mock() + mock_set_perms.reset_mock() + mock_makedirs.reset_mock() + + mock_set_perms.return_value = True + def path_exists_rv(path): + if path == "/test": + return True + else: + return False + mock_exists.side_effect = path_exists_rv + self.assertTrue(self.ptool._makedirs(entry)) + self.assertItemsEqual(mock_exists.call_args_list, + [call("/test"), call("/test/foo"), + call("/test/foo/bar")]) + self.assertItemsEqual(mock_set_perms.call_args_list, + [call(entry, path="/test/foo"), + call(entry, path="/test/foo/bar")]) + mock_makedirs.assert_called_with(entry.get("name")) + + reset() + mock_makedirs.side_effect = OSError + self.assertFalse(self.ptool._makedirs(entry)) + self.assertItemsEqual(mock_set_perms.call_args_list, + [call(entry, path="/test/foo"), + call(entry, path="/test/foo/bar")]) + + reset() + mock_makedirs.side_effect = None + def set_perms_rv(entry, path=None): + if path == '/test/foo': + return False + else: + return True + mock_set_perms.side_effect = set_perms_rv + self.assertFalse(self.ptool._makedirs(entry)) + self.assertItemsEqual(mock_exists.call_args_list, + [call("/test"), call("/test/foo"), + call("/test/foo/bar")]) + self.assertItemsEqual(mock_set_perms.call_args_list, + [call(entry, path="/test/foo"), + call(entry, path="/test/foo/bar")]) + mock_makedirs.assert_called_with(entry.get("name")) diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/__init__.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/__init__.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/testsuite/Testsrc/Testlib/TestClient/__init__.py b/testsuite/Testsrc/Testlib/TestClient/__init__.py new file mode 100644 index 000000000..e69de29bb -- cgit v1.2.3-1-g7c22