summaryrefslogtreecommitdiffstats
path: root/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-09-04 09:52:57 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-09-04 09:52:57 -0400
commitd9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 (patch)
treecf59b7bcef389ca76f09c7f804db9d893b918e3b /testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
parent6697481f7bed646b4c66c54c9a46d11aa075af4a (diff)
downloadbcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.tar.gz
bcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.tar.bz2
bcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.zip
reorganized testsuite to allow tests on stuff outside of src
Diffstat (limited to 'testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py')
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py448
1 files changed, 0 insertions, 448 deletions
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
deleted file mode 100644
index cdf11ce5e..000000000
--- a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
+++ /dev/null
@@ -1,448 +0,0 @@
-# -*- coding: utf-8 -*-
-import os
-import sys
-import copy
-import difflib
-import binascii
-import lxml.etree
-from Bcfg2.Compat import b64encode, b64decode, u_str
-from mock import Mock, MagicMock, patch
-from Bcfg2.Client.Tools.POSIX.File import *
-
-# add all parent testsuite directories to sys.path to allow (most)
-# relative imports in python 2.4
-path = os.path.dirname(__file__)
-while path != "/":
- if os.path.basename(path).lower().startswith("test"):
- sys.path.append(path)
- if os.path.basename(path) == "testsuite":
- break
- path = os.path.dirname(path)
-from Test__init import get_posix_object
-from Testbase import TestPOSIXTool
-from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
- skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
- patchIf, datastore
-
-def get_file_object(posix=None):
- if posix is None:
- posix = get_posix_object()
- return POSIXFile(posix.logger, posix.setup, posix.config)
-
-class TestPOSIXFile(TestPOSIXTool):
- test_obj = POSIXFile
-
- def test_fully_specified(self):
- entry = lxml.etree.Element("Path", name="/test", type="file")
- self.assertFalse(self.ptool.fully_specified(entry))
-
- entry.set("empty", "true")
- self.assertTrue(self.ptool.fully_specified(entry))
-
- entry.set("empty", "false")
- entry.text = "text"
- self.assertTrue(self.ptool.fully_specified(entry))
-
- def test_is_string(self):
- for char in list(range(8)) + list(range(14, 32)):
- self.assertFalse(self.ptool._is_string("foo" + chr(char) + "bar",
- 'UTF-8'))
- for char in list(range(9, 14)) + list(range(33, 128)):
- self.assertTrue(self.ptool._is_string("foo" + chr(char) + "bar",
- 'UTF-8'))
- ustr = 'é'
- self.assertTrue(self.ptool._is_string(ustr, 'UTF-8'))
- if not inPy3k:
- self.assertFalse(self.ptool._is_string("foo" + chr(128) + "bar",
- 'ascii'))
- self.assertFalse(self.ptool._is_string(ustr, 'ascii'))
-
- def test_get_data(self):
- orig_entry = lxml.etree.Element("Path", name="/test", type="file")
- setup = dict(encoding="ascii", ppath='/', max_copies=5)
- ptool = self.get_obj(posix=get_posix_object(setup=setup))
-
- entry = copy.deepcopy(orig_entry)
- entry.text = b64encode("test")
- entry.set("encoding", "base64")
- self.assertEqual(ptool._get_data(entry), ("test", True))
-
- entry = copy.deepcopy(orig_entry)
- entry.set("empty", "true")
- self.assertEqual(ptool._get_data(entry), ("", False))
-
- entry = copy.deepcopy(orig_entry)
- entry.text = "test"
- self.assertEqual(ptool._get_data(entry), ("test", False))
-
- if inPy3k:
- ustr = 'é'
- else:
- ustr = u_str('é', 'UTF-8')
- entry = copy.deepcopy(orig_entry)
- entry.text = ustr
- self.assertEqual(ptool._get_data(entry), (ustr, False))
-
- @patch("%s.open" % builtins)
- @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._exists" % test_obj.__name__)
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__)
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_diffs" % test_obj.__name__)
- def test_verify(self, mock_get_diffs, mock_get_data, mock_exists,
- mock_verify, mock_open):
- entry = lxml.etree.Element("Path", name="/test", type="file")
- setup = dict(interactive=False, ppath='/', max_copies=5)
- ptool = self.get_obj(posix=get_posix_object(setup=setup))
-
- def reset():
- mock_get_diffs.reset_mock()
- mock_get_data.reset_mock()
- mock_exists.reset_mock()
- mock_verify.reset_mock()
- mock_open.reset_mock()
-
- mock_get_data.return_value = ("test", False)
- mock_exists.return_value = False
- mock_verify.return_value = True
- self.assertFalse(ptool.verify(entry, []))
- mock_exists.assert_called_with(entry)
- mock_verify.assert_called_with(ptool, entry, [])
- mock_get_diffs.assert_called_with(entry, interactive=False,
- sensitive=False,
- is_binary=False,
- content="")
-
- reset()
- exists_rv = MagicMock()
- exists_rv.__getitem__.return_value = 5
- mock_exists.return_value = exists_rv
- mock_get_data.return_value = ("test", True)
- self.assertFalse(ptool.verify(entry, []))
- mock_exists.assert_called_with(entry)
- mock_verify.assert_called_with(ptool, entry, [])
- mock_get_diffs.assert_called_with(entry, interactive=False,
- sensitive=False,
- is_binary=True,
- content=None)
-
- reset()
- mock_get_data.return_value = ("test", False)
- exists_rv.__getitem__.return_value = 4
- entry.set("sensitive", "true")
- mock_open.return_value.read.return_value = "tart"
- self.assertFalse(ptool.verify(entry, []))
- mock_exists.assert_called_with(entry)
- mock_verify.assert_called_with(ptool, entry, [])
- mock_open.assert_called_with(entry.get("name"))
- mock_open.return_value.read.assert_called_with()
- mock_get_diffs.assert_called_with(entry, interactive=False,
- sensitive=True,
- is_binary=False,
- content="tart")
-
- reset()
- mock_open.return_value.read.return_value = "test"
- self.assertTrue(ptool.verify(entry, []))
- mock_exists.assert_called_with(entry)
- mock_verify.assert_called_with(ptool, entry, [])
- mock_open.assert_called_with(entry.get("name"))
- mock_open.return_value.read.assert_called_with()
- self.assertFalse(mock_get_diffs.called)
-
- reset()
- mock_open.side_effect = IOError
- self.assertFalse(ptool.verify(entry, []))
- mock_exists.assert_called_with(entry)
- mock_open.assert_called_with(entry.get("name"))
-
- @patch("os.fdopen")
- @patch("tempfile.mkstemp")
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__)
- def test_write_tmpfile(self, mock_get_data, mock_mkstemp, mock_fdopen):
- entry = lxml.etree.Element("Path", name="/test", type="file",
- perms='0644', owner='root', group='root')
- newfile = "/foo/bar"
-
- def reset():
- mock_get_data.reset_mock()
- mock_mkstemp.reset_mock()
- mock_fdopen.reset_mock()
-
- mock_get_data.return_value = ("test", False)
- mock_mkstemp.return_value = (5, newfile)
- self.assertEqual(self.ptool._write_tmpfile(entry), newfile)
- mock_get_data.assert_called_with(entry)
- mock_mkstemp.assert_called_with(prefix='test', dir='/')
- mock_fdopen.assert_called_with(5, 'w')
- mock_fdopen.return_value.write.assert_called_with("test")
-
- reset()
- mock_mkstemp.side_effect = OSError
- self.assertFalse(self.ptool._write_tmpfile(entry))
- mock_mkstemp.assert_called_with(prefix='test', dir='/')
-
- reset()
- mock_mkstemp.side_effect = None
- mock_fdopen.side_effect = OSError
- self.assertFalse(self.ptool._write_tmpfile(entry))
- mock_mkstemp.assert_called_with(prefix='test', dir='/')
- mock_get_data.assert_called_with(entry)
- mock_fdopen.assert_called_with(5, 'w')
-
- @patch("os.rename")
- @patch("os.unlink")
- def test_rename_tmpfile(self, mock_unlink, mock_rename):
- entry = lxml.etree.Element("Path", name="/test", type="file",
- perms='0644', owner='root', group='root')
- newfile = "/foo/bar"
-
- self.assertTrue(self.ptool._rename_tmpfile(newfile, entry))
- mock_rename.assert_called_with(newfile, entry.get("name"))
-
- mock_rename.reset_mock()
- mock_unlink.reset_mock()
- mock_rename.side_effect = OSError
- self.assertFalse(self.ptool._rename_tmpfile(newfile, entry))
- mock_rename.assert_called_with(newfile, entry.get("name"))
- mock_unlink.assert_called_with(newfile)
-
- # even if the unlink fails, return false gracefully
- mock_rename.reset_mock()
- mock_unlink.reset_mock()
- mock_unlink.side_effect = OSError
- self.assertFalse(self.ptool._rename_tmpfile(newfile, entry))
- mock_rename.assert_called_with(newfile, entry.get("name"))
- mock_unlink.assert_called_with(newfile)
-
- @patch("%s.open" % builtins)
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._diff" % test_obj.__name__)
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__)
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._is_string" % test_obj.__name__)
- def test__get_diffs(self, mock_is_string, mock_get_data, mock_diff,
- mock_open):
- orig_entry = lxml.etree.Element("Path", name="/test", type="file",
- perms='0644', owner='root',
- group='root')
- orig_entry.text = "test"
- ondisk = "test2"
- setup = dict(encoding="utf-8", ppath='/', max_copies=5)
- ptool = self.get_obj(posix=get_posix_object(setup=setup))
-
- def reset():
- mock_is_string.reset_mock()
- mock_get_data.reset_mock()
- mock_diff.reset_mock()
- mock_open.reset_mock()
- return copy.deepcopy(orig_entry)
-
- mock_is_string.return_value = True
- mock_get_data.return_value = (orig_entry.text, False)
- mock_open.return_value.read.return_value = ondisk
- mock_diff.return_value = ["-test2", "+test"]
-
- # binary data in the entry
- entry = reset()
- ptool._get_diffs(entry, is_binary=True)
- mock_open.assert_called_with(entry.get("name"))
- mock_open.return_value.read.assert_any_call()
- self.assertFalse(mock_diff.called)
- self.assertEqual(entry.get("current_bfile"), b64encode(ondisk))
-
- # binary data on disk
- entry = reset()
- mock_is_string.return_value = False
- ptool._get_diffs(entry, content=ondisk)
- self.assertFalse(mock_open.called)
- self.assertFalse(mock_diff.called)
- self.assertEqual(entry.get("current_bfile"), b64encode(ondisk))
-
- # sensitive, non-interactive -- do nothing
- entry = reset()
- mock_is_string.return_value = True
- ptool._get_diffs(entry, sensitive=True, interactive=False)
- self.assertFalse(mock_open.called)
- self.assertFalse(mock_diff.called)
- self.assertXMLEqual(entry, orig_entry)
-
- # sensitive, interactive
- entry = reset()
- ptool._get_diffs(entry, sensitive=True, interactive=True)
- mock_open.assert_called_with(entry.get("name"))
- mock_open.return_value.read.assert_any_call()
- mock_diff.assert_called_with(ondisk, entry.text, difflib.unified_diff,
- filename=entry.get("name"))
- self.assertIsNotNone(entry.get("qtext"))
- del entry.attrib['qtext']
- self.assertItemsEqual(orig_entry.attrib, entry.attrib)
-
- # non-sensitive, non-interactive
- entry = reset()
- ptool._get_diffs(entry, content=ondisk)
- self.assertFalse(mock_open.called)
- mock_diff.assert_called_with(ondisk, entry.text, difflib.ndiff,
- filename=entry.get("name"))
- self.assertIsNone(entry.get("qtext"))
- self.assertEqual(entry.get("current_bdiff"),
- b64encode("\n".join(mock_diff.return_value)))
- del entry.attrib["current_bdiff"]
- self.assertItemsEqual(orig_entry.attrib, entry.attrib)
-
- # non-sensitive, interactive -- do everything. also test
- # appending to qtext
- entry = reset()
- entry.set("qtext", "test")
- ptool._get_diffs(entry, interactive=True)
- mock_open.assert_called_with(entry.get("name"))
- mock_open.return_value.read.assert_any_call()
- self.assertItemsEqual(mock_diff.call_args_list,
- [call(ondisk, entry.text, difflib.unified_diff,
- filename=entry.get("name")),
- call(ondisk, entry.text, difflib.ndiff,
- filename=entry.get("name"))])
- self.assertIsNotNone(entry.get("qtext"))
- self.assertTrue(entry.get("qtext").startswith("test\n"))
- self.assertEqual(entry.get("current_bdiff"),
- b64encode("\n".join(mock_diff.return_value)))
- del entry.attrib['qtext']
- del entry.attrib["current_bdiff"]
- self.assertItemsEqual(orig_entry.attrib, entry.attrib)
-
- # non-sensitive, interactive with unicode data
- entry = reset()
- entry.text = u("tëst")
- encoded = entry.text.encode(setup['encoding'])
- mock_diff.return_value = ["-test2", "+tëst"]
- mock_get_data.return_value = (encoded, False)
- ptool._get_diffs(entry, interactive=True)
- mock_open.assert_called_with(entry.get("name"))
- mock_open.return_value.read.assert_any_call()
- self.assertItemsEqual(mock_diff.call_args_list,
- [call(ondisk, encoded, difflib.unified_diff,
- filename=entry.get("name")),
- call(ondisk, encoded, difflib.ndiff,
- filename=entry.get("name"))])
- self.assertIsNotNone(entry.get("qtext"))
- self.assertEqual(entry.get("current_bdiff"),
- b64encode("\n".join(mock_diff.return_value)))
- del entry.attrib['qtext']
- del entry.attrib["current_bdiff"]
- self.assertItemsEqual(orig_entry.attrib, entry.attrib)
-
- @patch("os.path.exists")
- @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._makedirs" % test_obj.__name__)
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._set_perms" % test_obj.__name__)
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._write_tmpfile" %
- test_obj.__name__)
- @patch("Bcfg2.Client.Tools.POSIX.File.%s._rename_tmpfile" %
- test_obj.__name__)
- def test_install(self, mock_rename, mock_write, mock_set_perms,
- mock_makedirs, mock_install, mock_exists):
- entry = lxml.etree.Element("Path", name="/test", type="file",
- perms='0644', owner='root', group='root')
-
- def reset():
- mock_rename.reset_mock()
- mock_write.reset_mock()
- mock_set_perms.reset_mock()
- mock_makedirs.reset_mock()
- mock_install.reset_mock()
- mock_exists.reset_mock()
-
- mock_exists.return_value = False
- mock_makedirs.return_value = False
- self.assertFalse(self.ptool.install(entry))
- mock_exists.assert_called_with("/")
- mock_makedirs.assert_called_with(entry, path="/")
-
- reset()
- mock_makedirs.return_value = True
- mock_write.return_value = False
- self.assertFalse(self.ptool.install(entry))
- mock_exists.assert_called_with("/")
- mock_makedirs.assert_called_with(entry, path="/")
- mock_write.assert_called_with(entry)
-
- reset()
- newfile = '/test.X987yS'
- mock_write.return_value = newfile
- mock_set_perms.return_value = False
- mock_rename.return_value = False
- self.assertFalse(self.ptool.install(entry))
- mock_exists.assert_called_with("/")
- mock_makedirs.assert_called_with(entry, path="/")
- mock_write.assert_called_with(entry)
- mock_set_perms.assert_called_with(entry, path=newfile)
- mock_rename.assert_called_with(newfile, entry)
-
- reset()
- mock_rename.return_value = True
- mock_install.return_value = False
- self.assertFalse(self.ptool.install(entry))
- mock_exists.assert_called_with("/")
- mock_makedirs.assert_called_with(entry, path="/")
- mock_write.assert_called_with(entry)
- mock_set_perms.assert_called_with(entry, path=newfile)
- mock_rename.assert_called_with(newfile, entry)
- mock_install.assert_called_with(self.ptool, entry)
-
- reset()
- mock_install.return_value = True
- self.assertFalse(self.ptool.install(entry))
- mock_exists.assert_called_with("/")
- mock_makedirs.assert_called_with(entry, path="/")
- mock_write.assert_called_with(entry)
- mock_set_perms.assert_called_with(entry, path=newfile)
- mock_rename.assert_called_with(newfile, entry)
- mock_install.assert_called_with(self.ptool, entry)
-
- reset()
- mock_set_perms.return_value = True
- self.assertTrue(self.ptool.install(entry))
- mock_exists.assert_called_with("/")
- mock_makedirs.assert_called_with(entry, path="/")
- mock_write.assert_called_with(entry)
- mock_set_perms.assert_called_with(entry, path=newfile)
- mock_rename.assert_called_with(newfile, entry)
- mock_install.assert_called_with(self.ptool, entry)
-
- reset()
- mock_exists.return_value = True
- self.assertTrue(self.ptool.install(entry))
- mock_exists.assert_called_with("/")
- self.assertFalse(mock_makedirs.called)
- mock_write.assert_called_with(entry)
- mock_set_perms.assert_called_with(entry, path=newfile)
- mock_rename.assert_called_with(newfile, entry)
- mock_install.assert_called_with(self.ptool, entry)
-
- @patch("time.time")
- def test_diff(self, mock_time):
- content1 = "line1\nline2"
- content2 = "line3"
-
- self.now = 1345640723
- def time_rv():
- self.now += 1
- return self.now
- mock_time.side_effect = time_rv
-
- rv = ["line1", "line2", "line3"]
- func = Mock()
- func.return_value = rv
- self.assertItemsEqual(self.ptool._diff(content1, content2, func), rv)
- func.assert_called_with(["line1", "line2"], ["line3"])
-
- func.reset_mock()
- mock_time.reset_mock()
- def time_rv():
- self.now += 5
- return self.now
- mock_time.side_effect = time_rv
-
- def slow_diff(content1, content2):
- for i in range(1, 10):
- yield "line%s" % i
- func.side_effect = slow_diff
- self.assertFalse(self.ptool._diff(content1, content2, func), rv)
- func.assert_called_with(["line1", "line2"], ["line3"])