summaryrefslogtreecommitdiffstats
path: root/testsuite/Testsrc
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-09-04 09:52:57 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-09-04 09:52:57 -0400
commitd9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 (patch)
treecf59b7bcef389ca76f09c7f804db9d893b918e3b /testsuite/Testsrc
parent6697481f7bed646b4c66c54c9a46d11aa075af4a (diff)
downloadbcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.tar.gz
bcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.tar.bz2
bcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.zip
reorganized testsuite to allow tests on stuff outside of src
Diffstat (limited to 'testsuite/Testsrc')
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py144
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py159
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py448
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py85
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py91
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py5
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py99
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py252
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py991
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/__init__.py0
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/TestTools/__init__.py0
-rw-r--r--testsuite/Testsrc/Testlib/TestClient/__init__.py0
-rw-r--r--testsuite/Testsrc/Testlib/TestOptions.py238
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin.py2296
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py1470
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py549
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py109
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py120
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/__init__.py0
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/__init__.py0
-rw-r--r--testsuite/Testsrc/Testlib/__init__.py0
21 files changed, 7056 insertions, 0 deletions
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py
new file mode 100644
index 000000000..a18327fe0
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py
@@ -0,0 +1,144 @@
+import os
+import sys
+import copy
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Device import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from Test__init import get_posix_object
+from Testbase import TestPOSIXTool
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+class TestPOSIXDevice(TestPOSIXTool):
+ test_obj = POSIXDevice
+
+ def test_fully_specified(self):
+ ptool = self.get_obj()
+ orig_entry = lxml.etree.Element("Path", name="/test", type="device",
+ dev_type="fifo")
+ self.assertTrue(ptool.fully_specified(orig_entry))
+ for dtype in ["block", "char"]:
+ for attr in ["major", "minor"]:
+ entry = copy.deepcopy(orig_entry)
+ entry.set("dev_type", dtype)
+ entry.set(attr, "0")
+ self.assertFalse(ptool.fully_specified(entry))
+ entry = copy.deepcopy(orig_entry)
+ entry.set("dev_type", dtype)
+ entry.set("major", "0")
+ entry.set("minor", "0")
+ self.assertTrue(ptool.fully_specified(entry))
+
+ @patch("os.major")
+ @patch("os.minor")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ def test_verify(self, mock_verify, mock_exists, mock_minor, mock_major):
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ mode='0644', owner='root', group='root',
+ dev_type="block", major="0", minor="10")
+ ptool = self.get_obj()
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_minor.reset_mock()
+ mock_major.reset_mock()
+
+ mock_exists.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+
+ reset()
+ mock_exists.return_value = MagicMock()
+ mock_major.return_value = 0
+ mock_minor.return_value = 10
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_exists.assert_called_with(entry)
+ mock_major.assert_called_with(mock_exists.return_value.st_rdev)
+ mock_minor.assert_called_with(mock_exists.return_value.st_rdev)
+
+ reset()
+ mock_exists.return_value = MagicMock()
+ mock_major.return_value = 0
+ mock_minor.return_value = 10
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_exists.assert_called_with(entry)
+ mock_major.assert_called_with(mock_exists.return_value.st_rdev)
+ mock_minor.assert_called_with(mock_exists.return_value.st_rdev)
+
+ reset()
+ mock_verify.return_value = True
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ mode='0644', owner='root', group='root',
+ dev_type="fifo")
+ self.assertTrue(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ self.assertFalse(mock_major.called)
+ self.assertFalse(mock_minor.called)
+
+ @patch("os.makedev")
+ @patch("os.mknod")
+ @patch("Bcfg2.Client.Tools.POSIX.Device.%s._exists" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ def test_install(self, mock_install, mock_exists, mock_mknod, mock_makedev):
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ mode='0644', owner='root', group='root',
+ dev_type="block", major="0", minor="10")
+ ptool = self.get_obj()
+
+ mock_exists.return_value = False
+ mock_makedev.return_value = Mock()
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_makedev.assert_called_with(0, 10)
+ mock_mknod.assert_called_with(entry.get("name"), # 0o644
+ device_map[entry.get("dev_type")] | 420,
+ mock_makedev.return_value)
+ mock_install.assert_called_with(ptool, entry)
+
+ mock_makedev.reset_mock()
+ mock_mknod.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_makedev.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+
+ mock_makedev.reset_mock()
+ mock_mknod.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_mknod.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+
+ mock_makedev.reset_mock()
+ mock_mknod.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_mknod.side_effect = None
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ mode='0644', owner='root', group='root',
+ dev_type="fifo")
+
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_mknod.assert_called_with(entry.get("name"), # 0o644
+ device_map[entry.get("dev_type")] | 420)
+ mock_install.assert_called_with(ptool, entry)
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py
new file mode 100644
index 000000000..e01bd7453
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py
@@ -0,0 +1,159 @@
+import os
+import sys
+import stat
+import copy
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Directory import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from Test__init import get_posix_object
+from Testbase import TestPOSIXTool
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+class TestPOSIXDirectory(TestPOSIXTool):
+ test_obj = POSIXDirectory
+
+ @patch("os.listdir")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ @patch("Bcfg2.Client.Tools.POSIX.Directory.%s._exists" % test_obj.__name__)
+ def test_verify(self, mock_exists, mock_verify, mock_listdir):
+ entry = lxml.etree.Element("Path", name="/test", type="directory",
+ perms='0644', owner='root', group='root')
+
+ mock_exists.return_value = False
+ self.assertFalse(self.ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+
+ mock_exists.reset_mock()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = stat.S_IFREG | 420 # 0o644
+ mock_exists.return_value = exists_rv
+ self.assertFalse(self.ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+
+ mock_exists.reset_mock()
+ mock_verify.return_value = False
+ exists_rv.__getitem__.return_value = stat.S_IFDIR | 420 # 0o644
+ self.assertFalse(self.ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(self.ptool, entry, [])
+
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(self.ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(self.ptool, entry, [])
+
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ entry.set("prune", "true")
+ orig_entry = copy.deepcopy(entry)
+
+ entries = ["foo", "bar", "bar/baz"]
+ mock_listdir.return_value = entries
+ modlist = [os.path.join(entry.get("name"), entries[0])]
+ self.assertFalse(self.ptool.verify(entry, modlist))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(self.ptool, entry, modlist)
+ mock_listdir.assert_called_with(entry.get("name"))
+ expected = [os.path.join(entry.get("name"), e)
+ for e in entries
+ if os.path.join(entry.get("name"), e) not in modlist]
+ actual = [e.get("path") for e in entry.findall("Prune")]
+ self.assertItemsEqual(expected, actual)
+
+ mock_verify.reset_mock()
+ mock_exists.reset_mock()
+ mock_listdir.reset_mock()
+ entry = copy.deepcopy(orig_entry)
+ modlist = [os.path.join(entry.get("name"), e)
+ for e in entries]
+ self.assertTrue(self.ptool.verify(entry, modlist))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(self.ptool, entry, modlist)
+ mock_listdir.assert_called_with(entry.get("name"))
+ self.assertEqual(len(entry.findall("Prune")), 0)
+
+ @patch("os.unlink")
+ @patch("os.path.isdir")
+ @patch("shutil.rmtree")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.Directory.%s._exists" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.Directory.%s._makedirs" %
+ test_obj.__name__)
+ def test_install(self, mock_makedirs, mock_exists, mock_install,
+ mock_rmtree, mock_isdir, mock_unlink):
+ entry = lxml.etree.Element("Path", name="/test/foo/bar",
+ type="directory", perms='0644',
+ owner='root', group='root')
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_rmtree.mock_makedirs()
+
+ mock_makedirs.return_value = True
+ mock_exists.return_value = False
+ mock_install.return_value = True
+ self.assertTrue(self.ptool.install(entry))
+ mock_exists.assert_called_with(entry)
+ mock_install.assert_called_with(self.ptool, entry)
+ mock_makedirs.assert_called_with(entry)
+
+ reset()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = stat.S_IFREG | 420 # 0o644
+ mock_exists.return_value = exists_rv
+ self.assertTrue(self.ptool.install(entry))
+ mock_unlink.assert_called_with(entry.get("name"))
+ mock_exists.assert_called_with(entry)
+ mock_makedirs.assert_called_with(entry)
+ mock_install.assert_called_with(self.ptool, entry)
+
+ reset()
+ exists_rv.__getitem__.return_value = stat.S_IFDIR | 420 # 0o644
+ mock_install.return_value = True
+ self.assertTrue(self.ptool.install(entry))
+ mock_exists.assert_called_with(entry)
+ mock_install.assert_called_with(self.ptool, entry)
+
+ reset()
+ mock_install.return_value = False
+ self.assertFalse(self.ptool.install(entry))
+ mock_install.assert_called_with(self.ptool, entry)
+
+ entry.set("prune", "true")
+ prune = ["/test/foo/bar/prune1", "/test/foo/bar/prune2"]
+ for path in prune:
+ lxml.etree.SubElement(entry, "Prune", path=path)
+
+ reset()
+ mock_install.return_value = True
+
+ def isdir_rv(path):
+ if path.endswith("prune2"):
+ return True
+ else:
+ return False
+ mock_isdir.side_effect = isdir_rv
+ self.assertTrue(self.ptool.install(entry))
+ mock_exists.assert_called_with(entry)
+ mock_install.assert_called_with(self.ptool, entry)
+ self.assertItemsEqual(mock_isdir.call_args_list,
+ [call(p) for p in prune])
+ mock_unlink.assert_called_with("/test/foo/bar/prune1")
+ mock_rmtree.assert_called_with("/test/foo/bar/prune2")
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
new file mode 100644
index 000000000..cdf11ce5e
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
@@ -0,0 +1,448 @@
+# -*- coding: utf-8 -*-
+import os
+import sys
+import copy
+import difflib
+import binascii
+import lxml.etree
+from Bcfg2.Compat import b64encode, b64decode, u_str
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.File import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from Test__init import get_posix_object
+from Testbase import TestPOSIXTool
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+def get_file_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXFile(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXFile(TestPOSIXTool):
+ test_obj = POSIXFile
+
+ def test_fully_specified(self):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ self.assertFalse(self.ptool.fully_specified(entry))
+
+ entry.set("empty", "true")
+ self.assertTrue(self.ptool.fully_specified(entry))
+
+ entry.set("empty", "false")
+ entry.text = "text"
+ self.assertTrue(self.ptool.fully_specified(entry))
+
+ def test_is_string(self):
+ for char in list(range(8)) + list(range(14, 32)):
+ self.assertFalse(self.ptool._is_string("foo" + chr(char) + "bar",
+ 'UTF-8'))
+ for char in list(range(9, 14)) + list(range(33, 128)):
+ self.assertTrue(self.ptool._is_string("foo" + chr(char) + "bar",
+ 'UTF-8'))
+ ustr = 'é'
+ self.assertTrue(self.ptool._is_string(ustr, 'UTF-8'))
+ if not inPy3k:
+ self.assertFalse(self.ptool._is_string("foo" + chr(128) + "bar",
+ 'ascii'))
+ self.assertFalse(self.ptool._is_string(ustr, 'ascii'))
+
+ def test_get_data(self):
+ orig_entry = lxml.etree.Element("Path", name="/test", type="file")
+ setup = dict(encoding="ascii", ppath='/', max_copies=5)
+ ptool = self.get_obj(posix=get_posix_object(setup=setup))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.text = b64encode("test")
+ entry.set("encoding", "base64")
+ self.assertEqual(ptool._get_data(entry), ("test", True))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.set("empty", "true")
+ self.assertEqual(ptool._get_data(entry), ("", False))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.text = "test"
+ self.assertEqual(ptool._get_data(entry), ("test", False))
+
+ if inPy3k:
+ ustr = 'é'
+ else:
+ ustr = u_str('é', 'UTF-8')
+ entry = copy.deepcopy(orig_entry)
+ entry.text = ustr
+ self.assertEqual(ptool._get_data(entry), (ustr, False))
+
+ @patch("%s.open" % builtins)
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._exists" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_diffs" % test_obj.__name__)
+ def test_verify(self, mock_get_diffs, mock_get_data, mock_exists,
+ mock_verify, mock_open):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ setup = dict(interactive=False, ppath='/', max_copies=5)
+ ptool = self.get_obj(posix=get_posix_object(setup=setup))
+
+ def reset():
+ mock_get_diffs.reset_mock()
+ mock_get_data.reset_mock()
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_open.reset_mock()
+
+ mock_get_data.return_value = ("test", False)
+ mock_exists.return_value = False
+ mock_verify.return_value = True
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=False,
+ is_binary=False,
+ content="")
+
+ reset()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = 5
+ mock_exists.return_value = exists_rv
+ mock_get_data.return_value = ("test", True)
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=False,
+ is_binary=True,
+ content=None)
+
+ reset()
+ mock_get_data.return_value = ("test", False)
+ exists_rv.__getitem__.return_value = 4
+ entry.set("sensitive", "true")
+ mock_open.return_value.read.return_value = "tart"
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_open.assert_called_with(entry.get("name"))
+ mock_open.return_value.read.assert_called_with()
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=True,
+ is_binary=False,
+ content="tart")
+
+ reset()
+ mock_open.return_value.read.return_value = "test"
+ self.assertTrue(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_open.assert_called_with(entry.get("name"))
+ mock_open.return_value.read.assert_called_with()
+ self.assertFalse(mock_get_diffs.called)
+
+ reset()
+ mock_open.side_effect = IOError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_open.assert_called_with(entry.get("name"))
+
+ @patch("os.fdopen")
+ @patch("tempfile.mkstemp")
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__)
+ def test_write_tmpfile(self, mock_get_data, mock_mkstemp, mock_fdopen):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ newfile = "/foo/bar"
+
+ def reset():
+ mock_get_data.reset_mock()
+ mock_mkstemp.reset_mock()
+ mock_fdopen.reset_mock()
+
+ mock_get_data.return_value = ("test", False)
+ mock_mkstemp.return_value = (5, newfile)
+ self.assertEqual(self.ptool._write_tmpfile(entry), newfile)
+ mock_get_data.assert_called_with(entry)
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+ mock_fdopen.assert_called_with(5, 'w')
+ mock_fdopen.return_value.write.assert_called_with("test")
+
+ reset()
+ mock_mkstemp.side_effect = OSError
+ self.assertFalse(self.ptool._write_tmpfile(entry))
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+
+ reset()
+ mock_mkstemp.side_effect = None
+ mock_fdopen.side_effect = OSError
+ self.assertFalse(self.ptool._write_tmpfile(entry))
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+ mock_get_data.assert_called_with(entry)
+ mock_fdopen.assert_called_with(5, 'w')
+
+ @patch("os.rename")
+ @patch("os.unlink")
+ def test_rename_tmpfile(self, mock_unlink, mock_rename):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ newfile = "/foo/bar"
+
+ self.assertTrue(self.ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+
+ mock_rename.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rename.side_effect = OSError
+ self.assertFalse(self.ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+ mock_unlink.assert_called_with(newfile)
+
+ # even if the unlink fails, return false gracefully
+ mock_rename.reset_mock()
+ mock_unlink.reset_mock()
+ mock_unlink.side_effect = OSError
+ self.assertFalse(self.ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+ mock_unlink.assert_called_with(newfile)
+
+ @patch("%s.open" % builtins)
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._diff" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._get_data" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._is_string" % test_obj.__name__)
+ def test__get_diffs(self, mock_is_string, mock_get_data, mock_diff,
+ mock_open):
+ orig_entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root',
+ group='root')
+ orig_entry.text = "test"
+ ondisk = "test2"
+ setup = dict(encoding="utf-8", ppath='/', max_copies=5)
+ ptool = self.get_obj(posix=get_posix_object(setup=setup))
+
+ def reset():
+ mock_is_string.reset_mock()
+ mock_get_data.reset_mock()
+ mock_diff.reset_mock()
+ mock_open.reset_mock()
+ return copy.deepcopy(orig_entry)
+
+ mock_is_string.return_value = True
+ mock_get_data.return_value = (orig_entry.text, False)
+ mock_open.return_value.read.return_value = ondisk
+ mock_diff.return_value = ["-test2", "+test"]
+
+ # binary data in the entry
+ entry = reset()
+ ptool._get_diffs(entry, is_binary=True)
+ mock_open.assert_called_with(entry.get("name"))
+ mock_open.return_value.read.assert_any_call()
+ self.assertFalse(mock_diff.called)
+ self.assertEqual(entry.get("current_bfile"), b64encode(ondisk))
+
+ # binary data on disk
+ entry = reset()
+ mock_is_string.return_value = False
+ ptool._get_diffs(entry, content=ondisk)
+ self.assertFalse(mock_open.called)
+ self.assertFalse(mock_diff.called)
+ self.assertEqual(entry.get("current_bfile"), b64encode(ondisk))
+
+ # sensitive, non-interactive -- do nothing
+ entry = reset()
+ mock_is_string.return_value = True
+ ptool._get_diffs(entry, sensitive=True, interactive=False)
+ self.assertFalse(mock_open.called)
+ self.assertFalse(mock_diff.called)
+ self.assertXMLEqual(entry, orig_entry)
+
+ # sensitive, interactive
+ entry = reset()
+ ptool._get_diffs(entry, sensitive=True, interactive=True)
+ mock_open.assert_called_with(entry.get("name"))
+ mock_open.return_value.read.assert_any_call()
+ mock_diff.assert_called_with(ondisk, entry.text, difflib.unified_diff,
+ filename=entry.get("name"))
+ self.assertIsNotNone(entry.get("qtext"))
+ del entry.attrib['qtext']
+ self.assertItemsEqual(orig_entry.attrib, entry.attrib)
+
+ # non-sensitive, non-interactive
+ entry = reset()
+ ptool._get_diffs(entry, content=ondisk)
+ self.assertFalse(mock_open.called)
+ mock_diff.assert_called_with(ondisk, entry.text, difflib.ndiff,
+ filename=entry.get("name"))
+ self.assertIsNone(entry.get("qtext"))
+ self.assertEqual(entry.get("current_bdiff"),
+ b64encode("\n".join(mock_diff.return_value)))
+ del entry.attrib["current_bdiff"]
+ self.assertItemsEqual(orig_entry.attrib, entry.attrib)
+
+ # non-sensitive, interactive -- do everything. also test
+ # appending to qtext
+ entry = reset()
+ entry.set("qtext", "test")
+ ptool._get_diffs(entry, interactive=True)
+ mock_open.assert_called_with(entry.get("name"))
+ mock_open.return_value.read.assert_any_call()
+ self.assertItemsEqual(mock_diff.call_args_list,
+ [call(ondisk, entry.text, difflib.unified_diff,
+ filename=entry.get("name")),
+ call(ondisk, entry.text, difflib.ndiff,
+ filename=entry.get("name"))])
+ self.assertIsNotNone(entry.get("qtext"))
+ self.assertTrue(entry.get("qtext").startswith("test\n"))
+ self.assertEqual(entry.get("current_bdiff"),
+ b64encode("\n".join(mock_diff.return_value)))
+ del entry.attrib['qtext']
+ del entry.attrib["current_bdiff"]
+ self.assertItemsEqual(orig_entry.attrib, entry.attrib)
+
+ # non-sensitive, interactive with unicode data
+ entry = reset()
+ entry.text = u("tëst")
+ encoded = entry.text.encode(setup['encoding'])
+ mock_diff.return_value = ["-test2", "+tëst"]
+ mock_get_data.return_value = (encoded, False)
+ ptool._get_diffs(entry, interactive=True)
+ mock_open.assert_called_with(entry.get("name"))
+ mock_open.return_value.read.assert_any_call()
+ self.assertItemsEqual(mock_diff.call_args_list,
+ [call(ondisk, encoded, difflib.unified_diff,
+ filename=entry.get("name")),
+ call(ondisk, encoded, difflib.ndiff,
+ filename=entry.get("name"))])
+ self.assertIsNotNone(entry.get("qtext"))
+ self.assertEqual(entry.get("current_bdiff"),
+ b64encode("\n".join(mock_diff.return_value)))
+ del entry.attrib['qtext']
+ del entry.attrib["current_bdiff"]
+ self.assertItemsEqual(orig_entry.attrib, entry.attrib)
+
+ @patch("os.path.exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._makedirs" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._set_perms" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._write_tmpfile" %
+ test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.File.%s._rename_tmpfile" %
+ test_obj.__name__)
+ def test_install(self, mock_rename, mock_write, mock_set_perms,
+ mock_makedirs, mock_install, mock_exists):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+
+ def reset():
+ mock_rename.reset_mock()
+ mock_write.reset_mock()
+ mock_set_perms.reset_mock()
+ mock_makedirs.reset_mock()
+ mock_install.reset_mock()
+ mock_exists.reset_mock()
+
+ mock_exists.return_value = False
+ mock_makedirs.return_value = False
+ self.assertFalse(self.ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+
+ reset()
+ mock_makedirs.return_value = True
+ mock_write.return_value = False
+ self.assertFalse(self.ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+
+ reset()
+ newfile = '/test.X987yS'
+ mock_write.return_value = newfile
+ mock_set_perms.return_value = False
+ mock_rename.return_value = False
+ self.assertFalse(self.ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+
+ reset()
+ mock_rename.return_value = True
+ mock_install.return_value = False
+ self.assertFalse(self.ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(self.ptool, entry)
+
+ reset()
+ mock_install.return_value = True
+ self.assertFalse(self.ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(self.ptool, entry)
+
+ reset()
+ mock_set_perms.return_value = True
+ self.assertTrue(self.ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(self.ptool, entry)
+
+ reset()
+ mock_exists.return_value = True
+ self.assertTrue(self.ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ self.assertFalse(mock_makedirs.called)
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(self.ptool, entry)
+
+ @patch("time.time")
+ def test_diff(self, mock_time):
+ content1 = "line1\nline2"
+ content2 = "line3"
+
+ self.now = 1345640723
+ def time_rv():
+ self.now += 1
+ return self.now
+ mock_time.side_effect = time_rv
+
+ rv = ["line1", "line2", "line3"]
+ func = Mock()
+ func.return_value = rv
+ self.assertItemsEqual(self.ptool._diff(content1, content2, func), rv)
+ func.assert_called_with(["line1", "line2"], ["line3"])
+
+ func.reset_mock()
+ mock_time.reset_mock()
+ def time_rv():
+ self.now += 5
+ return self.now
+ mock_time.side_effect = time_rv
+
+ def slow_diff(content1, content2):
+ for i in range(1, 10):
+ yield "line%s" % i
+ func.side_effect = slow_diff
+ self.assertFalse(self.ptool._diff(content1, content2, func), rv)
+ func.assert_called_with(["line1", "line2"], ["line3"])
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py
new file mode 100644
index 000000000..d68e15837
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py
@@ -0,0 +1,85 @@
+import os
+import sys
+import copy
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Hardlink import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from Test__init import get_posix_object
+from Testbase import TestPOSIXTool
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+class TestPOSIXHardlink(TestPOSIXTool):
+ test_obj = POSIXHardlink
+
+ @patch("os.path.samefile")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ def test_verify(self, mock_verify, mock_samefile):
+ entry = lxml.etree.Element("Path", name="/test", type="hardlink",
+ to="/dest")
+ ptool = self.get_obj()
+
+ mock_samefile.return_value = True
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_samefile.reset_mock()
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_samefile.reset_mock()
+ mock_verify.reset_mock()
+ mock_samefile.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_samefile.reset_mock()
+ mock_verify.reset_mock()
+ mock_samefile.side_effect = OSError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+
+ @patch("os.link")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.Hardlink.%s._exists" % test_obj.__name__)
+ def test_install(self, mock_exists, mock_install, mock_link):
+ entry = lxml.etree.Element("Path", name="/test", type="hardlink",
+ to="/dest")
+ ptool = self.get_obj()
+
+ mock_exists.return_value = False
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_link.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
+
+ mock_link.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_link.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_link.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py
new file mode 100644
index 000000000..375ff00eb
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py
@@ -0,0 +1,91 @@
+import os
+import sys
+import copy
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Nonexistent import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from Test__init import get_config, get_posix_object
+from Testbase import TestPOSIXTool
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+class TestPOSIXNonexistent(TestPOSIXTool):
+ test_obj = POSIXNonexistent
+
+ @patch("os.path.lexists")
+ def test_verify(self, mock_lexists):
+ entry = lxml.etree.Element("Path", name="/test", type="nonexistent")
+
+ for val in [True, False]:
+ mock_lexists.reset_mock()
+ mock_lexists.return_value = val
+ self.assertEqual(self.ptool.verify(entry, []), not val)
+ mock_lexists.assert_called_with(entry.get("name"))
+
+ @patch("os.rmdir")
+ @patch("os.remove")
+ @patch("os.path.isdir")
+ @patch("shutil.rmtree")
+ def test_install(self, mock_rmtree, mock_isdir, mock_remove, mock_rmdir):
+ entry = lxml.etree.Element("Path", name="/test", type="nonexistent")
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_remove.reset_mock()
+ mock_rmdir.reset_mock()
+ mock_rmtree.reset_mock()
+
+ mock_isdir.return_value = False
+ self.assertTrue(self.ptool.install(entry))
+ mock_remove.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_remove.side_effect = OSError
+ self.assertFalse(self.ptool.install(entry))
+ mock_remove.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_isdir.return_value = True
+ self.assertTrue(self.ptool.install(entry))
+ mock_rmdir.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_rmdir.side_effect = OSError
+ self.assertFalse(self.ptool.install(entry))
+ mock_rmdir.assert_called_with(entry.get("name"))
+
+ reset()
+ entry.set("recursive", "true")
+ self.assertTrue(self.ptool.install(entry))
+ mock_rmtree.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_rmtree.side_effect = OSError
+ self.assertFalse(self.ptool.install(entry))
+ mock_rmtree.assert_called_with(entry.get("name"))
+
+ reset()
+ child_entry = lxml.etree.Element("Path", name="/test/foo",
+ type="nonexistent")
+ ptool = self.get_obj(posix=get_posix_object(config=get_config([child_entry])))
+ mock_rmtree.side_effect = None
+ self.assertTrue(ptool.install(entry))
+ mock_rmtree.assert_called_with(entry.get("name"))
+
+ reset()
+ child_entry = lxml.etree.Element("Path", name="/test/foo",
+ type="file")
+ ptool = self.get_obj(posix=get_posix_object(config=get_config([child_entry])))
+ mock_rmtree.side_effect = None
+ self.assertFalse(ptool.install(entry))
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py
new file mode 100644
index 000000000..565857437
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py
@@ -0,0 +1,5 @@
+from Bcfg2.Client.Tools.POSIX.Permissions import *
+from Testbase import TestPOSIXTool
+
+class TestPOSIXPermissions(TestPOSIXTool):
+ test_obj = POSIXPermissions
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py
new file mode 100644
index 000000000..b02f7b3c3
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py
@@ -0,0 +1,99 @@
+import os
+import sys
+import copy
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Symlink import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from Test__init import get_posix_object
+from Testbase import TestPOSIXTool
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+class TestPOSIXSymlink(TestPOSIXTool):
+ test_obj = POSIXSymlink
+
+ @patch("os.readlink")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ def test_verify(self, mock_verify, mock_readlink):
+ entry = lxml.etree.Element("Path", name="/test", type="symlink",
+ to="/dest")
+ ptool = self.get_obj()
+
+ mock_readlink.return_value = entry.get("to")
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ mock_readlink.return_value = "/bogus"
+ self.assertFalse(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ # relative symlink
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ entry = lxml.etree.Element("Path", name="/test", type="symlink",
+ to="dest")
+ mock_readlink.return_value = entry.get("to")
+ self.assertTrue(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ mock_readlink.side_effect = OSError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+
+ @patch("os.symlink")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.Symlink.%s._exists" % test_obj.__name__)
+ def test_install(self, mock_exists, mock_install, mock_symlink):
+ entry = lxml.etree.Element("Path", name="/test", type="symlink",
+ to="/dest")
+ ptool = self.get_obj()
+
+ mock_exists.return_value = False
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_symlink.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
+
+ # relative symlink
+ entry = lxml.etree.Element("Path", name="/test", type="symlink",
+ to="dest")
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_symlink.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
+
+ mock_symlink.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_symlink.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_symlink.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py
new file mode 100644
index 000000000..14a2520df
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py
@@ -0,0 +1,252 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+import Bcfg2.Client.Tools
+import Bcfg2.Client.Tools.POSIX
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+def get_config(entries):
+ config = lxml.etree.Element("Configuration")
+ bundle = lxml.etree.SubElement(config, "Bundle", name="test")
+ bundle.extend(entries)
+ return config
+
+def get_posix_object(logger=None, setup=None, config=None):
+ if config is None:
+ config = lxml.etree.Element("Configuration")
+ if not logger:
+ def print_msg(msg):
+ print(msg)
+ logger = Mock()
+ logger.error = Mock(side_effect=print_msg)
+ logger.warning = Mock(side_effect=print_msg)
+ logger.info = Mock(side_effect=print_msg)
+ logger.debug = Mock(side_effect=print_msg)
+ if not setup:
+ setup = MagicMock()
+ return Bcfg2.Client.Tools.POSIX.POSIX(logger, setup, config)
+
+
+class TestPOSIX(Bcfg2TestCase):
+ def setUp(self):
+ self.posix = get_posix_object()
+
+ def tearDown(self):
+ # just to guarantee that we start fresh each time
+ self.posix = None
+
+ def test__init(self):
+ entries = [lxml.etree.Element("Path", name="test", type="file")]
+ posix = get_posix_object(config=get_config(entries))
+ self.assertIsInstance(posix, Bcfg2.Client.Tools.Tool)
+ self.assertIsInstance(posix, Bcfg2.Client.Tools.POSIX.POSIX)
+ self.assertIn('Path', posix.__req__)
+ self.assertGreater(len(posix.__req__['Path']), 0)
+ self.assertGreater(len(posix.__handles__), 0)
+ self.assertItemsEqual(posix.handled, entries)
+
+ @patch("Bcfg2.Client.Tools.Tool.canVerify")
+ def test_canVerify(self, mock_canVerify):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+
+ # first, test superclass canVerify failure
+ mock_canVerify.return_value = False
+ self.assertFalse(self.posix.canVerify(entry))
+ mock_canVerify.assert_called_with(self.posix, entry)
+
+ # next, test fully_specified failure
+ self.posix.logger.error.reset_mock()
+ mock_canVerify.reset_mock()
+ mock_canVerify.return_value = True
+ mock_fully_spec = Mock()
+ mock_fully_spec.return_value = False
+ self.posix._handlers[entry.get("type")].fully_specified = \
+ mock_fully_spec
+ self.assertFalse(self.posix.canVerify(entry))
+ mock_canVerify.assert_called_with(self.posix, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertTrue(self.posix.logger.error.called)
+
+ # finally, test success
+ self.posix.logger.error.reset_mock()
+ mock_canVerify.reset_mock()
+ mock_fully_spec.reset_mock()
+ mock_fully_spec.return_value = True
+ self.assertTrue(self.posix.canVerify(entry))
+ mock_canVerify.assert_called_with(self.posix, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertFalse(self.posix.logger.error.called)
+
+ @patch("Bcfg2.Client.Tools.Tool.canInstall")
+ def test_canInstall(self, mock_canInstall):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+
+ # first, test superclass canInstall failure
+ mock_canInstall.return_value = False
+ self.assertFalse(self.posix.canInstall(entry))
+ mock_canInstall.assert_called_with(self.posix, entry)
+
+ # next, test fully_specified failure
+ self.posix.logger.error.reset_mock()
+ mock_canInstall.reset_mock()
+ mock_canInstall.return_value = True
+ mock_fully_spec = Mock()
+ mock_fully_spec.return_value = False
+ self.posix._handlers[entry.get("type")].fully_specified = \
+ mock_fully_spec
+ self.assertFalse(self.posix.canInstall(entry))
+ mock_canInstall.assert_called_with(self.posix, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertTrue(self.posix.logger.error.called)
+
+ # finally, test success
+ self.posix.logger.error.reset_mock()
+ mock_canInstall.reset_mock()
+ mock_fully_spec.reset_mock()
+ mock_fully_spec.return_value = True
+ self.assertTrue(self.posix.canInstall(entry))
+ mock_canInstall.assert_called_with(self.posix, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertFalse(self.posix.logger.error.called)
+
+ def test_InstallPath(self):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+
+ mock_install = Mock()
+ mock_install.return_value = True
+ self.posix._handlers[entry.get("type")].install = mock_install
+ self.assertTrue(self.posix.InstallPath(entry))
+ mock_install.assert_called_with(entry)
+
+ def test_VerifyPath(self):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+ modlist = []
+
+ mock_verify = Mock()
+ mock_verify.return_value = True
+ self.posix._handlers[entry.get("type")].verify = mock_verify
+ self.assertTrue(self.posix.VerifyPath(entry, modlist))
+ mock_verify.assert_called_with(entry, modlist)
+
+ mock_verify.reset_mock()
+ mock_verify.return_value = False
+ self.posix.setup.__getitem__.return_value = True
+ self.assertFalse(self.posix.VerifyPath(entry, modlist))
+ self.assertIsNotNone(entry.get('qtext'))
+
+ @patch('os.remove')
+ def test_prune_old_backups(self, mock_remove):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ setup = dict(ppath='/', max_copies=5, paranoid=True)
+ posix = get_posix_object(setup=setup)
+
+ remove = ["_etc_foo_2012-07-20T04:13:22.364989",
+ "_etc_foo_2012-07-31T04:13:23.894958",
+ "_etc_foo_2012-07-17T04:13:22.493316",]
+ keep = ["_etc_foo_bar_2011-08-07T04:13:22.519978",
+ "_etc_foo_2012-08-04T04:13:22.519978",
+ "_etc_Foo_2011-08-07T04:13:22.519978",
+ "_etc_foo_2012-08-06T04:13:22.519978",
+ "_etc_foo_2012-08-03T04:13:22.191895",
+ "_etc_test_2011-08-07T04:13:22.519978",
+ "_etc_foo_2012-08-07T04:13:22.519978",]
+
+ @patch('os.listdir')
+ def inner(mock_listdir):
+ mock_listdir.side_effect = OSError
+ posix._prune_old_backups(entry)
+ self.assertTrue(posix.logger.error.called)
+ self.assertFalse(mock_remove.called)
+ mock_listdir.assert_called_with(setup['ppath'])
+
+ mock_listdir.reset_mock()
+ mock_remove.reset_mock()
+ mock_listdir.side_effect = None
+ mock_listdir.return_value = keep + remove
+
+ posix._prune_old_backups(entry)
+ mock_listdir.assert_called_with(setup['ppath'])
+ self.assertItemsEqual(mock_remove.call_args_list,
+ [call(os.path.join(setup['ppath'], p))
+ for p in remove])
+
+ mock_listdir.reset_mock()
+ mock_remove.reset_mock()
+ mock_remove.side_effect = OSError
+ posix.logger.error.reset_mock()
+ # test to ensure that we call os.remove() for all files that
+ # need to be removed even if we get an error
+ posix._prune_old_backups(entry)
+ mock_listdir.assert_called_with(setup['ppath'])
+ self.assertItemsEqual(mock_remove.call_args_list,
+ [call(os.path.join(setup['ppath'], p))
+ for p in remove])
+ self.assertTrue(posix.logger.error.called)
+
+ inner()
+
+ @patch("shutil.copy")
+ @patch("os.path.isdir")
+ @patch("Bcfg2.Client.Tools.POSIX.POSIX._prune_old_backups")
+ def test_paranoid_backup(self, mock_prune, mock_isdir, mock_copy):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ setup = dict(ppath='/', max_copies=5, paranoid=False)
+ posix = get_posix_object(setup=setup)
+
+ # paranoid false globally
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+
+ # paranoid false on the entry
+ mock_prune.reset_mock()
+ setup['paranoid'] = True
+ posix = get_posix_object(setup=setup)
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+
+ # entry does not exist on filesystem
+ mock_prune.reset_mock()
+ entry.set("paranoid", "true")
+ entry.set("current_exists", "false")
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+
+ # entry is a directory on the filesystem
+ mock_prune.reset_mock()
+ entry.set("current_exists", "true")
+ mock_isdir.return_value = True
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+ mock_isdir.assert_called_with(entry.get("name"))
+
+ # test the actual backup now
+ mock_prune.reset_mock()
+ mock_isdir.return_value = False
+ posix._paranoid_backup(entry)
+ mock_isdir.assert_called_with(entry.get("name"))
+ mock_prune.assert_called_with(entry)
+ # it's basically impossible to test the shutil.copy() call
+ # exactly because the destination includes microseconds, so we
+ # just test it good enough
+ self.assertEqual(mock_copy.call_args[0][0],
+ entry.get("name"))
+ bkupnam = os.path.join(setup['ppath'],
+ entry.get('name').replace('/', '_')) + '_'
+ self.assertEqual(bkupnam, mock_copy.call_args[0][1][:len(bkupnam)])
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py
new file mode 100644
index 000000000..b447ab642
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py
@@ -0,0 +1,991 @@
+import os
+import sys
+import copy
+import stat
+import lxml.etree
+from mock import Mock, MagicMock, patch
+import Bcfg2.Client.Tools
+from Bcfg2.Client.Tools.POSIX.base import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from Test__init import get_posix_object
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+try:
+ import selinux
+ has_selinux = True
+except ImportError:
+ has_selinux = False
+
+try:
+ import posix1e
+ has_acls = True
+except ImportError:
+ has_acls = False
+
+class TestPOSIXTool(Bcfg2TestCase):
+ test_obj = POSIXTool
+
+ def get_obj(self, posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return self.test_obj(posix.logger, posix.setup, posix.config)
+
+ def setUp(self):
+ self.ptool = self.get_obj()
+
+ def tearDown(self):
+ # just to guarantee that we start fresh each time
+ self.ptool = None
+
+ def test_fully_specified(self):
+ # fully_specified should do no checking on the abstract
+ # POSIXTool object
+ self.assertTrue(self.ptool.fully_specified(Mock()))
+
+ @patch('os.stat')
+ @patch('os.walk')
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_metadata" %
+ test_obj.__name__)
+ def test_verify(self, mock_verify, mock_walk, mock_stat):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+
+ mock_stat.return_value = MagicMock()
+ mock_verify.return_value = False
+ self.assertFalse(self.ptool.verify(entry, []))
+ mock_verify.assert_called_with(entry)
+
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(self.ptool.verify(entry, []))
+ mock_verify.assert_called_with(entry)
+
+ mock_verify.reset_mock()
+ entry.set("recursive", "true")
+ walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]),
+ ("/dir1", ["dir3"], []),
+ ("/dir2", [], ["file3", "file4"])]
+ mock_walk.return_value = walk_rv
+ self.assertTrue(self.ptool.verify(entry, []))
+ mock_walk.assert_called_with(entry.get("name"))
+ all_verifies = [call(entry)]
+ for root, dirs, files in walk_rv:
+ all_verifies.extend([call(entry, path=os.path.join(root, p))
+ for p in dirs + files])
+ self.assertItemsEqual(mock_verify.call_args_list, all_verifies)
+
+ @patch('os.walk')
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__)
+ def test_install(self, mock_set_perms, mock_walk):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+
+ mock_set_perms.return_value = True
+ self.assertTrue(self.ptool.install(entry))
+ mock_set_perms.assert_called_with(entry)
+
+ mock_set_perms.reset_mock()
+ entry.set("recursive", "true")
+ walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]),
+ ("/dir1", ["dir3"], []),
+ ("/dir2", [], ["file3", "file4"])]
+ mock_walk.return_value = walk_rv
+
+ mock_set_perms.return_value = True
+ self.assertTrue(self.ptool.install(entry))
+ mock_walk.assert_called_with(entry.get("name"))
+ all_set_perms = [call(entry)]
+ for root, dirs, files in walk_rv:
+ all_set_perms.extend([call(entry, path=os.path.join(root, p))
+ for p in dirs + files])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ all_set_perms)
+
+ mock_walk.reset_mock()
+ mock_set_perms.reset_mock()
+
+ def set_perms_rv(entry, path=None):
+ if path == '/dir2/file3':
+ return False
+ else:
+ return True
+ mock_set_perms.side_effect = set_perms_rv
+
+ self.assertFalse(self.ptool.install(entry))
+ mock_walk.assert_called_with(entry.get("name"))
+ self.assertItemsEqual(mock_set_perms.call_args_list, all_set_perms)
+
+ @patch('os.lstat')
+ @patch("os.unlink")
+ @patch("os.path.isdir")
+ @patch("shutil.rmtree")
+ def test_exists(self, mock_rmtree, mock_isdir, mock_unlink, mock_lstat):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+
+ mock_lstat.side_effect = OSError
+ self.assertFalse(self.ptool._exists(entry))
+ mock_lstat.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ rv = MagicMock()
+ mock_lstat.return_value = rv
+ mock_lstat.side_effect = None
+ self.assertEqual(self.ptool._exists(entry), rv)
+ mock_lstat.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ mock_isdir.return_value = False
+ self.assertFalse(self.ptool._exists(entry, remove=True))
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_unlink.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_rmtree.called)
+
+ mock_lstat.reset_mock()
+ mock_isdir.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_isdir.return_value = True
+ self.assertFalse(self.ptool._exists(entry, remove=True))
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_rmtree.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_isdir.reset_mock()
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_rmtree.side_effect = OSError
+ self.assertEqual(self.ptool._exists(entry, remove=True), rv)
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_rmtree.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ @patch("os.chown")
+ @patch("os.chmod")
+ @patch("os.utime")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" %
+ test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" %
+ test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_acls" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_secontext" %
+ test_obj.__name__)
+ def test_set_perms(self, mock_set_secontext, mock_set_acls, mock_norm_gid,
+ mock_norm_uid, mock_utime, mock_chmod, mock_chown):
+ def reset():
+ mock_set_secontext.reset_mock()
+ mock_set_acls.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_chmod.reset_mock()
+ mock_chown.reset_mock()
+ mock_utime.reset_mock()
+
+ entry = lxml.etree.Element("Path", name="/etc/foo", to="/etc/bar",
+ type="symlink")
+ mock_set_acls.return_value = True
+ mock_set_secontext.return_value = True
+ self.assertTrue(self.ptool._set_perms(entry))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ entry = lxml.etree.Element("Path", name="/etc/foo", owner="owner",
+ group="group", perms="644", type="file")
+ mock_norm_uid.return_value = 10
+ mock_norm_gid.return_value = 100
+
+ reset()
+ self.assertTrue(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ self.assertFalse(mock_utime.called)
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ reset()
+ mtime = 1344459042
+ entry.set("mtime", str(mtime))
+ self.assertTrue(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ reset()
+ self.assertTrue(self.ptool._set_perms(entry, path='/etc/bar'))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with('/etc/bar', 10, 100)
+ mock_chmod.assert_called_with('/etc/bar', int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path='/etc/bar')
+ mock_set_acls.assert_called_with(entry, path='/etc/bar')
+
+ # test dev_type modification of perms, failure of chown
+ reset()
+ def chown_rv(path, owner, group):
+ if owner == 0 and group == 0:
+ return True
+ else:
+ raise KeyError
+ os.chown.side_effect = chown_rv
+ entry.set("type", "device")
+ entry.set("dev_type", list(device_map.keys())[0])
+ self.assertFalse(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 0, 0)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8) | list(device_map.values())[0])
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ # test failure of chmod
+ reset()
+ os.chown.side_effect = None
+ os.chmod.side_effect = OSError
+ entry.set("type", "file")
+ del entry.attrib["dev_type"]
+ self.assertFalse(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ # test that even when everything fails, we try to do it all.
+ # e.g., when chmod fails, we still try to apply acls, set
+ # selinux context, etc.
+ reset()
+ os.chown.side_effect = OSError
+ os.utime.side_effect = OSError
+ mock_set_acls.return_value = False
+ mock_set_secontext.return_value = False
+ self.assertFalse(self.ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ @patchIf(has_acls, "posix1e.ACL")
+ @patchIf(has_acls, "posix1e.Entry")
+ @patch("os.path.isdir")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" %
+ test_obj.__name__)
+ def test_set_acls(self, mock_list_entry_acls, mock_norm_gid, mock_norm_uid,
+ mock_isdir, mock_Entry, mock_ACL):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+
+ # disable acls for the initial test
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertTrue(self.ptool._set_acls(entry))
+ Bcfg2.Client.Tools.POSIX.base.has_acls = True
+
+ # build a set of file ACLs to return from posix1e.ACL(file=...)
+ file_acls = []
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_USER
+ acl.name = "remove"
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_GROUP
+ acl.name = "remove"
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_MASK
+ acl.name = "keep"
+ file_acls.append(acl)
+ remove_acls = [a for a in file_acls if a.name == "remove"]
+
+ # build a set of ACLs listed on the entry as returned by
+ # _list_entry_acls()
+ entry_acls = {("default", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5}
+ mock_list_entry_acls.return_value = entry_acls
+ mock_norm_uid.return_value = 10
+ mock_norm_gid.return_value = 100
+
+ # set up the unreasonably complex return value for
+ # posix1e.ACL(), which has three separate uses
+ fileacl_rv = MagicMock()
+ fileacl_rv.valid.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv = MagicMock()
+ filedef_rv.valid.return_value = True
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ acl_rv = MagicMock()
+ def mock_acl_rv(file=None, filedef=None, acl=None):
+ if file:
+ return fileacl_rv
+ elif filedef:
+ return filedef_rv
+ elif acl:
+ return acl_rv
+
+ # set up the equally unreasonably complex return value for
+ # posix1e.Entry, which returns a new entry and adds it to
+ # an ACL, so we have to track the Mock objects it returns.
+ # why can't they just have an acl.add_entry() method?!?
+ acl_entries = []
+ def mock_entry_rv(acl):
+ rv = MagicMock()
+ rv.acl = acl
+ rv.permset = set()
+ acl_entries.append(rv)
+ return rv
+ mock_Entry.side_effect = mock_entry_rv
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_ACL.reset_mock()
+ mock_Entry.reset_mock()
+ fileacl_rv.reset_mock()
+
+ # test fs mounted noacl
+ mock_ACL.side_effect = IOError(95, "Operation not permitted")
+ self.assertFalse(self.ptool._set_acls(entry))
+
+ # test other error
+ reset()
+ mock_ACL.side_effect = IOError
+ self.assertFalse(self.ptool._set_acls(entry))
+
+ reset()
+ mock_ACL.side_effect = mock_acl_rv
+ mock_isdir.return_value = True
+ self.assertTrue(self.ptool._set_acls(entry))
+ self.assertItemsEqual(mock_ACL.call_args_list,
+ [call(file=entry.get("name")),
+ call(filedef=entry.get("name"))])
+ self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ self.assertItemsEqual(filedef_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_norm_uid.assert_called_with("user")
+ mock_norm_gid.assert_called_with("group")
+ fileacl_rv.calc_mask.assert_any_call()
+ fileacl_rv.applyto.assert_called_with(entry.get("name"),
+ posix1e.ACL_TYPE_ACCESS)
+ filedef_rv.calc_mask.assert_any_call()
+ filedef_rv.applyto.assert_called_with(entry.get("name"),
+ posix1e.ACL_TYPE_DEFAULT)
+
+ # build tuples of the Entry objects that were added to acl
+ # and defaacl so they're easier to compare for equality
+ added_acls = []
+ for acl in acl_entries:
+ added_acls.append((acl.acl, acl.tag_type, acl.qualifier,
+ sum(acl.permset)))
+ self.assertItemsEqual(added_acls,
+ [(filedef_rv, posix1e.ACL_USER, 10, 7),
+ (fileacl_rv, posix1e.ACL_GROUP, 100, 5)])
+
+ reset()
+ # have to reassign these because they're iterators, and
+ # they've already been iterated over once
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ mock_list_entry_acls.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_isdir.return_value = False
+ acl_entries = []
+ self.assertTrue(self.ptool._set_acls(entry, path="/bin/bar"))
+ mock_ACL.assert_called_with(file="/bin/bar")
+ self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_norm_gid.assert_called_with("group")
+ fileacl_rv.calc_mask.assert_any_call()
+ fileacl_rv.applyto.assert_called_with("/bin/bar",
+ posix1e.ACL_TYPE_ACCESS)
+
+ added_acls = []
+ for acl in acl_entries:
+ added_acls.append((acl.acl, acl.tag_type, acl.qualifier,
+ sum(acl.permset)))
+ self.assertItemsEqual(added_acls,
+ [(fileacl_rv, posix1e.ACL_GROUP, 100, 5)])
+
+ @skipUnless(has_selinux, "SELinux not found, skipping")
+ @patchIf(has_selinux, "selinux.restorecon")
+ @patchIf(has_selinux, "selinux.lsetfilecon")
+ def test_set_secontext(self, mock_lsetfilecon, mock_restorecon):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+
+ # disable selinux for the initial test
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ self.assertTrue(self.ptool._set_secontext(entry))
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = True
+
+ # no context given
+ self.assertTrue(self.ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ self.assertFalse(mock_lsetfilecon.called)
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ entry.set("secontext", "__default__")
+ self.assertTrue(self.ptool._set_secontext(entry))
+ mock_restorecon.assert_called_with(entry.get("name"))
+ self.assertFalse(mock_lsetfilecon.called)
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ mock_lsetfilecon.return_value = 0
+ entry.set("secontext", "foo_t")
+ self.assertTrue(self.ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t")
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ mock_lsetfilecon.return_value = 1
+ self.assertFalse(self.ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t")
+
+ @patch("grp.getgrnam")
+ def test_norm_gid(self, mock_getgrnam):
+ self.assertEqual(5, self.ptool._norm_gid("5"))
+ self.assertFalse(mock_getgrnam.called)
+
+ mock_getgrnam.reset_mock()
+ mock_getgrnam.return_value = ("group", "x", 5, [])
+ self.assertEqual(5, self.ptool._norm_gid("group"))
+ mock_getgrnam.assert_called_with("group")
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_gid" % test_obj.__name__)
+ def test_norm_entry_gid(self, mock_norm_gid):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user")
+ mock_norm_gid.return_value = 10
+ self.assertEqual(10, self.ptool._norm_entry_gid(entry))
+ mock_norm_gid.assert_called_with(entry.get("group"))
+
+ mock_norm_gid.reset_mock()
+ mock_norm_gid.side_effect = KeyError
+ self.assertEqual(0, self.ptool._norm_entry_gid(entry))
+ mock_norm_gid.assert_called_with(entry.get("group"))
+
+ @patch("pwd.getpwnam")
+ def test_norm_uid(self, mock_getpwnam):
+ self.assertEqual(5, self.ptool._norm_uid("5"))
+ self.assertFalse(mock_getpwnam.called)
+
+ mock_getpwnam.reset_mock()
+ mock_getpwnam.return_value = ("user", "x", 5, 5, "User", "/home/user",
+ "/bin/zsh")
+ self.assertEqual(5, self.ptool._norm_uid("user"))
+ mock_getpwnam.assert_called_with("user")
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_uid" % test_obj.__name__)
+ def test_norm_entry_uid(self, mock_norm_uid):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user")
+ mock_norm_uid.return_value = 10
+ self.assertEqual(10, self.ptool._norm_entry_uid(entry))
+ mock_norm_uid.assert_called_with(entry.get("owner"))
+
+ mock_norm_uid.reset_mock()
+ mock_norm_uid.side_effect = KeyError
+ self.assertEqual(0, self.ptool._norm_entry_uid(entry))
+ mock_norm_uid.assert_called_with(entry.get("owner"))
+
+ def test_norm_acl_perms(self):
+ # there's basically no reasonably way to test the Permset
+ # object parsing feature without writing our own Mock object
+ # that re-implements Permset.test(). silly pylibacl won't let
+ # us create standalone Entry or Permset objects.
+ self.assertEqual(5, self.ptool._norm_acl_perms("5"))
+ self.assertEqual(0, self.ptool._norm_acl_perms("55"))
+ self.assertEqual(5, self.ptool._norm_acl_perms("rx"))
+ self.assertEqual(5, self.ptool._norm_acl_perms("r-x"))
+ self.assertEqual(6, self.ptool._norm_acl_perms("wr-"))
+ self.assertEqual(0, self.ptool._norm_acl_perms("rwrw"))
+ self.assertEqual(0, self.ptool._norm_acl_perms("-"))
+ self.assertEqual(0, self.ptool._norm_acl_perms("a"))
+ self.assertEqual(6, self.ptool._norm_acl_perms("rwa"))
+ self.assertEqual(4, self.ptool._norm_acl_perms("rr"))
+
+ @patch('os.stat')
+ def test__gather_data(self, mock_stat):
+ path = '/test'
+ mock_stat.side_effect = OSError
+ self.assertFalse(self.ptool._gather_data(path)[0])
+ mock_stat.assert_called_with(path)
+
+ mock_stat.reset_mock()
+ mock_stat.side_effect = None
+ # create a return value
+ stat_rv = MagicMock()
+ def stat_getitem(key):
+ if int(key) == stat.ST_UID:
+ return 0
+ elif int(key) == stat.ST_GID:
+ return 10
+ elif int(key) == stat.ST_MODE:
+ # return extra bits in the mode to emulate a device
+ # and ensure that they're stripped
+ return int('060660', 8)
+ stat_rv.__getitem__ = Mock(side_effect=stat_getitem)
+ mock_stat.return_value = stat_rv
+
+ # disable selinux and acls for this call -- we test them
+ # separately so that we can skip those tests as appropriate
+ states = (Bcfg2.Client.Tools.POSIX.base.has_selinux,
+ Bcfg2.Client.Tools.POSIX.base.has_acls)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertEqual(self.ptool._gather_data(path),
+ (stat_rv, '0', '10', '0660', None, None))
+ Bcfg2.Client.Tools.POSIX.base.has_selinux, \
+ Bcfg2.Client.Tools.POSIX.base.has_acls = states
+ mock_stat.assert_called_with(path)
+
+ @skipUnless(has_selinux, "SELinux not found, skipping")
+ def test__gather_data_selinux(self):
+ context = 'system_u:object_r:root_t:s0'
+ path = '/test'
+
+ @patch('os.stat')
+ @patchIf(has_selinux, "selinux.getfilecon")
+ def inner(mock_getfilecon, mock_stat):
+ mock_getfilecon.return_value = [len(context) + 1, context]
+ mock_stat.return_value = MagicMock()
+ # disable acls for this call and test them separately
+ state = Bcfg2.Client.Tools.POSIX.base.has_acls
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertEqual(self.ptool._gather_data(path)[4], 'root_t')
+ Bcfg2.Client.Tools.POSIX.base.has_acls = state
+ mock_getfilecon.assert_called_with(path)
+
+ inner()
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ @patch('os.stat')
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" %
+ test_obj.__name__)
+ def test__gather_data_acls(self, mock_list_file_acls, mock_stat):
+ acls = {("default", posix1e.ACL_USER, "testuser"): "rwx",
+ ("access", posix1e.ACL_GROUP, "testgroup"): "rx"}
+ mock_list_file_acls.return_value = acls
+ path = '/test'
+ mock_stat.return_value = MagicMock()
+ # disable selinux for this call and test it separately
+ state = Bcfg2.Client.Tools.POSIX.base.has_selinux
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ self.assertItemsEqual(self.ptool._gather_data(path)[5], acls)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = state
+ mock_list_file_acls.assert_called_with(path)
+
+ @patchIf(has_selinux, "selinux.matchpathcon")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._verify_acls" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._gather_data" % test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_uid" %
+ test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._norm_entry_gid" %
+ test_obj.__name__)
+ def test_verify_metadata(self, mock_norm_gid, mock_norm_uid,
+ mock_gather_data, mock_verify_acls,
+ mock_matchpathcon):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user", perms="664",
+ secontext='etc_t')
+ # _verify_metadata() mutates the entry, so we keep a backup so we
+ # can start fresh every time
+ orig_entry = copy.deepcopy(entry)
+
+ def reset():
+ mock_gather_data.reset_mock()
+ mock_verify_acls.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ return copy.deepcopy(orig_entry)
+
+ # test nonexistent file
+ mock_gather_data.return_value = (False, None, None, None, None, None)
+ self.assertFalse(self.ptool._verify_metadata(entry))
+ self.assertEqual(entry.get("current_exists", "").lower(), "false")
+ mock_gather_data.assert_called_with(entry.get("name"))
+
+ # expected data. tuple of attr, return value index, value
+ expected = [('current_owner', 1, '0'),
+ ('current_group', 2, '10'),
+ ('current_perms', 3, '0664'),
+ ('current_secontext', 4, 'etc_t')]
+ mock_norm_uid.return_value = 0
+ mock_norm_gid.return_value = 10
+ gather_data_rv = [MagicMock(), None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+
+ entry = reset()
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+
+ # test when secontext is None
+ entry = reset()
+ gather_data_rv[4] = None
+ sestate = Bcfg2.Client.Tools.POSIX.base.has_selinux
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ if attr != 'current_secontext':
+ self.assertEqual(entry.get(attr), val)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = sestate
+
+ gather_data_rv = [MagicMock(), None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+
+ mtime = 1344430414
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ stat_rv = MagicMock()
+ stat_rv.__getitem__.return_value = mtime
+ gather_data_rv[0] = stat_rv
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ # failure modes for each checked datum. tuple of changed attr,
+ # return value index, new (failing) value
+ failures = [('current_owner', 1, '10'),
+ ('current_group', 2, '100'),
+ ('current_perms', 3, '0660')]
+ if has_selinux:
+ failures.append(('current_secontext', 4, 'root_t'))
+
+ for fail_attr, fail_idx, fail_val in failures:
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ gather_data_rv = [stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ gather_data_rv[fail_idx] = fail_val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertFalse(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ self.assertEqual(entry.get(fail_attr), fail_val)
+ for attr, idx, val in expected:
+ if attr != fail_attr:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ # failure mode for mtime
+ fail_mtime = 1344431162
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ fail_stat_rv = MagicMock()
+ fail_stat_rv.__getitem__.return_value = fail_mtime
+ gather_data_rv = [fail_stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertFalse(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(fail_mtime))
+
+ if has_selinux:
+ # test success and failure for __default__ secontext
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ entry.set("secontext", "__default__")
+
+ context1 = "system_u:object_r:etc_t:s0"
+ context2 = "system_u:object_r:root_t:s0"
+ mock_matchpathcon.return_value = [1 + len(context1),
+ context1]
+ gather_data_rv = [stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry,
+ path=entry.get("name"))
+ mock_matchpathcon.assert_called_with(entry.get("name"), 0)
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ entry = reset()
+ entry.set("mtime", str(mtime))
+ entry.set("secontext", "__default__")
+ mock_matchpathcon.return_value = [1 + len(context2),
+ context2]
+ self.assertFalse(self.ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry,
+ path=entry.get("name"))
+ mock_matchpathcon.assert_called_with(entry.get("name"), 0)
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ def test_list_entry_acls(self):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ lxml.etree.SubElement(entry, "ACL", scope="user", type="default",
+ user="user", perms="rwx")
+ lxml.etree.SubElement(entry, "ACL", scope="group", type="access",
+ group="group", perms="5")
+ self.assertItemsEqual(self.ptool._list_entry_acls(entry),
+ {("default", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5})
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("pwd.getpwuid")
+ @patch("grp.getgrgid")
+ @patch("os.path.isdir")
+ def test_list_file_acls(self, mock_isdir, mock_getgrgid, mock_getpwuid,
+ mock_ACL):
+ path = '/test'
+
+ # build a set of file ACLs to return from posix1e.ACL(file=...)
+ file_acls = []
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_USER
+ acl.qualifier = 10
+ # yes, this is a bogus permset. thanks to _norm_acl_perms
+ # it works and is easier than many of the alternatives.
+ acl.permset = 'rwx'
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_GROUP
+ acl.qualifier = 100
+ acl.permset = 'rx'
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_MASK
+ file_acls.append(acl)
+ acls = {("access", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5}
+
+ # set up the unreasonably complex return value for
+ # posix1e.ACL(), which has two separate uses
+ fileacl_rv = MagicMock()
+ fileacl_rv.valid.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv = MagicMock()
+ filedef_rv.valid.return_value = True
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ def mock_acl_rv(file=None, filedef=None):
+ if file:
+ return fileacl_rv
+ elif filedef:
+ return filedef_rv
+ # other return values
+ mock_isdir.return_value = False
+ mock_getgrgid.return_value = ("group", "x", 5, [])
+ mock_getpwuid.return_value = ("user", "x", 5, 5, "User",
+ "/home/user", "/bin/zsh")
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_getgrgid.reset_mock()
+ mock_getpwuid.reset_mock()
+ mock_ACL.reset_mock()
+
+ mock_ACL.side_effect = IOError(95, "Operation not supported")
+ self.assertItemsEqual(self.ptool._list_file_acls(path), dict())
+
+ reset()
+ mock_ACL.side_effect = IOError
+ self.assertItemsEqual(self.ptool._list_file_acls(path), dict())
+
+ reset()
+ mock_ACL.side_effect = mock_acl_rv
+ self.assertItemsEqual(self.ptool._list_file_acls(path), acls)
+ mock_isdir.assert_called_with(path)
+ mock_getgrgid.assert_called_with(100)
+ mock_getpwuid.assert_called_with(10)
+ mock_ACL.assert_called_with(file=path)
+
+ reset()
+ mock_isdir.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv.__iter__.return_value = iter(file_acls)
+
+ defacls = acls
+ for akey, perms in acls.items():
+ defacls[('default', akey[1], akey[2])] = perms
+ self.assertItemsEqual(self.ptool._list_file_acls(path), defacls)
+ mock_isdir.assert_called_with(path)
+ self.assertItemsEqual(mock_getgrgid.call_args_list,
+ [call(100), call(100)])
+ self.assertItemsEqual(mock_getpwuid.call_args_list,
+ [call(10), call(10)])
+ self.assertItemsEqual(mock_ACL.call_args_list,
+ [call(file=path), call(filedef=path)])
+
+ if has_acls:
+ # python 2.6 applies decorators at compile-time, not at
+ # run-time, so we can't do these as decorators because
+ # pylibacl might not be installed. (If it's not, this test
+ # will be skipped, so as long as this is done at run-time
+ # we're safe.)
+ test_list_file_acls = patch("posix1e.ACL")(test_list_file_acls)
+
+ @skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_file_acls" %
+ test_obj.__name__)
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._list_entry_acls" %
+ test_obj.__name__)
+ def test_verify_acls(self, mock_list_entry_acls, mock_list_file_acls):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ # we can't test to make sure that errors get properly sorted
+ # into (missing, extra, wrong) without refactoring the
+ # _verify_acls code, and I don't feel like doing that, so eff
+ # it. let's just test to make sure that failures are
+ # identified at all for now.
+
+ acls = {("access", posix1e.ACL_USER, "user"): 7,
+ ("default", posix1e.ACL_GROUP, "group"): 5}
+ extra_acls = copy.deepcopy(acls)
+ extra_acls[("access", posix1e.ACL_USER, "user2")] = 4
+
+ mock_list_entry_acls.return_value = acls
+ mock_list_file_acls.return_value = acls
+ self.assertTrue(self.ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test missing
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_file_acls.return_value = extra_acls
+ self.assertFalse(self.ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test extra
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_entry_acls.return_value = extra_acls
+ mock_list_file_acls.return_value = acls
+ self.assertFalse(self.ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test wrong
+ wrong_acls = copy.deepcopy(extra_acls)
+ wrong_acls[("access", posix1e.ACL_USER, "user2")] = 5
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_entry_acls.return_value = extra_acls
+ mock_list_file_acls.return_value = wrong_acls
+ self.assertFalse(self.ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ @patch("os.makedirs")
+ @patch("os.path.exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.%s._set_perms" % test_obj.__name__)
+ def test_makedirs(self, mock_set_perms, mock_exists, mock_makedirs):
+ entry = lxml.etree.Element("Path", name="/test/foo/bar",
+ type="directory")
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_set_perms.reset_mock()
+ mock_makedirs.reset_mock()
+
+ mock_set_perms.return_value = True
+ def path_exists_rv(path):
+ if path == "/test":
+ return True
+ else:
+ return False
+ mock_exists.side_effect = path_exists_rv
+ self.assertTrue(self.ptool._makedirs(entry))
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call("/test"), call("/test/foo"),
+ call("/test/foo/bar")])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+ mock_makedirs.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_makedirs.side_effect = OSError
+ self.assertFalse(self.ptool._makedirs(entry))
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+
+ reset()
+ mock_makedirs.side_effect = None
+ def set_perms_rv(entry, path=None):
+ if path == '/test/foo':
+ return False
+ else:
+ return True
+ mock_set_perms.side_effect = set_perms_rv
+ self.assertFalse(self.ptool._makedirs(entry))
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call("/test"), call("/test/foo"),
+ call("/test/foo/bar")])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+ mock_makedirs.assert_called_with(entry.get("name"))
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/__init__.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/__init__.py
diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/__init__.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/__init__.py
diff --git a/testsuite/Testsrc/Testlib/TestClient/__init__.py b/testsuite/Testsrc/Testlib/TestClient/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestClient/__init__.py
diff --git a/testsuite/Testsrc/Testlib/TestOptions.py b/testsuite/Testsrc/Testlib/TestOptions.py
new file mode 100644
index 000000000..e20a320b1
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestOptions.py
@@ -0,0 +1,238 @@
+import os
+import sys
+from mock import Mock, MagicMock, patch
+from Bcfg2.Options import *
+from Bcfg2.Compat import ConfigParser
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != '/':
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+class TestDefaultConfigParser(Bcfg2TestCase):
+ @patch("%s.ConfigParser.get" % ConfigParser.__name__)
+ def test_get(self, mock_get):
+ dcp = DefaultConfigParser()
+ mock_get.return_value = "foo"
+ self.assertEqual(dcp.get("section", "option"), "foo")
+ mock_get.assert_called_with(dcp, "section", "option")
+
+ mock_get.reset_mock()
+ self.assertEqual(dcp.get("section", "option",
+ default="bar", other="test"), "foo")
+ mock_get.assert_called_with(dcp, "section", "option", other="test")
+
+ for etype, err in [(ConfigParser.NoOptionError,
+ ConfigParser.NoOptionError(None, None)),
+ (ConfigParser.NoSectionError,
+ ConfigParser.NoSectionError(None))]:
+ mock_get.side_effect = err
+ mock_get.reset_mock()
+ self.assertEqual(dcp.get("section", "option", default="bar"), "bar")
+ mock_get.assert_called_with(dcp, "section", "option")
+
+ mock_get.reset_mock()
+ self.assertRaises(etype, dcp.get, "section", "option")
+ mock_get.assert_called_with(dcp, "section", "option")
+
+ @patch("%s.ConfigParser.getboolean" % ConfigParser.__name__)
+ def test_getboolean(self, mock_getboolean):
+ dcp = DefaultConfigParser()
+ mock_getboolean.return_value = True
+ self.assertEqual(dcp.getboolean("section", "option"), True)
+ mock_getboolean.assert_called_with(dcp, "section", "option")
+
+ mock_getboolean.reset_mock()
+ self.assertEqual(dcp.getboolean("section", "option",
+ default=False, other="test"), True)
+ mock_getboolean.assert_called_with(dcp, "section", "option",
+ other="test")
+
+ for etype, err in [(ConfigParser.NoOptionError,
+ ConfigParser.NoOptionError(None, None)),
+ (ConfigParser.NoSectionError,
+ ConfigParser.NoSectionError(None))]:
+ mock_getboolean.side_effect = err
+ mock_getboolean.reset_mock()
+ self.assertEqual(dcp.getboolean("section", "option", default=False),
+ False)
+ mock_getboolean.assert_called_with(dcp, "section", "option")
+
+ mock_getboolean.reset_mock()
+ self.assertRaises(etype, dcp.getboolean, "section", "option")
+ mock_getboolean.assert_called_with(dcp, "section", "option")
+
+
+class TestOption(Bcfg2TestCase):
+ def test__init(self):
+ self.assertRaises(OptionFailure,
+ Option,
+ 'foo', False, cmd='f')
+ self.assertRaises(OptionFailure,
+ Option,
+ 'foo', False, cmd='--f')
+ self.assertRaises(OptionFailure,
+ Option,
+ 'foo', False, cmd='-foo')
+ self.assertRaises(OptionFailure,
+ Option,
+ 'foo', False, cmd='-foo', long_arg=True)
+ opt = Option('foo', False)
+ self.assertTrue(opt.boolean)
+ opt = Option('foo', False, odesc='<val>')
+ self.assertFalse(opt.boolean)
+ opt = Option('foo', False, cook=get_bool)
+ self.assertFalse(opt.boolean)
+ opt = Option('foo', "foo")
+ self.assertFalse(opt.boolean)
+
+ def test_get_cooked_value(self):
+ opt = Option('foo', False)
+ opt.boolean = True
+ self.assertTrue(opt.get_cooked_value("anything"))
+
+ opt = Option('foo', 'foo')
+ opt.boolean = False
+ opt.cook = False
+ self.assertEqual("foo", opt.get_cooked_value("foo"))
+
+ opt = Option('foo', 'foo')
+ opt.boolean = False
+ opt.cook = Mock()
+ self.assertEqual(opt.cook.return_value, opt.get_cooked_value("foo"))
+ opt.cook.assert_called_with("foo")
+
+ def test_buildHelpMessage(self):
+ opt = Option('foo', False)
+ self.assertEqual(opt.buildHelpMessage(), '')
+
+ opt = Option('foo', False, '-f')
+ self.assertEqual(opt.buildHelpMessage().split(),
+ ["-f", "foo"])
+
+ opt = Option('foo', False, cmd="--foo", long_arg=True)
+ self.assertEqual(opt.buildHelpMessage().split(),
+ ["--foo", "foo"])
+
+ opt = Option('foo', False, cmd="-f", odesc='<val>')
+ self.assertEqual(opt.buildHelpMessage().split(),
+ ["-f", "<val>", "foo"])
+
+ opt = Option('foo', False, cmd="--foo", long_arg=True, odesc='<val>')
+ self.assertEqual(opt.buildHelpMessage().split(),
+ ["--foo=<val>", "foo"])
+
+ def test_buildGetopt(self):
+ opt = Option('foo', False)
+ self.assertEqual(opt.buildGetopt(), '')
+
+ opt = Option('foo', False, '-f')
+ self.assertEqual(opt.buildGetopt(), "f")
+
+ opt = Option('foo', False, cmd="--foo", long_arg=True)
+ self.assertEqual(opt.buildGetopt(), '')
+
+ opt = Option('foo', False, cmd="-f", odesc='<val>')
+ self.assertEqual(opt.buildGetopt(), 'f:')
+
+ opt = Option('foo', False, cmd="--foo", long_arg=True, odesc='<val>')
+ self.assertEqual(opt.buildGetopt(), '')
+
+ def test_buildLongGetopt(self):
+ opt = Option('foo', False, cmd="--foo", long_arg=True)
+ self.assertEqual(opt.buildLongGetopt(), 'foo')
+
+ opt = Option('foo', False, cmd="--foo", long_arg=True, odesc='<val>')
+ self.assertEqual(opt.buildLongGetopt(), 'foo=')
+
+ def test_parse(self):
+ cf = ('communication', 'password')
+ o = Option('foo', default='test4', cmd='-F', env='TEST2',
+ odesc='bar', cf=cf)
+ o.parse([], ['-F', 'test'])
+ self.assertEqual(o.value, 'test')
+ o.parse([('-F', 'test2')], [])
+ self.assertEqual(o.value, 'test2')
+
+ os.environ['TEST2'] = 'test3'
+ o.parse([], [])
+ self.assertEqual(o.value, 'test3')
+ del os.environ['TEST2']
+
+ cfp = DefaultConfigParser()
+ cfp.get = Mock()
+ cfp.get.return_value = 'test5'
+ o.parse([], [], configparser=cfp)
+ cfp.get.assert_any_call(*cf)
+ self.assertEqual(o.value, 'test5')
+
+ o.cf = False
+ o.parse([], [])
+ assert o.value == 'test4'
+
+
+class TestOptionSet(Bcfg2TestCase):
+ def test_buildGetopt(self):
+ opts = [('foo', Option('foo', 'test1', cmd='-G')),
+ ('bar', Option('foo', 'test2')),
+ ('baz', Option('foo', 'test1', cmd='-H',
+ odesc='1'))]
+ oset = OptionSet(opts)
+ res = oset.buildGetopt()
+ self.assertIn('H:', res)
+ self.assertIn('G', res)
+ self.assertEqual(len(res), 3)
+
+ def test_buildLongGetopt(self):
+ opts = [('foo', Option('foo', 'test1', cmd='-G')),
+ ('bar', Option('foo', 'test2')),
+ ('baz', Option('foo', 'test1', cmd='--H',
+ odesc='1', long_arg=True))]
+ oset = OptionSet(opts)
+ res = oset.buildLongGetopt()
+ self.assertIn('H=', res)
+ self.assertEqual(len(res), 1)
+
+ def test_parse(self):
+ opts = [('foo', Option('foo', 'test1', cmd='-G')),
+ ('bar', Option('foo', 'test2')),
+ ('baz', Option('foo', 'test1', cmd='-H',
+ odesc='1'))]
+ oset = OptionSet(opts)
+ self.assertRaises(SystemExit,
+ oset.parse,
+ ['-G', '-H'])
+ oset2 = OptionSet(opts)
+ self.assertRaises(SystemExit,
+ oset2.parse,
+ ['-h'])
+ oset3 = OptionSet(opts)
+ oset3.parse(['-G'])
+ self.assertTrue(oset3['foo'])
+
+
+class TestOptionParser(Bcfg2TestCase):
+ def test__init(self):
+ opts = [('foo', Option('foo', 'test1', cmd='-h')),
+ ('bar', Option('foo', 'test2')),
+ ('baz', Option('foo', 'test1', cmd='-H',
+ odesc='1'))]
+ oset1 = OptionParser(opts)
+ self.assertEqual(oset1.cfile,
+ DEFAULT_CONFIG_LOCATION)
+ sys.argv = ['foo', '-C', '/usr/local/etc/bcfg2.conf']
+ oset2 = OptionParser(opts)
+ self.assertEqual(oset2.cfile,
+ '/usr/local/etc/bcfg2.conf')
+ sys.argv = []
+ oset3 = OptionParser(opts)
+ self.assertEqual(oset3.cfile,
+ DEFAULT_CONFIG_LOCATION)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin.py
new file mode 100644
index 000000000..7ea5b9b42
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin.py
@@ -0,0 +1,2296 @@
+import os
+import re
+import sys
+import copy
+import logging
+import lxml.etree
+import Bcfg2.Server
+from Bcfg2.Compat import reduce
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugin import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != '/':
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+
+
+try:
+ re_type = re._pattern_type
+except AttributeError:
+ re_type = type(re.compile(""))
+
+def tostring(el):
+ return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8')
+
+
+class FakeElementTree(lxml.etree._ElementTree):
+ xinclude = Mock()
+
+
+class TestFunctions(Bcfg2TestCase):
+ def test_bind_info(self):
+ entry = lxml.etree.Element("Path", name="/test")
+ metadata = Mock()
+ default = dict(test1="test1", test2="test2")
+ # test without infoxml
+ bind_info(entry, metadata, default=default)
+ self.assertItemsEqual(entry.attrib,
+ dict(test1="test1",
+ test2="test2",
+ name="/test"))
+
+ # test with bogus infoxml
+ entry = lxml.etree.Element("Path", name="/test")
+ infoxml = Mock()
+ self.assertRaises(PluginExecutionError,
+ bind_info,
+ entry, metadata, infoxml=infoxml)
+ infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry)
+
+ # test with valid infoxml
+ entry = lxml.etree.Element("Path", name="/test")
+ infoxml.reset_mock()
+ infodata = {None: {"test3": "test3", "test4": "test4"}}
+ def infoxml_rv(metadata, rv, entry=None):
+ rv['Info'] = infodata
+ infoxml.pnode.Match.side_effect = infoxml_rv
+ bind_info(entry, metadata, infoxml=infoxml, default=default)
+ # mock objects don't properly track the called-with value of
+ # arguments whose value is changed by the function, so it
+ # thinks Match() was called with the final value of the mdata
+ # arg, not the initial value. makes this test a little less
+ # worthwhile, TBH.
+ infoxml.pnode.Match.assert_called_with(metadata, dict(Info=infodata),
+ entry=entry)
+ self.assertItemsEqual(entry.attrib,
+ dict(test1="test1",
+ test2="test2",
+ test3="test3",
+ test4="test4",
+ name="/test"))
+
+
+class TestPluginInitError(Bcfg2TestCase):
+ """ placeholder for future tests """
+ pass
+
+
+class TestPluginExecutionError(Bcfg2TestCase):
+ """ placeholder for future tests """
+ pass
+
+
+class TestDebuggable(Bcfg2TestCase):
+ test_obj = Debuggable
+
+ def get_obj(self):
+ return self.test_obj()
+
+ def test__init(self):
+ d = self.get_obj()
+ self.assertIsInstance(d.logger, logging.Logger)
+ self.assertFalse(d.debug_flag)
+
+ @patch("Bcfg2.Server.Plugin.%s.debug_log" % test_obj.__name__)
+ def test_toggle_debug(self, mock_debug):
+ d = self.get_obj()
+ orig = d.debug_flag
+ d.toggle_debug()
+ self.assertNotEqual(orig, d.debug_flag)
+ self.assertTrue(mock_debug.called)
+
+ mock_debug.reset_mock()
+
+ changed = d.debug_flag
+ d.toggle_debug()
+ self.assertNotEqual(changed, d.debug_flag)
+ self.assertEqual(orig, d.debug_flag)
+ self.assertTrue(mock_debug.called)
+
+ def test_debug_log(self):
+ d = self.get_obj()
+ d.logger = Mock()
+ d.debug_flag = False
+ d.debug_log("test")
+ self.assertFalse(d.logger.error.called)
+
+ d.logger.reset_mock()
+ d.debug_log("test", flag=True)
+ self.assertTrue(d.logger.error.called)
+
+ d.logger.reset_mock()
+ d.debug_flag = True
+ d.debug_log("test")
+ self.assertTrue(d.logger.error.called)
+
+
+class TestPlugin(TestDebuggable):
+ test_obj = Plugin
+
+ def get_obj(self, core=None):
+ if core is None:
+ core = Mock()
+ return self.test_obj(core, datastore)
+
+ def test__init(self):
+ core = Mock()
+ p = self.get_obj(core=core)
+ self.assertEqual(p.data, os.path.join(datastore, p.name))
+ self.assertEqual(p.core, core)
+ self.assertIsInstance(p, Debuggable)
+
+ @patch("os.makedirs")
+ def test_init_repo(self, mock_makedirs):
+ self.test_obj.init_repo(datastore)
+ mock_makedirs.assert_called_with(os.path.join(datastore,
+ self.test_obj.name))
+
+
+class TestDatabaseBacked(TestPlugin):
+ test_obj = DatabaseBacked
+
+ @skipUnless(has_django, "Django not found")
+ def test__use_db(self):
+ core = Mock()
+ core.setup.cfp.getboolean.return_value = True
+ db = self.get_obj(core)
+ self.assertTrue(db._use_db)
+
+ core = Mock()
+ core.setup.cfp.getboolean.return_value = False
+ db = self.get_obj(core)
+ self.assertFalse(db._use_db)
+
+ Bcfg2.Server.Plugin.has_django = False
+ core = Mock()
+ db = self.get_obj(core)
+ self.assertFalse(db._use_db)
+
+ core = Mock()
+ core.setup.cfp.getboolean.return_value = True
+ db = self.get_obj(core)
+ self.assertFalse(db._use_db)
+ Bcfg2.Server.Plugin.has_django = True
+
+
+class TestPluginDatabaseModel(Bcfg2TestCase):
+ """ placeholder for future tests """
+ pass
+
+
+class TestGenerator(Bcfg2TestCase):
+ test_obj = Generator
+
+ def test_HandlesEntry(self):
+ pass
+
+ def test_HandleEntry(self):
+ pass
+
+
+class TestStructure(Bcfg2TestCase):
+ test_obj = Structure
+
+ def get_obj(self):
+ return self.test_obj()
+
+ def test_BuildStructures(self):
+ s = self.get_obj()
+ self.assertRaises(NotImplementedError,
+ s.BuildStructures, None)
+
+
+class TestMetadata(Bcfg2TestCase):
+ test_obj = Metadata
+
+ def get_obj(self):
+ return self.test_obj()
+
+ def test_get_initial_metadata(self):
+ m = self.get_obj()
+ self.assertRaises(NotImplementedError,
+ m.get_initial_metadata, None)
+
+ def test_merge_additional_data(self):
+ m = self.get_obj()
+ self.assertRaises(NotImplementedError,
+ m.merge_additional_data, None, None, None)
+
+ def test_merge_additional_groups(self):
+ m = self.get_obj()
+ self.assertRaises(NotImplementedError,
+ m.merge_additional_groups, None, None)
+
+
+class TestConnector(Bcfg2TestCase):
+ """ placeholder """
+ def test_get_additional_groups(self):
+ pass
+
+ def test_get_additional_data(self):
+ pass
+
+
+class TestProbing(Bcfg2TestCase):
+ """ placeholder """
+ def test_GetProbes(self):
+ pass
+
+ def test_ReceiveData(self):
+ pass
+
+
+class TestStatistics(TestPlugin):
+ test_obj = Statistics
+
+ def test_process_statistics(self):
+ pass
+
+
+class TestThreadedStatistics(TestStatistics):
+ test_obj = ThreadedStatistics
+ data = [("foo.example.com", "<foo/>"),
+ ("bar.example.com", "<bar/>")]
+
+ @patch("threading.Thread.start")
+ def test__init(self, mock_start):
+ core = Mock()
+ ts = self.get_obj(core)
+ mock_start.assert_any_call()
+
+ @patch("%s.open" % builtins)
+ @patch("%s.dump" % cPickle.__name__)
+ @patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock())
+ def test_save(self, mock_dump, mock_open):
+ core = Mock()
+ ts = self.get_obj(core)
+ queue = Mock()
+ queue.empty = Mock(side_effect=Empty)
+ ts.work_queue = queue
+
+ mock_open.side_effect = OSError
+ # test that save does _not_ raise an exception even when
+ # everything goes pear-shaped
+ ts.save()
+ queue.empty.assert_any_call()
+ mock_open.assert_called_with(ts.pending_file, 'w')
+
+ queue.reset_mock()
+ mock_open.reset_mock()
+
+ queue.data = []
+ for hostname, xml in self.data:
+ md = Mock()
+ md.hostname = hostname
+ queue.data.append((md, lxml.etree.XML(xml)))
+ queue.empty.side_effect = lambda: len(queue.data) == 0
+ queue.get_nowait = Mock(side_effect=lambda: queue.data.pop())
+ mock_open.side_effect = None
+
+ ts.save()
+ queue.empty.assert_any_call()
+ queue.get_nowait.assert_any_call()
+ mock_open.assert_called_with(ts.pending_file, 'w')
+ mock_open.return_value.close.assert_any_call()
+ # the order of the queue data gets changed, so we have to
+ # verify this call in an ugly way
+ self.assertItemsEqual(mock_dump.call_args[0][0], self.data)
+ self.assertEqual(mock_dump.call_args[0][1], mock_open.return_value)
+
+ @patch("os.unlink")
+ @patch("os.path.exists")
+ @patch("%s.open" % builtins)
+ @patch("lxml.etree.XML")
+ @patch("%s.load" % cPickle.__name__)
+ @patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock())
+ def test_load(self, mock_load, mock_XML, mock_open, mock_exists,
+ mock_unlink):
+ core = Mock()
+ core.terminate.isSet.return_value = False
+ ts = self.get_obj(core)
+
+ ts.work_queue = Mock()
+ ts.work_queue.data = []
+ def reset():
+ core.reset_mock()
+ mock_open.reset_mock()
+ mock_exists.reset_mock()
+ mock_unlink.reset_mock()
+ mock_load.reset_mock()
+ mock_XML.reset_mock()
+ ts.work_queue.reset_mock()
+ ts.work_queue.data = []
+
+ mock_exists.return_value = False
+ self.assertTrue(ts.load())
+ mock_exists.assert_called_with(ts.pending_file)
+
+ reset()
+ mock_exists.return_value = True
+ mock_open.side_effect = OSError
+ self.assertFalse(ts.load())
+ mock_exists.assert_called_with(ts.pending_file)
+ mock_open.assert_called_with(ts.pending_file, 'r')
+
+ reset()
+ mock_open.side_effect = None
+ mock_load.return_value = self.data
+ ts.work_queue.put_nowait.side_effect = Full
+ self.assertTrue(ts.load())
+ mock_exists.assert_called_with(ts.pending_file)
+ mock_open.assert_called_with(ts.pending_file, 'r')
+ mock_open.return_value.close.assert_any_call()
+ mock_load.assert_called_with(mock_open.return_value)
+
+ reset()
+ core.build_metadata.side_effect = lambda x: x
+ mock_XML.side_effect = lambda x, parser=None: x
+ ts.work_queue.put_nowait.side_effect = None
+ self.assertTrue(ts.load())
+ mock_exists.assert_called_with(ts.pending_file)
+ mock_open.assert_called_with(ts.pending_file, 'r')
+ mock_open.return_value.close.assert_any_call()
+ mock_load.assert_called_with(mock_open.return_value)
+ self.assertItemsEqual(mock_XML.call_args_list,
+ [call(x, parser=Bcfg2.Server.XMLParser)
+ for h, x in self.data])
+ self.assertItemsEqual(ts.work_queue.put_nowait.call_args_list,
+ [call((h, x)) for h, x in self.data])
+ mock_unlink.assert_called_with(ts.pending_file)
+
+ @patch("threading.Thread.start", Mock())
+ @patch("Bcfg2.Server.Plugin.ThreadedStatistics.load")
+ @patch("Bcfg2.Server.Plugin.ThreadedStatistics.save")
+ @patch("Bcfg2.Server.Plugin.ThreadedStatistics.handle_statistic")
+ def test_run(self, mock_handle, mock_save, mock_load):
+ core = Mock()
+ ts = self.get_obj(core)
+ mock_load.return_value = True
+ ts.work_queue = Mock()
+
+ def reset():
+ mock_handle.reset_mock()
+ mock_save.reset_mock()
+ mock_load.reset_mock()
+ core.reset_mock()
+ ts.work_queue.reset_mock()
+ ts.work_queue.data = self.data[:]
+ ts.work_queue.get_calls = 0
+
+ reset()
+
+ def get_rv(**kwargs):
+ ts.work_queue.get_calls += 1
+ try:
+ return ts.work_queue.data.pop()
+ except:
+ raise Empty
+ ts.work_queue.get.side_effect = get_rv
+ def terminate_isset():
+ # this lets the loop go on a few iterations with an empty
+ # queue to test that it doesn't error out
+ return ts.work_queue.get_calls > 3
+ core.terminate.isSet.side_effect = terminate_isset
+
+ ts.work_queue.empty.return_value = False
+ ts.run()
+ mock_load.assert_any_call()
+ self.assertGreaterEqual(ts.work_queue.get.call_count, len(self.data))
+ self.assertItemsEqual(mock_handle.call_args_list,
+ [call(h, x) for h, x in self.data])
+ mock_save.assert_any_call()
+
+ @patch("copy.copy", Mock(side_effect=lambda x: x))
+ @patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock())
+ def test_process_statistics(self):
+ TestStatistics.test_process_statistics(self)
+
+ core = Mock()
+ ts = self.get_obj(core)
+ ts.work_queue = Mock()
+ ts.process_statistics(*self.data[0])
+ ts.work_queue.put_nowait.assert_called_with(self.data[0])
+
+ ts.work_queue.reset_mock()
+ ts.work_queue.put_nowait.side_effect = Full
+ # test that no exception is thrown
+ ts.process_statistics(*self.data[0])
+
+
+class TestPullSource(Bcfg2TestCase):
+ def test_GetCurrentEntry(self):
+ ps = PullSource()
+ self.assertRaises(NotImplementedError,
+ ps.GetCurrentEntry, None, None, None)
+
+
+class TestPullTarget(Bcfg2TestCase):
+ def test_AcceptChoices(self):
+ pt = PullTarget()
+ self.assertRaises(NotImplementedError,
+ pt.AcceptChoices, None, None)
+
+ def test_AcceptPullData(self):
+ pt = PullTarget()
+ self.assertRaises(NotImplementedError,
+ pt.AcceptPullData, None, None, None)
+
+
+class TestDecision(Bcfg2TestCase):
+ """ placeholder for future tests """
+ pass
+
+
+class TestValidationError(Bcfg2TestCase):
+ """ placeholder for future tests """
+ pass
+
+
+class TestStructureValidator(Bcfg2TestCase):
+ def test_validate_structures(self):
+ sv = StructureValidator()
+ self.assertRaises(NotImplementedError,
+ sv.validate_structures, None, None)
+
+
+class TestGoalValidator(Bcfg2TestCase):
+ def test_validate_goals(self):
+ gv = GoalValidator()
+ self.assertRaises(NotImplementedError,
+ gv.validate_goals, None, None)
+
+
+class TestVersion(Bcfg2TestCase):
+ """ placeholder for future tests """
+ pass
+
+
+class TestClientRunHooks(Bcfg2TestCase):
+ """ placeholder for future tests """
+ pass
+
+
+class TestFileBacked(Bcfg2TestCase):
+ test_obj = FileBacked
+ path = os.path.join(datastore, "test")
+
+ def get_obj(self, path=None, fam=None):
+ if path is None:
+ path = self.path
+ return self.test_obj(path, fam=fam)
+
+ @patch("%s.open" % builtins)
+ def test_HandleEvent(self, mock_open):
+ fb = self.get_obj()
+ fb.Index = Mock()
+
+ def reset():
+ fb.Index.reset_mock()
+ mock_open.reset_mock()
+
+ for evt in ["exists", "changed", "created"]:
+ reset()
+ event = Mock()
+ event.code2str.return_value = evt
+ fb.HandleEvent(event)
+ mock_open.assert_called_with(self.path)
+ mock_open.return_value.read.assert_any_call()
+ fb.Index.assert_any_call()
+
+ reset()
+ event = Mock()
+ event.code2str.return_value = "endExist"
+ fb.HandleEvent(event)
+ self.assertFalse(mock_open.called)
+ self.assertFalse(fb.Index.called)
+
+
+class TestDirectoryBacked(Bcfg2TestCase):
+ test_obj = DirectoryBacked
+ testpaths = {1: '',
+ 2: '/foo',
+ 3: '/foo/bar',
+ 4: '/foo/bar/baz',
+ 5: 'quux',
+ 6: 'xyzzy/',
+ 7: 'xyzzy/plugh/'}
+ testfiles = ['foo', 'bar/baz.txt', 'plugh.py']
+ ignore = [] # ignore no events
+ badevents = [] # DirectoryBacked handles all files, so there's no
+ # such thing as a bad event
+
+ def test_child_interface(self):
+ # ensure that the child object has the correct interface
+ self.assertTrue(hasattr(self.test_obj.__child__, "HandleEvent"))
+
+ @patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__,
+ Mock())
+ def get_obj(self, fam=None):
+ if fam is None:
+ fam = Mock()
+ return self.test_obj(os.path.join(datastore, self.test_obj.__name__),
+ fam)
+
+ @patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__)
+ def test__init(self, mock_add_monitor):
+ db = self.test_obj(datastore, Mock())
+ mock_add_monitor.assert_called_with('')
+
+ def test__getitem(self):
+ db = self.get_obj()
+ db.entries.update(dict(a=1, b=2, c=3))
+ self.assertEqual(db['a'], 1)
+ self.assertEqual(db['b'], 2)
+ expected = KeyError
+ try:
+ db['d']
+ except expected:
+ pass
+ except:
+ err = sys.exc_info()[1]
+ self.assertFalse(True, "%s raised instead of %s" %
+ (err.__class__.__name__,
+ expected.__class__.__name__))
+ else:
+ self.assertFalse(True,
+ "%s not raised" % expected.__class__.__name__)
+
+ def test__iter(self):
+ db = self.get_obj()
+ db.entries.update(dict(a=1, b=2, c=3))
+ self.assertEqual([i for i in db],
+ [i for i in db.entries.items()])
+
+ @patch("os.path.isdir")
+ def test_add_directory_monitor(self, mock_isdir):
+ db = self.get_obj()
+ db.fam = Mock()
+ db.fam.rv = 0
+
+ def reset():
+ db.fam.rv += 1
+ db.fam.AddMonitor.return_value = db.fam.rv
+ db.fam.reset_mock()
+ mock_isdir.reset_mock()
+
+ mock_isdir.return_value = True
+ for path in self.testpaths.values():
+ reset()
+ db.add_directory_monitor(path)
+ db.fam.AddMonitor.assert_called_with(os.path.join(db.data, path),
+ db)
+ self.assertIn(db.fam.rv, db.handles)
+ self.assertEqual(db.handles[db.fam.rv], path)
+
+ reset()
+ # test duplicate adds
+ for path in self.testpaths.values():
+ reset()
+ db.add_directory_monitor(path)
+ self.assertFalse(db.fam.AddMonitor.called)
+
+ reset()
+ mock_isdir.return_value = False
+ db.add_directory_monitor('bogus')
+ self.assertFalse(db.fam.AddMonitor.called)
+ self.assertNotIn(db.fam.rv, db.handles)
+
+ def test_add_entry(self):
+ db = self.get_obj()
+ db.fam = Mock()
+ class MockChild(Mock):
+ def __init__(self, path, fam, **kwargs):
+ Mock.__init__(self, **kwargs)
+ self.path = path
+ self.fam = fam
+ self.HandleEvent = Mock()
+ db.__child__ = MockChild
+
+ for path in self.testpaths.values():
+ event = Mock()
+ db.add_entry(path, event)
+ self.assertIn(path, db.entries)
+ self.assertEqual(db.entries[path].path,
+ os.path.join(db.data, path))
+ self.assertEqual(db.entries[path].fam, db.fam)
+ db.entries[path].HandleEvent.assert_called_with(event)
+
+ @patch("os.path.isdir")
+ @patch("Bcfg2.Server.Plugin.%s.add_entry" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__)
+ def test_HandleEvent(self, mock_add_monitor, mock_add_entry, mock_isdir):
+ db = self.get_obj()
+ # a path with a leading / should never get into
+ # DirectoryBacked.handles, so strip that test case
+ for rid, path in self.testpaths.items():
+ path = path.lstrip('/')
+ db.handles[rid] = path
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_add_entry.reset_mock()
+ mock_add_monitor.reset_mock()
+
+ def get_event(filename, action, requestID):
+ event = Mock()
+ event.code2str.return_value = action
+ event.filename = filename
+ event.requestID = requestID
+ return event
+
+ # test events on the data directory itself
+ reset()
+ mock_isdir.return_value = True
+ event = get_event(db.data, "exists", 1)
+ db.HandleEvent(event)
+ mock_add_monitor.assert_called_with("")
+
+ # test events on paths that aren't handled
+ reset()
+ mock_isdir.return_value = False
+ event = get_event('/' + self.testfiles[0], 'created',
+ max(self.testpaths.keys()) + 1)
+ db.HandleEvent(event)
+ self.assertFalse(mock_add_monitor.called)
+ self.assertFalse(mock_add_entry.called)
+
+ for req_id, path in self.testpaths.items():
+ # a path with a leading / should never get into
+ # DirectoryBacked.handles, so strip that test case
+ path = path.lstrip('/')
+ basepath = os.path.join(datastore, path)
+ for fname in self.testfiles:
+ relpath = os.path.join(path, fname)
+ abspath = os.path.join(basepath, fname)
+
+ # test endExist does nothing
+ reset()
+ event = get_event(fname, 'endExist', req_id)
+ db.HandleEvent(event)
+ self.assertFalse(mock_add_monitor.called)
+ self.assertFalse(mock_add_entry.called)
+
+ mock_isdir.return_value = True
+ for evt in ["created", "exists", "changed"]:
+ # test that creating or changing a directory works
+ reset()
+ event = get_event(fname, evt, req_id)
+ db.HandleEvent(event)
+ mock_add_monitor.assert_called_with(relpath)
+ self.assertFalse(mock_add_entry.called)
+
+ mock_isdir.return_value = False
+ for evt in ["created", "exists"]:
+ # test that creating a file works
+ reset()
+ event = get_event(fname, evt, req_id)
+ db.HandleEvent(event)
+ mock_add_entry.assert_called_with(relpath, event)
+ self.assertFalse(mock_add_monitor.called)
+ db.entries[relpath] = MagicMock()
+
+ # test that changing a file that already exists works
+ reset()
+ event = get_event(fname, "changed", req_id)
+ db.HandleEvent(event)
+ db.entries[relpath].HandleEvent.assert_called_with(event)
+ self.assertFalse(mock_add_monitor.called)
+ self.assertFalse(mock_add_entry.called)
+
+ # test that deleting an entry works
+ reset()
+ event = get_event(fname, "deleted", req_id)
+ db.HandleEvent(event)
+ self.assertNotIn(relpath, db.entries)
+
+ # test that changing a file that doesn't exist works
+ reset()
+ event = get_event(fname, "changed", req_id)
+ db.HandleEvent(event)
+ mock_add_entry.assert_called_with(relpath, event)
+ self.assertFalse(mock_add_monitor.called)
+ db.entries[relpath] = MagicMock()
+
+ # test that deleting a directory works. this is a little
+ # strange because the _parent_ directory has to handle the
+ # deletion
+ reset()
+ event = get_event('quux', "deleted", 1)
+ db.HandleEvent(event)
+ for key in db.entries.keys():
+ self.assertFalse(key.startswith('quux'))
+
+ # test bad events
+ for fname in self.badevents:
+ reset()
+ event = get_event(fname, "created", 1)
+ db.HandleEvent(event)
+ self.assertFalse(mock_add_entry.called)
+ self.assertFalse(mock_add_monitor.called)
+
+ # test ignored events
+ for fname in self.ignore:
+ reset()
+ event = get_event(fname, "created", 1)
+ db.HandleEvent(event)
+ self.assertFalse(mock_isdir.called,
+ msg="Failed to ignore %s" % fname)
+ self.assertFalse(mock_add_entry.called,
+ msg="Failed to ignore %s" % fname)
+ self.assertFalse(mock_add_monitor.called,
+ msg="Failed to ignore %s" % fname)
+
+
+class TestXMLFileBacked(TestFileBacked):
+ test_obj = XMLFileBacked
+ path = os.path.join(datastore, "test", "test1.xml")
+
+ def get_obj(self, path=None, fam=None, should_monitor=False):
+ if path is None:
+ path = self.path
+ return self.test_obj(path, fam=fam, should_monitor=should_monitor)
+
+ def test__init(self):
+ fam = Mock()
+ xfb = self.get_obj()
+ self.assertIsNone(xfb.fam)
+
+ xfb = self.get_obj(fam=fam)
+ self.assertFalse(fam.AddMonitor.called)
+
+ fam.reset_mock()
+ xfb = self.get_obj(fam=fam, should_monitor=True)
+ fam.AddMonitor.assert_called_with(self.path, xfb)
+
+ @patch("os.path.exists")
+ @patch("lxml.etree.parse")
+ def test_follow_xincludes(self, mock_parse, mock_exists):
+ xfb = self.get_obj()
+ xfb.add_monitor = Mock()
+
+ def reset():
+ xfb.add_monitor.reset_mock()
+ mock_parse.reset_mock()
+ mock_exists.reset_mock()
+ xfb.extras = []
+
+ mock_exists.return_value = True
+ xdata = dict()
+ mock_parse.side_effect = lambda p: xdata[p]
+
+ # basic functionality
+ xdata['/test/test2.xml'] = lxml.etree.Element("Test").getroottree()
+ xfb._follow_xincludes(xdata=xdata['/test/test2.xml'])
+ self.assertFalse(xfb.add_monitor.called)
+
+ if (not hasattr(self.test_obj, "xdata") or
+ not isinstance(self.test_obj.xdata, property)):
+ # if xdata is settable, test that method of getting data
+ # to _follow_xincludes
+ reset()
+ xfb.xdata = xdata['/test/test2.xml'].getroot()
+ xfb._follow_xincludes()
+ self.assertFalse(xfb.add_monitor.called)
+ xfb.xdata = None
+
+ reset()
+ xfb._follow_xincludes(fname="/test/test2.xml")
+ self.assertFalse(xfb.add_monitor.called)
+
+ # test one level of xinclude
+ xdata[self.path] = lxml.etree.Element("Test").getroottree()
+ lxml.etree.SubElement(xdata[self.path].getroot(),
+ Bcfg2.Server.XI_NAMESPACE + "include",
+ href="/test/test2.xml")
+ reset()
+ xfb._follow_xincludes(fname=self.path)
+ xfb.add_monitor.assert_called_with("/test/test2.xml")
+ self.assertItemsEqual(mock_parse.call_args_list,
+ [call(f) for f in xdata.keys()])
+ mock_exists.assert_called_with("/test/test2.xml")
+
+ reset()
+ xfb._follow_xincludes(fname=self.path, xdata=xdata[self.path])
+ xfb.add_monitor.assert_called_with("/test/test2.xml")
+ self.assertItemsEqual(mock_parse.call_args_list,
+ [call(f) for f in xdata.keys()
+ if f != self.path])
+ mock_exists.assert_called_with("/test/test2.xml")
+
+ # test two-deep level of xinclude, with some files in another
+ # directory
+ xdata["/test/test3.xml"] = \
+ lxml.etree.Element("Test").getroottree()
+ lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(),
+ Bcfg2.Server.XI_NAMESPACE + "include",
+ href="/test/test_dir/test4.xml")
+ xdata["/test/test_dir/test4.xml"] = \
+ lxml.etree.Element("Test").getroottree()
+ lxml.etree.SubElement(xdata["/test/test_dir/test4.xml"].getroot(),
+ Bcfg2.Server.XI_NAMESPACE + "include",
+ href="/test/test_dir/test5.xml")
+ xdata['/test/test_dir/test5.xml'] = \
+ lxml.etree.Element("Test").getroottree()
+ xdata['/test/test_dir/test6.xml'] = \
+ lxml.etree.Element("Test").getroottree()
+ # relative includes
+ lxml.etree.SubElement(xdata[self.path].getroot(),
+ Bcfg2.Server.XI_NAMESPACE + "include",
+ href="test3.xml")
+ lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(),
+ Bcfg2.Server.XI_NAMESPACE + "include",
+ href="test_dir/test6.xml")
+
+ reset()
+ xfb._follow_xincludes(fname=self.path)
+ self.assertItemsEqual(xfb.add_monitor.call_args_list,
+ [call(f) for f in xdata.keys() if f != self.path])
+ self.assertItemsEqual(mock_parse.call_args_list,
+ [call(f) for f in xdata.keys()])
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call(f) for f in xdata.keys() if f != self.path])
+
+ reset()
+ xfb._follow_xincludes(fname=self.path, xdata=xdata[self.path])
+ self.assertItemsEqual(xfb.add_monitor.call_args_list,
+ [call(f) for f in xdata.keys() if f != self.path])
+ self.assertItemsEqual(mock_parse.call_args_list,
+ [call(f) for f in xdata.keys() if f != self.path])
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call(f) for f in xdata.keys() if f != self.path])
+
+ @patch("lxml.etree._ElementTree", FakeElementTree)
+ @patch("Bcfg2.Server.Plugin.%s._follow_xincludes" % test_obj.__name__)
+ def test_Index(self, mock_follow):
+ xfb = self.get_obj()
+
+ def reset():
+ mock_follow.reset_mock()
+ FakeElementTree.xinclude.reset_mock()
+ xfb.extras = []
+ xfb.xdata = None
+
+ # syntax error
+ xfb.data = "<"
+ self.assertRaises(PluginInitError, xfb.Index)
+
+ # no xinclude
+ reset()
+ xdata = lxml.etree.Element("Test", name="test")
+ children = [lxml.etree.SubElement(xdata, "Foo"),
+ lxml.etree.SubElement(xdata, "Bar", name="bar")]
+ xfb.data = tostring(xdata)
+ xfb.Index()
+ mock_follow.assert_any_call()
+ try:
+ self.assertEqual(xfb.xdata.base, self.path)
+ except AttributeError:
+ # python 2.4 and/or lxml 2.0 don't store the base_url in
+ # .base -- no idea where it's stored.
+ pass
+ self.assertItemsEqual([tostring(e) for e in xfb.entries],
+ [tostring(e) for e in children])
+
+ # with xincludes
+ reset()
+ mock_follow.side_effect = \
+ lambda: xfb.extras.extend(["/test/test2.xml",
+ "/test/test_dir/test3.xml"])
+ children.extend([
+ lxml.etree.SubElement(xdata,
+ Bcfg2.Server.XI_NAMESPACE + "include",
+ href="/test/test2.xml"),
+ lxml.etree.SubElement(xdata,
+ Bcfg2.Server.XI_NAMESPACE + "include",
+ href="/test/test_dir/test3.xml")])
+ test2 = lxml.etree.Element("Test", name="test2")
+ lxml.etree.SubElement(test2, "Baz")
+ test3 = lxml.etree.Element("Test", name="test3")
+ replacements = {"/test/test2.xml": test2,
+ "/test/test_dir/test3.xml": test3}
+ def xinclude():
+ for el in xfb.xdata.findall('//%sinclude' %
+ Bcfg2.Server.XI_NAMESPACE):
+ xfb.xdata.replace(el, replacements[el.get("href")])
+ FakeElementTree.xinclude.side_effect = xinclude
+
+ xfb.data = tostring(xdata)
+ xfb.Index()
+ mock_follow.assert_any_call()
+ FakeElementTree.xinclude.assert_any_call
+ try:
+ self.assertEqual(xfb.xdata.base, self.path)
+ except AttributeError:
+ pass
+ self.assertItemsEqual([tostring(e) for e in xfb.entries],
+ [tostring(e) for e in children])
+
+ def test_add_monitor(self):
+ xfb = self.get_obj()
+ xfb.add_monitor("/test/test2.xml")
+ self.assertIn("/test/test2.xml", xfb.extras)
+
+ fam = Mock()
+ xfb = self.get_obj(fam=fam)
+ fam.reset_mock()
+ xfb.add_monitor("/test/test3.xml")
+ self.assertFalse(fam.AddMonitor.called)
+ self.assertIn("/test/test3.xml", xfb.extras)
+
+ fam.reset_mock()
+ xfb = self.get_obj(fam=fam, should_monitor=True)
+ xfb.add_monitor("/test/test4.xml")
+ fam.AddMonitor.assert_called_with("/test/test4.xml", xfb)
+ self.assertIn("/test/test4.xml", xfb.extras)
+
+
+class TestStructFile(TestXMLFileBacked):
+ test_obj = StructFile
+
+ def _get_test_data(self):
+ """ build a very complex set of test data """
+ # top-level group and client elements
+ groups = dict()
+ # group and client elements that are descendents of other group or
+ # client elements
+ subgroups = dict()
+ # children of elements in `groups' that should be included in
+ # match results
+ children = dict()
+ # children of elements in `subgroups' that should be included in
+ # match results
+ subchildren = dict()
+ # top-level tags that are not group elements
+ standalone = []
+ xdata = lxml.etree.Element("Test", name="test")
+ groups[0] = lxml.etree.SubElement(xdata, "Group", name="group1",
+ include="true")
+ children[0] = [lxml.etree.SubElement(groups[0], "Child", name="c1"),
+ lxml.etree.SubElement(groups[0], "Child", name="c2")]
+ subgroups[0] = [lxml.etree.SubElement(groups[0], "Group",
+ name="subgroup1", include="true"),
+ lxml.etree.SubElement(groups[0],
+ "Client", name="client1",
+ include="false")]
+ subchildren[0] = \
+ [lxml.etree.SubElement(subgroups[0][0], "Child", name="sc1"),
+ lxml.etree.SubElement(subgroups[0][0], "Child", name="sc2",
+ attr="some attr"),
+ lxml.etree.SubElement(subgroups[0][0], "Child", name="sc3")]
+ lxml.etree.SubElement(subchildren[0][-1], "SubChild", name="subchild")
+ lxml.etree.SubElement(subgroups[0][1], "Child", name="sc4")
+
+ groups[1] = lxml.etree.SubElement(xdata, "Group", name="group2",
+ include="false")
+ children[1] = []
+ subgroups[1] = []
+ subchildren[1] = []
+ lxml.etree.SubElement(groups[1], "Child", name="c3")
+ lxml.etree.SubElement(groups[1], "Child", name="c4")
+
+ standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s1"))
+
+ groups[2] = lxml.etree.SubElement(xdata, "Client", name="client2",
+ include="false")
+ children[2] = []
+ subgroups[2] = []
+ subchildren[2] = []
+ lxml.etree.SubElement(groups[2], "Child", name="c5")
+ lxml.etree.SubElement(groups[2], "Child", name="c6")
+
+ standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s2",
+ attr="some attr"))
+
+ groups[3] = lxml.etree.SubElement(xdata, "Client", name="client3",
+ include="true")
+ children[3] = [lxml.etree.SubElement(groups[3], "Child", name="c7",
+ attr="some_attr"),
+ lxml.etree.SubElement(groups[3], "Child", name="c8")]
+ subgroups[3] = []
+ subchildren[3] = []
+ lxml.etree.SubElement(children[3][-1], "SubChild", name="subchild")
+
+ standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s3"))
+ lxml.etree.SubElement(standalone[-1], "SubStandalone", name="sub1")
+
+ children[4] = standalone
+ return (xdata, groups, subgroups, children, subchildren, standalone)
+
+ def test_include_element(self):
+ sf = self.get_obj()
+ metadata = Mock()
+ metadata.groups = ["group1", "group2"]
+ metadata.hostname = "foo.example.com"
+
+ inc = lambda tag, **attrs: \
+ sf._include_element(lxml.etree.Element(tag, **attrs), metadata)
+
+ self.assertFalse(sf._include_element(lxml.etree.Comment("test"),
+ metadata))
+
+ self.assertFalse(inc("Group", name="group3"))
+ self.assertFalse(inc("Group", name="group2", negate="true"))
+ self.assertFalse(inc("Group", name="group2", negate="tRuE"))
+ self.assertTrue(inc("Group", name="group2"))
+ self.assertTrue(inc("Group", name="group2", negate="false"))
+ self.assertTrue(inc("Group", name="group2", negate="faLSe"))
+ self.assertTrue(inc("Group", name="group3", negate="true"))
+ self.assertTrue(inc("Group", name="group3", negate="tRUe"))
+
+ self.assertFalse(inc("Client", name="bogus.example.com"))
+ self.assertFalse(inc("Client", name="foo.example.com", negate="true"))
+ self.assertFalse(inc("Client", name="foo.example.com", negate="tRuE"))
+ self.assertTrue(inc("Client", name="foo.example.com"))
+ self.assertTrue(inc("Client", name="foo.example.com", negate="false"))
+ self.assertTrue(inc("Client", name="foo.example.com", negate="faLSe"))
+ self.assertTrue(inc("Client", name="bogus.example.com", negate="true"))
+ self.assertTrue(inc("Client", name="bogus.example.com", negate="tRUe"))
+
+ self.assertTrue(inc("Other"))
+
+ @patch("Bcfg2.Server.Plugin.%s._include_element" % test_obj.__name__)
+ def test__match(self, mock_include):
+ sf = self.get_obj()
+ metadata = Mock()
+
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ self._get_test_data()
+
+ mock_include.side_effect = \
+ lambda x, _: (x.tag not in ['Client', 'Group'] or
+ x.get("include") == "true")
+
+ for i, group in groups.items():
+ actual = sf._match(group, metadata)
+ expected = children[i] + subchildren[i]
+ self.assertEqual(len(actual), len(expected))
+ # easiest way to compare the values is actually to make
+ # them into an XML document and let assertXMLEqual compare
+ # them
+ xactual = lxml.etree.Element("Container")
+ xactual.extend(actual)
+ xexpected = lxml.etree.Element("Container")
+ xexpected.extend(expected)
+ self.assertXMLEqual(xactual, xexpected)
+
+ for el in standalone:
+ self.assertXMLEqual(el, sf._match(el, metadata)[0])
+
+ @patch("Bcfg2.Server.Plugin.%s._match" % test_obj.__name__)
+ def test_Match(self, mock_match):
+ sf = self.get_obj()
+ metadata = Mock()
+
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ self._get_test_data()
+ sf.entries.extend(copy.deepcopy(xdata).getchildren())
+
+ def match_rv(el, _):
+ if el.tag not in ['Client', 'Group']:
+ return [el]
+ elif x.get("include") == "true":
+ return el.getchildren()
+ else:
+ return []
+ mock_match.side_effect = match_rv
+ actual = sf.Match(metadata)
+ expected = reduce(lambda x, y: x + y,
+ list(children.values()) + list(subgroups.values()))
+ self.assertEqual(len(actual), len(expected))
+ # easiest way to compare the values is actually to make
+ # them into an XML document and let assertXMLEqual compare
+ # them
+ xactual = lxml.etree.Element("Container")
+ xactual.extend(actual)
+ xexpected = lxml.etree.Element("Container")
+ xexpected.extend(expected)
+ self.assertXMLEqual(xactual, xexpected)
+
+ @patch("Bcfg2.Server.Plugin.%s._include_element" % test_obj.__name__)
+ def test__xml_match(self, mock_include):
+ sf = self.get_obj()
+ metadata = Mock()
+
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ self._get_test_data()
+
+ mock_include.side_effect = \
+ lambda x, _: (x.tag not in ['Client', 'Group'] or
+ x.get("include") == "true")
+
+ actual = copy.deepcopy(xdata)
+ for el in actual.getchildren():
+ sf._xml_match(el, metadata)
+ expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib))
+ expected.text = xdata.text
+ expected.extend(reduce(lambda x, y: x + y,
+ list(children.values()) + list(subchildren.values())))
+ expected.extend(standalone)
+ self.assertXMLEqual(actual, expected)
+
+ @patch("Bcfg2.Server.Plugin.%s._xml_match" % test_obj.__name__)
+ def test_Match(self, mock_xml_match):
+ sf = self.get_obj()
+ metadata = Mock()
+
+ (sf.xdata, groups, subgroups, children, subchildren, standalone) = \
+ self._get_test_data()
+
+ sf.XMLMatch(metadata)
+ actual = []
+ for call in mock_xml_match.call_args_list:
+ actual.append(call[0][0])
+ self.assertEqual(call[0][1], metadata)
+ expected = list(groups.values()) + standalone
+ # easiest way to compare the values is actually to make
+ # them into an XML document and let assertXMLEqual compare
+ # them
+ xactual = lxml.etree.Element("Container")
+ xactual.extend(actual)
+ xexpected = lxml.etree.Element("Container")
+ xexpected.extend(expected)
+ self.assertXMLEqual(xactual, xexpected)
+
+
+class TestINode(Bcfg2TestCase):
+ test_obj = INode
+
+ # INode.__init__ and INode._load_children() call each other
+ # recursively, which makes this class kind of a nightmare to test.
+ # we have to first patch INode._load_children so that we can
+ # create an INode object with no children loaded, then we unpatch
+ # INode._load_children and patch INode.__init__ so that child
+ # objects aren't actually created. but in order to test things
+ # atomically, we do this umpteen times in order to test with
+ # different data. this convenience method makes this a little
+ # easier. fun fun fun.
+ @patch("Bcfg2.Server.Plugin.%s._load_children" % test_obj.__name__, Mock())
+ def _get_inode(self, data, idict):
+ return self.test_obj(data, idict)
+
+ def test_raw_predicates(self):
+ metadata = Mock()
+ metadata.groups = ["group1", "group2"]
+ metadata.hostname = "foo.example.com"
+ entry = None
+
+ parent_predicate = lambda m, e: True
+ pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+ pred = eval(self.test_obj.raw['Client'] % dict(name="bar.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
+ pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+ pred = eval(self.test_obj.raw['Group'] % dict(name="group3"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
+ pred = eval(self.test_obj.nraw['Client'] % dict(name="foo.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+
+ pred = eval(self.test_obj.nraw['Group'] % dict(name="group1"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+
+ parent_predicate = lambda m, e: False
+ pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
+ self.assertItemsEqual(self.test_obj.containers,
+ self.test_obj.raw.keys())
+ self.assertItemsEqual(self.test_obj.containers,
+ self.test_obj.nraw.keys())
+
+ @patch("Bcfg2.Server.Plugin.INode._load_children")
+ def test__init(self, mock_load_children):
+ data = lxml.etree.Element("Bogus")
+ # called with no parent, should not raise an exception; it's a
+ # top-level tag in an XML file and so is not expected to be a
+ # proper predicate
+ INode(data, dict())
+ self.assertRaises(PluginExecutionError,
+ INode, data, dict(), Mock())
+
+ data = lxml.etree.Element("Client", name="foo.example.com")
+ idict = dict()
+ inode = INode(data, idict)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertTrue(inode.predicate(Mock(), Mock()))
+
+ parent = Mock()
+ parent.predicate = lambda m, e: True
+ metadata = Mock()
+ metadata.groups = ["group1", "group2"]
+ metadata.hostname = "foo.example.com"
+ entry = None
+
+ # test setting predicate with parent object
+ mock_load_children.reset_mock()
+ inode = INode(data, idict, parent=parent)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertTrue(inode.predicate(metadata, entry))
+
+ # test negation
+ data = lxml.etree.Element("Client", name="foo.example.com",
+ negate="true")
+ mock_load_children.reset_mock()
+ inode = INode(data, idict, parent=parent)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertFalse(inode.predicate(metadata, entry))
+
+ # test failure of a matching predicate (client names do not match)
+ data = lxml.etree.Element("Client", name="foo.example.com")
+ metadata.hostname = "bar.example.com"
+ mock_load_children.reset_mock()
+ inode = INode(data, idict, parent=parent)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertFalse(inode.predicate(metadata, entry))
+
+ # test that parent predicate is AND'ed in correctly
+ parent.predicate = lambda m, e: False
+ metadata.hostname = "foo.example.com"
+ mock_load_children.reset_mock()
+ inode = INode(data, idict, parent=parent)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertFalse(inode.predicate(metadata, entry))
+
+ def test_load_children(self):
+ data = lxml.etree.Element("Parent")
+ child1 = lxml.etree.SubElement(data, "Client", name="foo.example.com")
+ child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
+ idict = dict()
+
+ inode = self._get_inode(data, idict)
+
+ @patch("Bcfg2.Server.Plugin.%s.__init__" % inode.__class__.__name__)
+ def inner(mock_init):
+ mock_init.return_value = None
+ inode._load_children(data, idict)
+ self.assertItemsEqual(mock_init.call_args_list,
+ [call(child1, idict, inode),
+ call(child2, idict, inode)])
+ self.assertEqual(idict, dict())
+ self.assertItemsEqual(inode.contents, dict())
+
+ inner()
+
+ data = lxml.etree.Element("Parent")
+ child1 = lxml.etree.SubElement(data, "Data", name="child1",
+ attr="some attr")
+ child1.text = "text"
+ subchild1 = lxml.etree.SubElement(child1, "SubChild", name="subchild")
+ child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
+ idict = dict()
+
+ inode = self._get_inode(data, idict)
+
+ @patch("Bcfg2.Server.Plugin.%s.__init__" % inode.__class__.__name__)
+ def inner2(mock_init):
+ mock_init.return_value = None
+ inode._load_children(data, idict)
+ mock_init.assert_called_with(child2, idict, inode)
+ tag = child1.tag
+ name = child1.get("name")
+ self.assertEqual(idict, dict(Data=[name]))
+ self.assertIn(tag, inode.contents)
+ self.assertIn(name, inode.contents[tag])
+ self.assertItemsEqual(inode.contents[tag][name],
+ dict(name=name,
+ attr=child1.get('attr'),
+ __text__=child1.text,
+ __children__=[subchild1]))
+
+ inner2()
+
+ # test ignore. no ignore is set on INode by default, so we
+ # have to set one
+ old_ignore = copy.copy(self.test_obj.ignore)
+ self.test_obj.ignore.append("Data")
+ idict = dict()
+
+ inode = self._get_inode(data, idict)
+
+ @patch("Bcfg2.Server.Plugin.%s.__init__" % inode.__class__.__name__)
+ def inner3(mock_init):
+ mock_init.return_value = None
+ inode._load_children(data, idict)
+ mock_init.assert_called_with(child2, idict, inode)
+ self.assertEqual(idict, dict())
+ self.assertItemsEqual(inode.contents, dict())
+
+ inner3()
+ self.test_obj.ignore = old_ignore
+
+ def test_Match(self):
+ idata = lxml.etree.Element("Parent")
+ contents = lxml.etree.SubElement(idata, "Data", name="contents",
+ attr="some attr")
+ child = lxml.etree.SubElement(idata, "Group", name="bar", negate="true")
+
+ inode = INode(idata, dict())
+ inode.predicate = Mock()
+ inode.predicate.return_value = False
+
+ metadata = Mock()
+ metadata.groups = ['foo']
+ data = dict()
+ entry = child
+
+ inode.Match(metadata, data, entry=child)
+ self.assertEqual(data, dict())
+ inode.predicate.assert_called_with(metadata, child)
+
+ inode.predicate.reset_mock()
+ inode.Match(metadata, data)
+ self.assertEqual(data, dict())
+ # can't easily compare XML args without the original
+ # object, and we're testing that Match() works without an
+ # XML object passed in, so...
+ self.assertEqual(inode.predicate.call_args[0][0],
+ metadata)
+ self.assertXMLEqual(inode.predicate.call_args[0][1],
+ lxml.etree.Element("None"))
+
+ inode.predicate.reset_mock()
+ inode.predicate.return_value = True
+ inode.Match(metadata, data, entry=child)
+ self.assertEqual(data, inode.contents)
+ inode.predicate.assert_called_with(metadata, child)
+
+
+class TestInfoNode(TestINode):
+ __test__ = True
+ test_obj = InfoNode
+
+ def test_raw_predicates(self):
+ TestINode.test_raw_predicates(self)
+ metadata = Mock()
+ entry = lxml.etree.Element("Path", name="/tmp/foo",
+ realname="/tmp/bar")
+
+ parent_predicate = lambda m, d: True
+ pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+ pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+ pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bogus"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
+ pred = eval(self.test_obj.nraw['Path'] % dict(name="/tmp/foo"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bar"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+
+ parent_predicate = lambda m, d: False
+ pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
+
+class TestXMLSrc(TestXMLFileBacked):
+ test_obj = XMLSrc
+
+ def test_node_interface(self):
+ # ensure that the node object has the necessary interface
+ self.assertTrue(hasattr(self.test_obj.__node__, "Match"))
+
+ @patch("%s.open" % builtins)
+ def test_HandleEvent(self, mock_open):
+ xdata = lxml.etree.Element("Test")
+ lxml.etree.SubElement(xdata, "Path", name="path", attr="whatever")
+
+ xsrc = self.get_obj("/test/foo.xml")
+ xsrc.__node__ = Mock()
+ mock_open.return_value.read.return_value = tostring(xdata)
+
+ if xsrc.__priority_required__:
+ # test with no priority at all
+ self.assertRaises(PluginExecutionError,
+ xsrc.HandleEvent, Mock())
+
+ # test with bogus priority
+ xdata.set("priority", "cow")
+ mock_open.return_value.read.return_value = tostring(xdata)
+ self.assertRaises(PluginExecutionError,
+ xsrc.HandleEvent, Mock())
+
+ # assign a priority to use in future tests
+ xdata.set("priority", "10")
+ mock_open.return_value.read.return_value = tostring(xdata)
+
+ mock_open.reset_mock()
+ xsrc = self.get_obj("/test/foo.xml")
+ xsrc.__node__ = Mock()
+ xsrc.HandleEvent(Mock())
+ mock_open.assert_called_with("/test/foo.xml")
+ mock_open.return_value.read.assert_any_call()
+ self.assertXMLEqual(xsrc.__node__.call_args[0][0], xdata)
+ self.assertEqual(xsrc.__node__.call_args[0][1], dict())
+ self.assertEqual(xsrc.pnode, xsrc.__node__.return_value)
+ self.assertEqual(xsrc.cache, None)
+
+ @patch("Bcfg2.Server.Plugin.XMLSrc.HandleEvent")
+ def test_Cache(self, mock_HandleEvent):
+ xsrc = self.get_obj("/test/foo.xml")
+ metadata = Mock()
+ xsrc.Cache(metadata)
+ mock_HandleEvent.assert_any_call()
+
+ xsrc.pnode = Mock()
+ xsrc.Cache(metadata)
+ xsrc.pnode.Match.assert_called_with(metadata, xsrc.__cacheobj__())
+ self.assertEqual(xsrc.cache[0], metadata)
+
+ xsrc.pnode.reset_mock()
+ xsrc.Cache(metadata)
+ self.assertFalse(xsrc.pnode.Mock.called)
+ self.assertEqual(xsrc.cache[0], metadata)
+
+ xsrc.cache = ("bogus")
+ xsrc.Cache(metadata)
+ xsrc.pnode.Match.assert_called_with(metadata, xsrc.__cacheobj__())
+ self.assertEqual(xsrc.cache[0], metadata)
+
+
+class TestInfoXML(TestXMLSrc):
+ test_obj = InfoXML
+
+
+class TestXMLDirectoryBacked(TestDirectoryBacked):
+ test_obj = XMLDirectoryBacked
+ testfiles = ['foo.xml', 'bar/baz.xml', 'plugh.plugh.xml']
+ badpaths = ["foo", "foo.txt", "foo.xsd", "xml"]
+
+
+class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
+ test_obj = PrioDir
+
+ @patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__,
+ Mock())
+ def get_obj(self, core=None):
+ if core is None:
+ core = Mock()
+ return self.test_obj(core, datastore)
+
+ def test_HandleEvent(self):
+ TestXMLDirectoryBacked.test_HandleEvent(self)
+
+ @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent", Mock())
+ def inner():
+ pd = self.get_obj()
+ test1 = Mock()
+ test1.items = dict(Path=["/etc/foo.conf", "/etc/bar.conf"])
+ test2 = Mock()
+ test2.items = dict(Path=["/etc/baz.conf"],
+ Package=["quux", "xyzzy"])
+ pd.entries = {"/test1.xml": test1,
+ "/test2.xml": test2}
+ pd.HandleEvent(Mock())
+ self.assertItemsEqual(pd.Entries,
+ dict(Path={"/etc/foo.conf": pd.BindEntry,
+ "/etc/bar.conf": pd.BindEntry,
+ "/etc/baz.conf": pd.BindEntry},
+ Package={"quux": pd.BindEntry,
+ "xyzzy": pd.BindEntry}))
+
+ inner()
+
+ def test__matches(self):
+ pd = self.get_obj()
+ self.assertTrue(pd._matches(lxml.etree.Element("Test",
+ name="/etc/foo.conf"),
+ Mock(),
+ {"/etc/foo.conf": pd.BindEntry,
+ "/etc/bar.conf": pd.BindEntry}))
+ self.assertFalse(pd._matches(lxml.etree.Element("Test",
+ name="/etc/baz.conf"),
+ Mock(),
+ {"/etc/foo.conf": pd.BindEntry,
+ "/etc/bar.conf": pd.BindEntry}))
+
+ def test_BindEntry(self):
+ pd = self.get_obj()
+ pd.get_attrs = Mock(return_value=dict(test1="test1", test2="test2"))
+ entry = lxml.etree.Element("Path", name="/etc/foo.conf", test1="bogus")
+ metadata = Mock()
+ pd.BindEntry(entry, metadata)
+ pd.get_attrs.assert_called_with(entry, metadata)
+ self.assertItemsEqual(entry.attrib,
+ dict(name="/etc/foo.conf",
+ test1="test1", test2="test2"))
+
+ def test_get_attrs(self):
+ pd = self.get_obj()
+ entry = lxml.etree.Element("Path", name="/etc/foo.conf")
+ children = [lxml.etree.Element("Child")]
+ metadata = Mock()
+ pd.entries = dict()
+
+ def reset():
+ metadata.reset_mock()
+ for src in pd.entries.values():
+ src.reset_mock()
+ src.cache = None
+
+ # test with no matches
+ self.assertRaises(PluginExecutionError,
+ pd.get_attrs, entry, metadata)
+
+ def add_entry(name, data, prio=10):
+ path = os.path.join(pd.data, name)
+ pd.entries[path] = Mock()
+ pd.entries[path].priority = prio
+ def do_Cache(metadata):
+ pd.entries[path].cache = (metadata, data)
+ pd.entries[path].Cache.side_effect = do_Cache
+
+ add_entry('test1.xml',
+ dict(Path={'/etc/foo.conf': dict(attr="attr1",
+ __children__=children),
+ '/etc/bar.conf': dict()}))
+ add_entry('test2.xml',
+ dict(Path={'/etc/bar.conf': dict(__text__="text",
+ attr="attr1")},
+ Package={'quux': dict(),
+ 'xyzzy': dict()}),
+ prio=20)
+ add_entry('test3.xml',
+ dict(Path={'/etc/baz.conf': dict()},
+ Package={'xyzzy': dict()}),
+ prio=20)
+
+ # test with exactly one match, __children__
+ reset()
+ self.assertItemsEqual(pd.get_attrs(entry, metadata),
+ dict(attr="attr1"))
+ for src in pd.entries.values():
+ src.Cache.assert_called_with(metadata)
+ self.assertEqual(len(entry.getchildren()), 1)
+ self.assertXMLEqual(entry.getchildren()[0], children[0])
+
+ # test with multiple matches with different priorities, __text__
+ reset()
+ entry = lxml.etree.Element("Path", name="/etc/bar.conf")
+ self.assertItemsEqual(pd.get_attrs(entry, metadata),
+ dict(attr="attr1"))
+ for src in pd.entries.values():
+ src.Cache.assert_called_with(metadata)
+ self.assertEqual(entry.text, "text")
+
+ # test with multiple matches with identical priorities
+ reset()
+ entry = lxml.etree.Element("Package", name="xyzzy")
+ self.assertRaises(PluginExecutionError,
+ pd.get_attrs, entry, metadata)
+
+
+class TestSpecificityError(Bcfg2TestCase):
+ """ placeholder for future tests """
+ pass
+
+
+class TestSpecificity(Bcfg2TestCase):
+ test_obj = Specificity
+
+ def get_obj(self, **kwargs):
+ return self.test_obj(**kwargs)
+
+ def test_matches(self):
+ metadata = Mock()
+ metadata.hostname = "foo.example.com"
+ metadata.groups = ["group1", "group2"]
+ self.assertTrue(self.get_obj(all=True).matches(metadata))
+ self.assertTrue(self.get_obj(group="group1").matches(metadata))
+ self.assertTrue(self.get_obj(hostname="foo.example.com").matches(metadata))
+ self.assertFalse(self.get_obj().matches(metadata))
+ self.assertFalse(self.get_obj(group="group3").matches(metadata))
+ self.assertFalse(self.get_obj(hostname="bar.example.com").matches(metadata))
+
+ def test__cmp(self):
+ specs = [self.get_obj(all=True),
+ self.get_obj(group="group1", prio=10),
+ self.get_obj(group="group1", prio=20),
+ self.get_obj(hostname="foo.example.com")]
+
+ for i in range(len(specs)):
+ for j in range(len(specs)):
+ if i == j:
+ self.assertEqual(0, specs[i].__cmp__(specs[j]))
+ self.assertEqual(0, specs[j].__cmp__(specs[i]))
+ elif i > j:
+ self.assertEqual(-1, specs[i].__cmp__(specs[j]))
+ self.assertEqual(1, specs[j].__cmp__(specs[i]))
+ elif i < j:
+ self.assertEqual(1, specs[i].__cmp__(specs[j]))
+ self.assertEqual(-1, specs[j].__cmp__(specs[i]))
+
+ def test_cmp(self):
+ """ test __lt__/__gt__/__eq__ """
+ specs = [self.get_obj(all=True),
+ self.get_obj(group="group1", prio=10),
+ self.get_obj(group="group1", prio=20),
+ self.get_obj(hostname="foo.example.com")]
+
+ for i in range(len(specs)):
+ for j in range(len(specs)):
+ if i < j:
+ self.assertGreater(specs[i], specs[j])
+ self.assertLess(specs[j], specs[i])
+ self.assertGreaterEqual(specs[i], specs[j])
+ self.assertLessEqual(specs[j], specs[i])
+ elif i == j:
+ self.assertEqual(specs[i], specs[j])
+ self.assertEqual(specs[j], specs[i])
+ self.assertLessEqual(specs[i], specs[j])
+ self.assertGreaterEqual(specs[j], specs[i])
+ elif i > j:
+ self.assertLess(specs[i], specs[j])
+ self.assertGreater(specs[j], specs[i])
+ self.assertLessEqual(specs[i], specs[j])
+ self.assertGreaterEqual(specs[j], specs[i])
+
+
+class TestSpecificData(Bcfg2TestCase):
+ test_obj = SpecificData
+ path = os.path.join(datastore, "test.txt")
+
+ def get_obj(self, name=None, specific=None, encoding=None):
+ if name is None:
+ name = self.path
+ if specific is None:
+ specific = Mock()
+ return self.test_obj(name, specific, encoding)
+
+ @patch("%s.open" % builtins)
+ def test_handle_event(self, mock_open):
+ event = Mock()
+ event.code2str.return_value = 'deleted'
+ sd = self.get_obj()
+ sd.handle_event(event)
+ self.assertFalse(mock_open.called)
+ if hasattr(sd, 'data'):
+ self.assertIsNone(sd.data)
+ else:
+ self.assertFalse(hasattr(sd, 'data'))
+
+ event = Mock()
+ mock_open.return_value.read.return_value = "test"
+ sd.handle_event(event)
+ mock_open.assert_called_with(self.path)
+ mock_open.return_value.read.assert_any_call()
+ self.assertEqual(sd.data, "test")
+
+
+class TestEntrySet(TestDebuggable):
+ test_obj = EntrySet
+ # filenames that should be matched successfully by the EntrySet
+ # 'specific' regex. these are filenames alone -- a specificity
+ # will be added to these
+ basenames = ["test", "test.py", "test with spaces.txt",
+ "test.multiple.dots.py", "test_underscores.and.dots",
+ "really_misleading.G10_test",
+ "name$with*regex(special){chars}",
+ "misleading.H_hostname.test.com"]
+ # filenames that do not match any of the basenames (or the
+ # basename regex, if applicable)
+ bogus_names = ["bogus"]
+ # filenames that should be ignored
+ ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx",
+ "test.txt.genshi_include", "test.G_foo.genshi_include"]
+
+
+ def get_obj(self, basename="test", path=datastore, entry_type=MagicMock(),
+ encoding=None):
+ return self.test_obj(basename, path, entry_type, encoding)
+
+ def test__init(self):
+ for basename in self.basenames:
+ eset = self.get_obj(basename=basename)
+ self.assertIsInstance(eset.specific, re_type)
+ self.assertTrue(eset.specific.match(os.path.join(datastore,
+ basename)))
+ ppath = os.path.join(datastore, "Plugin", basename)
+ self.assertTrue(eset.specific.match(ppath))
+ self.assertTrue(eset.specific.match(ppath + ".G20_foo"))
+ self.assertTrue(eset.specific.match(ppath + ".G1_foo"))
+ self.assertTrue(eset.specific.match(ppath + ".G32768_foo"))
+ # a group named '_'
+ self.assertTrue(eset.specific.match(ppath + ".G10__"))
+ self.assertTrue(eset.specific.match(ppath + ".H_hostname"))
+ self.assertTrue(eset.specific.match(ppath + ".H_fqdn.subdomain.example.com"))
+ self.assertTrue(eset.specific.match(ppath + ".G20_group_with_underscores"))
+
+ self.assertFalse(eset.specific.match(ppath + ".G20_group with spaces"))
+ self.assertFalse(eset.specific.match(ppath + ".G_foo"))
+ self.assertFalse(eset.specific.match(ppath + ".G_"))
+ self.assertFalse(eset.specific.match(ppath + ".G20_"))
+ self.assertFalse(eset.specific.match(ppath + ".H_"))
+
+ for bogus in self.bogus_names:
+ self.assertFalse(eset.specific.match(os.path.join(datastore,
+ "Plugin",
+ bogus)))
+
+ for ignore in self.ignore:
+ self.assertTrue(eset.ignore.match(ignore))
+
+ self.assertFalse(eset.ignore.match(basename))
+ self.assertFalse(eset.ignore.match(basename + ".G20_foo"))
+ self.assertFalse(eset.ignore.match(basename + ".G1_foo"))
+ self.assertFalse(eset.ignore.match(basename + ".G32768_foo"))
+ self.assertFalse(eset.ignore.match(basename + ".G10__"))
+ self.assertFalse(eset.ignore.match(basename + ".H_hostname"))
+ self.assertFalse(eset.ignore.match(basename + ".H_fqdn.subdomain.example.com"))
+ self.assertFalse(eset.ignore.match(basename + ".G20_group_with_underscores"))
+
+ def test_get_matching(self):
+ items = {0: Mock(), 1: Mock(), 2: Mock(), 3: Mock(), 4: Mock(),
+ 5: Mock()}
+ items[0].specific.matches.return_value = False
+ items[1].specific.matches.return_value = True
+ items[2].specific.matches.return_value = False
+ items[3].specific.matches.return_value = False
+ items[4].specific.matches.return_value = True
+ items[5].specific.matches.return_value = True
+ metadata = Mock()
+ eset = self.get_obj()
+ eset.entries = items
+ self.assertItemsEqual(eset.get_matching(metadata),
+ [items[1], items[4], items[5]])
+ for i in items.values():
+ i.specific.matches.assert_called_with(metadata)
+
+ @patch("Bcfg2.Server.Plugin.%s.get_matching" % test_obj.__name__)
+ def test_best_matching(self, mock_get_matching):
+ eset = self.get_obj()
+ metadata = Mock()
+ matching = []
+
+ def reset():
+ mock_get_matching.reset_mock()
+ metadata.reset_mock()
+ for m in matching:
+ m.reset_mock()
+
+ def specific(all=False, group=False, prio=None, hostname=False):
+ spec = Mock()
+ spec.specific = Specificity(all=all, group=group, prio=prio,
+ hostname=hostname)
+ return spec
+
+ self.assertRaises(PluginExecutionError,
+ eset.best_matching, metadata, matching=[])
+
+ reset()
+ mock_get_matching.return_value = matching
+ self.assertRaises(PluginExecutionError,
+ eset.best_matching, metadata)
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with a single file for all
+ reset()
+ expected = specific(all=True)
+ matching.append(expected)
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata), expected)
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with a single group-specific file
+ reset()
+ expected = specific(group=True, prio=10)
+ matching.append(expected)
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata), expected)
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with multiple group-specific files
+ reset()
+ expected = specific(group=True, prio=20)
+ matching.append(expected)
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata), expected)
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with host-specific file
+ reset()
+ expected = specific(hostname=True)
+ matching.append(expected)
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata), expected)
+ mock_get_matching.assert_called_with(metadata)
+
+ @patch("Bcfg2.Server.Plugin.%s.entry_init" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.reset_metadata" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.update_metadata" % test_obj.__name__)
+ def test_handle_event(self, mock_update_md, mock_reset_md, mock_init):
+ def reset():
+ mock_update_md.reset_mock()
+ mock_reset_md.reset_mock()
+ mock_init.reset_mock()
+
+ eset = self.get_obj()
+ for fname in ["info", "info.xml", ":info"]:
+ for evt in ["exists", "created", "changed"]:
+ reset()
+ event = Mock()
+ event.code2str.return_value = evt
+ event.filename = fname
+ eset.handle_event(event)
+ mock_update_md.assert_called_with(event)
+ self.assertFalse(mock_init.called)
+ self.assertFalse(mock_reset_md.called)
+
+ reset()
+ event = Mock()
+ event.code2str.return_value = "deleted"
+ event.filename = fname
+ eset.handle_event(event)
+ mock_reset_md.assert_called_with(event)
+ self.assertFalse(mock_init.called)
+ self.assertFalse(mock_update_md.called)
+
+ for evt in ["exists", "created", "changed"]:
+ reset()
+ event = Mock()
+ event.code2str.return_value = evt
+ event.filename = "test.txt"
+ eset.handle_event(event)
+ mock_init.assert_called_with(event)
+ self.assertFalse(mock_reset_md.called)
+ self.assertFalse(mock_update_md.called)
+
+ reset()
+ entry = Mock()
+ eset.entries["test.txt"] = entry
+ event = Mock()
+ event.code2str.return_value = "changed"
+ event.filename = "test.txt"
+ eset.handle_event(event)
+ entry.handle_event.assert_called_with(event)
+ self.assertFalse(mock_init.called)
+ self.assertFalse(mock_reset_md.called)
+ self.assertFalse(mock_update_md.called)
+
+ reset()
+ entry = Mock()
+ eset.entries["test.txt"] = entry
+ event = Mock()
+ event.code2str.return_value = "deleted"
+ event.filename = "test.txt"
+ eset.handle_event(event)
+ self.assertNotIn("test.txt", eset.entries)
+
+ @patch("Bcfg2.Server.Plugin.%s.specificity_from_filename" %
+ test_obj.__name__)
+ def test_entry_init(self, mock_spec):
+ eset = self.get_obj()
+
+ def reset():
+ eset.entry_type.reset_mock()
+ mock_spec.reset_mock()
+
+ event = Mock()
+ event.code2str.return_value = "created"
+ event.filename = "test.txt"
+ eset.entry_init(event)
+ mock_spec.assert_called_with("test.txt", specific=None)
+ eset.entry_type.assert_called_with(os.path.join(eset.path, "test.txt"),
+ mock_spec.return_value, None)
+ eset.entry_type.return_value.handle_event.assert_called_with(event)
+ self.assertIn("test.txt", eset.entries)
+
+ # test duplicate add
+ reset()
+ eset.entry_init(event)
+ self.assertFalse(mock_spec.called)
+ self.assertFalse(eset.entry_type.called)
+ eset.entries["test.txt"].handle_event.assert_called_with(event)
+
+ # test keyword args
+ etype = Mock()
+ specific = Mock()
+ event = Mock()
+ event.code2str.return_value = "created"
+ event.filename = "test2.txt"
+ eset.entry_init(event, entry_type=etype, specific=specific)
+ mock_spec.assert_called_with("test2.txt", specific=specific)
+ etype.assert_called_with(os.path.join(eset.path, "test2.txt"),
+ mock_spec.return_value, None)
+ etype.return_value.handle_event.assert_called_with(event)
+ self.assertIn("test2.txt", eset.entries)
+
+ # test specificity error
+ event = Mock()
+ event.code2str.return_value = "created"
+ event.filename = "test3.txt"
+ mock_spec.side_effect = SpecificityError
+ eset.entry_init(event)
+ mock_spec.assert_called_with("test3.txt", specific=None)
+ self.assertFalse(eset.entry_type.called)
+
+ @patch("Bcfg2.Server.Plugin.Specificity")
+ def test_specificity_from_filename(self, mock_spec):
+ def test(eset, fname, **kwargs):
+ mock_spec.reset_mock()
+ if "specific" in kwargs:
+ specific = kwargs['specific']
+ del kwargs['specific']
+ else:
+ specific = None
+ self.assertEqual(eset.specificity_from_filename(fname,
+ specific=specific),
+ mock_spec.return_value)
+ mock_spec.assert_called_with(**kwargs)
+
+ def fails(eset, fname, specific=None):
+ mock_spec.reset_mock()
+ self.assertRaises(SpecificityError,
+ eset.specificity_from_filename, fname,
+ specific=specific)
+
+ for basename in self.basenames:
+ eset = self.get_obj(basename=basename)
+ ppath = os.path.join(datastore, "Plugin", basename)
+ test(eset, ppath, all=True)
+ test(eset, ppath + ".G20_foo", group="foo", prio=20)
+ test(eset, ppath + ".G1_foo", group="foo", prio=1)
+ test(eset, ppath + ".G32768_foo", group="foo", prio=32768)
+ test(eset, ppath + ".G10__", group="_", prio=10)
+ test(eset, ppath + ".H_hostname", hostname="hostname")
+ test(eset, ppath + ".H_fqdn.subdomain.example.com",
+ hostname="fqdn.subdomain.example.com")
+ test(eset, ppath + ".G20_group_with_underscores",
+ group="group_with_underscores", prio=20)
+
+ for bogus in self.bogus_names:
+ fails(eset, bogus)
+ fails(eset, ppath + ".G_group with spaces")
+ fails(eset, ppath + ".G_foo")
+ fails(eset, ppath + ".G_")
+ fails(eset, ppath + ".G20_")
+ fails(eset, ppath + ".H_")
+
+ @patch("%s.open" % builtins)
+ @patch("Bcfg2.Server.Plugin.InfoXML")
+ def test_update_metadata(self, mock_InfoXML, mock_open):
+ eset = self.get_obj()
+
+ # add info.xml
+ event = Mock()
+ event.filename = "info.xml"
+ eset.update_metadata(event)
+ mock_InfoXML.assert_called_with(os.path.join(eset.path, "info.xml"))
+ mock_InfoXML.return_value.HandleEvent.assert_called_with(event)
+ self.assertEqual(eset.infoxml, mock_InfoXML.return_value)
+
+ # modify info.xml
+ mock_InfoXML.reset_mock()
+ eset.update_metadata(event)
+ self.assertFalse(mock_InfoXML.called)
+ eset.infoxml.HandleEvent.assert_called_with(event)
+
+ for fname in [':info', 'info']:
+ event = Mock()
+ event.filename = fname
+
+ idata = ["owner:owner",
+ "group: GROUP",
+ "perms: 775",
+ "important: true",
+ "bogus: line"]
+ mock_open.return_value.readlines.return_value = idata
+ eset.update_metadata(event)
+ expected = default_file_metadata.copy()
+ expected['owner'] = 'owner'
+ expected['group'] = 'GROUP'
+ expected['perms'] = '0775'
+ expected['important'] = 'true'
+ self.assertItemsEqual(eset.metadata,
+ expected)
+
+ def test_reset_metadata(self):
+ eset = self.get_obj()
+
+ # test info.xml
+ event = Mock()
+ event.filename = "info.xml"
+ eset.infoxml = Mock()
+ eset.reset_metadata(event)
+ self.assertIsNone(eset.infoxml)
+
+ for fname in [':info', 'info']:
+ event = Mock()
+ event.filename = fname
+ eset.metadata = Mock()
+ eset.reset_metadata(event)
+ self.assertItemsEqual(eset.metadata, default_file_metadata)
+
+ @patch("Bcfg2.Server.Plugin.bind_info")
+ def test_bind_info_to_entry(self, mock_bind_info):
+ eset = self.get_obj()
+ entry = Mock()
+ metadata = Mock()
+ eset.bind_info_to_entry(entry, metadata)
+ mock_bind_info.assert_called_with(entry, metadata,
+ infoxml=eset.infoxml,
+ default=eset.metadata)
+
+ @patch("Bcfg2.Server.Plugin.%s.best_matching" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.bind_info_to_entry" % test_obj.__name__)
+ def test_bind_entry(self, mock_bind_info, mock_best_matching):
+ eset = self.get_obj()
+ entry = Mock()
+ metadata = Mock()
+ eset.bind_entry(entry, metadata)
+ mock_bind_info.assert_called_with(entry, metadata)
+ mock_best_matching.assert_called_with(metadata)
+ mock_best_matching.return_value.bind_entry.assert_called_with(entry,
+ metadata)
+
+
+class TestGroupSpool(TestPlugin, TestGenerator):
+ test_obj = GroupSpool
+
+ @patch("Bcfg2.Server.Plugin.%s.AddDirectoryMonitor" % test_obj.__name__)
+ def get_obj(self, core=None):
+ return TestPlugin.get_obj(self, core=core)
+
+ @patch("Bcfg2.Server.Plugin.%s.AddDirectoryMonitor" % test_obj.__name__)
+ def test__init(self, mock_Add):
+ core = Mock()
+ gs = self.test_obj(core, datastore)
+ mock_Add.assert_called_with('')
+ self.assertItemsEqual(gs.Entries, {gs.entry_type: {}})
+
+ @patch("os.path.isdir")
+ @patch("os.path.isfile")
+ @patch("Bcfg2.Server.Plugin.%s.event_id" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.event_path" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.AddDirectoryMonitor" % test_obj.__name__)
+ def test_add_entry(self, mock_Add, mock_event_path, mock_event_id,
+ mock_isfile, mock_isdir):
+ gs = self.get_obj()
+ gs.es_cls = Mock()
+ gs.es_child_cls = Mock()
+
+ def reset():
+ gs.es_cls.reset_mock()
+ gs.es_child_cls.reset_mock()
+ mock_Add.reset_mock()
+ mock_event_path.reset_mock()
+ mock_event_id.reset_mock()
+ mock_isfile.reset_mock()
+ mock_isdir.reset_mock()
+
+ # directory
+ event = Mock()
+ event.filename = "foo"
+ basedir = "test"
+ epath = os.path.join(gs.data, basedir, event.filename)
+ ident = os.path.join(basedir, event.filename)
+ mock_event_path.return_value = epath
+ mock_event_id.return_value = ident
+ mock_isdir.return_value = True
+ mock_isfile.return_value = False
+ gs.add_entry(event)
+ mock_Add.assert_called_with(os.path.join("/" + basedir, event.filename))
+ self.assertNotIn(ident, gs.entries)
+ mock_isdir.assert_called_with(epath)
+
+ # file that is not in self.entries
+ reset()
+ event = Mock()
+ event.filename = "foo"
+ basedir = "test/foo/"
+ epath = os.path.join(gs.data, basedir, event.filename)
+ ident = basedir[:-1]
+ mock_event_path.return_value = epath
+ mock_event_id.return_value = ident
+ mock_isdir.return_value = False
+ mock_isfile.return_value = True
+ gs.add_entry(event)
+ self.assertFalse(mock_Add.called)
+ gs.es_cls.assert_called_with(gs.filename_pattern,
+ gs.data + ident,
+ gs.es_child_cls,
+ gs.encoding)
+ self.assertIn(ident, gs.entries)
+ self.assertEqual(gs.entries[ident], gs.es_cls.return_value)
+ self.assertIn(ident, gs.Entries[gs.entry_type])
+ self.assertEqual(gs.Entries[gs.entry_type][ident],
+ gs.es_cls.return_value.bind_entry)
+ gs.entries[ident].handle_event.assert_called_with(event)
+ mock_isfile.assert_called_with(epath)
+
+ # file that is in self.entries
+ reset()
+ gs.add_entry(event)
+ self.assertFalse(mock_Add.called)
+ self.assertFalse(gs.es_cls.called)
+ gs.entries[ident].handle_event.assert_called_with(event)
+
+ def test_event_path(self):
+ gs = self.get_obj()
+ gs.handles[1] = "/var/lib/foo/"
+ gs.handles[2] = "/etc/foo/"
+ gs.handles[3] = "/usr/share/foo/"
+ event = Mock()
+ event.filename = "foo"
+ for i in range(1, 4):
+ event.requestID = i
+ self.assertEqual(gs.event_path(event),
+ os.path.join(datastore, gs.name,
+ gs.handles[event.requestID].lstrip('/'),
+ event.filename))
+
+ @patch("os.path.isdir")
+ @patch("Bcfg2.Server.Plugin.%s.event_path" % test_obj.__name__)
+ def test_event_id(self, mock_event_path, mock_isdir):
+ gs = self.get_obj()
+
+ def reset():
+ mock_event_path.reset_mock()
+ mock_isdir.reset_mock()
+
+ gs.handles[1] = "/var/lib/foo/"
+ gs.handles[2] = "/etc/foo/"
+ gs.handles[3] = "/usr/share/foo/"
+ event = Mock()
+ event.filename = "foo"
+ for i in range(1, 4):
+ event.requestID = i
+ reset()
+ mock_isdir.return_value = True
+ self.assertEqual(gs.event_id(event),
+ os.path.join(gs.handles[event.requestID].lstrip('/'),
+ event.filename))
+ mock_isdir.assert_called_with(mock_event_path.return_value)
+
+ reset()
+ mock_isdir.return_value = False
+ self.assertEqual(gs.event_id(event),
+ gs.handles[event.requestID].rstrip('/'))
+ mock_isdir.assert_called_with(mock_event_path.return_value)
+
+ def test_toggle_debug(self):
+ gs = self.get_obj()
+ gs.entries = {"/foo": Mock(),
+ "/bar": Mock(),
+ "/baz/quux": Mock()}
+
+ @patch("Bcfg2.Server.Plugin.Plugin.toggle_debug")
+ def inner(mock_debug):
+ gs.toggle_debug()
+ mock_debug.assert_called_with(gs)
+ for entry in gs.entries.values():
+ entry.toggle_debug.assert_any_call()
+
+ inner()
+
+ TestPlugin.test_toggle_debug(self)
+
+ def test_HandleEvent(self):
+ gs = self.get_obj()
+ gs.entries = {"/foo": Mock(),
+ "/bar": Mock(),
+ "/baz": Mock(),
+ "/baz/quux": Mock()}
+ for path in gs.entries.keys():
+ gs.Entries[gs.entry_type] = {path: Mock()}
+ gs.handles = {1: "/foo/",
+ 2: "/bar/",
+ 3: "/baz/",
+ 4: "/baz/quux"}
+
+ gs.add_entry = Mock()
+ gs.event_id = Mock()
+
+ def reset():
+ gs.add_entry.reset_mock()
+ gs.event_id.reset_mock()
+ for entry in gs.entries.values():
+ entry.reset_mock()
+
+ # test event creation, changing entry that doesn't exist
+ for evt in ["exists", "created", "changed"]:
+ reset()
+ event = Mock()
+ event.filename = "foo"
+ event.requestID = 1
+ event.code2str.return_value = evt
+ gs.HandleEvent(event)
+ gs.event_id.assert_called_with(event)
+ gs.add_entry.assert_called_with(event)
+
+ # test deleting entry, changing entry that does exist
+ for evt in ["changed", "deleted"]:
+ reset()
+ event = Mock()
+ event.filename = "quux"
+ event.requestID = 4
+ event.code2str.return_value = evt
+ gs.event_id.return_value = "/baz/quux"
+ gs.HandleEvent(event)
+ gs.event_id.assert_called_with(event)
+ self.assertIn(gs.event_id.return_value, gs.entries)
+ gs.entries[gs.event_id.return_value].handle_event.assert_called_with(event)
+ self.assertFalse(gs.add_entry.called)
+
+ # test deleting directory
+ reset()
+ event = Mock()
+ event.filename = "quux"
+ event.requestID = 3
+ event.code2str.return_value = "deleted"
+ gs.event_id.return_value = "/baz/quux"
+ gs.HandleEvent(event)
+ gs.event_id.assert_called_with(event)
+ self.assertNotIn("/baz/quux", gs.entries)
+ self.assertNotIn("/baz/quux", gs.Entries[gs.entry_type])
+
+
+
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
new file mode 100644
index 000000000..2ff0af78e
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
@@ -0,0 +1,1470 @@
+import os
+import sys
+import copy
+import time
+import socket
+import lxml.etree
+import Bcfg2.Server
+import Bcfg2.Server.Plugin
+from Bcfg2.Server.Plugins.Metadata import *
+from mock import Mock, patch
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \
+ TestStatistics, TestDatabaseBacked
+
+def get_clients_test_tree():
+ return lxml.etree.XML('''
+<Clients>
+ <Client name="client1" address="1.2.3.1" auth="cert"
+ location="floating" password="password2" profile="group1"/>
+ <Client name="client2" address="1.2.3.2" secure="true" profile="group2"/>
+ <Client name="client3" address="1.2.3.3" uuid="uuid1" profile="group1"
+ password="password2">
+ <Alias name="alias1"/>
+ </Client>
+ <Client name="client4" profile="group1">
+ <Alias name="alias2" address="1.2.3.2"/>
+ <Alias name="alias3"/>
+ </Client>
+ <Client name="client5" profile="group1"/>
+ <Client name="client6" profile="group1" auth="bootstrap"/>
+ <Client name="client7" profile="group1" auth="cert" address="1.2.3.4"/>
+ <Client name="client8" profile="group1" auth="cert+password"
+ address="1.2.3.5"/>
+ <Client name="client9" profile="group2" secure="true" password="password3"/>
+ <Client name="client10" profile="group1" floating="true"/>
+</Clients>''').getroottree()
+
+def get_groups_test_tree():
+ return lxml.etree.XML('''
+<Groups xmlns:xi="http://www.w3.org/2001/XInclude">
+ <Client name="client8">
+ <Group name="group8"/>
+ </Client>
+ <Client name="client9">
+ <Group name="group8"/>
+ </Client>
+
+ <Group name="group1" default="true" profile="true" public="true"
+ category="category1"/>
+ <Group name="group2" profile="true" public="true" category="category1">
+ <Bundle name="bundle1"/>
+ <Bundle name="bundle2"/>
+ <Group name="group1"/>
+ <Group name="group4"/>
+ </Group>
+ <Group name="group3" category="category2" public="false"/>
+ <Group name="group4" category="category1">
+ <Group name="group1"/>
+ <Group name="group6"/>
+ </Group>
+ <Group name="group5"/>
+ <Group name="group7">
+ <Bundle name="bundle3"/>
+ </Group>
+ <Group name="group8">
+ <Group name="group9"/>
+ <Client name="client9">
+ <Group name="group11"/>
+ <Group name="group9" negate="true"/>
+ </Client>
+ <Group name="group1">
+ <Group name="group10"/>
+ </Group>
+ </Group>
+</Groups>''').getroottree()
+
+
+def get_metadata_object(core=None, watch_clients=False, use_db=False):
+ if core is None:
+ core = Mock()
+ core.setup.cfp.getboolean = Mock(return_value=use_db)
+ return Metadata(core, datastore, watch_clients=watch_clients)
+
+
+class TestMetadataDB(DBModelTestCase):
+ if has_django:
+ models = [MetadataClientModel]
+
+
+if has_django or can_skip:
+ class TestClientVersions(Bcfg2TestCase):
+ test_clients = dict(client1="1.2.0",
+ client2="1.2.2",
+ client3="1.3.0pre1",
+ client4="1.1.0",
+ client5=None,
+ client6=None)
+
+ @skipUnless(has_django, "Django not found")
+ def setUp(self):
+ syncdb(TestMetadataDB)
+ for client, version in self.test_clients.items():
+ MetadataClientModel(hostname=client, version=version).save()
+
+ def test__contains(self):
+ v = ClientVersions()
+ self.assertIn("client1", v)
+ self.assertIn("client5", v)
+ self.assertNotIn("client__contains", v)
+
+ def test_keys(self):
+ v = ClientVersions()
+ self.assertItemsEqual(self.test_clients.keys(), v.keys())
+
+ def test__setitem(self):
+ v = ClientVersions()
+
+ # test setting version of existing client
+ v["client1"] = "1.2.3"
+ self.assertIn("client1", v)
+ self.assertEqual(v['client1'], "1.2.3")
+ client = MetadataClientModel.objects.get(hostname="client1")
+ self.assertEqual(client.version, "1.2.3")
+
+ # test adding new client
+ new = "client__setitem"
+ v[new] = "1.3.0"
+ self.assertIn(new, v)
+ self.assertEqual(v[new], "1.3.0")
+ client = MetadataClientModel.objects.get(hostname=new)
+ self.assertEqual(client.version, "1.3.0")
+
+ # test adding new client with no version
+ new2 = "client__setitem_2"
+ v[new2] = None
+ self.assertIn(new2, v)
+ self.assertEqual(v[new2], None)
+ client = MetadataClientModel.objects.get(hostname=new2)
+ self.assertEqual(client.version, None)
+
+ def test__getitem(self):
+ v = ClientVersions()
+
+ # test getting existing client
+ self.assertEqual(v['client2'], "1.2.2")
+ self.assertIsNone(v['client5'])
+
+ # test exception on nonexistent client
+ expected = KeyError
+ try:
+ v['clients__getitem']
+ except expected:
+ pass
+ except:
+ err = sys.exc_info()[1]
+ self.assertFalse(True, "%s raised instead of %s" %
+ (err.__class__.__name__,
+ expected.__class__.__name__))
+ else:
+ self.assertFalse(True,
+ "%s not raised" % expected.__class__.__name__)
+
+ def test__len(self):
+ v = ClientVersions()
+ self.assertEqual(len(v), MetadataClientModel.objects.count())
+
+ def test__iter(self):
+ v = ClientVersions()
+ self.assertItemsEqual([h for h in iter(v)], v.keys())
+
+ def test__delitem(self):
+ v = ClientVersions()
+
+ # test adding new client
+ new = "client__delitem"
+ v[new] = "1.3.0"
+
+ del v[new]
+ self.assertIn(new, v)
+ self.assertIsNone(v[new])
+
+
+class TestXMLMetadataConfig(TestXMLFileBacked):
+ test_obj = XMLMetadataConfig
+
+ def get_obj(self, basefile="clients.xml", core=None, watch_clients=False):
+ self.metadata = get_metadata_object(core=core,
+ watch_clients=watch_clients)
+ return XMLMetadataConfig(self.metadata, watch_clients, basefile)
+
+ def test__init(self):
+ xmc = self.get_obj()
+ self.assertEqual(self.metadata.core.fam, xmc.fam)
+ self.assertFalse(xmc.fam.AddMonitor.called)
+
+ def test_xdata(self):
+ config = self.get_obj()
+ expected = Bcfg2.Server.Plugin.MetadataRuntimeError
+ try:
+ config.xdata
+ except expected:
+ pass
+ except:
+ err = sys.exc_info()[1]
+ self.assertFalse(True, "%s raised instead of %s" %
+ (err.__class__.__name__,
+ expected.__class__.__name__))
+ else:
+ self.assertFalse(True,
+ "%s not raised" % expected.__class__.__name__)
+ pass
+
+ config.data = "<test/>"
+ self.assertEqual(config.xdata, "<test/>")
+
+ def test_base_xdata(self):
+ config = self.get_obj()
+ # we can't use assertRaises here because base_xdata is a property
+ expected = Bcfg2.Server.Plugin.MetadataRuntimeError
+ try:
+ config.base_xdata
+ except expected:
+ pass
+ except:
+ err = sys.exc_info()[1]
+ self.assertFalse(True, "%s raised instead of %s" %
+ (err.__class__.__name__,
+ expected.__class__.__name__))
+ else:
+ self.assertFalse(True,
+ "%s not raised" % expected.__class__.__name__)
+ pass
+
+ config.basedata = "<test/>"
+ self.assertEqual(config.base_xdata, "<test/>")
+
+ def test_add_monitor(self):
+ core = Mock()
+ config = self.get_obj(core=core)
+
+ fname = "test.xml"
+ fpath = os.path.join(self.metadata.data, fname)
+
+ config.extras = []
+ config.add_monitor(fpath)
+ self.assertFalse(core.fam.AddMonitor.called)
+ self.assertEqual(config.extras, [fpath])
+
+ config = self.get_obj(core=core, watch_clients=True)
+ config.add_monitor(fpath)
+ core.fam.AddMonitor.assert_called_with(fpath, config.metadata)
+ self.assertItemsEqual(config.extras, [fpath])
+
+ def test_Index(self):
+ # Index() isn't used on XMLMetadataConfig objects
+ pass
+
+ @patch("lxml.etree.parse")
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig._follow_xincludes")
+ def test_load_xml(self, mock_follow, mock_parse):
+ config = self.get_obj("clients.xml")
+
+ def reset():
+ mock_parse.reset_mock()
+ mock_follow.reset_mock()
+ config.data = None
+ config.basedata = None
+
+ reset()
+ config.load_xml()
+ mock_follow.assert_called_with(xdata=mock_parse.return_value)
+ mock_parse.assert_called_with(os.path.join(config.basedir,
+ "clients.xml"),
+ parser=Bcfg2.Server.XMLParser)
+ self.assertFalse(mock_parse.return_value.xinclude.called)
+ self.assertEqual(config.data, mock_parse.return_value)
+ self.assertIsNotNone(config.basedata)
+
+ reset()
+ mock_parse.side_effect = lxml.etree.XMLSyntaxError(None, None, None,
+ None)
+ config.load_xml()
+ mock_parse.assert_called_with(os.path.join(config.basedir,
+ "clients.xml"),
+ parser=Bcfg2.Server.XMLParser)
+ self.assertIsNone(config.data)
+ self.assertIsNone(config.basedata)
+
+ reset()
+ mock_parse.side_effect = None
+ def follow_xincludes(xdata=None):
+ config.extras = [Mock(), Mock()]
+ mock_follow.side_effect = follow_xincludes
+ config.load_xml()
+ mock_follow.assert_called_with(xdata=mock_parse.return_value)
+ mock_parse.assert_called_with(os.path.join(config.basedir,
+ "clients.xml"),
+ parser=Bcfg2.Server.XMLParser)
+ mock_parse.return_value.xinclude.assert_any_call()
+ self.assertEqual(config.data, mock_parse.return_value)
+ self.assertIsNotNone(config.basedata)
+
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml")
+ def test_write(self, mock_write_xml):
+ config = self.get_obj("clients.xml")
+ config.basedata = "<test/>"
+ config.write()
+ mock_write_xml.assert_called_with(os.path.join(self.metadata.data,
+ "clients.xml"),
+ "<test/>")
+
+ @patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False))
+ @patch('fcntl.lockf', Mock())
+ @patch('%s.open' % builtins)
+ @patch('os.unlink')
+ @patch('os.rename')
+ @patch('os.path.islink')
+ @patch('os.readlink')
+ def test_write_xml(self, mock_readlink, mock_islink, mock_rename,
+ mock_unlink, mock_open):
+ fname = "clients.xml"
+ config = self.get_obj(fname)
+ fpath = os.path.join(self.metadata.data, fname)
+ tmpfile = "%s.new" % fpath
+ linkdest = os.path.join(self.metadata.data, "client-link.xml")
+
+ mock_islink.return_value = False
+
+ config.write_xml(fpath, get_clients_test_tree())
+ mock_open.assert_called_with(tmpfile, "w")
+ self.assertTrue(mock_open.return_value.write.called)
+ mock_islink.assert_called_with(fpath)
+ mock_rename.assert_called_with(tmpfile, fpath)
+
+ mock_islink.return_value = True
+ mock_readlink.return_value = linkdest
+ config.write_xml(fpath, get_clients_test_tree())
+ mock_rename.assert_called_with(tmpfile, linkdest)
+
+ mock_rename.side_effect = OSError
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ config.write_xml, fpath, get_clients_test_tree())
+
+ mock_open.return_value.write.side_effect = IOError
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ config.write_xml, fpath, get_clients_test_tree())
+ mock_unlink.assert_called_with(tmpfile)
+
+ mock_open.side_effect = IOError
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ config.write_xml, fpath, get_clients_test_tree())
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch('lxml.etree.parse')
+ def test_find_xml_for_xpath(self, mock_parse):
+ config = self.get_obj("groups.xml")
+ config.basedata = get_groups_test_tree()
+ xpath = "//Group[@name='group1']"
+ self.assertItemsEqual(config.find_xml_for_xpath(xpath),
+ dict(filename=os.path.join(self.metadata.data,
+ "groups.xml"),
+ xmltree=get_groups_test_tree(),
+ xquery=get_groups_test_tree().xpath(xpath)))
+
+ self.assertEqual(config.find_xml_for_xpath("//boguselement"), dict())
+
+ config.extras = [os.path.join(self.metadata.data, p)
+ for p in ["foo.xml", "bar.xml", "clients.xml"]]
+
+ def parse_side_effect(fname, parser=Bcfg2.Server.XMLParser):
+ if fname == os.path.join(self.metadata.data, "clients.xml"):
+ return get_clients_test_tree()
+ else:
+ return lxml.etree.XML("<null/>").getroottree()
+
+ mock_parse.side_effect = parse_side_effect
+ xpath = "//Client[@secure='true']"
+ self.assertItemsEqual(config.find_xml_for_xpath(xpath),
+ dict(filename=os.path.join(self.metadata.data,
+ "clients.xml"),
+ xmltree=get_clients_test_tree(),
+ xquery=get_clients_test_tree().xpath(xpath)))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
+ def test_HandleEvent(self, mock_load_xml):
+ config = self.get_obj("groups.xml")
+ evt = Mock()
+ evt.filename = os.path.join(self.metadata.data, "groups.xml")
+ evt.code2str = Mock(return_value="changed")
+ self.assertTrue(config.HandleEvent(evt))
+ mock_load_xml.assert_called_with()
+
+
+class TestClientMetadata(Bcfg2TestCase):
+ def test_inGroup(self):
+ cm = ClientMetadata("client1", "group1", ["group1", "group2"],
+ ["bundle1"], [], [], [], None, None, None, None)
+ self.assertTrue(cm.inGroup("group1"))
+ self.assertFalse(cm.inGroup("group3"))
+
+
+class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked):
+ test_obj = Metadata
+ use_db = False
+
+ def get_obj(self, core=None, watch_clients=False):
+ return get_metadata_object(core=core, watch_clients=watch_clients,
+ use_db=self.use_db)
+
+ @skipUnless(has_django, "Django not found")
+ def test__use_db(self):
+ # with the way we've set up our metadata tests, it's unweildy
+ # to test _use_db. however, given the way get_obj works, if
+ # there was a bug in _use_db it'd be almost certain to shake
+ # out in the rest of the testing.
+ pass
+
+ def get_nonexistent_client(self, metadata, prefix="newclient"):
+ if metadata is None:
+ metadata = self.load_clients_data()
+ i = 0
+ client_name = "%s%s" % (prefix, i)
+ while client_name in metadata.clients:
+ i += 1
+ client_name = "%s%s" % (prefix, i)
+ return client_name
+
+ def test__init(self):
+ # test with watch_clients=False
+ core = Mock()
+ metadata = self.get_obj(core=core)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics)
+ self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig)
+ self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig)
+ self.assertIsInstance(metadata.query, MetadataQuery)
+ self.assertEqual(metadata.states, dict())
+
+ # test with watch_clients=True
+ core.fam = Mock()
+ metadata = self.get_obj(core=core, watch_clients=True)
+ self.assertEqual(len(metadata.states), 2)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "clients.xml"),
+ metadata)
+
+ core.fam.reset_mock()
+ core.fam.AddMonitor = Mock(side_effect=IOError)
+ self.assertRaises(Bcfg2.Server.Plugin.PluginInitError,
+ self.get_obj, core=core, watch_clients=True)
+
+ @patch('os.makedirs', Mock())
+ @patch('%s.open' % builtins)
+ def test_init_repo(self, mock_open):
+ Metadata.init_repo(datastore,
+ groups_xml="groups", clients_xml="clients")
+ mock_open.assert_any_call(os.path.join(datastore, "Metadata",
+ "groups.xml"), "w")
+ mock_open.assert_any_call(os.path.join(datastore, "Metadata",
+ "clients.xml"), "w")
+
+ def test_search_xdata(self):
+ # test finding a node with the proper name
+ metadata = self.get_obj()
+ tree = get_groups_test_tree()
+ res = metadata._search_xdata("Group", "group1", tree)
+ self.assertIsInstance(res, lxml.etree._Element)
+ self.assertEqual(res.get("name"), "group1")
+
+ # test finding a node with the wrong name but correct alias
+ metadata = self.get_obj()
+ tree = get_clients_test_tree()
+ res = metadata._search_xdata("Client", "alias3", tree, alias=True)
+ self.assertIsInstance(res, lxml.etree._Element)
+ self.assertNotEqual(res.get("name"), "alias3")
+
+ # test failure finding a node
+ metadata = self.get_obj()
+ tree = get_clients_test_tree()
+ res = metadata._search_xdata("Client",
+ self.get_nonexistent_client(metadata),
+ tree, alias=True)
+ self.assertIsNone(res)
+
+ def search_xdata(self, tag, name, tree, alias=False):
+ metadata = self.get_obj()
+ res = metadata._search_xdata(tag, name, tree, alias=alias)
+ self.assertIsInstance(res, lxml.etree._Element)
+ if not alias:
+ self.assertEqual(res.get("name"), name)
+
+ def test_search_group(self):
+ # test finding a group with the proper name
+ tree = get_groups_test_tree()
+ self.search_xdata("Group", "group1", tree)
+
+ def test_search_bundle(self):
+ # test finding a bundle with the proper name
+ tree = get_groups_test_tree()
+ self.search_xdata("Bundle", "bundle1", tree)
+
+ def test_search_client(self):
+ # test finding a client with the proper name
+ tree = get_clients_test_tree()
+ self.search_xdata("Client", "client1", tree, alias=True)
+ self.search_xdata("Client", "alias1", tree, alias=True)
+
+ def test_add_group(self):
+ metadata = self.get_obj()
+ metadata.groups_xml.write = Mock()
+ metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.add_group("test1", dict())
+ metadata.groups_xml.write.assert_any_call()
+ grp = metadata.search_group("test1", metadata.groups_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertEqual(grp.attrib, dict(name='test1'))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.add_group("test2", dict(foo='bar'))
+ metadata.groups_xml.write.assert_any_call()
+ grp = metadata.search_group("test2", metadata.groups_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertEqual(grp.attrib, dict(name='test2', foo='bar'))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.groups_xml.write.reset_mock()
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.add_group,
+ "test1", dict())
+ self.assertFalse(metadata.groups_xml.write.called)
+
+ def test_update_group(self):
+ metadata = self.get_obj()
+ metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.update_group("group1", dict(foo="bar"))
+ grp = metadata.search_group("group1", metadata.groups_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertIn("foo", grp.attrib)
+ self.assertEqual(grp.get("foo"), "bar")
+ self.assertTrue(metadata.groups_xml.write_xml.called)
+
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.update_group,
+ "bogus_group", dict())
+
+ def test_remove_group(self):
+ metadata = self.get_obj()
+ metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.remove_group("group5")
+ grp = metadata.search_group("group5", metadata.groups_xml.base_xdata)
+ self.assertIsNone(grp)
+ self.assertTrue(metadata.groups_xml.write_xml.called)
+
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.remove_group,
+ "bogus_group")
+
+ def test_add_bundle(self):
+ metadata = self.get_obj()
+ metadata.groups_xml.write = Mock()
+ metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.add_bundle("bundle1")
+ metadata.groups_xml.write.assert_any_call()
+ bundle = metadata.search_bundle("bundle1",
+ metadata.groups_xml.base_xdata)
+ self.assertIsNotNone(bundle)
+ self.assertEqual(bundle.attrib, dict(name='bundle1'))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.groups_xml.write.reset_mock()
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.add_bundle,
+ "bundle1")
+ self.assertFalse(metadata.groups_xml.write.called)
+
+ def test_remove_bundle(self):
+ metadata = self.get_obj()
+ metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.remove_bundle("bundle1")
+ grp = metadata.search_bundle("bundle1", metadata.groups_xml.base_xdata)
+ self.assertIsNone(grp)
+ self.assertTrue(metadata.groups_xml.write_xml.called)
+
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.remove_bundle,
+ "bogus_bundle")
+
+ def test_add_client(self):
+ metadata = self.get_obj()
+ metadata.clients_xml.write = Mock()
+ metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree()
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+
+ new1 = self.get_nonexistent_client(metadata)
+ metadata.add_client(new1, dict())
+ metadata.clients_xml.write.assert_any_call()
+ grp = metadata.search_client(new1, metadata.clients_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertEqual(grp.attrib, dict(name=new1))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+ metadata._handle_clients_xml_event(Mock())
+
+ new2 = self.get_nonexistent_client(metadata)
+ metadata.add_client(new2, dict(foo='bar'))
+ metadata.clients_xml.write.assert_any_call()
+ grp = metadata.search_client(new2, metadata.clients_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertEqual(grp.attrib, dict(name=new2, foo='bar'))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+
+ metadata.clients_xml.write.reset_mock()
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.add_client,
+ new1, dict())
+ self.assertFalse(metadata.clients_xml.write.called)
+
+ def test_update_client(self):
+ metadata = self.get_obj()
+ metadata.clients_xml.write_xml = Mock()
+ metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree())
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+
+ metadata.update_client("client1", dict(foo="bar"))
+ grp = metadata.search_client("client1", metadata.clients_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertIn("foo", grp.attrib)
+ self.assertEqual(grp.get("foo"), "bar")
+ self.assertTrue(metadata.clients_xml.write_xml.called)
+
+ new = self.get_nonexistent_client(metadata)
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.update_client,
+ new, dict())
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = self.get_obj()
+ metadata.clients_xml.data = \
+ xdata or copy.deepcopy(get_clients_test_tree())
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+ evt = Mock()
+ evt.filename = os.path.join(datastore, "Metadata", "clients.xml")
+ evt.code2str = Mock(return_value="changed")
+ metadata.HandleEvent(evt)
+ return metadata
+
+ def test_handle_clients_xml_event(self):
+ metadata = self.get_obj()
+ metadata.profiles = ["group1", "group2"]
+
+ metadata.clients_xml = Mock()
+ metadata.clients_xml.xdata = copy.deepcopy(get_clients_test_tree())
+ metadata._handle_clients_xml_event(Mock())
+
+ if not self.use_db:
+ self.assertItemsEqual(metadata.clients,
+ dict([(c.get("name"), c.get("profile"))
+ for c in get_clients_test_tree().findall("//Client")]))
+ aliases = dict([(a.get("name"), a.getparent().get("name"))
+ for a in get_clients_test_tree().findall("//Alias")])
+ self.assertItemsEqual(metadata.aliases, aliases)
+
+ raliases = dict([(c.get("name"), set())
+ for c in get_clients_test_tree().findall("//Client")])
+ for alias in get_clients_test_tree().findall("//Alias"):
+ raliases[alias.getparent().get("name")].add(alias.get("name"))
+ self.assertItemsEqual(metadata.raliases, raliases)
+
+ self.assertEqual(metadata.secure,
+ [c.get("name")
+ for c in get_clients_test_tree().findall("//Client[@secure='true']")])
+ self.assertEqual(metadata.floating, ["client1", "client10"])
+
+ addresses = dict([(c.get("address"), [])
+ for c in get_clients_test_tree().findall("//*[@address]")])
+ raddresses = dict()
+ for client in get_clients_test_tree().findall("//Client[@address]"):
+ addresses[client.get("address")].append(client.get("name"))
+ try:
+ raddresses[client.get("name")].append(client.get("address"))
+ except KeyError:
+ raddresses[client.get("name")] = [client.get("address")]
+ for alias in get_clients_test_tree().findall("//Alias[@address]"):
+ addresses[alias.get("address")].append(alias.getparent().get("name"))
+ try:
+ raddresses[alias.getparent().get("name")].append(alias.get("address"))
+ except KeyError:
+ raddresses[alias.getparent().get("name")] = alias.get("address")
+
+ self.assertItemsEqual(metadata.addresses, addresses)
+ self.assertItemsEqual(metadata.raddresses, raddresses)
+ self.assertTrue(metadata.states['clients.xml'])
+
+ def load_groups_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = self.get_obj()
+ metadata.groups_xml.data = \
+ xdata or copy.deepcopy(get_groups_test_tree())
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+ evt = Mock()
+ evt.filename = os.path.join(datastore, "Metadata", "groups.xml")
+ evt.code2str = Mock(return_value="changed")
+ metadata.HandleEvent(evt)
+ return metadata
+
+ def test_handle_groups_xml_event(self):
+ metadata = self.get_obj()
+ metadata.groups_xml = Mock()
+ metadata.groups_xml.xdata = get_groups_test_tree()
+ metadata._handle_groups_xml_event(Mock())
+
+ self.assertTrue(metadata.states['groups.xml'])
+ self.assertTrue(metadata.groups['group1'].is_public)
+ self.assertTrue(metadata.groups['group2'].is_public)
+ self.assertFalse(metadata.groups['group3'].is_public)
+ self.assertFalse(metadata.groups['group1'].is_private)
+ self.assertFalse(metadata.groups['group2'].is_private)
+ self.assertTrue(metadata.groups['group3'].is_private)
+ self.assertTrue(metadata.groups['group1'].is_profile)
+ self.assertTrue(metadata.groups['group2'].is_profile)
+ self.assertFalse(metadata.groups['group3'].is_profile)
+ self.assertItemsEqual(metadata.groups.keys(),
+ set(g.get("name")
+ for g in get_groups_test_tree().findall("//Group")))
+ self.assertEqual(metadata.groups['group1'].category, 'category1')
+ self.assertEqual(metadata.groups['group2'].category, 'category1')
+ self.assertEqual(metadata.groups['group3'].category, 'category2')
+ self.assertEqual(metadata.groups['group4'].category, 'category1')
+ self.assertEqual(metadata.default, "group1")
+
+ all_groups = []
+ negated_groups = []
+ for group in get_groups_test_tree().xpath("//Groups/Client//*") + \
+ get_groups_test_tree().xpath("//Groups/Group//*"):
+ if group.tag == 'Group' and not group.getchildren():
+ if group.get("negate", "false").lower() == 'true':
+ negated_groups.append(group.get("name"))
+ else:
+ all_groups.append(group.get("name"))
+ self.assertItemsEqual([g.name
+ for g in metadata.group_membership.values()],
+ all_groups)
+ self.assertItemsEqual([g.name
+ for g in metadata.negated_groups.values()],
+ negated_groups)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_set_profile(self):
+ metadata = self.get_obj()
+ if 'clients.xml' in metadata.states:
+ metadata.states['clients.xml'] = False
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ metadata.set_profile,
+ None, None, None)
+
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
+
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.set_profile,
+ "client1", "group5", None)
+
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.set_profile,
+ "client1", "group3", None)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_set_profile_db(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ if metadata._use_db:
+ profile = "group1"
+ client_name = self.get_nonexistent_client(metadata)
+ metadata.set_profile(client_name, profile, None)
+ self.assertIn(client_name, metadata.clients)
+ self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
+ metadata.set_profile,
+ client_name, profile, None)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
+ def test_set_profile_xml(self, mock_update_client, mock_add_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ if not metadata._use_db:
+ metadata.clients_xml.write = Mock()
+ metadata.set_profile("client1", "group2", None)
+ mock_update_client.assert_called_with("client1",
+ dict(profile="group2"))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clientgroups["client1"], ["group2"])
+
+ metadata.clients_xml.write.reset_mock()
+ new1 = self.get_nonexistent_client(metadata)
+ metadata.set_profile(new1, "group1", None)
+ mock_add_client.assert_called_with(new1, dict(profile="group1"))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clientgroups[new1], ["group1"])
+
+ metadata.clients_xml.write.reset_mock()
+ new2 = self.get_nonexistent_client(metadata)
+ metadata.session_cache[('1.2.3.6', None)] = (None, new2)
+ metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None))
+ mock_add_client.assert_called_with(new2,
+ dict(uuid='uuid_new',
+ profile="group1",
+ address='1.2.3.6'))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clientgroups["uuid_new"], ["group1"])
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("socket.gethostbyaddr")
+ def test_resolve_client(self, mock_gethostbyaddr):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
+
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.resolve_client,
+ ('1.2.3.2', None))
+ self.assertEqual(metadata.resolve_client(('1.2.3.1', None)), 'client1')
+
+ metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
+ 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
+ cleanup_cache=True), 'client3')
+ self.assertEqual(metadata.session_cache, dict())
+
+ mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
+ mock_gethostbyaddr.assert_called_with('1.2.3.6')
+
+ mock_gethostbyaddr.reset_mock()
+ mock_gethostbyaddr.return_value = ('alias3', [], ['1.2.3.7'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.7', None)), 'client4')
+ mock_gethostbyaddr.assert_called_with('1.2.3.7')
+
+ mock_gethostbyaddr.reset_mock()
+ mock_gethostbyaddr.return_value = None
+ mock_gethostbyaddr.side_effect = socket.herror
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.resolve_client,
+ ('1.2.3.8', None))
+ mock_gethostbyaddr.assert_called_with('1.2.3.8')
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
+ def test_get_initial_metadata(self, mock_clientmetadata):
+ metadata = self.get_obj()
+ if 'clients.xml' in metadata.states:
+ metadata.states['clients.xml'] = False
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ metadata.get_initial_metadata, None)
+
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
+
+ # test address, password
+ metadata.get_initial_metadata("client1")
+ mock_clientmetadata.assert_called_with("client1", "group1",
+ set(["group1"]), set(), set(),
+ set(["1.2.3.1"]),
+ dict(category1='group1'), None,
+ 'password2', None,
+ metadata.query)
+
+ # test address, bundles, category suppression
+ metadata.get_initial_metadata("client2")
+ mock_clientmetadata.assert_called_with("client2", "group2",
+ set(["group2"]),
+ set(["bundle1", "bundle2"]),
+ set(), set(["1.2.3.2"]),
+ dict(category1="group2"),
+ None, None, None,
+ metadata.query)
+
+ # test aliases, address, uuid, password
+ imd = metadata.get_initial_metadata("alias1")
+ mock_clientmetadata.assert_called_with("client3", "group1",
+ set(["group1"]), set(),
+ set(['alias1']),
+ set(["1.2.3.3"]),
+ dict(category1="group1"),
+ 'uuid1', 'password2', None,
+ metadata.query)
+
+ # test new client creation
+ new1 = self.get_nonexistent_client(metadata)
+ imd = metadata.get_initial_metadata(new1)
+ mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]),
+ set(), set(), set(),
+ dict(category1="group1"), None,
+ None, None, metadata.query)
+
+ # test nested groups, address, per-client groups
+ imd = metadata.get_initial_metadata("client8")
+ mock_clientmetadata.assert_called_with("client8", "group1",
+ set(["group1", "group8",
+ "group9", "group10"]),
+ set(),
+ set(), set(["1.2.3.5"]),
+ dict(category1="group1"),
+ None, None, None, metadata.query)
+
+ # test setting per-client groups, group negation, nested groups
+ imd = metadata.get_initial_metadata("client9")
+ mock_clientmetadata.assert_called_with("client9", "group2",
+ set(["group2", "group8",
+ "group11"]),
+ set(["bundle1", "bundle2"]),
+ set(), set(),
+ dict(category1="group2"), None,
+ "password3", None,
+ metadata.query)
+
+ # test new client with no default profile
+ metadata.default = None
+ new2 = self.get_nonexistent_client(metadata)
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.get_initial_metadata, new2)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_merge_groups(self):
+ metadata = self.get_obj()
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
+
+ self.assertEqual(metadata._merge_groups("client1", set(["group1"]),
+ categories=dict(group1="category1")),
+ (set(["group1"]), dict(group1="category1")))
+
+ self.assertEqual(metadata._merge_groups("client8",
+ set(["group1", "group8", "group9"]),
+ categories=dict(group1="category1")),
+ (set(["group1", "group8", "group9", "group10"]),
+ dict(group1="category1")))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_get_all_group_names(self):
+ metadata = self.load_groups_data()
+ self.assertItemsEqual(metadata.get_all_group_names(),
+ set([g.get("name")
+ for g in get_groups_test_tree().findall("//Group")]))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_get_all_groups_in_category(self):
+ metadata = self.load_groups_data()
+ self.assertItemsEqual(metadata.get_all_groups_in_category("category1"),
+ set([g.get("name")
+ for g in get_groups_test_tree().findall("//Group[@category='category1']")]))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_get_client_names_by_profiles(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ self.assertItemsEqual(metadata.get_client_names_by_profiles(["group2"]),
+ [c.get("name")
+ for c in get_clients_test_tree().findall("//Client[@profile='group2']")])
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_get_client_names_by_groups(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ # this is not the best test in the world, since we mock
+ # core.build_metadata to just build _initial_ metadata, which
+ # is not at all the same thing. it turns out that mocking
+ # this out without starting a Bcfg2 server is pretty
+ # non-trivial, so this works-ish
+ metadata.core.build_metadata = Mock()
+ metadata.core.build_metadata.side_effect = \
+ lambda c: metadata.get_initial_metadata(c)
+ self.assertItemsEqual(metadata.get_client_names_by_groups(["group2"]),
+ [c.get("name")
+ for c in get_clients_test_tree().findall("//Client[@profile='group2']")])
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_merge_additional_groups(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ imd = metadata.get_initial_metadata("client2")
+
+ # test adding a group excluded by categories
+ oldgroups = imd.groups
+ metadata.merge_additional_groups(imd, ["group4"])
+ self.assertEqual(imd.groups, oldgroups)
+
+ # test adding a private group
+ oldgroups = imd.groups
+ metadata.merge_additional_groups(imd, ["group3"])
+ self.assertEqual(imd.groups, oldgroups)
+
+ # test adding groups with bundles
+ oldgroups = imd.groups
+ oldbundles = imd.bundles
+ metadata.merge_additional_groups(imd, ["group7"])
+ self.assertEqual(imd.groups, oldgroups.union(["group7"]))
+ self.assertEqual(imd.bundles, oldbundles.union(["bundle3"]))
+
+ # test adding multiple groups
+ imd = metadata.get_initial_metadata("client2")
+ oldgroups = imd.groups
+ metadata.merge_additional_groups(imd, ["group6", "group8"])
+ self.assertItemsEqual(imd.groups,
+ oldgroups.union(["group6", "group8", "group9"]))
+
+ # test adding a group that is not defined in groups.xml
+ imd = metadata.get_initial_metadata("client2")
+ oldgroups = imd.groups
+ metadata.merge_additional_groups(imd, ["group6", "newgroup"])
+ self.assertItemsEqual(imd.groups,
+ oldgroups.union(["group6", "newgroup"]))
+
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_merge_additional_data(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ imd = metadata.get_initial_metadata("client1")
+
+ # we need to use a unique attribute name for this test. this
+ # is probably overkill, but it works
+ pattern = "connector%d"
+ for i in range(0, 100):
+ connector = pattern % i
+ if not hasattr(imd, connector):
+ break
+ self.assertFalse(hasattr(imd, connector),
+ "Could not find unique connector name to test "
+ "merge_additional_data()")
+
+ metadata.merge_additional_data(imd, connector, "test data")
+ self.assertEqual(getattr(imd, connector), "test data")
+ self.assertIn(connector, imd.connectors)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_validate_client_address(self, mock_resolve_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ self.assertTrue(metadata.validate_client_address("client1",
+ (None, None)))
+ self.assertTrue(metadata.validate_client_address("client2",
+ ("1.2.3.2", None)))
+ self.assertFalse(metadata.validate_client_address("client2",
+ ("1.2.3.8", None)))
+ self.assertTrue(metadata.validate_client_address("client4",
+ ("1.2.3.2", None)))
+ # this is upper case to ensure that case is folded properly in
+ # validate_client_address()
+ mock_resolve_client.return_value = "CLIENT4"
+ self.assertTrue(metadata.validate_client_address("client4",
+ ("1.2.3.7", None)))
+ mock_resolve_client.assert_called_with(("1.2.3.7", None))
+
+ mock_resolve_client.reset_mock()
+ self.assertFalse(metadata.validate_client_address("client5",
+ ("1.2.3.5", None)))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_AuthenticateConnection(self, mock_resolve_client,
+ mock_validate_client_address):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.password = "password1"
+
+ cert = dict(subject=[[("commonName", "client1")]])
+ mock_validate_client_address.return_value = False
+ self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.1"))
+ mock_validate_client_address.return_value = True
+ self.assertTrue(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.1"))
+ # floating cert-auth clients add themselves to the cache
+ self.assertIn("1.2.3.1", metadata.session_cache)
+ self.assertEqual(metadata.session_cache["1.2.3.1"][1], "client1")
+
+ cert = dict(subject=[[("commonName", "client7")]])
+ self.assertTrue(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.4"))
+ # non-floating cert-auth clients do not add themselves to the cache
+ self.assertNotIn("1.2.3.4", metadata.session_cache)
+
+ cert = dict(subject=[[("commonName", "client8")]])
+
+ mock_resolve_client.return_value = "client5"
+ self.assertTrue(metadata.AuthenticateConnection(None, "root",
+ "password1", "1.2.3.8"))
+
+ mock_resolve_client.side_effect = \
+ Bcfg2.Server.Plugin.MetadataConsistencyError
+ self.assertFalse(metadata.AuthenticateConnection(None, "root",
+ "password1",
+ "1.2.3.8"))
+
+ # secure mode, no password
+ self.assertFalse(metadata.AuthenticateConnection(None, 'client2', None,
+ "1.2.3.2"))
+
+ self.assertTrue(metadata.AuthenticateConnection(None, 'uuid1',
+ "password1", "1.2.3.3"))
+ # non-root, non-cert clients populate session cache
+ self.assertIn("1.2.3.3", metadata.session_cache)
+ self.assertEqual(metadata.session_cache["1.2.3.3"][1], "client3")
+
+ # use alternate password
+ self.assertTrue(metadata.AuthenticateConnection(None, 'client3',
+ "password2", "1.2.3.3"))
+
+ # test secure mode
+ self.assertFalse(metadata.AuthenticateConnection(None, 'client9',
+ "password1",
+ "1.2.3.9"))
+ self.assertTrue(metadata.AuthenticateConnection(None, 'client9',
+ "password3", "1.2.3.9"))
+
+ self.assertFalse(metadata.AuthenticateConnection(None, "client5",
+ "password2",
+ "1.2.3.7"))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
+ def test_process_statistics(self, mock_update_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ md = Mock()
+ md.hostname = "client6"
+ metadata.process_statistics(md, None)
+ mock_update_client.assert_called_with(md.hostname,
+ dict(auth='cert'))
+
+ mock_update_client.reset_mock()
+ md.hostname = "client5"
+ metadata.process_statistics(md, None)
+ self.assertFalse(mock_update_client.called)
+
+ def test_viz(self):
+ pass
+
+
+class TestMetadataBase(TestMetadata):
+ """ base test object for testing Metadata with database enabled """
+ __test__ = False
+ use_db = True
+
+ @skipUnless(has_django, "Django not found")
+ def setUp(self):
+ syncdb(TestMetadataDB)
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = get_obj()
+ for client in get_clients_test_tree().findall("Client"):
+ metadata.add_client(client.get("name"))
+ return metadata
+
+ def get_nonexistent_client(self, _, prefix="newclient"):
+ clients = [o.hostname for o in MetadataClientModel.objects.all()]
+ i = 0
+ client_name = "%s%s" % (prefix, i)
+ while client_name in clients:
+ i += 1
+ client_name = "%s%s" % (prefix, i)
+ return client_name
+
+ @patch('os.path.exists')
+ def test__init(self, mock_exists):
+ core = Mock()
+ core.fam = Mock()
+ mock_exists.return_value = False
+ metadata = self.get_obj(core=core, watch_clients=True)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked)
+ core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+
+ mock_exists.return_value = True
+ core.fam.reset_mock()
+ metadata = self.get_obj(core=core, watch_clients=True)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "clients.xml"),
+ metadata)
+
+ def test_add_group(self):
+ pass
+
+ def test_add_bundle(self):
+ pass
+
+ def test_add_client(self):
+ metadata = self.get_obj()
+ hostname = self.get_nonexistent_client(metadata)
+ client = metadata.add_client(hostname)
+ self.assertIsInstance(client, MetadataClientModel)
+ self.assertEqual(client.hostname, hostname)
+ self.assertIn(hostname, metadata.clients)
+ self.assertIn(hostname, metadata.list_clients())
+ self.assertItemsEqual(metadata.clients,
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_update_group(self):
+ pass
+
+ def test_update_bundle(self):
+ pass
+
+ def test_update_client(self):
+ pass
+
+ def test_list_clients(self):
+ metadata = self.get_obj()
+ self.assertItemsEqual(metadata.list_clients(),
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_remove_group(self):
+ pass
+
+ def test_remove_bundle(self):
+ pass
+
+ def test_remove_client(self):
+ metadata = self.get_obj()
+ client_name = self.get_nonexistent_client(metadata)
+
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.remove_client,
+ client_name)
+
+ metadata.add_client(client_name)
+ metadata.remove_client(client_name)
+ self.assertNotIn(client_name, metadata.clients)
+ self.assertNotIn(client_name, metadata.list_clients())
+ self.assertItemsEqual(metadata.clients,
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_process_statistics(self):
+ pass
+
+
+class TestMetadata_NoClientsXML(TestMetadataBase):
+ """ test Metadata without a clients.xml. we have to disable or
+ override tests that rely on client options """
+ # only run these tests if it's possible to skip tests or if we
+ # have django. otherwise they'll all get run because our fake
+ # skipping decorators for python < 2.7 won't work when they
+ # decorate setUp()
+ if can_skip or has_django:
+ __test__ = True
+
+ def load_groups_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = self.get_obj()
+ if not xdata:
+ xdata = copy.deepcopy(get_groups_test_tree())
+ for client in get_clients_test_tree().findall("Client"):
+ newclient = \
+ lxml.etree.SubElement(xdata.getroot(),
+ "Client", name=client.get("name"))
+ lxml.etree.SubElement(newclient, "Group",
+ name=client.get("profile"))
+ metadata.groups_xml.data = xdata
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+ evt = Mock()
+ evt.filename = os.path.join(datastore, "Metadata", "groups.xml")
+ evt.code2str = Mock(return_value="changed")
+ metadata.HandleEvent(evt)
+ return metadata
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
+ def test_get_initial_metadata(self, mock_clientmetadata):
+ metadata = self.get_obj()
+ if 'clients.xml' in metadata.states:
+ metadata.states['clients.xml'] = False
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ metadata.get_initial_metadata, None)
+
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
+
+ # test basic client metadata
+ metadata.get_initial_metadata("client1")
+ mock_clientmetadata.assert_called_with("client1", "group1",
+ set(["group1"]), set(), set(),
+ set(), dict(category1='group1'),
+ None, None, None, metadata.query)
+
+ # test bundles, category suppression
+ metadata.get_initial_metadata("client2")
+ mock_clientmetadata.assert_called_with("client2", "group2",
+ set(["group2"]),
+ set(["bundle1", "bundle2"]),
+ set(), set(),
+ dict(category1="group2"), None,
+ None, None, metadata.query)
+
+ # test new client creation
+ new1 = self.get_nonexistent_client(metadata)
+ imd = metadata.get_initial_metadata(new1)
+ mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]),
+ set(), set(), set(),
+ dict(category1="group1"), None,
+ None, None, metadata.query)
+
+ # test nested groups, per-client groups
+ imd = metadata.get_initial_metadata("client8")
+ mock_clientmetadata.assert_called_with("client8", "group1",
+ set(["group1", "group8",
+ "group9", "group10"]),
+ set(), set(), set(),
+ dict(category1="group1"), None,
+ None, None, metadata.query)
+
+ # test per-client groups, group negation, nested groups
+ imd = metadata.get_initial_metadata("client9")
+ mock_clientmetadata.assert_called_with("client9", "group2",
+ set(["group2", "group8",
+ "group11"]),
+ set(["bundle1", "bundle2"]),
+ set(), set(),
+ dict(category1="group2"), None,
+ None, None, metadata.query)
+
+ # test exception on new client with no default profile
+ metadata.default = None
+ new2 = self.get_nonexistent_client(metadata)
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.get_initial_metadata,
+ new2)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_validate_client_address(self, mock_resolve_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ # this is upper case to ensure that case is folded properly in
+ # validate_client_address()
+ mock_resolve_client.return_value = "CLIENT4"
+ self.assertTrue(metadata.validate_client_address("client4",
+ ("1.2.3.7", None)))
+ mock_resolve_client.assert_called_with(("1.2.3.7", None))
+
+ mock_resolve_client.reset_mock()
+ self.assertFalse(metadata.validate_client_address("client5",
+ ("1.2.3.5", None)))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_AuthenticateConnection(self, mock_resolve_client,
+ mock_validate_client_address):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.password = "password1"
+
+ cert = dict(subject=[[("commonName", "client1")]])
+ mock_validate_client_address.return_value = False
+ self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.1"))
+ mock_validate_client_address.return_value = True
+ self.assertTrue(metadata.AuthenticateConnection(cert, "root",
+ metadata.password,
+ "1.2.3.1"))
+
+ cert = dict(subject=[[("commonName", "client8")]])
+
+ mock_resolve_client.return_value = "client5"
+ self.assertTrue(metadata.AuthenticateConnection(None, "root",
+ "password1", "1.2.3.8"))
+
+ mock_resolve_client.side_effect = \
+ Bcfg2.Server.Plugin.MetadataConsistencyError
+ self.assertFalse(metadata.AuthenticateConnection(None, "root",
+ "password1",
+ "1.2.3.8"))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("socket.gethostbyaddr")
+ def test_resolve_client(self, mock_gethostbyaddr):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
+
+ metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
+ 'client3')
+ mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
+ cleanup_cache=True), 'client3')
+ self.assertEqual(metadata.session_cache, dict())
+
+ mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
+ mock_gethostbyaddr.assert_called_with('1.2.3.6')
+
+ mock_gethostbyaddr.reset_mock()
+ mock_gethostbyaddr.return_value = None
+ mock_gethostbyaddr.side_effect = socket.herror
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.resolve_client,
+ ('1.2.3.8', None))
+ mock_gethostbyaddr.assert_called_with('1.2.3.8')
+
+ def test_handle_clients_xml_event(self):
+ pass
+
+
+class TestMetadata_ClientsXML(TestMetadataBase):
+ """ test Metadata with a clients.xml. """
+ # only run these tests if it's possible to skip tests or if we
+ # have django. otherwise they'll all get run because our fake
+ # skipping decorators for python < 2.7 won't work when they
+ # decorate setUp()
+ if can_skip or has_django:
+ __test__ = True
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = self.get_obj()
+ metadata.core.fam = Mock()
+ metadata._handle_file("clients.xml")
+ metadata = TestMetadata.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+ return TestMetadataBase.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
new file mode 100644
index 000000000..0a971c245
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
@@ -0,0 +1,549 @@
+import os
+import sys
+import time
+import lxml.etree
+import Bcfg2.Server
+import Bcfg2.Server.Plugin
+from mock import Mock, MagicMock, patch
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+from Bcfg2.Server.Plugins.Probes import *
+from TestPlugin import TestEntrySet, TestProbing, TestConnector, \
+ TestDatabaseBacked
+
+# test data for JSON and YAML tests
+test_data = dict(a=1, b=[1, 2, 3], c="test")
+
+class FakeList(list):
+ pass
+
+
+class TestProbesDB(DBModelTestCase):
+ if has_django:
+ models = [ProbesGroupsModel, ProbesDataModel]
+
+
+class TestClientProbeDataSet(Bcfg2TestCase):
+ def test__init(self):
+ ds = ClientProbeDataSet()
+ self.assertLessEqual(ds.timestamp, time.time())
+ self.assertIsInstance(ds, dict)
+ self.assertNotIn("timestamp", ds)
+
+ ds = ClientProbeDataSet(timestamp=123)
+ self.assertEqual(ds.timestamp, 123)
+ self.assertNotIn("timestamp", ds)
+
+class TestProbeData(Bcfg2TestCase):
+ def test_str(self):
+ # a value that is not valid XML, JSON, or YAML
+ val = "'test"
+
+ # test string behavior
+ data = ProbeData(val)
+ self.assertIsInstance(data, str)
+ self.assertEqual(data, val)
+ # test 1.2.0-1.2.2 broken behavior
+ self.assertEqual(data.data, val)
+ # test that formatted data accessors return None
+ self.assertIsNone(data.xdata)
+ self.assertIsNone(data.yaml)
+ self.assertIsNone(data.json)
+
+ def test_xdata(self):
+ xdata = lxml.etree.Element("test")
+ lxml.etree.SubElement(xdata, "test2")
+ data = ProbeData(lxml.etree.tostring(xdata,
+ xml_declaration=False).decode('UTF-8'))
+ self.assertIsNotNone(data.xdata)
+ self.assertIsNotNone(data.xdata.find("test2"))
+
+ @skipUnless(has_json, "JSON libraries not found, skipping JSON tests")
+ def test_json(self):
+ jdata = json.dumps(test_data)
+ data = ProbeData(jdata)
+ self.assertIsNotNone(data.json)
+ self.assertItemsEqual(test_data, data.json)
+
+ @skipUnless(has_yaml, "YAML libraries not found, skipping YAML tests")
+ def test_yaml(self):
+ jdata = yaml.dump(test_data)
+ data = ProbeData(jdata)
+ self.assertIsNotNone(data.yaml)
+ self.assertItemsEqual(test_data, data.yaml)
+
+
+class TestProbeSet(TestEntrySet):
+ test_obj = ProbeSet
+ basenames = ["test", "_test", "test-test"]
+ ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"]
+ bogus_names = ["test.py"]
+
+ def get_obj(self, path=datastore, fam=None, encoding=None,
+ plugin_name="Probes", basename=None):
+ # get_obj() accepts the basename argument, accepted by the
+ # parent get_obj() method, and just throws it away, since
+ # ProbeSet uses a regex for the "basename"
+ if fam is None:
+ fam = Mock()
+ rv = self.test_obj(path, fam, encoding, plugin_name)
+ rv.entry_type = MagicMock()
+ return rv
+
+ def test__init(self):
+ fam = Mock()
+ ps = self.get_obj(fam=fam)
+ self.assertEqual(ps.plugin_name, "Probes")
+ fam.AddMonitor.assert_called_with(datastore, ps)
+ TestEntrySet.test__init(self)
+
+ def test_HandleEvent(self):
+ ps = self.get_obj()
+ ps.handle_event = Mock()
+
+ # test that events on the data store itself are skipped
+ evt = Mock()
+ evt.filename = datastore
+ ps.HandleEvent(evt)
+ self.assertFalse(ps.handle_event.called)
+
+ # test that events on probed.xml are skipped
+ evt.reset_mock()
+ evt.filename = "probed.xml"
+ ps.HandleEvent(evt)
+ self.assertFalse(ps.handle_event.called)
+
+ # test that other events are processed appropriately
+ evt.reset_mock()
+ evt.filename = "fooprobe"
+ ps.HandleEvent(evt)
+ ps.handle_event.assert_called_with(evt)
+
+ @patch("%s.list" % builtins, FakeList)
+ def test_get_probe_data(self):
+ ps = self.get_obj()
+
+ # build some fairly complex test data for this. in the end,
+ # we want the probe data to include only the most specific
+ # version of a given probe, and by basename only, not full
+ # (specific) name. We don't fully test the specificity stuff,
+ # we just check to make sure sort() is called and trust that
+ # sort() does the right thing on Specificity objects. (I.e.,
+ # trust that Specificity is well-tested. Hah!) We also test
+ # to make sure the interpreter is determined correctly.
+ ps.get_matching = Mock()
+ matching = FakeList()
+ matching.sort = Mock()
+
+ p1 = Mock()
+ p1.specific = Bcfg2.Server.Plugin.Specificity(group=True, prio=10)
+ p1.name = "fooprobe.G10_foogroup"
+ p1.data = """#!/bin/bash
+group-specific"""
+ matching.append(p1)
+
+ p2 = Mock()
+ p2.specific = Bcfg2.Server.Plugin.Specificity(all=True)
+ p2.name = "fooprobe"
+ p2.data = "#!/bin/bash"
+ matching.append(p2)
+
+ p3 = Mock()
+ p3.specific = Bcfg2.Server.Plugin.Specificity(all=True)
+ p3.name = "barprobe"
+ p3.data = "#! /usr/bin/env python"
+ matching.append(p3)
+
+ p4 = Mock()
+ p4.specific = Bcfg2.Server.Plugin.Specificity(all=True)
+ p4.name = "bazprobe"
+ p4.data = ""
+ matching.append(p4)
+
+ ps.get_matching.return_value = matching
+
+ metadata = Mock()
+ pdata = ps.get_probe_data(metadata)
+ ps.get_matching.assert_called_with(metadata)
+ # we can't create a matching operator.attrgetter object, and I
+ # don't feel the need to mock that out -- this is a good
+ # enough check
+ self.assertTrue(matching.sort.called)
+
+ self.assertEqual(len(pdata), 3,
+ "Found: %s" % [p.get("name") for p in pdata])
+ for probe in pdata:
+ if probe.get("name") == "fooprobe":
+ self.assertIn("group-specific", probe.text)
+ self.assertEqual(probe.get("interpreter"), "/bin/bash")
+ elif probe.get("name") == "barprobe":
+ self.assertEqual(probe.get("interpreter"),
+ "/usr/bin/env python")
+ elif probe.get("name") == "bazprobe":
+ self.assertIsNotNone(probe.get("interpreter"))
+ else:
+ assert False, "Strange probe found in get_probe_data() return"
+
+
+class TestProbes(TestProbing, TestConnector, TestDatabaseBacked):
+ test_obj = Probes
+
+ def get_test_probedata(self):
+ test_xdata = lxml.etree.Element("test")
+ lxml.etree.SubElement(test_xdata, "test", foo="foo")
+ rv = dict()
+ rv["foo.example.com"] = ClientProbeDataSet(timestamp=time.time())
+ rv["foo.example.com"]["xml"] = \
+ ProbeData(lxml.etree.tostring(test_xdata,
+ xml_declaration=False).decode('UTF-8'))
+ rv["foo.example.com"]["text"] = ProbeData("freeform text")
+ rv["foo.example.com"]["multiline"] = ProbeData("""multiple
+lines
+of
+freeform
+text
+""")
+ rv["bar.example.com"] = ClientProbeDataSet(timestamp=time.time())
+ rv["bar.example.com"]["empty"] = ProbeData("")
+ if has_yaml:
+ rv["bar.example.com"]["yaml"] = ProbeData(yaml.dump(test_data))
+ if has_json:
+ rv["bar.example.com"]["json"] = ProbeData(json.dumps(test_data))
+ return rv
+
+ def get_test_cgroups(self):
+ return {"foo.example.com": ["group", "group with spaces",
+ "group-with-dashes"],
+ "bar.example.com": []}
+
+ def get_probes_object(self, use_db=False, load_data=None):
+ core = Mock()
+ core.setup.cfp.getboolean = Mock()
+ core.setup.cfp.getboolean.return_value = use_db
+ if load_data is None:
+ load_data = MagicMock()
+ # we have to patch load_data() in a funny way because
+ # different versions of Mock have different scopes for
+ # patching. in some versions, a patch applied to
+ # get_probes_object() would only apply to that function, while
+ # in others it would also apply to the calling function (e.g.,
+ # test__init(), which relies on being able to check the calls
+ # of load_data(), and thus on load_data() being consistently
+ # mocked)
+ @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", new=load_data)
+ def inner():
+ return Probes(core, datastore)
+
+ return inner()
+
+ def test__init(self):
+ mock_load_data = Mock()
+ probes = self.get_probes_object(load_data=mock_load_data)
+ probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore,
+ probes.name),
+ probes.probes)
+ mock_load_data.assert_any_call()
+ self.assertEqual(probes.probedata, ClientProbeDataSet())
+ self.assertEqual(probes.cgroups, dict())
+
+ @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", Mock())
+ def test__use_db(self):
+ probes = self.get_probes_object()
+ self.assertFalse(probes._use_db)
+ probes.core.setup.cfp.getboolean.assert_called_with("probes",
+ "use_database",
+ default=False)
+
+ @skipUnless(has_django, "Django not found, skipping")
+ @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock())
+ @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock())
+ def test_write_data_xml(self):
+ probes = self.get_probes_object(use_db=False)
+ probes.write_data("test")
+ probes._write_data_xml.assert_called_with("test")
+ self.assertFalse(probes._write_data_db.called)
+
+ @skipUnless(has_django, "Django not found, skipping")
+ @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock())
+ @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock())
+ def test_write_data_db(self):
+ probes = self.get_probes_object(use_db=True)
+ probes.write_data("test")
+ probes._write_data_db.assert_called_with("test")
+ self.assertFalse(probes._write_data_xml.called)
+
+ @patch("%s.open" % builtins)
+ def test__write_data_xml(self, mock_open):
+ probes = self.get_probes_object(use_db=False)
+ probes.probedata = self.get_test_probedata()
+ probes.cgroups = self.get_test_cgroups()
+ probes._write_data_xml(None)
+
+ mock_open.assert_called_with(os.path.join(datastore, probes.name,
+ "probed.xml"), "w")
+ data = lxml.etree.XML(mock_open.return_value.write.call_args[0][0])
+ self.assertEqual(len(data.xpath("//Client")), 2)
+
+ foodata = data.find("Client[@name='foo.example.com']")
+ self.assertIsNotNone(foodata)
+ self.assertIsNotNone(foodata.get("timestamp"))
+ self.assertEqual(len(foodata.findall("Probe")),
+ len(probes.probedata['foo.example.com']))
+ self.assertEqual(len(foodata.findall("Group")),
+ len(probes.cgroups['foo.example.com']))
+ xml = foodata.find("Probe[@name='xml']")
+ self.assertIsNotNone(xml)
+ self.assertIsNotNone(xml.get("value"))
+ xdata = lxml.etree.XML(xml.get("value"))
+ self.assertIsNotNone(xdata)
+ self.assertIsNotNone(xdata.find("test"))
+ self.assertEqual(xdata.find("test").get("foo"), "foo")
+ text = foodata.find("Probe[@name='text']")
+ self.assertIsNotNone(text)
+ self.assertIsNotNone(text.get("value"))
+ multiline = foodata.find("Probe[@name='multiline']")
+ self.assertIsNotNone(multiline)
+ self.assertIsNotNone(multiline.get("value"))
+ self.assertGreater(len(multiline.get("value").splitlines()), 1)
+
+ bardata = data.find("Client[@name='bar.example.com']")
+ self.assertIsNotNone(bardata)
+ self.assertIsNotNone(bardata.get("timestamp"))
+ self.assertEqual(len(bardata.findall("Probe")),
+ len(probes.probedata['bar.example.com']))
+ self.assertEqual(len(bardata.findall("Group")),
+ len(probes.cgroups['bar.example.com']))
+ empty = bardata.find("Probe[@name='empty']")
+ self.assertIsNotNone(empty)
+ self.assertIsNotNone(empty.get("value"))
+ self.assertEqual(empty.get("value"), "")
+ if has_yaml:
+ ydata = bardata.find("Probe[@name='yaml']")
+ self.assertIsNotNone(ydata)
+ self.assertIsNotNone(ydata.get("value"))
+ self.assertItemsEqual(test_data, yaml.load(ydata.get("value")))
+ if has_json:
+ jdata = bardata.find("Probe[@name='json']")
+ self.assertIsNotNone(jdata)
+ self.assertIsNotNone(jdata.get("value"))
+ self.assertItemsEqual(test_data, json.loads(jdata.get("value")))
+
+ @skipUnless(has_django, "Django not found, skipping")
+ def test__write_data_db(self):
+ syncdb(TestProbesDB)
+ probes = self.get_probes_object(use_db=True)
+ probes.probedata = self.get_test_probedata()
+ probes.cgroups = self.get_test_cgroups()
+
+ for cname in ["foo.example.com", "bar.example.com"]:
+ client = Mock()
+ client.hostname = cname
+ probes._write_data_db(client)
+
+ pdata = ProbesDataModel.objects.filter(hostname=cname).all()
+ self.assertEqual(len(pdata), len(probes.probedata[cname]))
+
+ for probe in pdata:
+ self.assertEqual(probe.hostname, client.hostname)
+ self.assertIsNotNone(probe.data)
+ if probe.probe == "xml":
+ xdata = lxml.etree.XML(probe.data)
+ self.assertIsNotNone(xdata)
+ self.assertIsNotNone(xdata.find("test"))
+ self.assertEqual(xdata.find("test").get("foo"), "foo")
+ elif probe.probe == "text":
+ pass
+ elif probe.probe == "multiline":
+ self.assertGreater(len(probe.data.splitlines()), 1)
+ elif probe.probe == "empty":
+ self.assertEqual(probe.data, "")
+ elif probe.probe == "yaml":
+ self.assertItemsEqual(test_data, yaml.load(probe.data))
+ elif probe.probe == "json":
+ self.assertItemsEqual(test_data, json.loads(probe.data))
+ else:
+ assert False, "Strange probe found in _write_data_db data"
+
+ pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
+ self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
+
+ # test that old probe data is removed properly
+ cname = 'foo.example.com'
+ del probes.probedata[cname]['text']
+ probes.cgroups[cname].pop()
+ client = Mock()
+ client.hostname = cname
+ probes._write_data_db(client)
+
+ pdata = ProbesDataModel.objects.filter(hostname=cname).all()
+ self.assertEqual(len(pdata), len(probes.probedata[cname]))
+ pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
+ self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
+
+ @skipUnless(has_django, "Django not found, skipping")
+ @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock())
+ @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock())
+ def test_load_data_xml(self):
+ probes = self.get_probes_object(use_db=False)
+ probes.load_data()
+ probes._load_data_xml.assert_any_call()
+ self.assertFalse(probes._load_data_db.called)
+
+ @skipUnless(has_django, "Django not found, skipping")
+ @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock())
+ @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock())
+ def test_load_data_db(self):
+ probes = self.get_probes_object(use_db=True)
+ probes.load_data()
+ probes._load_data_db.assert_any_call()
+ self.assertFalse(probes._load_data_xml.called)
+
+ @patch("%s.open" % builtins)
+ @patch("lxml.etree.parse")
+ def test__load_data_xml(self, mock_parse, mock_open):
+ probes = self.get_probes_object(use_db=False)
+ # to get the value for lxml.etree.parse to parse, we call
+ # _write_data_xml, mock the open() call, and grab the data
+ # that gets "written" to probed.xml
+ probes.probedata = self.get_test_probedata()
+ probes.cgroups = self.get_test_cgroups()
+ probes._write_data_xml(None)
+ xdata = \
+ lxml.etree.XML(str(mock_open.return_value.write.call_args[0][0]))
+ mock_parse.return_value = xdata.getroottree()
+ probes.probedata = dict()
+ probes.cgroups = dict()
+
+ probes._load_data_xml()
+ mock_parse.assert_called_with(os.path.join(datastore, probes.name,
+ 'probed.xml'),
+ parser=Bcfg2.Server.XMLParser)
+ self.assertItemsEqual(probes.probedata, self.get_test_probedata())
+ self.assertItemsEqual(probes.cgroups, self.get_test_cgroups())
+
+ @skipUnless(has_django, "Django not found, skipping")
+ def test__load_data_db(self):
+ syncdb(TestProbesDB)
+ probes = self.get_probes_object(use_db=True)
+ probes.probedata = self.get_test_probedata()
+ probes.cgroups = self.get_test_cgroups()
+ for cname in probes.probedata.keys():
+ client = Mock()
+ client.hostname = cname
+ probes._write_data_db(client)
+
+ probes.probedata = dict()
+ probes.cgroups = dict()
+ probes._load_data_db()
+ self.assertItemsEqual(probes.probedata, self.get_test_probedata())
+ # the db backend does not store groups at all if a client has
+ # no groups set, so we can't just use assertItemsEqual here,
+ # because loading saved data may _not_ result in the original
+ # data if some clients had no groups set.
+ test_cgroups = self.get_test_cgroups()
+ for cname, groups in test_cgroups.items():
+ if cname in probes.cgroups:
+ self.assertEqual(groups, probes.cgroups[cname])
+ else:
+ self.assertEqual(groups, [])
+
+ @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data")
+ def test_GetProbes(self, mock_get_probe_data):
+ TestProbing.test_GetProbes(self)
+
+ probes = self.get_probes_object()
+ metadata = Mock()
+ probes.GetProbes(metadata)
+ mock_get_probe_data.assert_called_with(metadata)
+
+ @patch("Bcfg2.Server.Plugins.Probes.Probes.write_data")
+ @patch("Bcfg2.Server.Plugins.Probes.Probes.ReceiveDataItem")
+ def test_ReceiveData(self, mock_ReceiveDataItem, mock_write_data):
+ TestProbing.test_ReceiveData(self)
+
+ # we use a simple (read: bogus) datalist here to make this
+ # easy to test
+ datalist = ["a", "b", "c"]
+
+ probes = self.get_probes_object()
+ client = Mock()
+ client.hostname = "foo.example.com"
+ probes.ReceiveData(client, datalist)
+
+ self.assertItemsEqual(mock_ReceiveDataItem.call_args_list,
+ [call(client, "a"), call(client, "b"),
+ call(client, "c")])
+ mock_write_data.assert_called_with(client)
+
+ def test_ReceiveDataItem(self):
+ probes = self.get_probes_object()
+ for cname, cdata in self.get_test_probedata().items():
+ client = Mock()
+ client.hostname = cname
+ for pname, pdata in cdata.items():
+ dataitem = lxml.etree.Element("Probe", name=pname)
+ if pname == "text":
+ # add some groups to the plaintext test to test
+ # group parsing
+ data = [pdata]
+ for group in self.get_test_cgroups()[cname]:
+ data.append("group:%s" % group)
+ dataitem.text = "\n".join(data)
+ else:
+ dataitem.text = str(pdata)
+
+ probes.ReceiveDataItem(client, dataitem)
+
+ self.assertIn(client.hostname, probes.probedata)
+ self.assertIn(pname, probes.probedata[cname])
+ self.assertEqual(pdata, probes.probedata[cname][pname])
+ self.assertIn(client.hostname, probes.cgroups)
+ self.assertEqual(probes.cgroups[cname],
+ self.get_test_cgroups()[cname])
+
+ def test_get_additional_groups(self):
+ TestConnector.test_get_additional_groups(self)
+
+ probes = self.get_probes_object()
+ test_cgroups = self.get_test_cgroups()
+ probes.cgroups = self.get_test_cgroups()
+ for cname in test_cgroups.keys():
+ metadata = Mock()
+ metadata.hostname = cname
+ self.assertEqual(test_cgroups[cname],
+ probes.get_additional_groups(metadata))
+ # test a non-existent client
+ metadata = Mock()
+ metadata.hostname = "nonexistent"
+ self.assertEqual(probes.get_additional_groups(metadata),
+ list())
+
+ def test_get_additional_data(self):
+ TestConnector.test_get_additional_data(self)
+
+ probes = self.get_probes_object()
+ test_probedata = self.get_test_probedata()
+ probes.probedata = self.get_test_probedata()
+ for cname in test_probedata.keys():
+ metadata = Mock()
+ metadata.hostname = cname
+ self.assertEqual(test_probedata[cname],
+ probes.get_additional_data(metadata))
+ # test a non-existent client
+ metadata = Mock()
+ metadata.hostname = "nonexistent"
+ self.assertEqual(probes.get_additional_data(metadata),
+ ClientProbeDataSet())
+
+
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py
new file mode 100644
index 000000000..c319ed663
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py
@@ -0,0 +1,109 @@
+import os
+import sys
+import lxml.etree
+import Bcfg2.Server.Plugin
+from Bcfg2.Compat import b64encode
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.SEModules import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+from TestPlugin import TestSpecificData, TestGroupSpool
+
+class TestSEModuleData(TestSpecificData):
+ test_obj = SEModuleData
+ path = os.path.join(datastore, "SEModules", "test.pp", "test.pp")
+
+ def test_bind_entry(self):
+ data = self.get_obj()
+ data.data = "test"
+ entry = lxml.etree.Element("test", name=self.path)
+ data.bind_entry(entry, Mock())
+ self.assertEqual(entry.get("name"), self.path)
+ self.assertEqual(entry.get("encoding"), "base64")
+ self.assertEqual(entry.text, b64encode(data.data))
+
+
+class TestSEModules(TestGroupSpool):
+ test_obj = SEModules
+
+ def test__get_module_name(self):
+ modules = self.get_obj()
+ for mname in ["foo", "foo.pp"]:
+ entry = lxml.etree.Element("SELinux", type="module", name=mname)
+ self.assertEqual(modules._get_module_name(entry), "/foo.pp")
+
+ @patch("Bcfg2.Server.Plugins.SEModules.SEModules._get_module_name")
+ def test_HandlesEntry(self, mock_get_name):
+ modules = self.get_obj()
+ modules.Entries['SELinux']['/foo.pp'] = Mock()
+ modules.Entries['SELinux']['/bar.pp'] = Mock()
+ for el in [lxml.etree.Element("Path", name="/foo.pp"),
+ lxml.etree.Element("SELinux", type="fcontext",
+ name="/foo.pp"),
+ lxml.etree.Element("SELinux", type="module",
+ name="/baz.pp")]:
+ mock_get_name.return_value = el.get("name")
+ self.assertFalse(modules.HandlesEntry(el, Mock()))
+ mock_get_name.assert_called_with(el)
+
+ for el in [lxml.etree.Element("SELinux", type="module",
+ name="/foo.pp"),
+ lxml.etree.Element("SELinux", type="module",
+ name="/bar.pp")]:
+ mock_get_name.return_value = el.get("name")
+ self.assertTrue(modules.HandlesEntry(el, Mock()),
+ msg="SEModules fails to handle %s" % el.get("name"))
+ mock_get_name.assert_called_with(el)
+
+ TestGroupSpool.test_HandlesEntry(self)
+
+ @patch("Bcfg2.Server.Plugins.SEModules.SEModules._get_module_name")
+ def test_HandlesEntry(self, mock_get_name):
+ modules = self.get_obj()
+ handler = Mock()
+ modules.Entries['SELinux']['/foo.pp'] = handler
+ mock_get_name.return_value = "/foo.pp"
+
+ entry = lxml.etree.Element("SELinux", type="module", name="foo")
+ metadata = Mock()
+ self.assertEqual(modules.HandleEntry(entry, metadata),
+ handler.return_value)
+ mock_get_name.assert_called_with(entry)
+ self.assertEqual(entry.get("name"), mock_get_name.return_value)
+ handler.assert_called_with(entry, metadata)
+
+ TestGroupSpool.test_HandlesEntry(self)
+
+ def test_add_entry(self):
+ @patch("%s.%s.event_path" %
+ (self.test_obj.__module__, self.test_obj.__name__))
+ @patch("%s.%s.add_entry" % (self.test_obj.__base__.__module__,
+ self.test_obj.__base__.__name__))
+ def inner(mock_add_entry, mock_event_path):
+ modules = self.get_obj()
+
+ evt = Mock()
+ evt.filename = "test.pp.G10_foo"
+
+ mock_event_path.return_value = os.path.join(datastore,
+ self.test_obj.__name__,
+ "test.pp",
+ "test.pp.G10_foo")
+ modules.add_entry(evt)
+ self.assertEqual(modules.filename_pattern, "test.pp")
+ mock_add_entry.assert_called_with(modules, evt)
+ mock_event_path.assert_called_with(evt)
+
+ inner()
+ TestGroupSpool.test_add_entry(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
new file mode 100644
index 000000000..556487288
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
@@ -0,0 +1,120 @@
+import os
+import sys
+import Bcfg2.Server.Plugin
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.TemplateHelper import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+from TestPlugin import TestDirectoryBacked, TestConnector, TestPlugin, \
+ TestFileBacked
+
+
+class TestHelperModule(TestFileBacked):
+ test_obj = HelperModule
+ path = os.path.join(datastore, "test.py")
+
+ def test__init(self):
+ hm = self.get_obj()
+ self.assertEqual(hm._module_name, "test")
+ self.assertEqual(hm._attrs, [])
+
+ @patch("imp.load_source")
+ def test_Index(self, mock_load_source):
+ hm = self.get_obj()
+
+ mock_load_source.side_effect = ImportError
+ attrs = dir(hm)
+ hm.Index()
+ mock_load_source.assert_called_with(hm._module_name, hm.name)
+ self.assertEqual(attrs, dir(hm))
+ self.assertEqual(hm._attrs, [])
+
+ mock_load_source.reset()
+ mock_load_source.side_effect = None
+ # a regular Mock (not a MagicMock) won't automatically create
+ # __export__, so this triggers a failure condition in Index
+ mock_load_source.return_value = Mock()
+ attrs = dir(hm)
+ hm.Index()
+ mock_load_source.assert_called_with(hm._module_name, hm.name)
+ self.assertEqual(attrs, dir(hm))
+ self.assertEqual(hm._attrs, [])
+
+ # test reserved attributes
+ module = Mock()
+ module.__export__ = ["_attrs", "Index", "__init__"]
+ mock_load_source.reset()
+ mock_load_source.return_value = module
+ attrs = dir(hm)
+ hm.Index()
+ mock_load_source.assert_called_with(hm._module_name, hm.name)
+ self.assertEqual(attrs, dir(hm))
+ self.assertEqual(hm._attrs, [])
+
+ # test adding attributes
+ module = Mock()
+ module.__export__ = ["foo", "bar", "baz", "Index"]
+ mock_load_source.reset()
+ mock_load_source.return_value = module
+ hm.Index()
+ mock_load_source.assert_called_with(hm._module_name, hm.name)
+ self.assertTrue(hasattr(hm, "foo"))
+ self.assertTrue(hasattr(hm, "bar"))
+ self.assertTrue(hasattr(hm, "baz"))
+ self.assertEqual(hm._attrs, ["foo", "bar", "baz"])
+
+ # test removing attributes
+ module = Mock()
+ module.__export__ = ["foo", "bar", "quux", "Index"]
+ mock_load_source.reset()
+ mock_load_source.return_value = module
+ hm.Index()
+ mock_load_source.assert_called_with(hm._module_name, hm.name)
+ self.assertTrue(hasattr(hm, "foo"))
+ self.assertTrue(hasattr(hm, "bar"))
+ self.assertTrue(hasattr(hm, "quux"))
+ self.assertFalse(hasattr(hm, "baz"))
+ self.assertEqual(hm._attrs, ["foo", "bar", "quux"])
+
+
+
+class TestHelperSet(TestDirectoryBacked):
+ test_obj = HelperSet
+ testfiles = ['foo.py', 'foo_bar.py', 'foo.bar.py']
+ ignore = ['fooo.py~', 'fooo.pyc', 'fooo.pyo']
+ badevents = ['foo']
+
+
+class TestTemplateHelper(TestPlugin, TestConnector):
+ test_obj = TemplateHelper
+
+ def test__init(self):
+ TestPlugin.test__init(self)
+
+ th = self.get_obj()
+ self.assertIsInstance(th.helpers, HelperSet)
+
+ def test_get_additional_data(self):
+ TestConnector.test_get_additional_data(self)
+
+ th = self.get_obj()
+ modules = ['foo', 'bar']
+ rv = dict()
+ for mname in modules:
+ module = Mock()
+ module._module_name = mname
+ rv[mname] = module
+ th.helpers.entries['%s.py' % mname] = module
+ actual = th.get_additional_data(Mock())
+ self.assertItemsEqual(actual, rv)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/__init__.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/__init__.py
diff --git a/testsuite/Testsrc/Testlib/TestServer/__init__.py b/testsuite/Testsrc/Testlib/TestServer/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/__init__.py
diff --git a/testsuite/Testsrc/Testlib/__init__.py b/testsuite/Testsrc/Testlib/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/__init__.py