From b862090945322d5ba4b42e180bba92afb860df21 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 15 Aug 2012 09:06:43 -0400 Subject: POSIX: refactored POSIX tool into multiple files to make it more manageable Added unit tests for POSIX tool and sub-tools fixed ACL handling for filesystems mounted noacl --- .../TestClient/TestTools/TestPOSIX/TestFile.py | 318 +++++++++++++++++++++ 1 file changed, 318 insertions(+) create mode 100644 testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py (limited to 'testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py') diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py new file mode 100644 index 000000000..a2cd52dd5 --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py @@ -0,0 +1,318 @@ +import os +import copy +import binascii +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.File import * +from Test__init import get_posix_object + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_file_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXFile(posix.logger, posix.setup, posix.config) + +class TestPOSIXFile(unittest.TestCase): + def test_fully_specified(self): + entry = lxml.etree.Element("Path", name="/test", type="file") + ptool = get_file_object() + self.assertFalse(ptool.fully_specified(entry)) + + entry.set("empty", "true") + self.assertTrue(ptool.fully_specified(entry)) + + entry.set("empty", "false") + entry.text = "text" + self.assertTrue(ptool.fully_specified(entry)) + + def test_is_string(self): + ptool = get_file_object() + for char in range(8) + range(14, 32): + self.assertFalse(ptool._is_string("foo" + chr(char) + "bar", + 'utf_8')) + for char in range(9, 14) + range(33, 128): + self.assertTrue(ptool._is_string("foo" + chr(char) + "bar", + 'utf_8')) + self.assertFalse(ptool._is_string("foo" + chr(128) + "bar", + 'ascii')) + ustr = '\xef\xa3\x91 + \xef\xa3\x92' + self.assertTrue(ptool._is_string(ustr, 'utf_8')) + self.assertFalse(ptool._is_string(ustr, 'ascii')) + + def test_get_data(self): + orig_entry = lxml.etree.Element("Path", name="/test", type="file") + setup = dict(encoding="ascii", ppath='/', max_copies=5) + ptool = get_file_object(posix=get_posix_object(setup=setup)) + + entry = copy.deepcopy(orig_entry) + entry.text = binascii.b2a_base64("test") + entry.set("encoding", "base64") + self.assertEqual(ptool._get_data(entry), ("test", True)) + + entry = copy.deepcopy(orig_entry) + entry.set("empty", "true") + self.assertEqual(ptool._get_data(entry), ("", False)) + + entry = copy.deepcopy(orig_entry) + entry.text = "test" + self.assertEqual(ptool._get_data(entry), ("test", False)) + + ustr = u'\uf8d1 + \uf8d2' + entry = copy.deepcopy(orig_entry) + entry.text = ustr + self.assertEqual(ptool._get_data(entry), (ustr, False)) + + setup['encoding'] = "utf_8" + ptool = get_file_object(posix=get_posix_object(setup=setup)) + entry = copy.deepcopy(orig_entry) + entry.text = ustr + self.assertEqual(ptool._get_data(entry), + ('\xef\xa3\x91 + \xef\xa3\x92', False)) + + @patch("__builtin__.open") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._exists") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_diffs") + def test_verify(self, mock_get_diffs, mock_get_data, mock_exists, + mock_verify, mock_open): + entry = lxml.etree.Element("Path", name="/test", type="file") + setup = dict(interactive=False, ppath='/', max_copies=5) + ptool = get_file_object(posix=get_posix_object(setup=setup)) + + def reset(): + mock_get_diffs.reset_mock() + mock_get_data.reset_mock() + mock_exists.reset_mock() + mock_verify.reset_mock() + mock_open.reset_mock() + + mock_get_data.return_value = ("test", False) + mock_exists.return_value = False + mock_verify.return_value = True + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=False, + is_binary=False, + content="") + + reset() + exists_rv = MagicMock() + exists_rv.__getitem__.return_value = 5 + mock_exists.return_value = exists_rv + mock_get_data.return_value = ("test", True) + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=False, + is_binary=True, + content=None) + + reset() + mock_get_data.return_value = ("test", False) + exists_rv.__getitem__.return_value = 4 + entry.set("sensitive", "true") + open_rv = Mock() + open_rv.read.return_value = "tart" + mock_open.return_value = open_rv + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_open.assert_called_with(entry.get("name")) + open_rv.assert_any_call() + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=True, + is_binary=False, + content="tart") + + reset() + open_rv.read.return_value = "test" + mock_open.return_value = open_rv + self.assertTrue(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_open.assert_called_with(entry.get("name")) + open_rv.assert_any_call() + self.assertFalse(mock_get_diffs.called) + + reset() + mock_open.side_effect = IOError + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_open.assert_called_with(entry.get("name")) + + @patch("os.fdopen") + @patch("tempfile.mkstemp") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data") + def test_write_tmpfile(self, mock_get_data, mock_mkstemp, mock_fdopen): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + ptool = get_file_object() + newfile = "/foo/bar" + + def reset(): + mock_get_data.reset_mock() + mock_mkstemp.reset_mock() + mock_fdopen.reset_mock() + + mock_get_data.return_value = ("test", False) + mock_mkstemp.return_value = (5, newfile) + self.assertEqual(ptool._write_tmpfile(entry), newfile) + mock_get_data.assert_called_with(entry) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + mock_fdopen.assert_called_with(5, 'w') + mock_fdopen.return_value.write.assert_called_with("test") + + reset() + mock_mkstemp.side_effect = OSError + self.assertFalse(ptool._write_tmpfile(entry)) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + + reset() + mock_mkstemp.side_effect = None + mock_fdopen.side_effect = OSError + self.assertFalse(ptool._write_tmpfile(entry)) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + mock_get_data.assert_called_with(entry) + mock_fdopen.assert_called_with(5, 'w') + + @patch("os.rename") + @patch("os.unlink") + def test_rename_tmpfile(self, mock_unlink, mock_rename): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + ptool = get_file_object() + newfile = "/foo/bar" + + self.assertTrue(ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + + mock_rename.reset_mock() + mock_unlink.reset_mock() + mock_rename.side_effect = OSError + self.assertFalse(ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + mock_unlink.assert_called_with(newfile) + + # even if the unlink fails, return false gracefully + mock_rename.reset_mock() + mock_unlink.reset_mock() + mock_unlink.side_effect = OSError + self.assertFalse(ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + mock_unlink.assert_called_with(newfile) + + @patch("os.path.exists") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._makedirs") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._set_perms") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._write_tmpfile") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._rename_tmpfile") + def test_install(self, mock_rename, mock_write, mock_set_perms, + mock_makedirs, mock_install, mock_exists): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + ptool = get_file_object() + + def reset(): + mock_rename.reset_mock() + mock_write.reset_mock() + mock_set_perms.reset_mock() + mock_makedirs.reset_mock() + mock_install.reset_mock() + mock_exists.reset_mock() + + mock_exists.return_value = False + mock_makedirs.return_value = False + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + + reset() + mock_makedirs.return_value = True + mock_write.return_value = False + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + + reset() + newfile = '/test.X987yS' + mock_write.return_value = newfile + mock_set_perms.return_value = False + mock_rename.return_value = False + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + + reset() + mock_rename.return_value = True + mock_install.return_value = False + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(ptool, entry) + + reset() + mock_install.return_value = True + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(ptool, entry) + + reset() + mock_set_perms.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(ptool, entry) + + reset() + mock_exists.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with("/") + self.assertFalse(mock_makedirs.called) + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(ptool, entry) + + def test_diff(self): + ptool = get_file_object() + content1 = "line1\nline2" + content2 = "line3" + rv = ["line1", "line2", "line3"] + func = Mock() + func.return_value = rv + self.assertItemsEqual(ptool._diff(content1, content2, func), rv) + func.assert_called_with(["line1", "line2"], ["line3"]) + + func.reset_mock() + def slow_diff(content1, content2): + for i in range(1, 10): + time.sleep(5) + yield "line%s" % i + func.side_effect = slow_diff + self.assertFalse(ptool._diff(content1, content2, func), rv) + func.assert_called_with(["line1", "line2"], ["line3"]) -- cgit v1.2.3-1-g7c22