From d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 4 Sep 2012 09:52:57 -0400 Subject: reorganized testsuite to allow tests on stuff outside of src --- .../TestClient/TestTools/TestPOSIX/TestFile.py | 448 +++++++++++++++++++++ 1 file changed, 448 insertions(+) create mode 100644 testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py (limited to 'testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py') diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py new file mode 100644 index 000000000..cdf11ce5e --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py @@ -0,0 +1,448 @@ +# -*- coding: utf-8 -*- +import os +import sys +import copy +import difflib +import binascii +import lxml.etree +from Bcfg2.Compat import b64encode, b64decode, u_str +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.File import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from Test__init import get_posix_object +from Testbase import TestPOSIXTool +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore + +def get_file_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXFile(posix.logger, posix.setup, posix.config) + +class TestPOSIXFile(TestPOSIXTool): + test_obj = POSIXFile + + def test_fully_specified(self): + entry = lxml.etree.Element("Path", name="/test", type="file") + self.assertFalse(self.ptool.fully_specified(entry)) + + entry.set("empty", "true") + self.assertTrue(self.ptool.fully_specified(entry)) + + entry.set("empty", "false") + entry.text = "text" + self.assertTrue(self.ptool.fully_specified(entry)) + + def test_is_string(self): + for char in list(range(8)) + list(range(14, 32)): + self.assertFalse(self.ptool._is_string("foo" + chr(char) + "bar", + 'UTF-8')) + for char in list(range(9, 14)) + list(range(33, 128)): + self.assertTrue(self.ptool._is_string("foo" + chr(char) + "bar", + 'UTF-8')) + ustr = 'é' + self.assertTrue(self.ptool._is_string(ustr, 'UTF-8')) + if not inPy3k: + self.assertFalse(self.ptool._is_string("foo" + chr(128) + "bar", + 'ascii')) + self.assertFalse(self.ptool._is_string(ustr, 'ascii')) + + def test_get_data(self): + orig_entry = lxml.etree.Element("Path", name="/test", type="file") + setup = dict(encoding="ascii", ppath='/', max_copies=5) + ptool = self.get_obj(posix=get_posix_object(setup=setup)) + + entry = copy.deepcopy(orig_entry) + entry.text = b64encode("test") + entry.set("encoding", "base64") + self.assertEqual(ptool._get_data(entry), ("test", True)) + + entry = copy.deepcopy(orig_entry) + entry.set("empty", "true") + self.assertEqual(ptool._get_data(entry), ("", False)) + + entry = copy.deepcopy(orig_entry) + entry.text = "test" + self.assertEqual(ptool._get_data(entry), ("test", False)) + + if inPy3k: + ustr = 'é' + else: + ustr = u_str('é', 'UTF-8') + entry = copy.deepcopy(orig_entry) + entry.text = ustr + self.assertEqual(ptool._get_data(entry), (ustr, False)) + + @patch("%s.open" % builtins) + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + @patch("Bcfg2.Client.Tools.POSIX.File.%s._exists" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_diffs" % test_obj.__name__) + def test_verify(self, mock_get_diffs, mock_get_data, mock_exists, + mock_verify, mock_open): + entry = lxml.etree.Element("Path", name="/test", type="file") + setup = dict(interactive=False, ppath='/', max_copies=5) + ptool = self.get_obj(posix=get_posix_object(setup=setup)) + + def reset(): + mock_get_diffs.reset_mock() + mock_get_data.reset_mock() + mock_exists.reset_mock() + mock_verify.reset_mock() + mock_open.reset_mock() + + mock_get_data.return_value = ("test", False) + mock_exists.return_value = False + mock_verify.return_value = True + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=False, + is_binary=False, + content="") + + reset() + exists_rv = MagicMock() + exists_rv.__getitem__.return_value = 5 + mock_exists.return_value = exists_rv + mock_get_data.return_value = ("test", True) + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=False, + is_binary=True, + content=None) + + reset() + mock_get_data.return_value = ("test", False) + exists_rv.__getitem__.return_value = 4 + entry.set("sensitive", "true") + mock_open.return_value.read.return_value = "tart" + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_called_with() + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=True, + is_binary=False, + content="tart") + + reset() + mock_open.return_value.read.return_value = "test" + self.assertTrue(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_called_with() + self.assertFalse(mock_get_diffs.called) + + reset() + mock_open.side_effect = IOError + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_open.assert_called_with(entry.get("name")) + + @patch("os.fdopen") + @patch("tempfile.mkstemp") + @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__) + def test_write_tmpfile(self, mock_get_data, mock_mkstemp, mock_fdopen): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + newfile = "/foo/bar" + + def reset(): + mock_get_data.reset_mock() + mock_mkstemp.reset_mock() + mock_fdopen.reset_mock() + + mock_get_data.return_value = ("test", False) + mock_mkstemp.return_value = (5, newfile) + self.assertEqual(self.ptool._write_tmpfile(entry), newfile) + mock_get_data.assert_called_with(entry) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + mock_fdopen.assert_called_with(5, 'w') + mock_fdopen.return_value.write.assert_called_with("test") + + reset() + mock_mkstemp.side_effect = OSError + self.assertFalse(self.ptool._write_tmpfile(entry)) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + + reset() + mock_mkstemp.side_effect = None + mock_fdopen.side_effect = OSError + self.assertFalse(self.ptool._write_tmpfile(entry)) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + mock_get_data.assert_called_with(entry) + mock_fdopen.assert_called_with(5, 'w') + + @patch("os.rename") + @patch("os.unlink") + def test_rename_tmpfile(self, mock_unlink, mock_rename): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + newfile = "/foo/bar" + + self.assertTrue(self.ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + + mock_rename.reset_mock() + mock_unlink.reset_mock() + mock_rename.side_effect = OSError + self.assertFalse(self.ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + mock_unlink.assert_called_with(newfile) + + # even if the unlink fails, return false gracefully + mock_rename.reset_mock() + mock_unlink.reset_mock() + mock_unlink.side_effect = OSError + self.assertFalse(self.ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + mock_unlink.assert_called_with(newfile) + + @patch("%s.open" % builtins) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._diff" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._is_string" % test_obj.__name__) + def test__get_diffs(self, mock_is_string, mock_get_data, mock_diff, + mock_open): + orig_entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', + group='root') + orig_entry.text = "test" + ondisk = "test2" + setup = dict(encoding="utf-8", ppath='/', max_copies=5) + ptool = self.get_obj(posix=get_posix_object(setup=setup)) + + def reset(): + mock_is_string.reset_mock() + mock_get_data.reset_mock() + mock_diff.reset_mock() + mock_open.reset_mock() + return copy.deepcopy(orig_entry) + + mock_is_string.return_value = True + mock_get_data.return_value = (orig_entry.text, False) + mock_open.return_value.read.return_value = ondisk + mock_diff.return_value = ["-test2", "+test"] + + # binary data in the entry + entry = reset() + ptool._get_diffs(entry, is_binary=True) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_any_call() + self.assertFalse(mock_diff.called) + self.assertEqual(entry.get("current_bfile"), b64encode(ondisk)) + + # binary data on disk + entry = reset() + mock_is_string.return_value = False + ptool._get_diffs(entry, content=ondisk) + self.assertFalse(mock_open.called) + self.assertFalse(mock_diff.called) + self.assertEqual(entry.get("current_bfile"), b64encode(ondisk)) + + # sensitive, non-interactive -- do nothing + entry = reset() + mock_is_string.return_value = True + ptool._get_diffs(entry, sensitive=True, interactive=False) + self.assertFalse(mock_open.called) + self.assertFalse(mock_diff.called) + self.assertXMLEqual(entry, orig_entry) + + # sensitive, interactive + entry = reset() + ptool._get_diffs(entry, sensitive=True, interactive=True) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_any_call() + mock_diff.assert_called_with(ondisk, entry.text, difflib.unified_diff, + filename=entry.get("name")) + self.assertIsNotNone(entry.get("qtext")) + del entry.attrib['qtext'] + self.assertItemsEqual(orig_entry.attrib, entry.attrib) + + # non-sensitive, non-interactive + entry = reset() + ptool._get_diffs(entry, content=ondisk) + self.assertFalse(mock_open.called) + mock_diff.assert_called_with(ondisk, entry.text, difflib.ndiff, + filename=entry.get("name")) + self.assertIsNone(entry.get("qtext")) + self.assertEqual(entry.get("current_bdiff"), + b64encode("\n".join(mock_diff.return_value))) + del entry.attrib["current_bdiff"] + self.assertItemsEqual(orig_entry.attrib, entry.attrib) + + # non-sensitive, interactive -- do everything. also test + # appending to qtext + entry = reset() + entry.set("qtext", "test") + ptool._get_diffs(entry, interactive=True) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_any_call() + self.assertItemsEqual(mock_diff.call_args_list, + [call(ondisk, entry.text, difflib.unified_diff, + filename=entry.get("name")), + call(ondisk, entry.text, difflib.ndiff, + filename=entry.get("name"))]) + self.assertIsNotNone(entry.get("qtext")) + self.assertTrue(entry.get("qtext").startswith("test\n")) + self.assertEqual(entry.get("current_bdiff"), + b64encode("\n".join(mock_diff.return_value))) + del entry.attrib['qtext'] + del entry.attrib["current_bdiff"] + self.assertItemsEqual(orig_entry.attrib, entry.attrib) + + # non-sensitive, interactive with unicode data + entry = reset() + entry.text = u("tëst") + encoded = entry.text.encode(setup['encoding']) + mock_diff.return_value = ["-test2", "+tëst"] + mock_get_data.return_value = (encoded, False) + ptool._get_diffs(entry, interactive=True) + mock_open.assert_called_with(entry.get("name")) + mock_open.return_value.read.assert_any_call() + self.assertItemsEqual(mock_diff.call_args_list, + [call(ondisk, encoded, difflib.unified_diff, + filename=entry.get("name")), + call(ondisk, encoded, difflib.ndiff, + filename=entry.get("name"))]) + self.assertIsNotNone(entry.get("qtext")) + self.assertEqual(entry.get("current_bdiff"), + b64encode("\n".join(mock_diff.return_value))) + del entry.attrib['qtext'] + del entry.attrib["current_bdiff"] + self.assertItemsEqual(orig_entry.attrib, entry.attrib) + + @patch("os.path.exists") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.File.%s._makedirs" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._set_perms" % test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._write_tmpfile" % + test_obj.__name__) + @patch("Bcfg2.Client.Tools.POSIX.File.%s._rename_tmpfile" % + test_obj.__name__) + def test_install(self, mock_rename, mock_write, mock_set_perms, + mock_makedirs, mock_install, mock_exists): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + + def reset(): + mock_rename.reset_mock() + mock_write.reset_mock() + mock_set_perms.reset_mock() + mock_makedirs.reset_mock() + mock_install.reset_mock() + mock_exists.reset_mock() + + mock_exists.return_value = False + mock_makedirs.return_value = False + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + + reset() + mock_makedirs.return_value = True + mock_write.return_value = False + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + + reset() + newfile = '/test.X987yS' + mock_write.return_value = newfile + mock_set_perms.return_value = False + mock_rename.return_value = False + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + + reset() + mock_rename.return_value = True + mock_install.return_value = False + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(self.ptool, entry) + + reset() + mock_install.return_value = True + self.assertFalse(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(self.ptool, entry) + + reset() + mock_set_perms.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(self.ptool, entry) + + reset() + mock_exists.return_value = True + self.assertTrue(self.ptool.install(entry)) + mock_exists.assert_called_with("/") + self.assertFalse(mock_makedirs.called) + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(self.ptool, entry) + + @patch("time.time") + def test_diff(self, mock_time): + content1 = "line1\nline2" + content2 = "line3" + + self.now = 1345640723 + def time_rv(): + self.now += 1 + return self.now + mock_time.side_effect = time_rv + + rv = ["line1", "line2", "line3"] + func = Mock() + func.return_value = rv + self.assertItemsEqual(self.ptool._diff(content1, content2, func), rv) + func.assert_called_with(["line1", "line2"], ["line3"]) + + func.reset_mock() + mock_time.reset_mock() + def time_rv(): + self.now += 5 + return self.now + mock_time.side_effect = time_rv + + def slow_diff(content1, content2): + for i in range(1, 10): + yield "line%s" % i + func.side_effect = slow_diff + self.assertFalse(self.ptool._diff(content1, content2, func), rv) + func.assert_called_with(["line1", "line2"], ["line3"]) -- cgit v1.2.3-1-g7c22