summaryrefslogtreecommitdiffstats
path: root/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py')
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py318
1 files changed, 318 insertions, 0 deletions
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
new file mode 100644
index 000000000..a2cd52dd5
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
@@ -0,0 +1,318 @@
+import os
+import copy
+import binascii
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.File import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_file_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXFile(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXFile(unittest.TestCase):
+ def test_fully_specified(self):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ ptool = get_file_object()
+ self.assertFalse(ptool.fully_specified(entry))
+
+ entry.set("empty", "true")
+ self.assertTrue(ptool.fully_specified(entry))
+
+ entry.set("empty", "false")
+ entry.text = "text"
+ self.assertTrue(ptool.fully_specified(entry))
+
+ def test_is_string(self):
+ ptool = get_file_object()
+ for char in range(8) + range(14, 32):
+ self.assertFalse(ptool._is_string("foo" + chr(char) + "bar",
+ 'utf_8'))
+ for char in range(9, 14) + range(33, 128):
+ self.assertTrue(ptool._is_string("foo" + chr(char) + "bar",
+ 'utf_8'))
+ self.assertFalse(ptool._is_string("foo" + chr(128) + "bar",
+ 'ascii'))
+ ustr = '\xef\xa3\x91 + \xef\xa3\x92'
+ self.assertTrue(ptool._is_string(ustr, 'utf_8'))
+ self.assertFalse(ptool._is_string(ustr, 'ascii'))
+
+ def test_get_data(self):
+ orig_entry = lxml.etree.Element("Path", name="/test", type="file")
+ setup = dict(encoding="ascii", ppath='/', max_copies=5)
+ ptool = get_file_object(posix=get_posix_object(setup=setup))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.text = binascii.b2a_base64("test")
+ entry.set("encoding", "base64")
+ self.assertEqual(ptool._get_data(entry), ("test", True))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.set("empty", "true")
+ self.assertEqual(ptool._get_data(entry), ("", False))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.text = "test"
+ self.assertEqual(ptool._get_data(entry), ("test", False))
+
+ ustr = u'\uf8d1 + \uf8d2'
+ entry = copy.deepcopy(orig_entry)
+ entry.text = ustr
+ self.assertEqual(ptool._get_data(entry), (ustr, False))
+
+ setup['encoding'] = "utf_8"
+ ptool = get_file_object(posix=get_posix_object(setup=setup))
+ entry = copy.deepcopy(orig_entry)
+ entry.text = ustr
+ self.assertEqual(ptool._get_data(entry),
+ ('\xef\xa3\x91 + \xef\xa3\x92', False))
+
+ @patch("__builtin__.open")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_diffs")
+ def test_verify(self, mock_get_diffs, mock_get_data, mock_exists,
+ mock_verify, mock_open):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ setup = dict(interactive=False, ppath='/', max_copies=5)
+ ptool = get_file_object(posix=get_posix_object(setup=setup))
+
+ def reset():
+ mock_get_diffs.reset_mock()
+ mock_get_data.reset_mock()
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_open.reset_mock()
+
+ mock_get_data.return_value = ("test", False)
+ mock_exists.return_value = False
+ mock_verify.return_value = True
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=False,
+ is_binary=False,
+ content="")
+
+ reset()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = 5
+ mock_exists.return_value = exists_rv
+ mock_get_data.return_value = ("test", True)
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=False,
+ is_binary=True,
+ content=None)
+
+ reset()
+ mock_get_data.return_value = ("test", False)
+ exists_rv.__getitem__.return_value = 4
+ entry.set("sensitive", "true")
+ open_rv = Mock()
+ open_rv.read.return_value = "tart"
+ mock_open.return_value = open_rv
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_open.assert_called_with(entry.get("name"))
+ open_rv.assert_any_call()
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=True,
+ is_binary=False,
+ content="tart")
+
+ reset()
+ open_rv.read.return_value = "test"
+ mock_open.return_value = open_rv
+ self.assertTrue(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_open.assert_called_with(entry.get("name"))
+ open_rv.assert_any_call()
+ self.assertFalse(mock_get_diffs.called)
+
+ reset()
+ mock_open.side_effect = IOError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_open.assert_called_with(entry.get("name"))
+
+ @patch("os.fdopen")
+ @patch("tempfile.mkstemp")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data")
+ def test_write_tmpfile(self, mock_get_data, mock_mkstemp, mock_fdopen):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ ptool = get_file_object()
+ newfile = "/foo/bar"
+
+ def reset():
+ mock_get_data.reset_mock()
+ mock_mkstemp.reset_mock()
+ mock_fdopen.reset_mock()
+
+ mock_get_data.return_value = ("test", False)
+ mock_mkstemp.return_value = (5, newfile)
+ self.assertEqual(ptool._write_tmpfile(entry), newfile)
+ mock_get_data.assert_called_with(entry)
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+ mock_fdopen.assert_called_with(5, 'w')
+ mock_fdopen.return_value.write.assert_called_with("test")
+
+ reset()
+ mock_mkstemp.side_effect = OSError
+ self.assertFalse(ptool._write_tmpfile(entry))
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+
+ reset()
+ mock_mkstemp.side_effect = None
+ mock_fdopen.side_effect = OSError
+ self.assertFalse(ptool._write_tmpfile(entry))
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+ mock_get_data.assert_called_with(entry)
+ mock_fdopen.assert_called_with(5, 'w')
+
+ @patch("os.rename")
+ @patch("os.unlink")
+ def test_rename_tmpfile(self, mock_unlink, mock_rename):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ ptool = get_file_object()
+ newfile = "/foo/bar"
+
+ self.assertTrue(ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+
+ mock_rename.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rename.side_effect = OSError
+ self.assertFalse(ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+ mock_unlink.assert_called_with(newfile)
+
+ # even if the unlink fails, return false gracefully
+ mock_rename.reset_mock()
+ mock_unlink.reset_mock()
+ mock_unlink.side_effect = OSError
+ self.assertFalse(ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+ mock_unlink.assert_called_with(newfile)
+
+ @patch("os.path.exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._makedirs")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._set_perms")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._write_tmpfile")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._rename_tmpfile")
+ def test_install(self, mock_rename, mock_write, mock_set_perms,
+ mock_makedirs, mock_install, mock_exists):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ ptool = get_file_object()
+
+ def reset():
+ mock_rename.reset_mock()
+ mock_write.reset_mock()
+ mock_set_perms.reset_mock()
+ mock_makedirs.reset_mock()
+ mock_install.reset_mock()
+ mock_exists.reset_mock()
+
+ mock_exists.return_value = False
+ mock_makedirs.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+
+ reset()
+ mock_makedirs.return_value = True
+ mock_write.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+
+ reset()
+ newfile = '/test.X987yS'
+ mock_write.return_value = newfile
+ mock_set_perms.return_value = False
+ mock_rename.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+
+ reset()
+ mock_rename.return_value = True
+ mock_install.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_install.return_value = True
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_set_perms.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_exists.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ self.assertFalse(mock_makedirs.called)
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ def test_diff(self):
+ ptool = get_file_object()
+ content1 = "line1\nline2"
+ content2 = "line3"
+ rv = ["line1", "line2", "line3"]
+ func = Mock()
+ func.return_value = rv
+ self.assertItemsEqual(ptool._diff(content1, content2, func), rv)
+ func.assert_called_with(["line1", "line2"], ["line3"])
+
+ func.reset_mock()
+ def slow_diff(content1, content2):
+ for i in range(1, 10):
+ time.sleep(5)
+ yield "line%s" % i
+ func.side_effect = slow_diff
+ self.assertFalse(ptool._diff(content1, content2, func), rv)
+ func.assert_called_with(["line1", "line2"], ["line3"])