summaryrefslogtreecommitdiffstats
path: root/testsuite
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-08-15 09:06:43 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-08-15 09:06:43 -0400
commitb862090945322d5ba4b42e180bba92afb860df21 (patch)
tree1c482adfa9561bad14d82fc442f8f319b33b1d4f /testsuite
parent7890fd0aa5331541c71b893c313553765ca1628e (diff)
downloadbcfg2-b862090945322d5ba4b42e180bba92afb860df21.tar.gz
bcfg2-b862090945322d5ba4b42e180bba92afb860df21.tar.bz2
bcfg2-b862090945322d5ba4b42e180bba92afb860df21.zip
POSIX:
refactored POSIX tool into multiple files to make it more manageable Added unit tests for POSIX tool and sub-tools fixed ACL handling for filesystems mounted noacl
Diffstat (limited to 'testsuite')
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py139
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py154
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py318
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py80
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py88
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py21
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py76
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py238
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py966
9 files changed, 2080 insertions, 0 deletions
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py
new file mode 100644
index 000000000..7d64c5a2e
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py
@@ -0,0 +1,139 @@
+import os
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Device import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_device_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXDevice(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXDevice(unittest.TestCase):
+ def test_fully_specified(self):
+ ptool = get_device_object()
+ orig_entry = lxml.etree.Element("Path", name="/test", type="device",
+ dev_type="fifo")
+ self.assertTrue(ptool.fully_specified(orig_entry))
+ for dtype in ["block", "char"]:
+ for attr in ["major", "minor"]:
+ entry = copy.deepcopy(orig_entry)
+ entry.set("dev_type", dtype)
+ entry.set(attr, "0")
+ self.assertFalse(ptool.fully_specified(entry))
+ entry = copy.deepcopy(orig_entry)
+ entry.set("dev_type", dtype)
+ entry.set("major", "0")
+ entry.set("minor", "0")
+ self.assertTrue(ptool.fully_specified(entry))
+
+ @patch("os.major")
+ @patch("os.minor")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ def test_verify(self, mock_verify, mock_exists, mock_minor, mock_major):
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ perms='0644', owner='root', group='root',
+ dev_type="block", major="0", minor="10")
+ ptool = get_device_object()
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_minor.reset_mock()
+ mock_major.reset_mock()
+
+ mock_exists.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+
+ reset()
+ mock_exists.return_value = MagicMock()
+ mock_major.return_value = 0
+ mock_minor.return_value = 10
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_exists.assert_called_with(entry)
+ mock_major.assert_called_with(mock_exists.return_value.st_rdev)
+ mock_minor.assert_called_with(mock_exists.return_value.st_rdev)
+
+ reset()
+ mock_exists.return_value = MagicMock()
+ mock_major.return_value = 0
+ mock_minor.return_value = 10
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_exists.assert_called_with(entry)
+ mock_major.assert_called_with(mock_exists.return_value.st_rdev)
+ mock_minor.assert_called_with(mock_exists.return_value.st_rdev)
+
+ reset()
+ mock_verify.return_value = True
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ perms='0644', owner='root', group='root',
+ dev_type="fifo")
+ self.assertTrue(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ self.assertFalse(mock_major.called)
+ self.assertFalse(mock_minor.called)
+
+ @patch("os.makedev")
+ @patch("os.mknod")
+ @patch("Bcfg2.Client.Tools.POSIX.Device.POSIXDevice._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ def test_install(self, mock_install, mock_exists, mock_mknod, mock_makedev):
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ perms='0644', owner='root', group='root',
+ dev_type="block", major="0", minor="10")
+ ptool = get_device_object()
+
+ mock_exists.return_value = False
+ mock_makedev.return_value = Mock()
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_makedev.assert_called_with(0, 10)
+ mock_mknod.assert_called_with(entry.get("name"),
+ device_map[entry.get("dev_type")] | 0644,
+ mock_makedev.return_value)
+ mock_install.assert_called_with(ptool, entry)
+
+ mock_makedev.reset_mock()
+ mock_mknod.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_makedev.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+
+ mock_makedev.reset_mock()
+ mock_mknod.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_mknod.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+
+ mock_makedev.reset_mock()
+ mock_mknod.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_mknod.side_effect = None
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ perms='0644', owner='root', group='root',
+ dev_type="fifo")
+
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_mknod.assert_called_with(entry.get("name"),
+ device_map[entry.get("dev_type")] | 0644)
+ mock_install.assert_called_with(ptool, entry)
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py
new file mode 100644
index 000000000..021ed8113
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py
@@ -0,0 +1,154 @@
+import os
+import stat
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Directory import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_directory_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXDirectory(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXDirectory(unittest.TestCase):
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ @patch("Bcfg2.Client.Tools.POSIX.Directory.POSIXDirectory._exists")
+ def test_verify(self, mock_exists, mock_verify):
+ entry = lxml.etree.Element("Path", name="/test", type="directory",
+ perms='0644', owner='root', group='root')
+ ptool = get_directory_object()
+
+ mock_exists.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+
+ mock_exists.reset_mock()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = stat.S_IFREG | 0644
+ mock_exists.return_value = exists_rv
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+
+ mock_exists.reset_mock()
+ mock_verify.return_value = False
+ exists_rv.__getitem__.return_value = stat.S_IFDIR | 0644
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ with patch("os.listdir") as mock_listdir:
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ entry.set("prune", "true")
+ orig_entry = copy.deepcopy(entry)
+
+ entries = ["foo", "bar", "bar/baz"]
+ mock_listdir.return_value = entries
+ modlist = [os.path.join(entry.get("name"), entries[0])]
+ self.assertFalse(ptool.verify(entry, modlist))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, modlist)
+ mock_listdir.assert_called_with(entry.get("name"))
+ expected = [os.path.join(entry.get("name"), e)
+ for e in entries
+ if os.path.join(entry.get("name"), e) not in modlist]
+ actual = [e.get("path") for e in entry.findall("Prune")]
+ self.assertItemsEqual(expected, actual)
+
+ mock_verify.reset_mock()
+ mock_exists.reset_mock()
+ mock_listdir.reset_mock()
+ entry = copy.deepcopy(orig_entry)
+ modlist = [os.path.join(entry.get("name"), e)
+ for e in entries]
+ self.assertTrue(ptool.verify(entry, modlist))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, modlist)
+ mock_listdir.assert_called_with(entry.get("name"))
+ self.assertEqual(len(entry.findall("Prune")), 0)
+
+ @patch("os.unlink")
+ @patch("shutil.rmtree")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.Directory.POSIXDirectory._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.Directory.POSIXDirectory._makedirs")
+ def test_install(self, mock_makedirs, mock_exists, mock_install,
+ mock_rmtree, mock_unlink):
+ entry = lxml.etree.Element("Path", name="/test/foo/bar",
+ type="directory", perms='0644',
+ owner='root', group='root')
+ ptool = get_directory_object()
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_rmtree.mock_makedirs()
+
+ mock_makedirs.return_value = True
+ mock_exists.return_value = False
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry)
+ mock_install.assert_called_with(ptool, entry)
+ mock_makedirs.assert_called_with(entry)
+
+ reset()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = stat.S_IFREG | 0644
+ mock_exists.return_value = exists_rv
+ self.assertTrue(ptool.install(entry))
+ mock_unlink.assert_called_with(entry.get("name"))
+ mock_exists.assert_called_with(entry)
+ mock_makedirs.assert_called_with(entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ exists_rv.__getitem__.return_value = stat.S_IFDIR | 0644
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_install.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_install.assert_called_with(ptool, entry)
+
+ entry.set("prune", "true")
+ prune = ["/test/foo/bar/prune1", "/test/foo/bar/prune2"]
+ for path in prune:
+ lxml.etree.SubElement(entry, "Prune", path=path)
+
+ reset()
+ mock_install.return_value = True
+ with patch("os.path.isdir") as mock_isdir:
+ def isdir_rv(path):
+ if path.endswith("prune2"):
+ return True
+ else:
+ return False
+ mock_isdir.side_effect = isdir_rv
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry)
+ mock_install.assert_called_with(ptool, entry)
+ self.assertItemsEqual(mock_isdir.call_args_list,
+ [call(p) for p in prune])
+ mock_unlink.assert_called_with("/test/foo/bar/prune1")
+ mock_rmtree.assert_called_with("/test/foo/bar/prune2")
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
new file mode 100644
index 000000000..a2cd52dd5
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
@@ -0,0 +1,318 @@
+import os
+import copy
+import binascii
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.File import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_file_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXFile(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXFile(unittest.TestCase):
+ def test_fully_specified(self):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ ptool = get_file_object()
+ self.assertFalse(ptool.fully_specified(entry))
+
+ entry.set("empty", "true")
+ self.assertTrue(ptool.fully_specified(entry))
+
+ entry.set("empty", "false")
+ entry.text = "text"
+ self.assertTrue(ptool.fully_specified(entry))
+
+ def test_is_string(self):
+ ptool = get_file_object()
+ for char in range(8) + range(14, 32):
+ self.assertFalse(ptool._is_string("foo" + chr(char) + "bar",
+ 'utf_8'))
+ for char in range(9, 14) + range(33, 128):
+ self.assertTrue(ptool._is_string("foo" + chr(char) + "bar",
+ 'utf_8'))
+ self.assertFalse(ptool._is_string("foo" + chr(128) + "bar",
+ 'ascii'))
+ ustr = '\xef\xa3\x91 + \xef\xa3\x92'
+ self.assertTrue(ptool._is_string(ustr, 'utf_8'))
+ self.assertFalse(ptool._is_string(ustr, 'ascii'))
+
+ def test_get_data(self):
+ orig_entry = lxml.etree.Element("Path", name="/test", type="file")
+ setup = dict(encoding="ascii", ppath='/', max_copies=5)
+ ptool = get_file_object(posix=get_posix_object(setup=setup))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.text = binascii.b2a_base64("test")
+ entry.set("encoding", "base64")
+ self.assertEqual(ptool._get_data(entry), ("test", True))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.set("empty", "true")
+ self.assertEqual(ptool._get_data(entry), ("", False))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.text = "test"
+ self.assertEqual(ptool._get_data(entry), ("test", False))
+
+ ustr = u'\uf8d1 + \uf8d2'
+ entry = copy.deepcopy(orig_entry)
+ entry.text = ustr
+ self.assertEqual(ptool._get_data(entry), (ustr, False))
+
+ setup['encoding'] = "utf_8"
+ ptool = get_file_object(posix=get_posix_object(setup=setup))
+ entry = copy.deepcopy(orig_entry)
+ entry.text = ustr
+ self.assertEqual(ptool._get_data(entry),
+ ('\xef\xa3\x91 + \xef\xa3\x92', False))
+
+ @patch("__builtin__.open")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_diffs")
+ def test_verify(self, mock_get_diffs, mock_get_data, mock_exists,
+ mock_verify, mock_open):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ setup = dict(interactive=False, ppath='/', max_copies=5)
+ ptool = get_file_object(posix=get_posix_object(setup=setup))
+
+ def reset():
+ mock_get_diffs.reset_mock()
+ mock_get_data.reset_mock()
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_open.reset_mock()
+
+ mock_get_data.return_value = ("test", False)
+ mock_exists.return_value = False
+ mock_verify.return_value = True
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=False,
+ is_binary=False,
+ content="")
+
+ reset()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = 5
+ mock_exists.return_value = exists_rv
+ mock_get_data.return_value = ("test", True)
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=False,
+ is_binary=True,
+ content=None)
+
+ reset()
+ mock_get_data.return_value = ("test", False)
+ exists_rv.__getitem__.return_value = 4
+ entry.set("sensitive", "true")
+ open_rv = Mock()
+ open_rv.read.return_value = "tart"
+ mock_open.return_value = open_rv
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_open.assert_called_with(entry.get("name"))
+ open_rv.assert_any_call()
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=True,
+ is_binary=False,
+ content="tart")
+
+ reset()
+ open_rv.read.return_value = "test"
+ mock_open.return_value = open_rv
+ self.assertTrue(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_open.assert_called_with(entry.get("name"))
+ open_rv.assert_any_call()
+ self.assertFalse(mock_get_diffs.called)
+
+ reset()
+ mock_open.side_effect = IOError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_open.assert_called_with(entry.get("name"))
+
+ @patch("os.fdopen")
+ @patch("tempfile.mkstemp")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data")
+ def test_write_tmpfile(self, mock_get_data, mock_mkstemp, mock_fdopen):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ ptool = get_file_object()
+ newfile = "/foo/bar"
+
+ def reset():
+ mock_get_data.reset_mock()
+ mock_mkstemp.reset_mock()
+ mock_fdopen.reset_mock()
+
+ mock_get_data.return_value = ("test", False)
+ mock_mkstemp.return_value = (5, newfile)
+ self.assertEqual(ptool._write_tmpfile(entry), newfile)
+ mock_get_data.assert_called_with(entry)
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+ mock_fdopen.assert_called_with(5, 'w')
+ mock_fdopen.return_value.write.assert_called_with("test")
+
+ reset()
+ mock_mkstemp.side_effect = OSError
+ self.assertFalse(ptool._write_tmpfile(entry))
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+
+ reset()
+ mock_mkstemp.side_effect = None
+ mock_fdopen.side_effect = OSError
+ self.assertFalse(ptool._write_tmpfile(entry))
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+ mock_get_data.assert_called_with(entry)
+ mock_fdopen.assert_called_with(5, 'w')
+
+ @patch("os.rename")
+ @patch("os.unlink")
+ def test_rename_tmpfile(self, mock_unlink, mock_rename):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ ptool = get_file_object()
+ newfile = "/foo/bar"
+
+ self.assertTrue(ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+
+ mock_rename.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rename.side_effect = OSError
+ self.assertFalse(ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+ mock_unlink.assert_called_with(newfile)
+
+ # even if the unlink fails, return false gracefully
+ mock_rename.reset_mock()
+ mock_unlink.reset_mock()
+ mock_unlink.side_effect = OSError
+ self.assertFalse(ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+ mock_unlink.assert_called_with(newfile)
+
+ @patch("os.path.exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._makedirs")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._set_perms")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._write_tmpfile")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._rename_tmpfile")
+ def test_install(self, mock_rename, mock_write, mock_set_perms,
+ mock_makedirs, mock_install, mock_exists):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ ptool = get_file_object()
+
+ def reset():
+ mock_rename.reset_mock()
+ mock_write.reset_mock()
+ mock_set_perms.reset_mock()
+ mock_makedirs.reset_mock()
+ mock_install.reset_mock()
+ mock_exists.reset_mock()
+
+ mock_exists.return_value = False
+ mock_makedirs.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+
+ reset()
+ mock_makedirs.return_value = True
+ mock_write.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+
+ reset()
+ newfile = '/test.X987yS'
+ mock_write.return_value = newfile
+ mock_set_perms.return_value = False
+ mock_rename.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+
+ reset()
+ mock_rename.return_value = True
+ mock_install.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_install.return_value = True
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_set_perms.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_exists.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ self.assertFalse(mock_makedirs.called)
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ def test_diff(self):
+ ptool = get_file_object()
+ content1 = "line1\nline2"
+ content2 = "line3"
+ rv = ["line1", "line2", "line3"]
+ func = Mock()
+ func.return_value = rv
+ self.assertItemsEqual(ptool._diff(content1, content2, func), rv)
+ func.assert_called_with(["line1", "line2"], ["line3"])
+
+ func.reset_mock()
+ def slow_diff(content1, content2):
+ for i in range(1, 10):
+ time.sleep(5)
+ yield "line%s" % i
+ func.side_effect = slow_diff
+ self.assertFalse(ptool._diff(content1, content2, func), rv)
+ func.assert_called_with(["line1", "line2"], ["line3"])
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py
new file mode 100644
index 000000000..e663973c7
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py
@@ -0,0 +1,80 @@
+import os
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Hardlink import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_hardlink_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXHardlink(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXHardlink(unittest.TestCase):
+ @patch("os.path.samefile")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ def test_verify(self, mock_verify, mock_samefile):
+ entry = lxml.etree.Element("Path", name="/test", type="hardlink",
+ to="/dest")
+ ptool = get_hardlink_object()
+
+ mock_samefile.return_value = True
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_samefile.reset_mock()
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_samefile.reset_mock()
+ mock_verify.reset_mock()
+ mock_samefile.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_samefile.reset_mock()
+ mock_verify.reset_mock()
+ mock_samefile.side_effect = OSError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+
+ @patch("os.link")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.Hardlink.POSIXHardlink._exists")
+ def test_install(self, mock_exists, mock_install, mock_link):
+ entry = lxml.etree.Element("Path", name="/test", type="hardlink",
+ to="/dest")
+ ptool = get_hardlink_object()
+
+ mock_exists.return_value = False
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_link.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
+
+ mock_link.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_link.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_link.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py
new file mode 100644
index 000000000..38f3b6ee3
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py
@@ -0,0 +1,88 @@
+import os
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Nonexistent import *
+from Test__init import get_config, get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_nonexistent_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXNonexistent(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXNonexistent(unittest.TestCase):
+ @patch("os.path.lexists")
+ def test_verify(self, mock_lexists):
+ entry = lxml.etree.Element("Path", name="/test", type="nonexistent")
+ ptool = get_nonexistent_object()
+
+ for val in [True, False]:
+ mock_lexists.reset_mock()
+ mock_lexists.return_value = val
+ self.assertEqual(ptool.verify(entry, []), not val)
+ mock_lexists.assert_called_with(entry.get("name"))
+
+ @patch("os.rmdir")
+ @patch("os.remove")
+ @patch("shutil.rmtree")
+ def test_install(self, mock_rmtree, mock_remove, mock_rmdir):
+ entry = lxml.etree.Element("Path", name="/test", type="nonexistent")
+ ptool = get_nonexistent_object()
+
+ with patch("os.path.isdir") as mock_isdir:
+ def reset():
+ mock_isdir.reset_mock()
+ mock_remove.reset_mock()
+ mock_rmdir.reset_mock()
+ mock_rmtree.reset_mock()
+
+ mock_isdir.return_value = False
+ self.assertTrue(ptool.install(entry))
+ mock_remove.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_remove.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_remove.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_isdir.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_rmdir.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_rmdir.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_rmdir.assert_called_with(entry.get("name"))
+
+ reset()
+ entry.set("recursive", "true")
+ self.assertTrue(ptool.install(entry))
+ mock_rmtree.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_rmtree.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_rmtree.assert_called_with(entry.get("name"))
+
+ reset()
+ child_entry = lxml.etree.Element("Path", name="/test/foo",
+ type="nonexistent")
+ ptool = get_nonexistent_object(posix=get_posix_object(config=get_config([child_entry])))
+ mock_rmtree.side_effect = None
+ self.assertTrue(ptool.install(entry))
+ mock_rmtree.assert_called_with(entry.get("name"))
+
+ reset()
+ child_entry = lxml.etree.Element("Path", name="/test/foo",
+ type="file")
+ ptool = get_nonexistent_object(posix=get_posix_object(config=get_config([child_entry])))
+ mock_rmtree.side_effect = None
+ self.assertFalse(ptool.install(entry))
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py
new file mode 100644
index 000000000..94b74dd13
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py
@@ -0,0 +1,21 @@
+import os
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Permissions import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_permissions_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXPermissions(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXPermissions(unittest.TestCase):
+ # nothing to test!
+ pass
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py
new file mode 100644
index 000000000..a3ed9f68d
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py
@@ -0,0 +1,76 @@
+import os
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Symlink import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_symlink_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXSymlink(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXSymlink(unittest.TestCase):
+ @patch("os.readlink")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ def test_verify(self, mock_verify, mock_readlink):
+ entry = lxml.etree.Element("Path", name="/test", type="symlink",
+ to="/dest")
+ ptool = get_symlink_object()
+
+ mock_readlink.return_value = entry.get("to")
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ mock_readlink.return_value = "/bogus"
+ self.assertFalse(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ mock_readlink.side_effect = OSError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+
+ @patch("os.symlink")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.Symlink.POSIXSymlink._exists")
+ def test_install(self, mock_exists, mock_install, mock_symlink):
+ entry = lxml.etree.Element("Path", name="/test", type="symlink",
+ to="/dest")
+ ptool = get_symlink_object()
+
+ mock_exists.return_value = False
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_symlink.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
+
+ mock_symlink.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_symlink.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_symlink.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py
new file mode 100644
index 000000000..952bb02dd
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py
@@ -0,0 +1,238 @@
+import os
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+import Bcfg2.Client.Tools
+import Bcfg2.Client.Tools.POSIX
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_config(entries):
+ config = lxml.etree.Element("Configuration")
+ bundle = lxml.etree.SubElement(config, "Bundle", name="test")
+ bundle.extend(entries)
+ return config
+
+def get_posix_object(logger=None, setup=None, config=None):
+ if config is None:
+ config = lxml.etree.Element("Configuration")
+ if not logger:
+ def print_msg(msg):
+ print(msg)
+ logger = Mock()
+ logger.error = Mock(side_effect=print_msg)
+ logger.warning = Mock(side_effect=print_msg)
+ logger.info = Mock(side_effect=print_msg)
+ logger.debug = Mock(side_effect=print_msg)
+ if not setup:
+ setup = MagicMock()
+ return Bcfg2.Client.Tools.POSIX.POSIX(logger, setup, config)
+
+
+class TestPOSIX(unittest.TestCase):
+ def test__init(self):
+ entries = [lxml.etree.Element("Path", name="test", type="file")]
+ p = get_posix_object(config=get_config(entries))
+ self.assertIsInstance(p, Bcfg2.Client.Tools.Tool)
+ self.assertIsInstance(p, Bcfg2.Client.Tools.POSIX.POSIX)
+ self.assertIn('Path', p.__req__)
+ self.assertGreater(len(p.__req__['Path']), 0)
+ self.assertGreater(len(p.__handles__), 0)
+ self.assertItemsEqual(p.handled, entries)
+
+ @patch("Bcfg2.Client.Tools.Tool.canVerify")
+ def test_canVerify(self, mock_canVerify):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+ p = get_posix_object()
+
+ # first, test superclass canVerify failure
+ mock_canVerify.return_value = False
+ self.assertFalse(p.canVerify(entry))
+ mock_canVerify.assert_called_with(p, entry)
+
+ # next, test fully_specified failure
+ p.logger.error.reset_mock()
+ mock_canVerify.reset_mock()
+ mock_canVerify.return_value = True
+ mock_fully_spec = Mock()
+ mock_fully_spec.return_value = False
+ p._handlers[entry.get("type")].fully_specified = mock_fully_spec
+ self.assertFalse(p.canVerify(entry))
+ mock_canVerify.assert_called_with(p, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertTrue(p.logger.error.called)
+
+ # finally, test success
+ p.logger.error.reset_mock()
+ mock_canVerify.reset_mock()
+ mock_fully_spec.reset_mock()
+ mock_fully_spec.return_value = True
+ self.assertTrue(p.canVerify(entry))
+ mock_canVerify.assert_called_with(p, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertFalse(p.logger.error.called)
+
+ @patch("Bcfg2.Client.Tools.Tool.canInstall")
+ def test_canInstall(self, mock_canInstall):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+ p = get_posix_object()
+
+ # first, test superclass canInstall failure
+ mock_canInstall.return_value = False
+ self.assertFalse(p.canInstall(entry))
+ mock_canInstall.assert_called_with(p, entry)
+
+ # next, test fully_specified failure
+ p.logger.error.reset_mock()
+ mock_canInstall.reset_mock()
+ mock_canInstall.return_value = True
+ mock_fully_spec = Mock()
+ mock_fully_spec.return_value = False
+ p._handlers[entry.get("type")].fully_specified = mock_fully_spec
+ self.assertFalse(p.canInstall(entry))
+ mock_canInstall.assert_called_with(p, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertTrue(p.logger.error.called)
+
+ # finally, test success
+ p.logger.error.reset_mock()
+ mock_canInstall.reset_mock()
+ mock_fully_spec.reset_mock()
+ mock_fully_spec.return_value = True
+ self.assertTrue(p.canInstall(entry))
+ mock_canInstall.assert_called_with(p, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertFalse(p.logger.error.called)
+
+ def test_InstallPath(self):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+ p = get_posix_object()
+
+ mock_install = Mock()
+ mock_install.return_value = True
+ p._handlers[entry.get("type")].install = mock_install
+ self.assertTrue(p.InstallPath(entry))
+ mock_install.assert_called_with(entry)
+
+ def test_VerifyPath(self):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+ modlist = []
+ p = get_posix_object()
+
+ mock_verify = Mock()
+ mock_verify.return_value = True
+ p._handlers[entry.get("type")].verify = mock_verify
+ self.assertTrue(p.VerifyPath(entry, modlist))
+ mock_verify.assert_called_with(entry, modlist)
+
+ mock_verify.reset_mock()
+ mock_verify.return_value = False
+ p.setup.__getitem__.return_value = True
+ self.assertFalse(p.VerifyPath(entry, modlist))
+ self.assertIsNotNone(entry.get('qtext'))
+
+ @patch('os.remove')
+ def test_prune_old_backups(self, mock_remove):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ setup = dict(ppath='/', max_copies=5, paranoid=True)
+ posix = get_posix_object(setup=setup)
+
+ remove = ["_etc_foo_2012-07-20T04:13:22.364989",
+ "_etc_foo_2012-07-31T04:13:23.894958",
+ "_etc_foo_2012-07-17T04:13:22.493316",]
+ keep = ["_etc_foo_bar_2011-08-07T04:13:22.519978",
+ "_etc_foo_2012-08-04T04:13:22.519978",
+ "_etc_Foo_2011-08-07T04:13:22.519978",
+ "_etc_foo_2012-08-06T04:13:22.519978",
+ "_etc_foo_2012-08-03T04:13:22.191895",
+ "_etc_test_2011-08-07T04:13:22.519978",
+ "_etc_foo_2012-08-07T04:13:22.519978",]
+
+ with patch('os.listdir') as mock_listdir:
+ mock_listdir.side_effect = OSError
+ posix._prune_old_backups(entry)
+ self.assertTrue(posix.logger.error.called)
+ self.assertFalse(mock_remove.called)
+ mock_listdir.assert_called_with(setup['ppath'])
+
+ mock_listdir.reset_mock()
+ mock_remove.reset_mock()
+ mock_listdir.side_effect = None
+ mock_listdir.return_value = keep + remove
+
+ posix._prune_old_backups(entry)
+ mock_listdir.assert_called_with(setup['ppath'])
+ self.assertItemsEqual(mock_remove.call_args_list,
+ [call(os.path.join(setup['ppath'], p))
+ for p in remove])
+
+ mock_listdir.reset_mock()
+ mock_remove.reset_mock()
+ mock_remove.side_effect = OSError
+ posix.logger.error.reset_mock()
+ # test to ensure that we call os.remove() for all files that
+ # need to be removed even if we get an error
+ posix._prune_old_backups(entry)
+ mock_listdir.assert_called_with(setup['ppath'])
+ self.assertItemsEqual(mock_remove.call_args_list,
+ [call(os.path.join(setup['ppath'], p))
+ for p in remove])
+ self.assertTrue(posix.logger.error.called)
+
+ @patch("shutil.copy")
+ @patch("Bcfg2.Client.Tools.POSIX.POSIX._prune_old_backups")
+ def test_paranoid_backup(self, mock_prune, mock_copy):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ setup = dict(ppath='/', max_copies=5, paranoid=False)
+ posix = get_posix_object(setup=setup)
+
+ # paranoid false globally
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+
+ # paranoid false on the entry
+ mock_prune.reset_mock()
+ setup['paranoid'] = True
+ posix = get_posix_object(setup=setup)
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+
+ # entry does not exist on filesystem
+ mock_prune.reset_mock()
+ entry.set("paranoid", "true")
+ entry.set("current_exists", "false")
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+
+ with patch("os.path.isdir") as mock_isdir:
+ # entry is a directory on the filesystem
+ mock_prune.reset_mock()
+ entry.set("current_exists", "true")
+ mock_isdir.return_value = True
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+ mock_isdir.assert_called_with(entry.get("name"))
+
+ # test the actual backup now
+ mock_prune.reset_mock()
+ mock_isdir.return_value = False
+ posix._paranoid_backup(entry)
+ mock_isdir.assert_called_with(entry.get("name"))
+ mock_prune.assert_called_with(entry)
+ # it's basically impossible to test the shutil.copy() call
+ # exactly because the destination includes microseconds,
+ # so we just test it good enough
+ self.assertEqual(mock_copy.call_args[0][0],
+ entry.get("name"))
+ bkupnam = os.path.join(setup['ppath'],
+ entry.get('name').replace('/', '_')) + '_'
+ self.assertEqual(bkupnam,
+ mock_copy.call_args[0][1][:len(bkupnam)])
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py
new file mode 100644
index 000000000..68007ca8e
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py
@@ -0,0 +1,966 @@
+import os
+import copy
+import stat
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+import Bcfg2.Client.Tools
+from Bcfg2.Client.Tools.POSIX.base import *
+from Test__init import get_posix_object
+
+try:
+ import selinux
+ has_selinux = True
+except ImportError:
+ has_selinux = False
+
+try:
+ import posix1e
+ has_acls = True
+except ImportError:
+ has_acls = False
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_posixtool_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXTool(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXTool(unittest.TestCase):
+ def test_fully_specified(self):
+ # fully_specified should do no checking on the abstract
+ # POSIXTool object
+ ptool = get_posixtool_object()
+ self.assertTrue(ptool.fully_specified(Mock()))
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._verify_metadata")
+ def test_verify(self, mock_verify):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ ptool = get_posixtool_object()
+ with patch('os.stat') as mock_stat, patch('os.walk') as mock_walk:
+ mock_stat.return_value = MagicMock()
+
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_verify.assert_called_with(entry)
+
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_verify.assert_called_with(entry)
+
+ mock_verify.reset_mock()
+ entry.set("recursive", "true")
+ walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]),
+ ("/dir1", ["dir3"], []),
+ ("/dir2", [], ["file3", "file4"])]
+ mock_walk.return_value = walk_rv
+ self.assertTrue(ptool.verify(entry, []))
+ mock_walk.assert_called_with(entry.get("name"))
+ all_verifies = [call(entry)]
+ for root, dirs, files in walk_rv:
+ all_verifies.extend([call(entry, path=os.path.join(root, p))
+ for p in dirs + files])
+ self.assertItemsEqual(mock_verify.call_args_list, all_verifies)
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_perms")
+ def test_install(self, mock_set_perms):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ ptool = get_posixtool_object()
+
+ mock_set_perms.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_set_perms.assert_called_with(entry)
+
+ mock_set_perms.reset_mock()
+ entry.set("recursive", "true")
+ with patch('os.walk') as mock_walk:
+ walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]),
+ ("/dir1", ["dir3"], []),
+ ("/dir2", [], ["file3", "file4"])]
+ mock_walk.return_value = walk_rv
+
+ mock_set_perms.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_walk.assert_called_with(entry.get("name"))
+ all_set_perms = [call(entry)]
+ for root, dirs, files in walk_rv:
+ all_set_perms.extend([call(entry,
+ path=os.path.join(root, p))
+ for p in dirs + files])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ all_set_perms)
+
+ mock_walk.reset_mock()
+ mock_set_perms.reset_mock()
+
+ def set_perms_rv(entry, path=None):
+ if path == '/dir2/file3':
+ return False
+ else:
+ return True
+ mock_set_perms.side_effect = set_perms_rv
+
+ self.assertFalse(ptool.install(entry))
+ mock_walk.assert_called_with(entry.get("name"))
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ all_set_perms)
+
+ @patch("os.unlink")
+ @patch("shutil.rmtree")
+ def test_exists(self, mock_rmtree, mock_unlink):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ ptool = get_posixtool_object()
+ with patch('os.lstat') as mock_lstat, \
+ patch("os.path.isdir") as mock_isdir:
+ mock_lstat.side_effect = OSError
+ self.assertFalse(ptool._exists(entry))
+ mock_lstat.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ rv = MagicMock()
+ mock_lstat.return_value = rv
+ mock_lstat.side_effect = None
+ self.assertEqual(ptool._exists(entry), rv)
+ mock_lstat.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ mock_isdir.return_value = False
+ self.assertFalse(ptool._exists(entry, remove=True))
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_unlink.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_rmtree.called)
+
+ mock_lstat.reset_mock()
+ mock_isdir.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_isdir.return_value = True
+ self.assertFalse(ptool._exists(entry, remove=True))
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_rmtree.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_isdir.reset_mock()
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_rmtree.side_effect = OSError
+ self.assertEqual(ptool._exists(entry, remove=True), rv)
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_rmtree.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ @patch("os.chown")
+ @patch("os.chmod")
+ @patch("os.utime")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_uid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_gid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_acls")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_secontext")
+ def test_set_perms(self, mock_set_secontext, mock_set_acls, mock_norm_gid,
+ mock_norm_uid, mock_utime, mock_chmod, mock_chown):
+ ptool = get_posixtool_object()
+
+ def reset():
+ mock_set_secontext.reset_mock()
+ mock_set_acls.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_chmod.reset_mock()
+ mock_chown.reset_mock()
+ mock_utime.reset_mock()
+
+ entry = lxml.etree.Element("Path", name="/etc/foo", to="/etc/bar",
+ type="symlink")
+ mock_set_acls.return_value = True
+ mock_set_secontext.return_value = True
+ self.assertTrue(ptool._set_perms(entry))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ entry = lxml.etree.Element("Path", name="/etc/foo", owner="owner",
+ group="group", perms="644", type="file")
+ mock_norm_uid.return_value = 10
+ mock_norm_gid.return_value = 100
+
+ reset()
+ self.assertTrue(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ self.assertFalse(mock_utime.called)
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ reset()
+ mtime = 1344459042
+ entry.set("mtime", str(mtime))
+ self.assertTrue(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ reset()
+ self.assertTrue(ptool._set_perms(entry, path='/etc/bar'))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with('/etc/bar', 10, 100)
+ mock_chmod.assert_called_with('/etc/bar', int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path='/etc/bar')
+ mock_set_acls.assert_called_with(entry, path='/etc/bar')
+
+ # test dev_type modification of perms, failure of chown
+ reset()
+ def chown_rv(path, owner, group):
+ if owner == 0 and group == 0:
+ return True
+ else:
+ raise KeyError
+ os.chown.side_effect = chown_rv
+ entry.set("type", "device")
+ entry.set("dev_type", device_map.keys()[0])
+ self.assertFalse(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 0, 0)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8) | device_map.values()[0])
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ # test failure of chmod
+ reset()
+ os.chown.side_effect = None
+ os.chmod.side_effect = OSError
+ entry.set("type", "file")
+ del entry.attrib["dev_type"]
+ self.assertFalse(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ # test that even when everything fails, we try to do it all.
+ # e.g., when chmod fails, we still try to apply acls, set
+ # selinux context, etc.
+ reset()
+ os.chown.side_effect = OSError
+ os.utime.side_effect = OSError
+ mock_set_acls.return_value = False
+ mock_set_secontext.return_value = False
+ self.assertFalse(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_uid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_gid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_entry_acls")
+ def test_set_acls(self, mock_list_entry_acls, mock_norm_gid, mock_norm_uid):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ ptool = get_posixtool_object()
+
+ # disable acls for the initial test
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertTrue(ptool._set_acls(entry))
+ Bcfg2.Client.Tools.POSIX.base.has_acls = True
+
+ # build a set of file ACLs to return from posix1e.ACL(file=...)
+ file_acls = []
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_USER
+ acl.name = "remove"
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_GROUP
+ acl.name = "remove"
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_MASK
+ acl.name = "keep"
+ file_acls.append(acl)
+ remove_acls = [a for a in file_acls if a.name == "remove"]
+
+ # build a set of ACLs listed on the entry as returned by
+ # _list_entry_acls()
+ entry_acls = {("default", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5}
+ mock_list_entry_acls.return_value = entry_acls
+ mock_norm_uid.return_value = 10
+ mock_norm_gid.return_value = 100
+
+ with patch("posix1e.ACL") as mock_ACL, \
+ patch("posix1e.Entry") as mock_Entry, \
+ patch("os.path.isdir") as mock_isdir:
+ # set up the unreasonably complex return value for
+ # posix1e.ACL(), which has three separate uses
+ fileacl_rv = MagicMock()
+ fileacl_rv.valid.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv = MagicMock()
+ filedef_rv.valid.return_value = True
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ acl_rv = MagicMock()
+ def mock_acl_rv(file=None, filedef=None, acl=None):
+ if file:
+ return fileacl_rv
+ elif filedef:
+ return filedef_rv
+ elif acl:
+ return acl_rv
+
+ # set up the equally unreasonably complex return value for
+ # posix1e.Entry, which returns a new entry and adds it to
+ # an ACL, so we have to track the Mock objects it returns.
+ # why can't they just have an acl.add_entry() method?!?
+ acl_entries = []
+ def mock_entry_rv(acl):
+ rv = MagicMock()
+ rv.acl = acl
+ rv.permset = set()
+ acl_entries.append(rv)
+ return rv
+ mock_Entry.side_effect = mock_entry_rv
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_ACL.reset_mock()
+ mock_Entry.reset_mock()
+ fileacl_rv.reset_mock()
+
+ # test fs mounted noacl
+ mock_ACL.side_effect = IOError(95, "Operation not permitted")
+ self.assertFalse(ptool._set_acls(entry))
+
+ # test other error
+ reset()
+ mock_ACL.side_effect = IOError
+ self.assertFalse(ptool._set_acls(entry))
+
+ reset()
+ mock_ACL.side_effect = mock_acl_rv
+ mock_isdir.return_value = True
+ self.assertTrue(ptool._set_acls(entry))
+ self.assertItemsEqual(mock_ACL.call_args_list,
+ [call(file=entry.get("name")),
+ call(filedef=entry.get("name"))])
+ self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ self.assertItemsEqual(filedef_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_norm_uid.assert_called_with("user")
+ mock_norm_gid.assert_called_with("group")
+ fileacl_rv.calc_mask.assert_any_call()
+ fileacl_rv.applyto.assert_called_with(entry.get("name"),
+ posix1e.ACL_TYPE_ACCESS)
+ filedef_rv.calc_mask.assert_any_call()
+ filedef_rv.applyto.assert_called_with(entry.get("name"),
+ posix1e.ACL_TYPE_DEFAULT)
+
+ # build tuples of the Entry objects that were added to acl
+ # and defaacl so they're easier to compare for equality
+ added_acls = []
+ for acl in acl_entries:
+ added_acls.append((acl.acl, acl.tag_type, acl.qualifier,
+ sum(acl.permset)))
+ self.assertItemsEqual(added_acls,
+ [(filedef_rv, posix1e.ACL_USER, 10, 7),
+ (fileacl_rv, posix1e.ACL_GROUP, 100, 5)])
+
+ reset()
+ # have to reassign these because they're iterators, and
+ # they've already been iterated over once
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ mock_list_entry_acls.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_isdir.return_value = False
+ acl_entries = []
+ self.assertTrue(ptool._set_acls(entry, path="/bin/bar"))
+ mock_ACL.assert_called_with(file="/bin/bar")
+ self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_norm_gid.assert_called_with("group")
+ fileacl_rv.calc_mask.assert_any_call()
+ fileacl_rv.applyto.assert_called_with("/bin/bar",
+ posix1e.ACL_TYPE_ACCESS)
+
+ added_acls = []
+ for acl in acl_entries:
+ added_acls.append((acl.acl, acl.tag_type, acl.qualifier,
+ sum(acl.permset)))
+ self.assertItemsEqual(added_acls,
+ [(fileacl_rv, posix1e.ACL_GROUP, 100, 5)])
+
+ @unittest.skipUnless(has_selinux, "SELinux not found, skipping")
+ def test_set_secontext(self):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ ptool = get_posixtool_object()
+
+ # disable selinux for the initial test
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ self.assertTrue(ptool._set_secontext(entry))
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = True
+
+ with patch("selinux.restorecon") as mock_restorecon, \
+ patch("selinux.lsetfilecon") as mock_lsetfilecon:
+ # no context given
+ self.assertTrue(ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ self.assertFalse(mock_lsetfilecon.called)
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ entry.set("secontext", "__default__")
+ self.assertTrue(ptool._set_secontext(entry))
+ mock_restorecon.assert_called_with(entry.get("name"))
+ self.assertFalse(mock_lsetfilecon.called)
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ mock_lsetfilecon.return_value = 0
+ entry.set("secontext", "foo_t")
+ self.assertTrue(ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t")
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ mock_lsetfilecon.return_value = 1
+ self.assertFalse(ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t")
+
+ @patch("grp.getgrnam")
+ def test_norm_gid(self, mock_getgrnam):
+ ptool = get_posixtool_object()
+ self.assertEqual(5, ptool._norm_gid("5"))
+ self.assertFalse(mock_getgrnam.called)
+
+ mock_getgrnam.reset_mock()
+ mock_getgrnam.return_value = ("group", "x", 5, [])
+ self.assertEqual(5, ptool._norm_gid("group"))
+ mock_getgrnam.assert_called_with("group")
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_gid")
+ def test_norm_entry_gid(self, mock_norm_gid):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user")
+ ptool = get_posixtool_object()
+ mock_norm_gid.return_value = 10
+ self.assertEqual(10, ptool._norm_entry_gid(entry))
+ mock_norm_gid.assert_called_with(entry.get("group"))
+
+ mock_norm_gid.reset_mock()
+ mock_norm_gid.side_effect = KeyError
+ self.assertEqual(0, ptool._norm_entry_gid(entry))
+ mock_norm_gid.assert_called_with(entry.get("group"))
+
+ @patch("pwd.getpwnam")
+ def test_norm_uid(self, mock_getpwnam):
+ ptool = get_posixtool_object()
+ self.assertEqual(5, ptool._norm_uid("5"))
+ self.assertFalse(mock_getpwnam.called)
+
+ mock_getpwnam.reset_mock()
+ mock_getpwnam.return_value = ("user", "x", 5, 5, "User", "/home/user",
+ "/bin/zsh")
+ self.assertEqual(5, ptool._norm_uid("user"))
+ mock_getpwnam.assert_called_with("user")
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_uid")
+ def test_norm_entry_uid(self, mock_norm_uid):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user")
+ ptool = get_posixtool_object()
+ mock_norm_uid.return_value = 10
+ self.assertEqual(10, ptool._norm_entry_uid(entry))
+ mock_norm_uid.assert_called_with(entry.get("owner"))
+
+ mock_norm_uid.reset_mock()
+ mock_norm_uid.side_effect = KeyError
+ self.assertEqual(0, ptool._norm_entry_uid(entry))
+ mock_norm_uid.assert_called_with(entry.get("owner"))
+
+ def test_norm_acl_perms(self):
+ ptool = get_posixtool_object()
+ # there's basically no reasonably way to test the Permset
+ # object parsing feature without writing our own Mock object
+ # that re-implements Permset.test(). silly pylibacl won't let
+ # us create standalone Entry or Permset objects.
+ ptool = get_posixtool_object()
+ self.assertEqual(5, ptool._norm_acl_perms("5"))
+ self.assertEqual(0, ptool._norm_acl_perms("55"))
+ self.assertEqual(5, ptool._norm_acl_perms("rx"))
+ self.assertEqual(5, ptool._norm_acl_perms("r-x"))
+ self.assertEqual(6, ptool._norm_acl_perms("wr-"))
+ self.assertEqual(0, ptool._norm_acl_perms("rwrw"))
+ self.assertEqual(0, ptool._norm_acl_perms("-"))
+ self.assertEqual(0, ptool._norm_acl_perms("a"))
+ self.assertEqual(6, ptool._norm_acl_perms("rwa"))
+ self.assertEqual(4, ptool._norm_acl_perms("rr"))
+
+ def test__gather_data(self):
+ path = '/test'
+ ptool = get_posixtool_object()
+
+ # have to use context manager version of patch here because
+ # os.stat must be unpatched when we instantiate the object to
+ # make pkgutil.walk_packages() work
+ with patch('os.stat') as mock_stat:
+ mock_stat.side_effect = OSError
+ self.assertFalse(ptool._gather_data(path)[0])
+ mock_stat.assert_called_with(path)
+
+ mock_stat.reset_mock()
+ mock_stat.side_effect = None
+ # create a return value
+ stat_rv = MagicMock()
+ def stat_getitem(key):
+ if int(key) == stat.ST_UID:
+ return 0
+ elif int(key) == stat.ST_GID:
+ return 10
+ elif int(key) == stat.ST_MODE:
+ # return extra bits in the mode to emulate a device
+ # and ensure that they're stripped
+ return int('060660', 8)
+ stat_rv.__getitem__ = Mock(side_effect=stat_getitem)
+ mock_stat.return_value = stat_rv
+
+ # disable selinux and acls for this call -- we test them
+ # separately so that we can skip those tests as appropriate
+ states = (Bcfg2.Client.Tools.POSIX.base.has_selinux,
+ Bcfg2.Client.Tools.POSIX.base.has_acls)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertEqual(ptool._gather_data(path),
+ (stat_rv, '0', '10', '0660', None, None))
+ Bcfg2.Client.Tools.POSIX.base.has_selinux, \
+ Bcfg2.Client.Tools.POSIX.base.has_acls = states
+ mock_stat.assert_called_with(path)
+
+ @unittest.skipUnless(has_selinux, "SELinux not found, skipping")
+ def test__gather_data_selinux(self):
+ context = 'system_u:object_r:root_t:s0'
+ path = '/test'
+ ptool = get_posixtool_object()
+ with patch("selinux.getfilecon") as mock_getfilecon, \
+ patch('os.stat') as mock_stat:
+ mock_getfilecon.return_value = [len(context) + 1, context]
+ mock_stat.return_value = MagicMock()
+ # disable acls for this call and test them separately
+ state = Bcfg2.Client.Tools.POSIX.base.has_acls
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertEqual(ptool._gather_data(path)[4], 'root_t')
+ Bcfg2.Client.Tools.POSIX.base.has_acls = state
+ mock_getfilecon.assert_called_with(path)
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_file_acls")
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ def test__gather_data_acls(self, mock_list_file_acls):
+ acls = {("default", posix1e.ACL_USER, "testuser"): "rwx",
+ ("access", posix1e.ACL_GROUP, "testgroup"): "rx"}
+ mock_list_file_acls.return_value = acls
+ path = '/test'
+ ptool = get_posixtool_object()
+ with patch('os.stat') as mock_stat:
+ mock_stat.return_value = MagicMock()
+ # disable selinux for this call and test it separately
+ state = Bcfg2.Client.Tools.POSIX.base.has_selinux
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ self.assertItemsEqual(ptool._gather_data(path)[5], acls)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = state
+ mock_list_file_acls.assert_called_with(path)
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._verify_acls")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._gather_data")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_uid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_gid")
+ def test_verify_metadata(self, mock_norm_gid, mock_norm_uid,
+ mock_gather_data, mock_verify_acls):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user", perms="664",
+ secontext='etc_t')
+ # _verify_metadata() mutates the entry, so we keep a backup so we
+ # can start fresh every time
+ orig_entry = copy.deepcopy(entry)
+
+ ptool = get_posixtool_object()
+
+ mock_gather_data.return_value = (False, None, None, None, None, None)
+ self.assertFalse(ptool._verify_metadata(entry))
+ self.assertEqual(entry.get("current_exists", "").lower(), "false")
+ mock_gather_data.assert_called_with(entry.get("name"))
+
+ # expected data. tuple of attr, return value index, value
+ expected = [('current_owner', 1, '0'),
+ ('current_group', 2, '10'),
+ ('current_perms', 3, '0664'),
+ ('current_secontext', 4, 'etc_t')]
+ mock_norm_uid.return_value = 0
+ mock_norm_gid.return_value = 10
+ gather_data_rv = [MagicMock(), None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+
+ entry = copy.deepcopy(orig_entry)
+ mock_gather_data.reset_mock()
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ self.assertTrue(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+
+ mtime = 1344430414
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ stat_rv = MagicMock()
+ stat_rv.__getitem__.return_value = mtime
+ gather_data_rv[0] = stat_rv
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ # failure modes for each checked datum. tuple of changed attr,
+ # return value index, new (failing) value
+ failures = [('current_owner', 1, '10'),
+ ('current_group', 2, '100'),
+ ('current_perms', 3, '0660')]
+ if has_selinux:
+ failures.append(('current_secontext', 4, 'root_t'))
+
+ for fail_attr, fail_idx, fail_val in failures:
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ gather_data_rv = [stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ gather_data_rv[fail_idx] = fail_val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertFalse(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ self.assertEqual(entry.get(fail_attr), fail_val)
+ for attr, idx, val in expected:
+ if attr != fail_attr:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ # failure mode for mtime
+ fail_mtime = 1344431162
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ fail_stat_rv = MagicMock()
+ fail_stat_rv.__getitem__.return_value = fail_mtime
+ gather_data_rv = [fail_stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertFalse(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(fail_mtime))
+
+ if has_selinux:
+ # test success and failure for __default__ secontext
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ entry.set("secontext", "__default__")
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ with patch("selinux.matchpathcon") as mock_matchpathcon:
+ context1 = "system_u:object_r:etc_t:s0"
+ context2 = "system_u:object_r:root_t:s0"
+ mock_matchpathcon.return_value = [1 + len(context1),
+ context1]
+ gather_data_rv = [stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry,
+ path=entry.get("name"))
+ mock_matchpathcon.assert_called_with(entry.get("name"), 0)
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ entry.set("secontext", "__default__")
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_matchpathcon.return_value = [1 + len(context2),
+ context2]
+ self.assertFalse(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry,
+ path=entry.get("name"))
+ mock_matchpathcon.assert_called_with(entry.get("name"), 0)
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ def test_list_entry_acls(self):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ lxml.etree.SubElement(entry, "ACL", scope="user", type="default",
+ user="user", perms="rwx")
+ lxml.etree.SubElement(entry, "ACL", scope="group", type="access",
+ group="group", perms="5")
+ ptool = get_posixtool_object()
+ self.assertItemsEqual(ptool._list_entry_acls(entry),
+ {("default", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5})
+
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("pwd.getpwuid")
+ @patch("grp.getgrgid")
+ def test_list_file_acls(self, mock_getgrgid, mock_getpwuid):
+ path = '/test'
+ ptool = get_posixtool_object()
+ with patch("posix1e.ACL") as mock_ACL, \
+ patch("os.path.isdir") as mock_isdir:
+ # build a set of file ACLs to return from posix1e.ACL(file=...)
+ file_acls = []
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_USER
+ acl.qualifier = 10
+ # yes, this is a bogus permset. thanks to _norm_acl_perms
+ # it works and is easier than many of the alternatives.
+ acl.permset = 'rwx'
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_GROUP
+ acl.qualifier = 100
+ acl.permset = 'rx'
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_MASK
+ file_acls.append(acl)
+ acls = {("access", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5}
+
+ # set up the unreasonably complex return value for
+ # posix1e.ACL(), which has two separate uses
+ fileacl_rv = MagicMock()
+ fileacl_rv.valid.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv = MagicMock()
+ filedef_rv.valid.return_value = True
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ def mock_acl_rv(file=None, filedef=None):
+ if file:
+ return fileacl_rv
+ elif filedef:
+ return filedef_rv
+ # other return values
+ mock_isdir.return_value = False
+ mock_getgrgid.return_value = ("group", "x", 5, [])
+ mock_getpwuid.return_value = ("user", "x", 5, 5, "User",
+ "/home/user", "/bin/zsh")
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_getgrgid.reset_mock()
+ mock_getpwuid.reset_mock()
+ mock_ACL.reset_mock()
+
+ mock_ACL.side_effect = IOError(95, "Operation not supported")
+ self.assertItemsEqual(ptool._list_file_acls(path), dict())
+
+ reset()
+ mock_ACL.side_effect = IOError
+ self.assertItemsEqual(ptool._list_file_acls(path), dict())
+
+ reset()
+ mock_ACL.side_effect = mock_acl_rv
+ self.assertItemsEqual(ptool._list_file_acls(path), acls)
+ mock_isdir.assert_called_with(path)
+ mock_getgrgid.assert_called_with(100)
+ mock_getpwuid.assert_called_with(10)
+ mock_ACL.assert_called_with(file=path)
+
+ reset()
+ mock_isdir.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv.__iter__.return_value = iter(file_acls)
+
+ defacls = acls
+ for akey, perms in acls.items():
+ defacls[('default', akey[1], akey[2])] = perms
+ self.assertItemsEqual(ptool._list_file_acls(path), defacls)
+ mock_isdir.assert_called_with(path)
+ self.assertItemsEqual(mock_getgrgid.call_args_list,
+ [call(100), call(100)])
+ self.assertItemsEqual(mock_getpwuid.call_args_list,
+ [call(10), call(10)])
+ self.assertItemsEqual(mock_ACL.call_args_list,
+ [call(file=path), call(filedef=path)])
+
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_file_acls")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_entry_acls")
+ def test_verify_acls(self, mock_list_entry_acls, mock_list_file_acls):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ ptool = get_posixtool_object()
+ # we can't test to make sure that errors get properly sorted
+ # into (missing, extra, wrong) without refactoring the
+ # _verify_acls code, and I don't feel like doing that, so eff
+ # it. let's just test to make sure that failures are
+ # identified at all for now.
+
+ acls = {("access", posix1e.ACL_USER, "user"): 7,
+ ("default", posix1e.ACL_GROUP, "group"): 5}
+ extra_acls = copy.deepcopy(acls)
+ extra_acls[("access", posix1e.ACL_USER, "user2")] = 4
+
+ mock_list_entry_acls.return_value = acls
+ mock_list_file_acls.return_value = acls
+ self.assertTrue(ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test missing
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_file_acls.return_value = extra_acls
+ self.assertFalse(ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test extra
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_entry_acls.return_value = extra_acls
+ mock_list_file_acls.return_value = acls
+ self.assertFalse(ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test wrong
+ wrong_acls = copy.deepcopy(extra_acls)
+ wrong_acls[("access", posix1e.ACL_USER, "user2")] = 5
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_entry_acls.return_value = extra_acls
+ mock_list_file_acls.return_value = wrong_acls
+ self.assertFalse(ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ @patch("os.makedirs")
+ @patch("os.path.exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_perms")
+ def test_makedirs(self, mock_set_perms, mock_exists, mock_makedirs):
+ entry = lxml.etree.Element("Path", name="/test/foo/bar",
+ type="directory")
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_set_perms.reset_mock()
+ mock_makedirs.reset_mock()
+
+ ptool = get_posixtool_object()
+ mock_set_perms.return_value = True
+ def path_exists_rv(path):
+ if path == "/test":
+ return True
+ else:
+ return False
+ mock_exists.side_effect = path_exists_rv
+ self.assertTrue(ptool._makedirs(entry))
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call("/test"), call("/test/foo"),
+ call("/test/foo/bar")])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+ mock_makedirs.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_makedirs.side_effect = OSError
+ self.assertFalse(ptool._makedirs(entry))
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+
+ reset()
+ mock_makedirs.side_effect = None
+ def set_perms_rv(entry, path=None):
+ if path == '/test/foo':
+ return False
+ else:
+ return True
+ mock_set_perms.side_effect = set_perms_rv
+ self.assertFalse(ptool._makedirs(entry))
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call("/test"), call("/test/foo"),
+ call("/test/foo/bar")])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+ mock_makedirs.assert_called_with(entry.get("name"))