summaryrefslogtreecommitdiffstats
path: root/testsuite/Testsrc/Testlib/TestServer/TestPlugins
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins')
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py223
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py111
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py55
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py59
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py23
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py88
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py21
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py31
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py225
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py41
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py349
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py2
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py237
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py60
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py29
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py326
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py621
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py326
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py259
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py2
20 files changed, 1423 insertions, 1665 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py
new file mode 100644
index 000000000..86a960701
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py
@@ -0,0 +1,223 @@
+import os
+import sys
+import lxml.etree
+import Bcfg2.Server.Plugin
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.ACL import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \
+ TestClientACLs
+
+
+class TestFunctions(Bcfg2TestCase):
+ def test_rmi_names_equal(self):
+ good_cases = [('*', 'foo'),
+ ('foo', 'foo'),
+ ('foo.*', 'foo.bar'),
+ ('*.*', 'foo.bar'),
+ ('foo.bar', 'foo.bar'),
+ ('*.bar', 'foo.bar'),
+ ('foo.*.bar', 'foo.baz.bar')]
+ bad_cases = [('foo', 'bar'),
+ ('*', 'foo.bar'),
+ ('*.*', 'foo'),
+ ('*.*', 'foo.bar.baz'),
+ ('foo.*', 'bar.foo'),
+ ('*.bar', 'bar.foo'),
+ ('foo.*', 'foobar')]
+ for first, second in good_cases:
+ self.assertTrue(rmi_names_equal(first, second),
+ "rmi_names_equal(%s, %s) unexpectedly False" %
+ (first, second))
+ self.assertTrue(rmi_names_equal(second, first),
+ "rmi_names_equal(%s, %s) unexpectedly False" %
+ (second, first))
+ for first, second in bad_cases:
+ self.assertFalse(rmi_names_equal(first, second),
+ "rmi_names_equal(%s, %s) unexpectedly True" %
+ (first, second))
+ self.assertFalse(rmi_names_equal(second, first),
+ "rmi_names_equal(%s, %s) unexpectedly True" %
+ (second, first))
+
+ def test_ip_matches(self):
+ good_cases = [
+ ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")),
+ ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="24")),
+ ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.0")),
+ ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.224")),
+ ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="27")),
+ ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0",
+ netmask="16"))]
+ bad_cases = [
+ ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")),
+ ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="24")),
+ ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.0")),
+ ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.224")),
+ ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="27")),
+ ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0",
+ netmask="16"))]
+ for ip, entry in good_cases:
+ self.assertTrue(ip_matches(ip, entry),
+ "ip_matches(%s, %s) unexpectedly False" %
+ (ip, lxml.etree.tostring(entry)))
+ for ip, entry in bad_cases:
+ self.assertFalse(ip_matches(ip, entry),
+ "ip_matches(%s, %s) unexpectedly True" %
+ (ip, lxml.etree.tostring(entry)))
+
+
+class TestIPACLFile(TestXMLFileBacked):
+ test_obj = IPACLFile
+
+ @patch("Bcfg2.Server.Plugins.ACL.ip_matches")
+ @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal")
+ def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches):
+ af = self.get_obj()
+ ip = "10.0.0.8"
+ rmi = "ACL.test"
+
+ def reset():
+ mock_rmi_names_equal.reset_mock()
+ mock_ip_matches.reset_mock()
+
+ # test default defer with no entries
+ af.entries = []
+ self.assertIsNone(af.check_acl(ip, rmi))
+
+ # test explicit allow, deny, and defer
+ entries = dict(Allow=lxml.etree.Element("Allow", method=rmi),
+ Deny=lxml.etree.Element("Deny", method=rmi),
+ Defer=lxml.etree.Element("Defer", method=rmi))
+ af.entries = list(entries.values())
+
+ def get_ip_matches(tag):
+ def ip_matches(ip, entry):
+ return entry.tag == tag
+
+ return ip_matches
+
+ mock_rmi_names_equal.return_value = True
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Allow")
+ self.assertTrue(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Allow'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Deny")
+ self.assertFalse(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Deny'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Defer")
+ self.assertIsNone(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Defer'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ # test matching RMI names
+ reset()
+ mock_ip_matches.side_effect = lambda i, e: True
+ mock_rmi_names_equal.side_effect = lambda a, b: a == b
+ rmi = "ACL.test2"
+ matching = lxml.etree.Element("Allow", method=rmi)
+ af.entries.append(matching)
+ self.assertTrue(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, matching)
+ self.assertTrue(
+ call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or
+ call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list)
+
+ # test implicit allow for localhost, defer for others
+ reset()
+ mock_ip_matches.side_effect = lambda i, e: False
+ self.assertIsNone(af.check_acl(ip, rmi))
+
+ reset()
+ self.assertTrue(af.check_acl("127.0.0.1", rmi))
+
+
+class TestMetadataACLFile(TestStructFile):
+ test_obj = MetadataACLFile
+
+ @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal")
+ def test_check_acl(self, mock_rmi_names_equal):
+ af = self.get_obj()
+ af.Match = Mock()
+ metadata = Mock()
+ mock_rmi_names_equal.side_effect = lambda a, b: a == b
+
+ def reset():
+ af.Match.reset_mock()
+ mock_rmi_names_equal.reset_mock()
+
+ # test default allow
+ af.entries = []
+ self.assertTrue(af.check_acl(metadata, 'ACL.test'))
+
+ # test explicit allow and deny
+ reset()
+ af.entries = [lxml.etree.Element("Allow", method='ACL.test'),
+ lxml.etree.Element("Deny", method='ACL.test2')]
+ af.Match.return_value = af.entries
+ self.assertTrue(af.check_acl(metadata, 'ACL.test'))
+ af.Match.assert_called_with(metadata)
+ self.assertIn(call('ACL.test', 'ACL.test'),
+ mock_rmi_names_equal.call_args_list)
+
+ reset()
+ self.assertFalse(af.check_acl(metadata, 'ACL.test2'))
+ af.Match.assert_called_with(metadata)
+ self.assertIn(call('ACL.test2', 'ACL.test2'),
+ mock_rmi_names_equal.call_args_list)
+
+ # test default deny for non-localhost
+ reset()
+ self.assertFalse(af.check_acl(metadata, 'ACL.test3'))
+ af.Match.assert_called_with(metadata)
+
+ # test default allow for localhost
+ reset()
+ metadata.hostname = 'localhost'
+ self.assertTrue(af.check_acl(metadata, 'ACL.test3'))
+ af.Match.assert_called_with(metadata)
+
+
+class TestACL(TestPlugin, TestClientACLs):
+ test_obj = ACL
+
+ def test_check_acl_ip(self):
+ acl = self.get_obj()
+ acl.ip_acls = Mock()
+ self.assertEqual(acl.check_acl_ip(("192.168.1.10", "12345"),
+ "ACL.test"),
+ acl.ip_acls.check_acl.return_value)
+ acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test")
+
+ def test_check_acl_metadata(self):
+ acl = self.get_obj()
+ acl.metadata_acls = Mock()
+ metadata = Mock()
+ self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"),
+ acl.metadata_acls.check_acl.return_value)
+ acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test")
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py
new file mode 100644
index 000000000..cfb379c40
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py
@@ -0,0 +1,111 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Bundler import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestStructFile, TestPlugin, TestStructure, \
+ TestXMLDirectoryBacked
+
+
+class TestBundleFile(TestStructFile):
+ test_obj = BundleFile
+ path = os.path.join(datastore, "test", "test1.xml")
+
+ def test_bundle_name(self):
+ cases = [("foo.xml", "foo"),
+ ("foo.bar.xml", "foo.bar"),
+ ("foo-bar-baz.xml", "foo-bar-baz"),
+ ("foo....xml", "foo..."),
+ ("foo.genshi", "foo")]
+ bf = self.get_obj()
+ for fname, bname in cases:
+ bf.name = fname
+ self.assertEqual(bf.bundle_name, bname)
+
+
+class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked):
+ test_obj = Bundler
+
+ def get_obj(self, core=None):
+ @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__,
+ self.test_obj.__name__),
+ Mock())
+ def inner():
+ return TestPlugin.get_obj(self, core=core)
+ return inner()
+
+ @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent")
+ def test_HandleEvent(self, mock_HandleEvent):
+ b = self.get_obj()
+ b.bundles = dict(foo=Mock(), bar=Mock())
+ b.entries = {"foo.xml": BundleFile("foo.xml"),
+ "baz.xml": BundleFile("baz.xml")}
+ event = Mock()
+ b.HandleEvent(event)
+ mock_HandleEvent.assert_called_with(b, event)
+ self.assertItemsEqual(b.bundles,
+ dict(foo=b.entries['foo.xml'],
+ baz=b.entries['baz.xml']))
+
+ def test_BuildStructures(self):
+ b = self.get_obj()
+ b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(),
+ has_dep=Mock(), is_dep=Mock(), indep=Mock())
+ expected = dict()
+
+ b.bundles['error'].XMLMatch.side_effect = TemplateError(None)
+
+ xinclude = lxml.etree.Element("Bundle")
+ lxml.etree.SubElement(lxml.etree.SubElement(xinclude, "Bundle"),
+ "Path", name="/test")
+ b.bundles['xinclude'].XMLMatch.return_value = xinclude
+ expected['xinclude'] = lxml.etree.Element("Bundle", name="xinclude")
+ lxml.etree.SubElement(expected['xinclude'], "Path", name="/test")
+
+ has_dep = lxml.etree.Element("Bundle")
+ lxml.etree.SubElement(has_dep, "Bundle", name="is_dep")
+ lxml.etree.SubElement(has_dep, "Package", name="foo")
+ b.bundles['has_dep'].XMLMatch.return_value = has_dep
+ expected['has_dep'] = lxml.etree.Element("Bundle", name="has_dep")
+ lxml.etree.SubElement(expected['has_dep'], "Package", name="foo")
+
+ is_dep = lxml.etree.Element("Bundle")
+ lxml.etree.SubElement(is_dep, "Package", name="bar")
+ b.bundles['is_dep'].XMLMatch.return_value = is_dep
+ expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep")
+ lxml.etree.SubElement(expected['is_dep'], "Package", name="bar")
+
+ indep = lxml.etree.Element("Bundle", independent="true")
+ lxml.etree.SubElement(indep, "Service", name="baz")
+ b.bundles['indep'].XMLMatch.return_value = indep
+ expected['indep'] = lxml.etree.Element("Independent", name="indep")
+ lxml.etree.SubElement(expected['indep'], "Service", name="baz")
+
+ metadata = Mock()
+ metadata.bundles = ["error", "xinclude", "has_dep", "indep"]
+
+ rv = b.BuildStructures(metadata)
+ self.assertEqual(len(rv), 4)
+ for bundle in rv:
+ name = bundle.get("name")
+ self.assertIsNotNone(name,
+ "Bundle %s was not built" % name)
+ self.assertIn(name, expected,
+ "Unexpected bundle %s was built" % name)
+ self.assertXMLEqual(bundle, expected[name],
+ "Bundle %s was not built correctly" % name)
+ b.bundles[name].XMLMatch.assert_called_with(metadata)
+
+ b.bundles['error'].XMLMatch.assert_called_with(metadata)
+ self.assertFalse(b.bundles['skip'].XMLMatch.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
index d655a20cd..f41ae8a46 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
@@ -23,12 +23,16 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
test_obj = CfgAuthorizedKeysGenerator
should_monitor = False
- def get_obj(self, name=None, core=None, fam=None):
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
+ TestStructFile.setUp(self)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.get_cfg")
+ def get_obj(self, mock_get_cfg, name=None, core=None, fam=None):
if name is None:
name = self.path
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG = Mock()
if core is not None:
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG.core = core
+ mock_get_cfg.return_value.core = core
return self.test_obj(name)
@patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
@@ -40,33 +44,9 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
mock_HandleEvent.assert_called_with(akg, evt)
mock_handle_event.assert_called_with(akg, evt)
- def test_category(self):
- akg = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp
-
- self.assertIsNone(akg.category)
- cfp.has_section.assert_called_with("sshkeys")
-
- cfp.reset_mock()
- cfp.has_section.return_value = True
- self.assertIsNone(akg.category)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
-
- cfp.reset_mock()
- cfp.has_option.return_value = True
- self.assertEqual(akg.category, cfp.get.return_value)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.get.assert_called_with("sshkeys", "category")
-
@patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata")
- @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category")
def test_get_data(self, mock_ClientMetadata):
+ Bcfg2.Options.setup.sshkeys_category = "category"
akg = self.get_obj()
akg.XMLMatch = Mock()
@@ -131,17 +111,18 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
reset()
host = "baz.example.com"
spec = lxml.etree.Element("AuthorizedKeys")
- lxml.etree.SubElement(
- lxml.etree.SubElement(spec,
- "Allow",
- attrib={"from": pubkey, "host": host}),
- "Params", foo="foo", bar="bar=bar")
+ allow = lxml.etree.SubElement(spec, "Allow",
+ attrib={"from": pubkey, "host": host})
+ lxml.etree.SubElement(allow, "Option", name="foo", value="foo")
+ lxml.etree.SubElement(allow, "Option", name="bar")
+ lxml.etree.SubElement(allow, "Option", name="baz", value="baz=baz")
akg.XMLMatch.return_value = spec
params, actual_host, actual_pubkey = akg.get_data(entry,
metadata).split()
self.assertEqual(actual_host, host)
self.assertEqual(actual_pubkey, pubkey)
- self.assertItemsEqual(params.split(","), ["foo=foo", "bar=bar=bar"])
+ self.assertItemsEqual(params.split(","), ["foo=foo", "bar",
+ "baz=baz=baz"])
akg.XMLMatch.assert_called_with(metadata)
akg.core.build_metadata.assert_called_with(host)
self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey)
@@ -151,10 +132,10 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
spec = lxml.etree.Element("AuthorizedKeys")
text = lxml.etree.SubElement(spec, "Allow")
text.text = "ssh-rsa publickey /foo/bar\n"
- lxml.etree.SubElement(text, "Params", foo="foo")
+ lxml.etree.SubElement(text, "Option", name="foo")
akg.XMLMatch.return_value = spec
self.assertEqual(akg.get_data(entry, metadata),
- "foo=foo %s" % text.text.strip())
+ "foo %s" % text.text.strip())
akg.XMLMatch.assert_called_with(metadata)
self.assertFalse(akg.core.build_metadata.called)
self.assertFalse(akg.core.Bind.called)
@@ -163,7 +144,7 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey})
akg.XMLMatch.return_value = spec
self.assertItemsEqual(akg.get_data(entry, metadata).splitlines(),
- ["foo=foo %s" % text.text.strip(),
+ ["foo %s" % text.text.strip(),
"profile %s" % pubkey])
akg.XMLMatch.assert_called_with(metadata)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
index fc5d5e53d..93331304a 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
@@ -17,32 +17,39 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if HAS_CHEETAH or can_skip:
- class TestCfgCheetahGenerator(TestCfgGenerator):
- test_obj = CfgCheetahGenerator
+class TestCfgCheetahGenerator(TestCfgGenerator):
+ test_obj = CfgCheetahGenerator
- @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
- def setUp(self):
- pass
+ @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
+ set_setup_default("repository", datastore)
- @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template")
- def test_get_data(self, mock_Template):
- ccg = self.get_obj(encoding='UTF-8')
- ccg.data = "data"
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock()
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template")
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.get_template_data")
+ def test_get_data(self, mock_get_template_data, mock_Template):
+ ccg = self.get_obj()
+ ccg.data = "data"
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
- self.assertEqual(ccg.get_data(entry, metadata),
- mock_Template.return_value.respond.return_value)
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo")
- mock_Template.assert_called_with("data".decode(ccg.encoding),
- compilerSettings=ccg.settings)
- tmpl = mock_Template.return_value
- tmpl.respond.assert_called_with()
- self.assertEqual(tmpl.metadata, metadata)
- self.assertEqual(tmpl.name, entry.get("name"))
- self.assertEqual(tmpl.path, entry.get("name"))
- self.assertEqual(tmpl.source_path, ccg.name)
- self.assertEqual(tmpl.repo,
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value)
+ template_vars = dict(name=entry.get("name"),
+ metadata=metadata,
+ path=ccg.name,
+ source_path=ccg.name,
+ repo=datastore)
+ mock_get_template_data.return_value = template_vars
+
+ self.assertEqual(ccg.get_data(entry, metadata),
+ mock_Template.return_value.respond.return_value)
+ mock_Template.assert_called_with(
+ "data".decode(Bcfg2.Options.setup.encoding),
+ compilerSettings=ccg.settings)
+ tmpl = mock_Template.return_value
+ tmpl.respond.assert_called_with()
+ for key, val in template_vars.items():
+ self.assertEqual(getattr(tmpl, key), val)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, ccg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultCheetahDataProvider)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
index 46062569d..4c987551b 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
@@ -30,18 +30,17 @@ except ImportError:
HAS_CRYPTO = False
-if can_skip or (HAS_CRYPTO and HAS_CHEETAH):
- class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator,
- TestCfgEncryptedGenerator):
- test_obj = CfgEncryptedCheetahGenerator
+class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator,
+ TestCfgEncryptedGenerator):
+ test_obj = CfgEncryptedCheetahGenerator
- @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
- def setUp(self):
- pass
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
+ def setUp(self):
+ pass
- def test_handle_event(self):
- TestCfgEncryptedGenerator.test_handle_event(self)
+ def test_handle_event(self):
+ TestCfgEncryptedGenerator.test_handle_event(self)
- def test_get_data(self):
- TestCfgCheetahGenerator.test_get_data(self)
+ def test_get_data(self):
+ TestCfgCheetahGenerator.test_get_data(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
index 71a7410da..873ebd837 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
@@ -18,59 +18,53 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if can_skip or HAS_CRYPTO:
- class TestCfgEncryptedGenerator(TestCfgGenerator):
- test_obj = CfgEncryptedGenerator
+class TestCfgEncryptedGenerator(TestCfgGenerator):
+ test_obj = CfgEncryptedGenerator
- @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- def setUp(self):
- pass
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
- @patchIf(HAS_CRYPTO,
- "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm")
- @patchIf(HAS_CRYPTO,
- "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt")
- def test_handle_event(self, mock_decrypt, mock_get_algorithm):
- @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
- def inner(mock_handle_event):
- def reset():
- mock_decrypt.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_handle_event.reset_mock()
+ @patchIf(HAS_CRYPTO,
+ "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt")
+ def test_handle_event(self, mock_decrypt):
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
+ def inner(mock_handle_event):
+ def reset():
+ mock_decrypt.reset_mock()
+ mock_handle_event.reset_mock()
- def get_event_data(obj, event):
- obj.data = "encrypted"
+ def get_event_data(obj, event):
+ obj.data = "encrypted"
- mock_handle_event.side_effect = get_event_data
- mock_decrypt.side_effect = lambda d, **kw: "plaintext"
- event = Mock()
- ceg = self.get_obj()
- ceg.handle_event(event)
- mock_handle_event.assert_called_with(ceg, event)
- mock_decrypt.assert_called_with("encrypted",
- setup=SETUP,
- algorithm=mock_get_algorithm.return_value)
- self.assertEqual(ceg.data, "plaintext")
+ mock_handle_event.side_effect = get_event_data
+ mock_decrypt.side_effect = lambda d, **kw: "plaintext"
+ event = Mock()
+ ceg = self.get_obj()
+ ceg.handle_event(event)
+ mock_handle_event.assert_called_with(ceg, event)
+ mock_decrypt.assert_called_with("encrypted")
+ self.assertEqual(ceg.data, "plaintext")
- reset()
- mock_decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError,
- ceg.handle_event, event)
- inner()
+ reset()
+ mock_decrypt.side_effect = EVPError
+ self.assertRaises(PluginExecutionError,
+ ceg.handle_event, event)
+ inner()
- # to perform the tests from the parent test object, we
- # make bruteforce_decrypt just return whatever data was
- # given to it
- mock_decrypt.side_effect = lambda d, **kw: d
- TestCfgGenerator.test_handle_event(self)
+ # to perform the tests from the parent test object, we
+ # make bruteforce_decrypt just return whatever data was
+ # given to it
+ mock_decrypt.side_effect = lambda d, **kw: d
+ TestCfgGenerator.test_handle_event(self)
- def test_get_data(self):
- ceg = self.get_obj()
- ceg.data = None
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
+ def test_get_data(self):
+ ceg = self.get_obj()
+ ceg.data = None
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
- self.assertRaises(PluginExecutionError,
- ceg.get_data, entry, metadata)
+ self.assertRaises(PluginExecutionError,
+ ceg.get_data, entry, metadata)
- TestCfgGenerator.test_get_data(self)
+ TestCfgGenerator.test_get_data(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
index b447a9bb8..0b74e4a60 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
@@ -14,20 +14,13 @@ while path != "/":
path = os.path.dirname(path)
from common import *
-try:
- from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
- TestCfgGenshiGenerator
- HAS_GENSHI = True
-except ImportError:
- TestCfgGenshiGenerator = object
- HAS_GENSHI = False
+from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
+ TestCfgGenshiGenerator
-if can_skip or (HAS_CRYPTO and HAS_GENSHI):
- class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
- test_obj = CfgEncryptedGenshiGenerator
+class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
+ test_obj = CfgEncryptedGenshiGenerator
- @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
- def setUp(self):
- pass
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ def setUp(self):
+ TestCfgGenshiGenerator.setUp(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
index 0f369113b..7ceedb7c2 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
@@ -21,35 +21,32 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier
class TestCfgExternalCommandVerifier(TestCfgVerifier):
test_obj = CfgExternalCommandVerifier
- @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen")
- def test_verify_entry(self, mock_Popen):
- proc = Mock()
- mock_Popen.return_value = proc
- proc.wait.return_value = 0
- proc.communicate.return_value = ("stdout", "stderr")
+ def test_verify_entry(self):
entry = lxml.etree.Element("Path", name="/test.txt")
metadata = Mock()
ecv = self.get_obj()
ecv.cmd = ["/bin/bash", "-x", "foo"]
+ ecv.exc = Mock()
+ ecv.exc.run.return_value = Mock()
+ ecv.exc.run.return_value.success = True
+
ecv.verify_entry(entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
- proc.communicate.assert_called_with(input="data")
- proc.wait.assert_called_with()
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
- mock_Popen.reset_mock()
- proc.wait.return_value = 13
+ ecv.exc.reset_mock()
+ ecv.exc.run.return_value.success = False
self.assertRaises(CfgVerificationError,
ecv.verify_entry, entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
- proc.communicate.assert_called_with(input="data")
- proc.wait.assert_called_with()
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
+
+ ecv.exc.reset_mock()
- mock_Popen.reset_mock()
- mock_Popen.side_effect = OSError
+ ecv.exc.reset_mock()
+ ecv.exc.run.side_effect = OSError
self.assertRaises(CfgVerificationError,
ecv.verify_entry, entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
@patch("os.access")
def test_handle_event(self, mock_access):
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
index 2e8b7bfa5..b667d417a 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
@@ -19,111 +19,120 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if can_skip or HAS_GENSHI:
- class TestCfgGenshiGenerator(TestCfgGenerator):
- test_obj = CfgGenshiGenerator
-
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
- def setUp(self):
- pass
-
- def test_removecomment(self):
- data = [(None, "test", 1),
- (None, "test2", 2)]
- stream = [(genshi.core.COMMENT, "test", 0),
- data[0],
- (genshi.core.COMMENT, "test3", 0),
- data[1]]
- self.assertItemsEqual(list(removecomment(stream)), data)
-
- def test__init(self):
- TestCfgGenerator.test__init(self)
- cgg = self.get_obj()
- self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
-
- def test_get_data(self):
- cgg = self.get_obj()
- cgg._handle_genshi_exception = Mock()
- cgg.template = Mock()
- fltr = Mock()
- cgg.template.generate.return_value = fltr
- stream = Mock()
- fltr.filter.return_value = stream
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
-
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock()
-
- def reset():
- cgg.template.reset_mock()
- cgg._handle_genshi_exception.reset_mock()
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock()
-
- template_vars = dict(
- name=entry.get("name"),
- metadata=metadata,
- path=cgg.name,
- source_path=cgg.name,
- repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value)
-
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- def render(fmt, **kwargs):
- stream.render.side_effect = None
- raise TypeError
- stream.render.side_effect = render
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- self.assertEqual(stream.render.call_args_list,
- [call("text", encoding=cgg.encoding,
- strip_whitespace=False),
- call("text", encoding=cgg.encoding)])
-
- reset()
- stream.render.side_effect = UndefinedError("test")
- self.assertRaises(UndefinedError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- stream.render.side_effect = ValueError
- cgg._handle_genshi_exception.side_effect = ValueError
- self.assertRaises(ValueError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
- self.assertTrue(cgg._handle_genshi_exception.called)
-
- def test_handle_event(self):
- cgg = self.get_obj()
- cgg.loader = Mock()
- event = Mock()
- cgg.handle_event(event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
-
- cgg.loader.reset_mock()
- cgg.loader.load.side_effect = OSError
- self.assertRaises(PluginExecutionError,
- cgg.handle_event, event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
+class TestCfgGenshiGenerator(TestCfgGenerator):
+ test_obj = CfgGenshiGenerator
+
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
+ set_setup_default("repository", datastore)
+
+ def test__init(self):
+ TestCfgGenerator.test__init(self)
+ cgg = self.get_obj()
+ self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.get_template_data")
+ def test_get_data(self, mock_get_template_data):
+ cgg = self.get_obj()
+ cgg._handle_genshi_exception = Mock()
+ cgg.template = Mock()
+ fltr = Mock()
+ cgg.template.generate.return_value = fltr
+ stream = Mock()
+ fltr.filter.return_value = stream
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+ def reset():
+ cgg.template.reset_mock()
+ cgg._handle_genshi_exception.reset_mock()
+ mock_get_template_data.reset_mock()
+
+ template_vars = dict(name=entry.get("name"),
+ metadata=metadata,
+ path=cgg.name,
+ source_path=cgg.name,
+ repo=datastore)
+ mock_get_template_data.return_value = template_vars
+
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, cgg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultGenshiDataProvider)
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with(
+ "text",
+ encoding=Bcfg2.Options.setup.encoding,
+ strip_whitespace=False)
+
+ reset()
+ def render(fmt, **kwargs):
+ stream.render.side_effect = None
+ raise TypeError
+ stream.render.side_effect = render
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, cgg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultGenshiDataProvider)
+ fltr.filter.assert_called_with(removecomment)
+ self.assertEqual(stream.render.call_args_list,
+ [call("text",
+ encoding=Bcfg2.Options.setup.encoding,
+ strip_whitespace=False),
+ call("text",
+ encoding=Bcfg2.Options.setup.encoding)])
+
+ reset()
+ stream.render.side_effect = UndefinedError("test")
+ self.assertRaises(UndefinedError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, cgg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultGenshiDataProvider)
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text",
+ encoding=Bcfg2.Options.setup.encoding,
+ strip_whitespace=False)
+
+ reset()
+ stream.render.side_effect = ValueError
+ cgg._handle_genshi_exception.side_effect = ValueError
+ self.assertRaises(ValueError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, cgg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultGenshiDataProvider)
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text",
+ encoding=Bcfg2.Options.setup.encoding,
+ strip_whitespace=False)
+ self.assertTrue(cgg._handle_genshi_exception.called)
+
+ def test_handle_event(self):
+ cgg = self.get_obj()
+ cgg.loader = Mock()
+ event = Mock()
+ cgg.handle_event(event)
+ cgg.loader.load.assert_called_with(
+ cgg.name,
+ cls=NewTextTemplate,
+ encoding=Bcfg2.Options.setup.encoding)
+
+ cgg.loader.reset_mock()
+ cgg.loader.load.side_effect = OSError
+ self.assertRaises(PluginExecutionError,
+ cgg.handle_event, event)
+ cgg.loader.load.assert_called_with(
+ cgg.name,
+ cls=NewTextTemplate,
+ encoding=Bcfg2.Options.setup.encoding)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
index 839e9c3b8..349da2213 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
@@ -21,53 +21,26 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgInfo
class TestCfgInfoXML(TestCfgInfo):
test_obj = CfgInfoXML
+ def setUp(self):
+ TestCfgInfo.setUp(self)
+ set_setup_default("filemonitor", MagicMock())
+
def test__init(self):
TestCfgInfo.test__init(self)
ci = self.get_obj()
self.assertIsInstance(ci.infoxml, InfoXML)
def test_bind_info_to_entry(self):
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
ci = self.get_obj()
ci.infoxml = Mock()
- ci._set_info = Mock()
-
- self.assertRaises(PluginExecutionError,
- ci.bind_info_to_entry, entry, metadata)
- ci.infoxml.pnode.Match.assert_called_with(metadata, dict(),
- entry=entry)
- self.assertFalse(ci._set_info.called)
-
- ci.infoxml.reset_mock()
- ci._set_info.reset_mock()
- mdata_value = Mock()
- def set_mdata(metadata, mdata, entry=None):
- mdata['Info'] = {None: mdata_value}
+ entry = Mock()
+ metadata = Mock()
- ci.infoxml.pnode.Match.side_effect = set_mdata
ci.bind_info_to_entry(entry, metadata)
- ci.infoxml.pnode.Match.assert_called_with(metadata,
- dict(Info={None: mdata_value}),
- entry=entry)
- ci._set_info.assert_called_with(entry, mdata_value)
+ ci.infoxml.BindEntry.assert_called_with(entry, metadata)
def test_handle_event(self):
ci = self.get_obj()
ci.infoxml = Mock()
ci.handle_event(Mock)
ci.infoxml.HandleEvent.assert_called_with()
-
- def test__set_info(self):
- @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info")
- def inner(mock_set_info):
- ci = self.get_obj()
- entry = Mock()
- info = {"foo": "foo",
- "__children__": ["one", "two"]}
- ci._set_info(entry, info)
- self.assertItemsEqual(entry.append.call_args_list,
- [call(c) for c in info['__children__']])
-
- inner()
- TestCfgInfo.test__set_info(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
index e139a592b..d64bbaabf 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
@@ -7,7 +7,7 @@ from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import *
from Bcfg2.Server.Plugin import PluginExecutionError
import Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator
try:
- from Bcfg2.Encryption import EVPError
+ from Bcfg2.Server.Encryption import EVPError
HAS_CRYPTO = True
except:
HAS_CRYPTO = False
@@ -22,100 +22,36 @@ while path != "/":
break
path = os.path.dirname(path)
from common import *
-from TestServer.TestPlugins.TestCfg.Test_init import TestCfgCreator
-from TestServer.TestPlugin.Testhelpers import TestStructFile
+from TestServer.TestPlugins.TestCfg.Test_init import TestXMLCfgCreator
-class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
+class TestCfgPrivateKeyCreator(TestXMLCfgCreator):
test_obj = CfgPrivateKeyCreator
should_monitor = False
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator.get_cfg", Mock())
def get_obj(self, name=None, fam=None):
- Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator.CFG = Mock()
- return TestCfgCreator.get_obj(self, name=name)
-
- @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
- @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent")
- def test_handle_event(self, mock_HandleEvent, mock_handle_event):
- pkc = self.get_obj()
- evt = Mock()
- pkc.handle_event(evt)
- mock_HandleEvent.assert_called_with(pkc, evt)
- mock_handle_event.assert_called_with(pkc, evt)
-
- def test_category(self):
- pkc = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
-
- self.assertIsNone(pkc.category)
- cfp.has_section.assert_called_with("sshkeys")
-
- cfp.reset_mock()
- cfp.has_section.return_value = True
- self.assertIsNone(pkc.category)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
-
- cfp.reset_mock()
- cfp.has_option.return_value = True
- self.assertEqual(pkc.category, cfp.get.return_value)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.get.assert_called_with("sshkeys", "category")
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- def test_passphrase(self, mock_get_passphrases):
- pkc = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
-
- self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
-
- cfp.reset_mock()
- cfp.has_section.return_value = True
- self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
-
- cfp.reset_mock()
- cfp.get.return_value = "test"
- mock_get_passphrases.return_value = dict(test="foo", test2="bar")
- cfp.has_option.return_value = True
- self.assertEqual(pkc.passphrase, "foo")
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
- cfp.get.assert_called_with("sshkeys", "passphrase")
- mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ return TestXMLCfgCreator.get_obj(self, name=name)
@patch("shutil.rmtree")
@patch("tempfile.mkdtemp")
- @patch("subprocess.Popen")
- def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree):
+ def test__gen_keypair(self, mock_mkdtemp, mock_rmtree):
pkc = self.get_obj()
+ pkc.cmd = Mock()
pkc.XMLMatch = Mock()
mock_mkdtemp.return_value = datastore
metadata = Mock()
- proc = Mock()
- proc.wait.return_value = 0
- proc.communicate.return_value = MagicMock()
- mock_Popen.return_value = proc
+ exc = Mock()
+ exc.success = True
+ pkc.cmd.run.return_value = exc
spec = lxml.etree.Element("PrivateKey")
pkc.XMLMatch.return_value = spec
def reset():
pkc.XMLMatch.reset_mock()
- mock_Popen.reset_mock()
+ pkc.cmd.reset_mock()
mock_mkdtemp.reset_mock()
mock_rmtree.reset_mock()
@@ -123,10 +59,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
os.path.join(datastore, "privkey"))
pkc.XMLMatch.assert_called_with(metadata)
mock_mkdtemp.assert_called_with()
- self.assertItemsEqual(mock_Popen.call_args[0][0],
- ["ssh-keygen", "-f",
- os.path.join(datastore, "privkey"),
- "-t", "rsa", "-N", ""])
+ pkc.cmd.run.assert_called_with(["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "rsa", "-N", ""])
reset()
lxml.etree.SubElement(spec, "Params", bits="768", type="dsa")
@@ -137,73 +72,15 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
os.path.join(datastore, "privkey"))
pkc.XMLMatch.assert_called_with(metadata)
mock_mkdtemp.assert_called_with()
- self.assertItemsEqual(mock_Popen.call_args[0][0],
- ["ssh-keygen", "-f",
- os.path.join(datastore, "privkey"),
- "-t", "dsa", "-b", "768", "-N", "foo"])
+ pkc.cmd.run.assert_called_with(["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "dsa", "-b", "768", "-N", "foo"])
reset()
- proc.wait.return_value = 1
+ pkc.cmd.run.return_value.success = False
self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata)
mock_rmtree.assert_called_with(datastore)
- def test_get_specificity(self):
- pkc = self.get_obj()
- pkc.XMLMatch = Mock()
-
- metadata = Mock()
-
- def reset():
- pkc.XMLMatch.reset_mock()
- metadata.group_in_category.reset_mock()
-
- category = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.category"
- @patch(category, None)
- def inner():
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(host=metadata.hostname))
- inner()
-
- @patch(category, "foo")
- def inner2():
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(group=metadata.group_in_category.return_value,
- prio=50))
- metadata.group_in_category.assert_called_with("foo")
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
- perhost="true")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(host=metadata.hostname))
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
- category="bar")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(group=metadata.group_in_category.return_value,
- prio=50))
- metadata.group_in_category.assert_called_with("bar")
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
- prio="10")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(group=metadata.group_in_category.return_value,
- prio=10))
- metadata.group_in_category.assert_called_with("foo")
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
- metadata.group_in_category.return_value = ''
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(host=metadata.hostname))
- metadata.group_in_category.assert_called_with("foo")
-
- inner2()
-
@patch("shutil.rmtree")
@patch("%s.open" % builtins)
def test_create_data(self, mock_open, mock_rmtree):
@@ -216,7 +93,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
# the get_specificity() return value is being used
# appropriately, we put some dummy data in it and test for
# that data
- pkc.get_specificity.side_effect = lambda m, s: dict(group="foo")
+ pkc.get_specificity.side_effect = lambda m: dict(group="foo")
pkc._gen_keypair = Mock()
privkey = os.path.join(datastore, "privkey")
pkc._gen_keypair.return_value = privkey
@@ -242,179 +119,15 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
mock_open.return_value.read.side_effect = open_read_rv
reset()
- passphrase = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.passphrase"
-
- @patch(passphrase, None)
- def inner():
- self.assertEqual(pkc.create_data(entry, metadata), "privatekey")
- pkc.XMLMatch.assert_called_with(metadata)
- pkc.get_specificity.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- pkc._gen_keypair.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- self.assertItemsEqual(mock_open.call_args_list,
- [call(privkey + ".pub"), call(privkey)])
- pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
- pkc.pubkey_creator.write_data.assert_called_with(
- "ssh-rsa publickey pubkey.filename\n", group="foo")
- pkc.write_data.assert_called_with("privatekey", group="foo")
- mock_rmtree.assert_called_with(datastore)
-
- inner()
-
- if HAS_CRYPTO:
- @patch(passphrase, "foo")
- @patch("Bcfg2.Encryption.ssl_encrypt")
- @patch("Bcfg2.Encryption.get_algorithm")
- def inner2(mock_get_algorithm, mock_ssl_encrypt):
- reset()
- mock_ssl_encrypt.return_value = "encryptedprivatekey"
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True
- self.assertEqual(pkc.create_data(entry, metadata),
- "encryptedprivatekey")
- pkc.XMLMatch.assert_called_with(metadata)
- pkc.get_specificity.assert_called_with(
- metadata,
- pkc.XMLMatch.return_value)
- pkc._gen_keypair.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- self.assertItemsEqual(mock_open.call_args_list,
- [call(privkey + ".pub"), call(privkey)])
- pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
- pkc.pubkey_creator.write_data.assert_called_with(
- "ssh-rsa publickey pubkey.filename\n", group="foo")
- pkc.write_data.assert_called_with("encryptedprivatekey",
- group="foo", ext=".crypt")
- mock_ssl_encrypt.assert_called_with(
- "privatekey", "foo",
- algorithm=mock_get_algorithm.return_value)
- mock_rmtree.assert_called_with(datastore)
-
- inner2()
-
- def test_Index(self):
- has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False
- TestStructFile.test_Index(self)
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_Index_crypto(self):
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict"
-
- pkc = self.get_obj()
- pkc._decrypt = Mock()
- pkc._decrypt.return_value = 'plaintext'
- pkc.data = '''
-<PrivateKey>
- <Group name="test">
- <Passphrase encrypted="foo">crypted</Passphrase>
- </Group>
- <Group name="test" negate="true">
- <Passphrase>plain</Passphrase>
- </Group>
-</PrivateKey>'''
-
- # test successful decryption
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
- for el in pkc.xdata.xpath("//Crypted"):
- self.assertEqual(el.text, pkc._decrypt.return_value)
-
- # test failed decryption, strict
- pkc._decrypt.reset_mock()
- pkc._decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError, pkc.Index)
-
- # test failed decryption, lax
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax"
- pkc._decrypt.reset_mock()
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
- def test_decrypt(self, mock_bruteforce, mock_get_passphrases,
- mock_get_algorithm, mock_ssl):
- pkc = self.get_obj()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass",
- bar="barpass")
- mock_get_algorithm.return_value = "bf_cbc"
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pkc._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
+ self.assertEqual(pkc.create_data(entry, metadata), "privatekey")
+ pkc.XMLMatch.assert_called_with(metadata)
+ pkc.get_specificity.assert_called_with(metadata)
+ pkc._gen_keypair.assert_called_with(metadata,
+ pkc.XMLMatch.return_value)
+ self.assertItemsEqual(mock_open.call_args_list,
+ [call(privkey + ".pub"), call(privkey)])
+ pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
+ pkc.pubkey_creator.write_data.assert_called_with(
+ "ssh-rsa publickey pubkey.filename\n", group="foo")
+ pkc.write_data.assert_called_with("privatekey", group="foo")
+ mock_rmtree.assert_called_with(datastore)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py
index ef4610fae..f512a6803 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py
@@ -25,8 +25,8 @@ class TestCfgPublicKeyCreator(TestCfgCreator, TestStructFile):
test_obj = CfgPublicKeyCreator
should_monitor = False
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator.get_cfg", Mock())
def get_obj(self, name=None, fam=None):
- Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator.CFG = Mock()
return TestCfgCreator.get_obj(self, name=name)
@patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
index fdfb3a9f7..1b55beded 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
@@ -3,7 +3,7 @@ import sys
import errno
import lxml.etree
import Bcfg2.Options
-from Bcfg2.Compat import walk_packages
+from Bcfg2.Compat import walk_packages, ConfigParser
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugins.Cfg import *
from Bcfg2.Server.Plugin import PluginExecutionError, Specificity
@@ -19,7 +19,7 @@ while path != "/":
path = os.path.dirname(path)
from common import *
from TestPlugin import TestSpecificData, TestEntrySet, TestGroupSpool, \
- TestPullTarget
+ TestPullTarget, TestStructFile
class TestCfgBaseFileMatcher(TestSpecificData):
@@ -152,28 +152,13 @@ class TestCfgInfo(TestCfgBaseFileMatcher):
@patch("Bcfg2.Server.Plugins.Cfg.CfgBaseFileMatcher.__init__")
def test__init(self, mock__init):
ci = self.get_obj("test.txt")
- mock__init.assert_called_with(ci, "test.txt", None, None)
+ mock__init.assert_called_with(ci, "test.txt", None)
def test_bind_info_to_entry(self):
ci = self.get_obj()
self.assertRaises(NotImplementedError,
ci.bind_info_to_entry, Mock(), Mock())
- def test__set_info(self):
- ci = self.get_obj()
- entry = Mock()
- entry.attrib = dict()
-
- info = {"foo": "foo",
- "_bar": "bar",
- "bar:baz=quux": "quux",
- "baz__": "baz",
- "__quux": "quux"}
- ci._set_info(entry, info)
- self.assertItemsEqual(entry.attrib,
- dict([(k, v) for k, v in info.items()
- if not k.startswith("__")]))
-
class TestCfgVerifier(TestCfgBaseFileMatcher):
test_obj = CfgVerifier
@@ -187,6 +172,12 @@ class TestCfgVerifier(TestCfgBaseFileMatcher):
class TestCfgCreator(TestCfgBaseFileMatcher):
test_obj = CfgCreator
path = "/foo/bar/test.txt"
+ should_monitor = False
+
+ def setUp(self):
+ TestCfgBaseFileMatcher.setUp(self)
+ set_setup_default("filemonitor", MagicMock())
+ set_setup_default("cfg_passphrase", None)
def get_obj(self, name=None):
if name is None:
@@ -256,62 +247,122 @@ class TestCfgCreator(TestCfgBaseFileMatcher):
self.assertRaises(CfgCreationError, cc.write_data, data)
+class TestXMLCfgCreator(TestCfgCreator, TestStructFile):
+ test_obj = XMLCfgCreator
+
+ def setUp(self):
+ TestCfgCreator.setUp(self)
+ TestStructFile.setUp(self)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
+ @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent")
+ def test_handle_event(self, mock_HandleEvent, mock_handle_event):
+ cc = self.get_obj()
+ evt = Mock()
+ cc.handle_event(evt)
+ mock_HandleEvent.assert_called_with(cc, evt)
+ mock_handle_event.assert_called_with(cc, evt)
+
+ def test_get_specificity(self):
+ cc = self.get_obj()
+ metadata = Mock()
+
+ def reset():
+ metadata.group_in_category.reset_mock()
+
+ category = "%s.%s.category" % (self.test_obj.__module__,
+ self.test_obj.__name__)
+ @patch(category, None)
+ def inner():
+ cc.xdata = lxml.etree.Element("PrivateKey")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+ inner()
+
+ @patch(category, "foo")
+ def inner2():
+ cc.xdata = lxml.etree.Element("PrivateKey")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=50))
+ metadata.group_in_category.assert_called_with("foo")
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey", perhost="true")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey", category="bar")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=50))
+ metadata.group_in_category.assert_called_with("bar")
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey", prio="10")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=10))
+ metadata.group_in_category.assert_called_with("foo")
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey")
+ metadata.group_in_category.return_value = ''
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+ metadata.group_in_category.assert_called_with("foo")
+
+ inner2()
+
+
class TestCfgDefaultInfo(TestCfgInfo):
test_obj = CfgDefaultInfo
- def get_obj(self, defaults=None):
- if defaults is None:
- defaults = dict()
- return self.test_obj(defaults)
+ def get_obj(self, *_):
+ return self.test_obj()
- @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__")
- def test__init(self, mock__init):
- defaults = Mock()
- cdi = self.get_obj(defaults=defaults)
- mock__init.assert_called_with(cdi, '')
- self.assertEqual(defaults, cdi.defaults)
+ def test__init(self):
+ pass
def test_handle_event(self):
# this CfgInfo handler doesn't handle any events -- it's not
# file-driven, but based on the built-in defaults
pass
- def test_bind_info_to_entry(self):
+ @patch("Bcfg2.Server.Plugin.default_path_metadata")
+ def test_bind_info_to_entry(self, mock_default_path_metadata):
cdi = self.get_obj()
- cdi._set_info = Mock()
- entry = Mock()
+ entry = lxml.etree.Element("Test", name="test")
+ mock_default_path_metadata.return_value = \
+ dict(owner="root", mode="0600")
cdi.bind_info_to_entry(entry, Mock())
- cdi._set_info.assert_called_with(entry, cdi.defaults)
+ self.assertItemsEqual(entry.attrib,
+ dict(owner="root", mode="0600", name="test"))
class TestCfgEntrySet(TestEntrySet):
test_obj = CfgEntrySet
+ def setUp(self):
+ TestEntrySet.setUp(self)
+ set_setup_default("cfg_validation", False)
+ set_setup_default("cfg_handlers", [])
+
def test__init(self):
pass
- def test_handlers(self):
- # this is really really difficult to mock out, so we just get
- # a list of handlers and make sure that it roughly matches
- # what's on the filesystem
- expected = []
- for submodule in walk_packages(path=Bcfg2.Server.Plugins.Cfg.__path__,
- prefix="Bcfg2.Server.Plugins.Cfg."):
- expected.append(submodule[1].rsplit('.', 1)[-1])
- self.assertItemsEqual(expected, [h.__name__ for h in handlers()])
-
- @patch("Bcfg2.Server.Plugins.Cfg.handlers")
- def test_handle_event(self, mock_handlers):
+ def test_handle_event(self):
eset = self.get_obj()
eset.entry_init = Mock()
- mock_handlers.return_value = [Mock(), Mock(), Mock()]
- for hdlr in mock_handlers.return_value:
+ Bcfg2.Options.setup.cfg_handlers = [Mock(), Mock(), Mock()]
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.__name__ = "handler"
eset.entries = dict()
def reset():
eset.entry_init.reset_mock()
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.reset_mock()
# test that a bogus deleted event is discarded
@@ -321,18 +372,19 @@ class TestCfgEntrySet(TestEntrySet):
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
self.assertItemsEqual(eset.entries, dict())
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
# test creation of a new file
for action in ["exists", "created", "changed"]:
+ print("Testing handling of %s events" % action)
evt = Mock()
evt.code2str.return_value = action
evt.filename = os.path.join(datastore, "test.txt")
# test with no handler that handles
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.handles.return_value = False
hdlr.ignore.return_value = False
@@ -340,16 +392,16 @@ class TestCfgEntrySet(TestEntrySet):
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
self.assertItemsEqual(eset.entries, dict())
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.handles.assert_called_with(evt, basename=eset.path)
hdlr.ignore.assert_called_with(evt, basename=eset.path)
# test with a handler that handles the entry
reset()
- mock_handlers.return_value[-1].handles.return_value = True
+ Bcfg2.Options.setup.cfg_handlers[-1].handles.return_value = True
eset.handle_event(evt)
- eset.entry_init.assert_called_with(evt, mock_handlers.return_value[-1])
- for hdlr in mock_handlers.return_value:
+ eset.entry_init.assert_called_with(evt, Bcfg2.Options.setup.cfg_handlers[-1])
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.handles.assert_called_with(evt, basename=eset.path)
if not hdlr.return_value:
hdlr.ignore.assert_called_with(evt, basename=eset.path)
@@ -357,14 +409,14 @@ class TestCfgEntrySet(TestEntrySet):
# test with a handler that ignores the entry before one
# that handles it
reset()
- mock_handlers.return_value[0].ignore.return_value = True
+ Bcfg2.Options.setup.cfg_handlers[0].ignore.return_value = True
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
- mock_handlers.return_value[0].handles.assert_called_with(evt,
- basename=eset.path)
- mock_handlers.return_value[0].ignore.assert_called_with(evt,
- basename=eset.path)
- for hdlr in mock_handlers.return_value[1:]:
+ Bcfg2.Options.setup.cfg_handlers[0].handles.assert_called_with(
+ evt, basename=eset.path)
+ Bcfg2.Options.setup.cfg_handlers[0].ignore.assert_called_with(
+ evt, basename=eset.path)
+ for hdlr in Bcfg2.Options.setup.cfg_handlers[1:]:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
@@ -376,7 +428,7 @@ class TestCfgEntrySet(TestEntrySet):
eset.entries[evt.filename] = Mock()
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
eset.entries[evt.filename].handle_event.assert_called_with(evt)
@@ -386,7 +438,7 @@ class TestCfgEntrySet(TestEntrySet):
evt.code2str.return_value = "deleted"
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
self.assertItemsEqual(eset.entries, dict())
@@ -438,15 +490,15 @@ class TestCfgEntrySet(TestEntrySet):
@patch("Bcfg2.Server.Plugins.Cfg.u_str")
@patch("Bcfg2.Server.Plugins.Cfg.b64encode")
def test_bind_entry(self, mock_b64encode, mock_u_str):
- Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False)
-
mock_u_str.side_effect = lambda x: x
+ Bcfg2.Options.setup.cfg_validation = False
eset = self.get_obj()
eset.bind_info_to_entry = Mock()
eset._generate_data = Mock()
eset.get_handlers = Mock()
eset._validate_data = Mock()
+ eset.setup = dict(validate=False)
def reset():
mock_b64encode.reset_mock()
@@ -524,7 +576,7 @@ class TestCfgEntrySet(TestEntrySet):
# test successful validation
entry = reset()
- Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True
+ Bcfg2.Options.setup.cfg_validation = True
bound = eset.bind_entry(entry, metadata)
eset.bind_info_to_entry.assert_called_with(entry, metadata)
eset._generate_data.assert_called_with(entry, metadata)
@@ -546,16 +598,16 @@ class TestCfgEntrySet(TestEntrySet):
def test_get_handlers(self):
eset = self.get_obj()
eset.entries['test1.txt'] = CfgInfo("test1.txt")
- eset.entries['test2.txt'] = CfgGenerator("test2.txt", Mock(), None)
+ eset.entries['test2.txt'] = CfgGenerator("test2.txt", Mock())
eset.entries['test2.txt'].specific.matches.return_value = True
eset.entries['test3.txt'] = CfgInfo("test3.txt")
- eset.entries['test4.txt'] = CfgGenerator("test4.txt", Mock(), None)
+ eset.entries['test4.txt'] = CfgGenerator("test4.txt", Mock())
eset.entries['test4.txt'].specific.matches.return_value = False
- eset.entries['test5.txt'] = CfgGenerator("test5.txt", Mock(), None)
+ eset.entries['test5.txt'] = CfgGenerator("test5.txt", Mock())
eset.entries['test5.txt'].specific.matches.return_value = True
- eset.entries['test6.txt'] = CfgVerifier("test6.txt", Mock(), None)
+ eset.entries['test6.txt'] = CfgVerifier("test6.txt", Mock())
eset.entries['test6.txt'].specific.matches.return_value = True
- eset.entries['test7.txt'] = CfgFilter("test7.txt", Mock(), None)
+ eset.entries['test7.txt'] = CfgFilter("test7.txt", Mock())
eset.entries['test7.txt'].specific.matches.return_value = False
def reset():
@@ -603,24 +655,24 @@ class TestCfgEntrySet(TestEntrySet):
if hasattr(entry.specific.matches, "called"):
self.assertFalse(entry.specific.matches.called)
- def test_bind_info_to_entry(self):
- default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgDefaultInfo")
+ def test_bind_info_to_entry(self, mock_DefaultInfo):
eset = self.get_obj()
eset.get_handlers = Mock()
eset.get_handlers.return_value = []
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock()
metadata = Mock()
def reset():
eset.get_handlers.reset_mock()
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock()
+ mock_DefaultInfo.reset_mock()
return lxml.etree.Element("Path", name="/test.txt")
# test with no info handlers
entry = reset()
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
self.assertEqual(entry.get("type"), "file")
# test with one info handler
@@ -629,7 +681,8 @@ class TestCfgEntrySet(TestEntrySet):
eset.get_handlers.return_value = [handler]
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
handler.bind_info_to_entry.assert_called_with(entry, metadata)
self.assertEqual(entry.get("type"), "file")
@@ -639,7 +692,8 @@ class TestCfgEntrySet(TestEntrySet):
eset.get_handlers.return_value = handlers
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
# we don't care which handler gets called as long as exactly
# one of them does
called = 0
@@ -650,8 +704,6 @@ class TestCfgEntrySet(TestEntrySet):
self.assertEqual(called, 1)
self.assertEqual(entry.get("type"), "file")
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info
-
def test_create_data(self):
eset = self.get_obj()
eset.best_matching = Mock()
@@ -753,35 +805,16 @@ class TestCfgEntrySet(TestEntrySet):
class TestCfg(TestGroupSpool, TestPullTarget):
test_obj = Cfg
+ def setUp(self):
+ TestGroupSpool.setUp(self)
+ TestPullTarget.setUp(self)
+ set_setup_default("cfg_handlers", [])
+
def get_obj(self, core=None):
if core is None:
core = Mock()
- core.setup = MagicMock()
return TestGroupSpool.get_obj(self, core=core)
- @patch("Bcfg2.Server.Plugin.GroupSpool.__init__")
- @patch("Bcfg2.Server.Plugin.PullTarget.__init__")
- def test__init(self, mock_pulltarget_init, mock_groupspool_init):
- core = Mock()
- core.setup = MagicMock()
- cfg = self.test_obj(core, datastore)
- mock_pulltarget_init.assert_called_with(cfg)
- mock_groupspool_init.assert_called_with(cfg, core, datastore)
- core.setup.add_option.assert_called_with("validate",
- Bcfg2.Options.CFG_VALIDATION)
- core.setup.reparse.assert_called_with()
-
- core.reset_mock()
- core.setup.reset_mock()
- mock_pulltarget_init.reset_mock()
- mock_groupspool_init.reset_mock()
- core.setup.__contains__.return_value = True
- cfg = self.test_obj(core, datastore)
- mock_pulltarget_init.assert_called_with(cfg)
- mock_groupspool_init.assert_called_with(cfg, core, datastore)
- self.assertFalse(core.setup.add_option.called)
- self.assertFalse(core.setup.reparse.called)
-
def test_has_generator(self):
cfg = self.get_obj()
cfg.entries = dict()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py
new file mode 100644
index 000000000..537ceb4ff
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py
@@ -0,0 +1,60 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Decisions import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestStructFile, TestPlugin, TestDecision
+
+
+class TestDecisionFile(TestStructFile):
+ test_obj = DecisionFile
+
+ def test_get_decisions(self):
+ df = self.get_obj()
+ metadata = Mock()
+
+ df.xdata = None
+ self.assertItemsEqual(df.get_decisions(metadata), [])
+
+ df.xdata = lxml.etree.Element("Decisions")
+ df.XMLMatch = Mock()
+ df.XMLMatch.return_value = lxml.etree.Element("Decisions")
+ lxml.etree.SubElement(df.XMLMatch.return_value,
+ "Decision", type="Service", name='*')
+ lxml.etree.SubElement(df.XMLMatch.return_value,
+ "Decision", type="Path",
+ name='/etc/apt/apt.conf')
+
+ self.assertItemsEqual(df.get_decisions(metadata),
+ [("Service", '*'),
+ ("Path", '/etc/apt/apt.conf')])
+ df.XMLMatch.assert_called_with(metadata)
+
+
+class TestDecisions(TestPlugin, TestDecision):
+ test_obj = Decisions
+
+ def test_GetDecisions(self):
+ d = self.get_obj()
+ d.whitelist = Mock()
+ d.blacklist = Mock()
+ metadata = Mock()
+
+ self.assertEqual(d.GetDecisions(metadata, "whitelist"),
+ d.whitelist.get_decision.return_value)
+ d.whitelist.get_decision.assert_called_with(metadata)
+
+ self.assertEqual(d.GetDecisions(metadata, "blacklist"),
+ d.blacklist.get_decision.return_value)
+ d.blacklist.get_decision.assert_called_with(metadata)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py
index 7be3d8e84..9b4a6af88 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py
@@ -1,5 +1,6 @@
import os
import sys
+import copy
import lxml.etree
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugins.Defaults import *
@@ -62,3 +63,31 @@ class TestDefaults(TestRules, TestGoalValidator):
def test__regex_enabled(self):
r = self.get_obj()
self.assertTrue(r._regex_enabled)
+
+ def _do_test(self, name, groups=None):
+ if groups is None:
+ groups = []
+ d = self.get_obj()
+ metadata = Mock(groups=groups)
+ config = lxml.etree.Element("Configuration")
+ struct = lxml.etree.SubElement(config, "Bundle", name=name)
+ entry = copy.deepcopy(self.abstract[name])
+ struct.append(entry)
+ d.validate_goals(metadata, config)
+ self.assertXMLEqual(entry, self.concrete[name])
+
+ def _do_test_failure(self, name, groups=None, handles=None):
+ if groups is None:
+ groups = []
+ d = self.get_obj()
+ metadata = Mock(groups=groups)
+ config = lxml.etree.Element("Configuration")
+ struct = lxml.etree.SubElement(config, "Bundle", name=name)
+ orig = copy.deepcopy(self.abstract[name])
+ entry = copy.deepcopy(self.abstract[name])
+ struct.append(entry)
+ d.validate_goals(metadata, config)
+ self.assertXMLEqual(entry, orig)
+
+ def test_regex(self):
+ self._do_test('regex')
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
index a07fffba1..d3fa15236 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
@@ -6,7 +6,6 @@ import socket
import lxml.etree
import Bcfg2.Server
import Bcfg2.Server.Plugin
-from Bcfg2.Server.Plugins.Metadata import *
from mock import Mock, MagicMock, patch
# add all parent testsuite directories to sys.path to allow (most)
@@ -19,9 +18,13 @@ while path != "/":
break
path = os.path.dirname(path)
from common import *
+from Bcfg2.Server.Plugins.Metadata import load_django_models
from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \
TestClientRunHooks, TestDatabaseBacked
+load_django_models()
+from Bcfg2.Server.Plugins.Metadata import *
+
def get_clients_test_tree():
return lxml.etree.XML('''
@@ -88,18 +91,18 @@ def get_groups_test_tree():
</Groups>''').getroottree()
-def get_metadata_object(core=None, watch_clients=False, use_db=False):
+def get_metadata_object(core=None):
if core is None:
core = Mock()
- core.setup = MagicMock()
core.metadata_cache = MagicMock()
- core.setup.cfp.getboolean = Mock(return_value=use_db)
+ set_setup_default("password")
@patchIf(not isinstance(os.makedirs, Mock), "os.makedirs", Mock())
@patchIf(not isinstance(lxml.etree.Element, Mock),
"lxml.etree.Element", Mock())
+
def inner():
- return Metadata(core, datastore, watch_clients=watch_clients)
+ return Metadata(core)
return inner()
@@ -108,117 +111,117 @@ class TestMetadataDB(DBModelTestCase):
models = [MetadataClientModel]
-if HAS_DJANGO or can_skip:
- class TestClientVersions(TestDatabaseBacked):
- test_clients = dict(client1="1.2.0",
- client2="1.2.2",
- client3="1.3.0pre1",
- client4="1.1.0",
- client5=None,
- client6=None)
-
- @skipUnless(HAS_DJANGO, "Django not found")
- def setUp(self):
- self.test_obj = ClientVersions
- syncdb(TestMetadataDB)
- for client, version in self.test_clients.items():
- MetadataClientModel(hostname=client, version=version).save()
-
- def test__contains(self):
- v = self.get_obj()
- self.assertIn("client1", v)
- self.assertIn("client5", v)
- self.assertNotIn("client__contains", v)
-
- def test_keys(self):
- v = self.get_obj()
- self.assertItemsEqual(self.test_clients.keys(), v.keys())
-
- def test__setitem(self):
- v = self.get_obj()
-
- # test setting version of existing client
- v["client1"] = "1.2.3"
- self.assertIn("client1", v)
- self.assertEqual(v['client1'], "1.2.3")
- client = MetadataClientModel.objects.get(hostname="client1")
- self.assertEqual(client.version, "1.2.3")
-
- # test adding new client
- new = "client__setitem"
- v[new] = "1.3.0"
- self.assertIn(new, v)
- self.assertEqual(v[new], "1.3.0")
- client = MetadataClientModel.objects.get(hostname=new)
- self.assertEqual(client.version, "1.3.0")
-
- # test adding new client with no version
- new2 = "client__setitem_2"
- v[new2] = None
- self.assertIn(new2, v)
- self.assertEqual(v[new2], None)
- client = MetadataClientModel.objects.get(hostname=new2)
- self.assertEqual(client.version, None)
-
- def test__getitem(self):
- v = self.get_obj()
-
- # test getting existing client
- self.assertEqual(v['client2'], "1.2.2")
- self.assertIsNone(v['client5'])
-
- # test exception on nonexistent client
- expected = KeyError
- try:
- v['clients__getitem']
- except expected:
- pass
- except:
- err = sys.exc_info()[1]
- self.assertFalse(True, "%s raised instead of %s" %
- (err.__class__.__name__,
- expected.__class__.__name__))
- else:
- self.assertFalse(True,
- "%s not raised" % expected.__class__.__name__)
+class TestClientVersions(TestDatabaseBacked):
+ test_clients = dict(client1="1.2.0",
+ client2="1.2.2",
+ client3="1.3.0pre1",
+ client4="1.1.0",
+ client5=None,
+ client6=None)
+
+ @skipUnless(HAS_DJANGO, "Django not found")
+ def setUp(self):
+ TestDatabaseBacked.setUp(self)
+ self.test_obj = ClientVersions
+ syncdb(TestMetadataDB)
+ for client, version in self.test_clients.items():
+ MetadataClientModel(hostname=client, version=version).save()
+
+ def test__contains(self):
+ v = self.get_obj()
+ self.assertIn("client1", v)
+ self.assertIn("client5", v)
+ self.assertNotIn("client__contains", v)
+
+ def test_keys(self):
+ v = self.get_obj()
+ self.assertItemsEqual(self.test_clients.keys(), v.keys())
+
+ def test__setitem(self):
+ v = self.get_obj()
+
+ # test setting version of existing client
+ v["client1"] = "1.2.3"
+ self.assertIn("client1", v)
+ self.assertEqual(v['client1'], "1.2.3")
+ client = MetadataClientModel.objects.get(hostname="client1")
+ self.assertEqual(client.version, "1.2.3")
+
+ # test adding new client
+ new = "client__setitem"
+ v[new] = "1.3.0"
+ self.assertIn(new, v)
+ self.assertEqual(v[new], "1.3.0")
+ client = MetadataClientModel.objects.get(hostname=new)
+ self.assertEqual(client.version, "1.3.0")
+
+ # test adding new client with no version
+ new2 = "client__setitem_2"
+ v[new2] = None
+ self.assertIn(new2, v)
+ self.assertEqual(v[new2], None)
+ client = MetadataClientModel.objects.get(hostname=new2)
+ self.assertEqual(client.version, None)
+
+ def test__getitem(self):
+ v = self.get_obj()
+
+ # test getting existing client
+ self.assertEqual(v['client2'], "1.2.2")
+ self.assertIsNone(v['client5'])
+
+ # test exception on nonexistent client
+ expected = KeyError
+ try:
+ v['clients__getitem']
+ except expected:
+ pass
+ except:
+ err = sys.exc_info()[1]
+ self.assertFalse(True, "%s raised instead of %s" %
+ (err.__class__.__name__,
+ expected.__class__.__name__))
+ else:
+ self.assertFalse(True,
+ "%s not raised" % expected.__class__.__name__)
- def test__len(self):
- v = self.get_obj()
- self.assertEqual(len(v), MetadataClientModel.objects.count())
+ def test__len(self):
+ v = self.get_obj()
+ self.assertEqual(len(v), MetadataClientModel.objects.count())
- def test__iter(self):
- v = self.get_obj()
- self.assertItemsEqual([h for h in iter(v)], v.keys())
+ def test__iter(self):
+ v = self.get_obj()
+ self.assertItemsEqual([h for h in iter(v)], v.keys())
- def test__delitem(self):
- v = self.get_obj()
+ def test__delitem(self):
+ v = self.get_obj()
- # test adding new client
- new = "client__delitem"
- v[new] = "1.3.0"
+ # test adding new client
+ new = "client__delitem"
+ v[new] = "1.3.0"
- del v[new]
- self.assertIn(new, v)
- self.assertIsNone(v[new])
+ del v[new]
+ self.assertIn(new, v)
+ self.assertIsNone(v[new])
class TestXMLMetadataConfig(TestXMLFileBacked):
test_obj = XMLMetadataConfig
path = os.path.join(datastore, 'Metadata', 'clients.xml')
- def get_obj(self, basefile="clients.xml", core=None, watch_clients=False):
- self.metadata = get_metadata_object(core=core,
- watch_clients=watch_clients)
+ def get_obj(self, basefile="clients.xml", core=None):
+ self.metadata = get_metadata_object(core=core)
@patchIf(not isinstance(lxml.etree.Element, Mock),
"lxml.etree.Element", Mock())
def inner():
- return XMLMetadataConfig(self.metadata, watch_clients, basefile)
+ return XMLMetadataConfig(self.metadata, basefile)
return inner()
+ @patch("Bcfg2.Server.FileMonitor.get_fam", Mock())
def test__init(self):
xmc = self.get_obj()
- self.assertEqual(self.metadata.core.fam, xmc.fam)
- self.assertFalse(xmc.fam.AddMonitor.called)
+ self.assertNotIn(call(xmc.basefile),
+ xmc.fam.AddMonitor.call_args_list)
def test_xdata(self):
config = self.get_obj()
@@ -262,20 +265,15 @@ class TestXMLMetadataConfig(TestXMLFileBacked):
self.assertEqual(config.base_xdata, "<test/>")
def test_add_monitor(self):
- core = MagicMock()
- config = self.get_obj(core=core)
+ config = self.get_obj()
+ config.fam = Mock()
fname = "test.xml"
fpath = os.path.join(self.metadata.data, fname)
config.extras = []
config.add_monitor(fpath)
- self.assertFalse(core.fam.AddMonitor.called)
- self.assertEqual(config.extras, [fpath])
-
- config = self.get_obj(core=core, watch_clients=True)
- config.add_monitor(fpath)
- core.fam.AddMonitor.assert_called_with(fpath, config.metadata)
+ config.fam.AddMonitor.assert_called_with(fpath, config.metadata)
self.assertItemsEqual(config.extras, [fpath])
def test_Index(self):
@@ -480,11 +478,16 @@ class TestClientMetadata(Bcfg2TestCase):
class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
test_obj = Metadata
- use_db = False
- def get_obj(self, core=None, watch_clients=False):
- return get_metadata_object(core=core, watch_clients=watch_clients,
- use_db=self.use_db)
+ def setUp(self):
+ _TestMetadata.setUp(self)
+ TestClientRunHooks.setUp(self)
+ TestDatabaseBacked.setUp(self)
+ Bcfg2.Options.setup.metadata_db = False
+ Bcfg2.Options.setup.authentication = "cert+password"
+
+ def get_obj(self, core=None):
+ return get_metadata_object(core=core)
@skipUnless(HAS_DJANGO, "Django not found")
def test__use_db(self):
@@ -504,33 +507,24 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
client_name = "%s%s" % (prefix, i)
return client_name
- def test__init(self):
- # test with watch_clients=False
+ @patch("Bcfg2.Server.FileMonitor.get_fam")
+ def test__init(self, mock_get_fam):
core = MagicMock()
metadata = self.get_obj(core=core)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.ClientRunHooks)
- self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig)
- self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig)
- self.assertIsInstance(metadata.query, MetadataQuery)
- self.assertEqual(metadata.states, dict())
-
- # test with watch_clients=True
- core.fam = MagicMock()
- metadata = self.get_obj(core=core, watch_clients=True)
self.assertEqual(len(metadata.states), 2)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "clients.xml"),
- metadata)
-
- core.fam.reset_mock()
- core.fam.AddMonitor = Mock(side_effect=IOError)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "groups.xml"),
+ metadata)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "clients.xml"),
+ metadata)
+
+ mock_get_fam.reset_mock()
+ fam = Mock()
+ fam.AddMonitor = Mock(side_effect=IOError)
+ mock_get_fam.return_value = fam
self.assertRaises(Bcfg2.Server.Plugin.PluginInitError,
- self.get_obj, core=core, watch_clients=True)
+ self.get_obj, core=core)
@patch('os.makedirs', Mock())
@patch('%s.open' % builtins)
@@ -591,6 +585,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_add_group(self):
metadata = self.get_obj()
metadata.groups_xml.write = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -623,6 +618,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_update_group(self):
metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -640,6 +636,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_remove_group(self):
metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -655,6 +652,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_add_bundle(self):
metadata = self.get_obj()
metadata.groups_xml.write = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -678,6 +676,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_remove_bundle(self):
metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -693,6 +692,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_add_client(self):
metadata = self.get_obj()
metadata.clients_xml.write = Mock()
+ metadata.clients_xml.load_xml = Mock()
metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree()
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
@@ -727,6 +727,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_update_client(self):
metadata = self.get_obj()
metadata.clients_xml.write_xml = Mock()
+ metadata.clients_xml.load_xml = Mock()
metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree())
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
@@ -762,7 +763,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
metadata.clients_xml.xdata = copy.deepcopy(get_clients_test_tree())
metadata._handle_clients_xml_event(Mock())
- if not self.use_db:
+ if not Bcfg2.Options.setup.metadata_db:
self.assertItemsEqual(metadata.clients,
dict([(c.get("name"), c.get("profile"))
for c in get_clients_test_tree().findall("//Client")]))
@@ -1251,10 +1252,13 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
class TestMetadataBase(TestMetadata):
""" base test object for testing Metadata with database enabled """
__test__ = False
- use_db = True
@skipUnless(HAS_DJANGO, "Django not found")
def setUp(self):
+ _TestMetadata.setUp(self)
+ TestClientRunHooks.setUp(self)
+ TestDatabaseBacked.setUp(self)
+ Bcfg2.Options.setup.metadata_db = True
syncdb(TestMetadataDB)
def load_clients_data(self, metadata=None, xdata=None):
@@ -1274,25 +1278,24 @@ class TestMetadataBase(TestMetadata):
return client_name
@patch('os.path.exists')
- def test__init(self, mock_exists):
- core = MagicMock()
- core.fam = Mock()
+ @patch('Bcfg2.Server.FileMonitor.get_fam')
+ def test__init(self, mock_get_fam, mock_exists):
mock_exists.return_value = False
- metadata = self.get_obj(core=core, watch_clients=True)
+ metadata = self.get_obj()
self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked)
- core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
+ mock_get_fam.return_value.AddMonitor.assert_called_with(
+ os.path.join(metadata.data, "groups.xml"),
+ metadata)
mock_exists.return_value = True
- core.fam.reset_mock()
- metadata = self.get_obj(core=core, watch_clients=True)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "clients.xml"),
- metadata)
+ mock_get_fam.reset_mock()
+ metadata = self.get_obj()
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "groups.xml"),
+ metadata)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "clients.xml"),
+ metadata)
def test_add_group(self):
pass
@@ -1356,12 +1359,7 @@ class TestMetadataBase(TestMetadata):
class TestMetadata_NoClientsXML(TestMetadataBase):
""" test Metadata without a clients.xml. we have to disable or
override tests that rely on client options """
- # only run these tests if it's possible to skip tests or if we
- # have django. otherwise they'll all get run because our fake
- # skipping decorators for python < 2.7 won't work when they
- # decorate setUp()
- if can_skip or HAS_DJANGO:
- __test__ = True
+ __test__ = True
def load_groups_data(self, metadata=None, xdata=None):
if metadata is None:
@@ -1525,17 +1523,13 @@ class TestMetadata_NoClientsXML(TestMetadataBase):
class TestMetadata_ClientsXML(TestMetadataBase):
""" test Metadata with a clients.xml. """
- # only run these tests if it's possible to skip tests or if we
- # have django. otherwise they'll all get run because our fake
- # skipping decorators for python < 2.7 won't work when they
- # decorate setUp()
- if can_skip or HAS_DJANGO:
- __test__ = True
+ __test__ = True
def load_clients_data(self, metadata=None, xdata=None):
if metadata is None:
metadata = self.get_obj()
- metadata.core.fam = Mock()
+ fam = Bcfg2.Server.FileMonitor._FAM
+ Bcfg2.Server.FileMonitor._FAM = MagicMock()
@patchIf(not isinstance(lxml.etree.Element, Mock),
"lxml.etree.Element", Mock())
def inner():
@@ -1543,5 +1537,7 @@ class TestMetadata_ClientsXML(TestMetadataBase):
inner()
metadata = TestMetadata.load_clients_data(self, metadata=metadata,
xdata=xdata)
- return TestMetadataBase.load_clients_data(self, metadata=metadata,
- xdata=xdata)
+ rv = TestMetadataBase.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+ Bcfg2.Server.FileMonitor._FAM = fam
+ return rv
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
index a87628eaf..68313e6fb 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
@@ -1,7 +1,7 @@
import os
import sys
-import copy
-import time
+import shutil
+import tempfile
import lxml.etree
import Bcfg2.version
import Bcfg2.Server
@@ -18,54 +18,22 @@ while path != "/":
break
path = os.path.dirname(path)
from common import *
-from Bcfg2.Server.Plugins.Probes import *
-from TestPlugin import TestEntrySet, TestProbing, TestConnector, \
+from Bcfg2.Server.Plugins.Probes import load_django_models
+from TestPlugin import TestEntrySet, TestPlugin, \
TestDatabaseBacked
-# test data for JSON and YAML tests
-test_data = dict(a=1, b=[1, 2, 3], c="test",
- d=dict(a=1, b=dict(a=1), c=(1, "2", 3)))
-
-
-class FakeElement(lxml.etree._Element):
- getroottree = Mock()
-
- def __init__(self, el):
- self._element = el
-
- def __getattribute__(self, attr):
- el = lxml.etree._Element.__getattribute__(self,
- '__dict__')['_element']
- if attr == "getroottree":
- return FakeElement.getroottree
- elif attr == "_element":
- return el
- else:
- return getattr(el, attr)
-
-
-class StoringElement(object):
- OriginalElement = copy.copy(lxml.etree.Element)
-
- def __init__(self):
- self.element = None
- self.return_value = None
-
- def __call__(self, *args, **kwargs):
- self.element = self.OriginalElement(*args, **kwargs)
- self.return_value = FakeElement(self.element)
- return self.return_value
+load_django_models()
+from Bcfg2.Server.Plugins.Probes import *
+if HAS_JSON:
+ json = json
-class StoringSubElement(object):
- OriginalSubElement = copy.copy(lxml.etree.SubElement)
+if HAS_YAML:
+ yaml = yaml
- def __call__(self, parent, tag, **kwargs):
- try:
- return self.OriginalSubElement(parent._element, tag,
- **kwargs)
- except AttributeError:
- return self.OriginalSubElement(parent, tag, **kwargs)
+# test data for JSON and YAML tests
+test_data = dict(a=1, b=[1, 2, 3], c="test",
+ d=dict(a=1, b=dict(a=1), c=(1, "2", 3)))
class FakeList(list):
@@ -74,19 +42,8 @@ class FakeList(list):
class TestProbesDB(DBModelTestCase):
if HAS_DJANGO:
- models = [ProbesGroupsModel, ProbesDataModel]
-
-
-class TestClientProbeDataSet(Bcfg2TestCase):
- def test__init(self):
- ds = ClientProbeDataSet()
- self.assertLessEqual(ds.timestamp, time.time())
- self.assertIsInstance(ds, dict)
- self.assertNotIn("timestamp", ds)
-
- ds = ClientProbeDataSet(timestamp=123)
- self.assertEqual(ds.timestamp, 123)
- self.assertNotIn("timestamp", ds)
+ models = [ProbesGroupsModel,
+ ProbesDataModel]
class TestProbeData(Bcfg2TestCase):
@@ -108,19 +65,22 @@ class TestProbeData(Bcfg2TestCase):
def test_xdata(self):
xdata = lxml.etree.Element("test")
lxml.etree.SubElement(xdata, "test2")
- data = ProbeData(lxml.etree.tostring(xdata,
- xml_declaration=False).decode('UTF-8'))
+ data = ProbeData(
+ lxml.etree.tostring(xdata,
+ xml_declaration=False).decode('UTF-8'))
self.assertIsNotNone(data.xdata)
self.assertIsNotNone(data.xdata.find("test2"))
- @skipUnless(HAS_JSON, "JSON libraries not found, skipping JSON tests")
+ @skipUnless(HAS_JSON,
+ "JSON libraries not found, skipping JSON tests")
def test_json(self):
jdata = json.dumps(test_data)
data = ProbeData(jdata)
self.assertIsNotNone(data.json)
self.assertItemsEqual(test_data, data.json)
- @skipUnless(HAS_YAML, "YAML libraries not found, skipping YAML tests")
+ @skipUnless(HAS_YAML,
+ "YAML libraries not found, skipping YAML tests")
def test_yaml(self):
jdata = yaml.dump(test_data)
data = ProbeData(jdata)
@@ -134,22 +94,20 @@ class TestProbeSet(TestEntrySet):
ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"]
bogus_names = ["test.py"]
- def get_obj(self, path=datastore, fam=None, encoding=None,
+ def get_obj(self, path=datastore, encoding=None,
plugin_name="Probes", basename=None):
# get_obj() accepts the basename argument, accepted by the
# parent get_obj() method, and just throws it away, since
# ProbeSet uses a regex for the "basename"
- if fam is None:
- fam = Mock()
- rv = self.test_obj(path, fam, encoding, plugin_name)
+ rv = self.test_obj(path, plugin_name)
rv.entry_type = MagicMock()
return rv
- def test__init(self):
- fam = Mock()
- ps = self.get_obj(fam=fam)
+ @patch("Bcfg2.Server.FileMonitor.get_fam")
+ def test__init(self, mock_get_fam):
+ ps = self.get_obj()
self.assertEqual(ps.plugin_name, "Probes")
- fam.AddMonitor.assert_called_with(datastore, ps)
+ mock_get_fam.return_value.AddMonitor.assert_called_with(datastore, ps)
TestEntrySet.test__init(self)
def test_HandleEvent(self):
@@ -242,385 +200,164 @@ group-specific"""
assert False, "Strange probe found in get_probe_data() return"
-class TestProbes(TestProbing, TestConnector, TestDatabaseBacked):
+class TestProbes(TestPlugin):
test_obj = Probes
- def get_obj(self, core=None):
- return TestDatabaseBacked.get_obj(self, core=core)
-
- def get_test_probedata(self):
- test_xdata = lxml.etree.Element("test")
- lxml.etree.SubElement(test_xdata, "test", foo="foo")
- rv = dict()
- rv["foo.example.com"] = ClientProbeDataSet(timestamp=time.time())
- rv["foo.example.com"]["xml"] = \
- ProbeData(lxml.etree.tostring(test_xdata,
- xml_declaration=False).decode('UTF-8'))
- rv["foo.example.com"]["text"] = ProbeData("freeform text")
- rv["foo.example.com"]["multiline"] = ProbeData("""multiple
+ test_xdata = lxml.etree.Element("test")
+ lxml.etree.SubElement(test_xdata, "test", foo="foo")
+ test_xdoc = lxml.etree.tostring(test_xdata,
+ xml_declaration=False).decode('UTF-8')
+
+ data = dict()
+ data['xml'] = "group:group\n" + test_xdoc
+ data['text'] = "freeform text"
+ data['multiline'] = """multiple
lines
of
freeform
text
-""")
- rv["bar.example.com"] = ClientProbeDataSet(timestamp=time.time())
- rv["bar.example.com"]["empty"] = ProbeData("")
- if HAS_JSON:
- rv["bar.example.com"]["json"] = ProbeData(json.dumps(test_data))
- if HAS_YAML:
- rv["bar.example.com"]["yaml"] = ProbeData(yaml.dump(test_data))
- return rv
-
- def get_test_cgroups(self):
- return {"foo.example.com": ["group", "group with spaces",
- "group-with-dashes"],
- "bar.example.com": []}
-
- def get_probes_object(self, use_db=False, load_data=None):
- core = MagicMock()
- core.setup.cfp.getboolean = Mock()
- core.setup.cfp.getboolean.return_value = use_db
- if load_data is None:
- load_data = MagicMock()
- # we have to patch load_data() in a funny way because
- # different versions of Mock have different scopes for
- # patching. in some versions, a patch applied to
- # get_probes_object() would only apply to that function, while
- # in others it would also apply to the calling function (e.g.,
- # test__init(), which relies on being able to check the calls
- # of load_data(), and thus on load_data() being consistently
- # mocked)
- @patch("%s.%s.load_data" % (self.test_obj.__module__,
- self.test_obj.__name__), new=load_data)
- def inner():
- return self.get_obj(core)
-
- return inner()
+group:group-with-dashes
+group: group:with:colons
+"""
+ data['empty'] = ''
+ data['almost_empty'] = 'group: other_group'
+ if HAS_JSON:
+ data['json'] = json.dumps(test_data)
+ if HAS_YAML:
+ data['yaml'] = yaml.dump(test_data)
+
+ def setUp(self):
+ Bcfg2TestCase.setUp(self)
+ set_setup_default("probes_db")
+ self.datastore = None
+ Bcfg2.Server.Cache.expire("Probes")
+
+ def tearDown(self):
+ if self.datastore is not None:
+ shutil.rmtree(self.datastore)
+ self.datastore = None
+ Bcfg2.Options.setup.repository = datastore
+
+ def get_obj(self):
+ if not Bcfg2.Options.setup.probes_db:
+ # actually use a real datastore so we can read and write
+ # probed.xml
+ if self.datastore is None:
+ self.datastore = tempfile.mkdtemp()
+ Bcfg2.Options.setup.repository = self.datastore
+ datadir = os.path.join(self.datastore, self.test_obj.name)
+ if not os.path.exists(datadir):
+ os.makedirs(datadir)
+ return TestPlugin.get_obj(self)
def test__init(self):
- mock_load_data = Mock()
- probes = self.get_probes_object(load_data=mock_load_data)
- probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore,
- probes.name),
- probes.probes)
- mock_load_data.assert_any_call()
- self.assertEqual(probes.probedata, ClientProbeDataSet())
- self.assertEqual(probes.cgroups, dict())
-
- @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", Mock())
- def test__use_db(self):
- probes = self.get_probes_object()
- self.assertFalse(probes._use_db)
- probes.core.setup.cfp.getboolean.assert_called_with("probes",
- "use_database",
- default=False)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock())
- def test_write_data_xml(self):
- probes = self.get_probes_object(use_db=False)
- probes.write_data("test")
- probes._write_data_xml.assert_called_with("test")
- self.assertFalse(probes._write_data_db.called)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock())
- def test_write_data_db(self):
- probes = self.get_probes_object(use_db=True)
- probes.write_data("test")
- probes._write_data_db.assert_called_with("test")
- self.assertFalse(probes._write_data_xml.called)
-
- def test__write_data_xml(self):
- probes = self.get_probes_object(use_db=False)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- @patch("lxml.etree.Element")
- @patch("lxml.etree.SubElement", StoringSubElement())
- def inner(mock_Element):
- mock_Element.side_effect = StoringElement()
- probes._write_data_xml(None)
-
- top = mock_Element.side_effect.return_value
- write = top.getroottree.return_value.write
- self.assertEqual(write.call_args[0][0],
- os.path.join(datastore, probes.name,
- "probed.xml"))
-
- data = top._element
- foodata = data.find("Client[@name='foo.example.com']")
- self.assertIsNotNone(foodata)
- self.assertIsNotNone(foodata.get("timestamp"))
- self.assertEqual(len(foodata.findall("Probe")),
- len(probes.probedata['foo.example.com']))
- self.assertEqual(len(foodata.findall("Group")),
- len(probes.cgroups['foo.example.com']))
- xml = foodata.find("Probe[@name='xml']")
- self.assertIsNotNone(xml)
- self.assertIsNotNone(xml.get("value"))
- xdata = lxml.etree.XML(xml.get("value"))
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- text = foodata.find("Probe[@name='text']")
- self.assertIsNotNone(text)
- self.assertIsNotNone(text.get("value"))
- multiline = foodata.find("Probe[@name='multiline']")
- self.assertIsNotNone(multiline)
- self.assertIsNotNone(multiline.get("value"))
- self.assertGreater(len(multiline.get("value").splitlines()), 1)
-
- bardata = data.find("Client[@name='bar.example.com']")
- self.assertIsNotNone(bardata)
- self.assertIsNotNone(bardata.get("timestamp"))
- self.assertEqual(len(bardata.findall("Probe")),
- len(probes.probedata['bar.example.com']))
- self.assertEqual(len(bardata.findall("Group")),
- len(probes.cgroups['bar.example.com']))
- empty = bardata.find("Probe[@name='empty']")
- self.assertIsNotNone(empty)
- self.assertIsNotNone(empty.get("value"))
- self.assertEqual(empty.get("value"), "")
- if HAS_JSON:
- jdata = bardata.find("Probe[@name='json']")
- self.assertIsNotNone(jdata)
- self.assertIsNotNone(jdata.get("value"))
- self.assertItemsEqual(test_data,
- json.loads(jdata.get("value")))
- if HAS_YAML:
- ydata = bardata.find("Probe[@name='yaml']")
- self.assertIsNotNone(ydata)
- self.assertIsNotNone(ydata.get("value"))
- self.assertItemsEqual(test_data,
- yaml.load(ydata.get("value")))
-
- inner()
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test__write_data_db(self):
- syncdb(TestProbesDB)
- probes = self.get_probes_object(use_db=True)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- for cname in ["foo.example.com", "bar.example.com"]:
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
-
- for probe in pdata:
- self.assertEqual(probe.hostname, client.hostname)
- self.assertIsNotNone(probe.data)
- if probe.probe == "xml":
- xdata = lxml.etree.XML(probe.data)
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- elif probe.probe == "text":
- pass
- elif probe.probe == "multiline":
- self.assertGreater(len(probe.data.splitlines()), 1)
- elif probe.probe == "empty":
- self.assertEqual(probe.data, "")
- elif probe.probe == "yaml":
- self.assertItemsEqual(test_data, yaml.load(probe.data))
- elif probe.probe == "json":
- self.assertItemsEqual(test_data, json.loads(probe.data))
- else:
- assert False, "Strange probe found in _write_data_db data"
-
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- # test that old probe data is removed properly
- cname = 'foo.example.com'
- del probes.probedata[cname]['text']
- probes.cgroups[cname].pop()
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock())
- def test_load_data_xml(self):
- probes = self.get_probes_object(use_db=False)
- probes.load_data()
- probes._load_data_xml.assert_any_call()
- self.assertFalse(probes._load_data_db.called)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock())
- def test_load_data_db(self):
- probes = self.get_probes_object(use_db=True)
- probes.load_data()
- probes._load_data_db.assert_any_call(client=None)
- self.assertFalse(probes._load_data_xml.called)
-
- @patch("lxml.etree.parse")
- def test__load_data_xml(self, mock_parse):
- probes = self.get_probes_object(use_db=False)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- # to get the value for lxml.etree.parse to parse, we call
- # _write_data_xml, mock the lxml.etree._ElementTree.write()
- # call, and grab the data that gets "written" to probed.xml
- @patch("lxml.etree.Element")
- @patch("lxml.etree.SubElement", StoringSubElement())
- def inner(mock_Element):
- mock_Element.side_effect = StoringElement()
- probes._write_data_xml(None)
- top = mock_Element.side_effect.return_value
- return top._element
-
- xdata = inner()
- mock_parse.return_value = xdata.getroottree()
- probes.probedata = dict()
- probes.cgroups = dict()
-
- probes._load_data_xml()
- mock_parse.assert_called_with(os.path.join(datastore, probes.name,
- 'probed.xml'),
- parser=Bcfg2.Server.XMLParser)
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- self.assertItemsEqual(probes.cgroups, self.get_test_cgroups())
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test__load_data_db(self):
- syncdb(TestProbesDB)
- probes = self.get_probes_object(use_db=True)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
- for cname in probes.probedata.keys():
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- probes.probedata = dict()
- probes.cgroups = dict()
- probes._load_data_db()
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- # the db backend does not store groups at all if a client has
- # no groups set, so we can't just use assertItemsEqual here,
- # because loading saved data may _not_ result in the original
- # data if some clients had no groups set.
- test_cgroups = self.get_test_cgroups()
- for cname, groups in test_cgroups.items():
- if cname in probes.cgroups:
- self.assertEqual(groups, probes.cgroups[cname])
- else:
- self.assertEqual(groups, [])
+ if Bcfg2.Options.setup.probes_db:
+ TestPlugin.test__init(self)
- @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data")
- def test_GetProbes(self, mock_get_probe_data):
- probes = self.get_probes_object()
- metadata = Mock()
- probes.GetProbes(metadata)
- mock_get_probe_data.assert_called_with(metadata)
-
- @patch("Bcfg2.Server.Plugins.Probes.Probes.write_data")
- @patch("Bcfg2.Server.Plugins.Probes.Probes.ReceiveDataItem")
- def test_ReceiveData(self, mock_ReceiveDataItem, mock_write_data):
- # we use a simple (read: bogus) datalist here to make this
- # easy to test
- datalist = ["a", "b", "c"]
-
- probes = self.get_probes_object()
- probes.core.metadata_cache_mode = 'off'
- client = Mock()
- client.hostname = "foo.example.com"
- probes.ReceiveData(client, datalist)
-
- cgroups = []
- cprobedata = ClientProbeDataSet()
- self.assertItemsEqual(mock_ReceiveDataItem.call_args_list,
- [call(client, "a", cgroups, cprobedata),
- call(client, "b", cgroups, cprobedata),
- call(client, "c", cgroups, cprobedata)])
- mock_write_data.assert_called_with(client)
- self.assertFalse(probes.core.metadata_cache.expire.called)
-
- # change the datalist, ensure that the cache is cleared
- probes.cgroups[client.hostname] = datalist
- probes.core.metadata_cache_mode = 'aggressive'
- probes.ReceiveData(client, ['a', 'b', 'd'])
-
- mock_write_data.assert_called_with(client)
- probes.core.metadata_cache.expire.assert_called_with(client.hostname)
-
- def test_ReceiveDataItem(self):
- probes = self.get_probes_object()
- for cname, cdata in self.get_test_probedata().items():
- client = Mock()
- client.hostname = cname
- cgroups = []
- cprobedata = ClientProbeDataSet()
- for pname, pdata in cdata.items():
- dataitem = lxml.etree.Element("Probe", name=pname)
- if pname == "text":
- # add some groups to the plaintext test to test
- # group parsing
- data = [pdata]
- for group in self.get_test_cgroups()[cname]:
- data.append("group:%s" % group)
- dataitem.text = "\n".join(data)
- else:
- dataitem.text = str(pdata)
-
- probes.ReceiveDataItem(client, dataitem, cgroups, cprobedata)
-
- probes.cgroups[client.hostname] = cgroups
- probes.probedata[client.hostname] = cprobedata
- self.assertIn(client.hostname, probes.probedata)
- self.assertIn(pname, probes.probedata[cname])
- self.assertEqual(pdata, probes.probedata[cname][pname])
- self.assertIn(client.hostname, probes.cgroups)
- self.assertEqual(probes.cgroups[cname],
- self.get_test_cgroups()[cname])
-
- def test_get_additional_groups(self):
- TestConnector.test_get_additional_groups(self)
-
- probes = self.get_probes_object()
- test_cgroups = self.get_test_cgroups()
- probes.cgroups = self.get_test_cgroups()
- for cname in test_cgroups.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_cgroups[cname],
- probes.get_additional_groups(metadata))
- # test a non-existent client
+ def test_GetProbes(self):
+ p = self.get_obj()
+ p.probes = Mock()
metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_groups(metadata),
- list())
-
- def test_get_additional_data(self):
- TestConnector.test_get_additional_data(self)
-
- probes = self.get_probes_object()
- test_probedata = self.get_test_probedata()
- probes.probedata = self.get_test_probedata()
- for cname in test_probedata.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_probedata[cname],
- probes.get_additional_data(metadata))
- # test a non-existent client
- metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_data(metadata),
- ClientProbeDataSet())
+ p.GetProbes(metadata)
+ p.probes.get_probe_data.assert_called_with(metadata)
+
+ def additionalDataEqual(self, actual, expected):
+ self.assertItemsEqual(
+ dict([(k, str(d)) for k, d in actual.items()]),
+ expected)
+
+ def test_probes_xml(self):
+ """ Set and retrieve probe data with database disabled """
+ Bcfg2.Options.setup.probes_db = False
+ self._perform_tests()
+
+ @skipUnless(HAS_DJANGO, "Django not found")
+ def test_probes_db(self):
+ """ Set and retrieve probe data with database enabled """
+ Bcfg2.Options.setup.probes_db = True
+ self._perform_tests()
+
+ def _perform_tests(self):
+ p = self.get_obj()
+
+ # first, sanity checks
+ foo_md = Mock(hostname="foo.example.com")
+ bar_md = Mock(hostname="bar.example.com")
+ self.assertItemsEqual(p.get_additional_groups(foo_md), [])
+ self.assertItemsEqual(p.get_additional_data(foo_md), dict())
+ self.assertItemsEqual(p.get_additional_groups(bar_md), [])
+ self.assertItemsEqual(p.get_additional_data(bar_md), dict())
+
+ # next, set some initial probe data
+ foo_datalist = []
+ for key in ['xml', 'text', 'multiline']:
+ pdata = lxml.etree.Element("Probe", name=key)
+ pdata.text = self.data[key]
+ foo_datalist.append(pdata)
+ foo_addl_data = dict(xml=self.test_xdoc,
+ text="freeform text",
+ multiline="""multiple
+lines
+of
+freeform
+text""")
+ bar_datalist = []
+ for key in ['empty', 'almost_empty', 'json', 'yaml']:
+ if key in self.data:
+ pdata = lxml.etree.Element("Probe", name=key)
+ pdata.text = self.data[key]
+ bar_datalist.append(pdata)
+ bar_addl_data = dict(empty="", almost_empty="")
+ if HAS_JSON:
+ bar_addl_data['json'] = self.data['json']
+ if HAS_YAML:
+ bar_addl_data['yaml'] = self.data['yaml']
+
+ p.ReceiveData(foo_md, foo_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+
+ p.ReceiveData(bar_md, bar_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # instantiate a new Probes object and clear Probes caches to
+ # imitate a server restart
+ p = self.get_obj()
+ Bcfg2.Server.Cache.expire("Probes")
+
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # set new data (and groups) for foo
+ foo_datalist = []
+ pdata = lxml.etree.Element("Probe", name='xml')
+ pdata.text = self.data['xml']
+ foo_datalist.append(pdata)
+ foo_addl_data = dict(xml=self.test_xdoc)
+
+ p.ReceiveData(foo_md, foo_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # instantiate a new Probes object and clear Probes caches to
+ # imitate a server restart
+ p = self.get_obj()
+ Bcfg2.Server.Cache.expire("Probes")
+
+ self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
index 896f5861e..159dc6e66 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
@@ -19,12 +19,6 @@ from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \
TestPlugin, TestDirectoryBacked
try:
- from Bcfg2.Encryption import EVPError
- HAS_CRYPTO = True
-except:
- HAS_CRYPTO = False
-
-try:
import json
JSON = "json"
except ImportError:
@@ -36,12 +30,12 @@ class TestPropertyFile(Bcfg2TestCase):
path = os.path.join(datastore, "test")
def get_obj(self, path=None):
+ set_setup_default("writes_enabled", False)
if path is None:
path = self.path
return self.test_obj(path)
def test_write(self):
- Bcfg2.Server.Plugins.Properties.SETUP = MagicMock()
pf = self.get_obj()
pf.validate_data = Mock()
pf._write = Mock()
@@ -52,20 +46,16 @@ class TestPropertyFile(Bcfg2TestCase):
def reset():
pf.validate_data.reset_mock()
pf._write.reset_mock()
- Bcfg2.Server.Plugins.Properties.SETUP.reset_mock()
# test writes disabled
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False
+ Bcfg2.Options.setup.writes_enabled = False
self.assertRaises(PluginExecutionError, pf.write)
self.assertFalse(pf.validate_data.called)
self.assertFalse(pf._write.called)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties",
- "writes_enabled",
- default=True)
# test successful write
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True
+ Bcfg2.Options.setup.writes_enabled = True
self.assertEqual(pf.write(), pf._write.return_value)
pf.validate_data.assert_called_with()
pf._write.assert_called_with()
@@ -99,96 +89,95 @@ class TestPropertyFile(Bcfg2TestCase):
mock_copy.assert_called_with(pf)
-if can_skip or HAS_JSON:
- class TestJSONPropertyFile(TestFileBacked, TestPropertyFile):
- test_obj = JSONPropertyFile
-
- def get_obj(self, *args, **kwargs):
- return TestFileBacked.get_obj(self, *args, **kwargs)
-
- @skipUnless(HAS_JSON, "JSON libraries not found, skipping")
- def setUp(self):
- pass
-
- @patch("%s.loads" % JSON)
- def test_Index(self, mock_loads):
- pf = self.get_obj()
- pf.Index()
- mock_loads.assert_called_with(pf.data)
- self.assertEqual(pf.json, mock_loads.return_value)
-
- mock_loads.reset_mock()
- mock_loads.side_effect = ValueError
- self.assertRaises(PluginExecutionError, pf.Index)
- mock_loads.assert_called_with(pf.data)
-
- @patch("%s.dump" % JSON)
- @patch("%s.open" % builtins)
- def test__write(self, mock_open, mock_dump):
- pf = self.get_obj()
- self.assertTrue(pf._write())
- mock_open.assert_called_with(pf.name, 'wb')
- mock_dump.assert_called_with(pf.json, mock_open.return_value)
-
- @patch("%s.dumps" % JSON)
- def test_validate_data(self, mock_dumps):
- pf = self.get_obj()
- pf.validate_data()
- mock_dumps.assert_called_with(pf.json)
-
- mock_dumps.reset_mock()
- mock_dumps.side_effect = ValueError
- self.assertRaises(PluginExecutionError, pf.validate_data)
- mock_dumps.assert_called_with(pf.json)
-
-
-if can_skip or HAS_YAML:
- class TestYAMLPropertyFile(TestFileBacked, TestPropertyFile):
- test_obj = YAMLPropertyFile
-
- def get_obj(self, *args, **kwargs):
- return TestFileBacked.get_obj(self, *args, **kwargs)
-
- @skipUnless(HAS_YAML, "YAML libraries not found, skipping")
- def setUp(self):
- pass
-
- @patch("yaml.load")
- def test_Index(self, mock_load):
- pf = self.get_obj()
- pf.Index()
- mock_load.assert_called_with(pf.data)
- self.assertEqual(pf.yaml, mock_load.return_value)
-
- mock_load.reset_mock()
- mock_load.side_effect = yaml.YAMLError
- self.assertRaises(PluginExecutionError, pf.Index)
- mock_load.assert_called_with(pf.data)
-
- @patch("yaml.dump")
- @patch("%s.open" % builtins)
- def test__write(self, mock_open, mock_dump):
- pf = self.get_obj()
- self.assertTrue(pf._write())
- mock_open.assert_called_with(pf.name, 'wb')
- mock_dump.assert_called_with(pf.yaml, mock_open.return_value)
-
- @patch("yaml.dump")
- def test_validate_data(self, mock_dump):
- pf = self.get_obj()
- pf.validate_data()
- mock_dump.assert_called_with(pf.yaml)
-
- mock_dump.reset_mock()
- mock_dump.side_effect = yaml.YAMLError
- self.assertRaises(PluginExecutionError, pf.validate_data)
- mock_dump.assert_called_with(pf.yaml)
+class TestJSONPropertyFile(TestFileBacked, TestPropertyFile):
+ test_obj = JSONPropertyFile
+
+ @skipUnless(HAS_JSON, "JSON libraries not found, skipping")
+ def setUp(self):
+ TestFileBacked.setUp(self)
+ TestPropertyFile.setUp(self)
+
+ @patch("%s.loads" % JSON)
+ def test_Index(self, mock_loads):
+ pf = self.get_obj()
+ pf.Index()
+ mock_loads.assert_called_with(pf.data)
+ self.assertEqual(pf.json, mock_loads.return_value)
+
+ mock_loads.reset_mock()
+ mock_loads.side_effect = ValueError
+ self.assertRaises(PluginExecutionError, pf.Index)
+ mock_loads.assert_called_with(pf.data)
+
+ @patch("%s.dump" % JSON)
+ @patch("%s.open" % builtins)
+ def test__write(self, mock_open, mock_dump):
+ pf = self.get_obj()
+ self.assertTrue(pf._write())
+ mock_open.assert_called_with(pf.name, 'wb')
+ mock_dump.assert_called_with(pf.json, mock_open.return_value)
+
+ @patch("%s.dumps" % JSON)
+ def test_validate_data(self, mock_dumps):
+ pf = self.get_obj()
+ pf.validate_data()
+ mock_dumps.assert_called_with(pf.json)
+
+ mock_dumps.reset_mock()
+ mock_dumps.side_effect = ValueError
+ self.assertRaises(PluginExecutionError, pf.validate_data)
+ mock_dumps.assert_called_with(pf.json)
+
+
+class TestYAMLPropertyFile(TestFileBacked, TestPropertyFile):
+ test_obj = YAMLPropertyFile
+
+ @skipUnless(HAS_YAML, "YAML libraries not found, skipping")
+ def setUp(self):
+ TestFileBacked.setUp(self)
+ TestPropertyFile.setUp(self)
+
+ @patch("yaml.load")
+ def test_Index(self, mock_load):
+ pf = self.get_obj()
+ pf.Index()
+ mock_load.assert_called_with(pf.data)
+ self.assertEqual(pf.yaml, mock_load.return_value)
+
+ mock_load.reset_mock()
+ mock_load.side_effect = yaml.YAMLError
+ self.assertRaises(PluginExecutionError, pf.Index)
+ mock_load.assert_called_with(pf.data)
+
+ @patch("yaml.dump")
+ @patch("%s.open" % builtins)
+ def test__write(self, mock_open, mock_dump):
+ pf = self.get_obj()
+ self.assertTrue(pf._write())
+ mock_open.assert_called_with(pf.name, 'wb')
+ mock_dump.assert_called_with(pf.yaml, mock_open.return_value)
+
+ @patch("yaml.dump")
+ def test_validate_data(self, mock_dump):
+ pf = self.get_obj()
+ pf.validate_data()
+ mock_dump.assert_called_with(pf.yaml)
+
+ mock_dump.reset_mock()
+ mock_dump.side_effect = yaml.YAMLError
+ self.assertRaises(PluginExecutionError, pf.validate_data)
+ mock_dump.assert_called_with(pf.yaml)
class TestXMLPropertyFile(TestPropertyFile, TestStructFile):
test_obj = XMLPropertyFile
path = TestStructFile.path
+ def setUp(self):
+ TestPropertyFile.setUp(self)
+ TestStructFile.setUp(self)
+ set_setup_default("automatch", False)
+
def get_obj(self, *args, **kwargs):
return TestStructFile.get_obj(self, *args, **kwargs)
@@ -243,178 +232,47 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile):
mock_exists.assert_called_with(schemafile)
mock_XMLSchema.assert_called_with(file=schemafile)
- def test_Index(self):
- TestStructFile.test_Index(self)
-
- pf = self.get_obj()
- pf.xdata = lxml.etree.Element("Properties")
- lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo")
- pf.data = lxml.etree.tostring(pf.xdata)
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_Index_crypto(self):
- pf = self.get_obj()
- pf._decrypt = Mock()
- pf._decrypt.return_value = 'plaintext'
- pf.data = '''
-<Properties decrypt="strict">
- <Crypted encrypted="foo">
- crypted
- <Plain foo="bar">plain</Plain>
- </Crypted>
- <Crypted encrypted="bar">crypted</Crypted>
- <Plain bar="baz">plain</Plain>
- <Plain>
- <Crypted encrypted="foo">crypted</Crypted>
- </Plain>
-</Properties>'''
-
- # test successful decryption
- pf.Index()
- self.assertItemsEqual(pf._decrypt.call_args_list,
- [call(el) for el in pf.xdata.xpath("//Crypted")])
- for el in pf.xdata.xpath("//Crypted"):
- self.assertEqual(el.text, pf._decrypt.return_value)
-
- # test failed decryption, strict
- pf._decrypt.reset_mock()
- pf._decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError, pf.Index)
-
- # test failed decryption, lax
- pf.data = pf.data.replace("strict", "lax")
- pf._decrypt.reset_mock()
- pf.Index()
- self.assertItemsEqual(pf._decrypt.call_args_list,
- [call(el) for el in pf.xdata.xpath("//Crypted")])
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
- def test_decrypt(self, mock_bruteforce, mock_get_passphrases,
- mock_get_algorithm, mock_ssl):
- pf = self.get_obj()
- Bcfg2.Server.Plugins.Properties.SETUP = MagicMock()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pf._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass",
- bar="barpass")
- mock_get_algorithm.return_value = "bf_cbc"
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pf._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
@patch("copy.copy")
def test_get_additional_data(self, mock_copy):
- Bcfg2.Server.Plugins.Properties.SETUP = Mock()
pf = self.get_obj()
+ pf.setup = Mock()
pf.XMLMatch = Mock()
metadata = Mock()
def reset():
mock_copy.reset_mock()
pf.XMLMatch.reset_mock()
- Bcfg2.Server.Plugins.Properties.SETUP.reset_mock()
+ pf.setup.reset_mock()
pf.xdata = lxml.etree.Element("Properties", automatch="true")
- for automatch in [True, False]:
+ for Bcfg2.Options.setup.automatch in [True, False]:
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch
self.assertEqual(pf.get_additional_data(metadata),
pf.XMLMatch.return_value)
pf.XMLMatch.assert_called_with(metadata)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
self.assertFalse(mock_copy.called)
pf.xdata = lxml.etree.Element("Properties", automatch="false")
- for automatch in [True, False]:
+ for Bcfg2.Options.setup.automatch in [True, False]:
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch
self.assertEqual(pf.get_additional_data(metadata),
mock_copy.return_value)
mock_copy.assert_called_with(pf)
self.assertFalse(pf.XMLMatch.called)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
pf.xdata = lxml.etree.Element("Properties")
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False
+ Bcfg2.Options.setup.automatch = False
self.assertEqual(pf.get_additional_data(metadata),
mock_copy.return_value)
mock_copy.assert_called_with(pf)
self.assertFalse(pf.XMLMatch.called)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True
+ Bcfg2.Options.setup.automatch = True
self.assertEqual(pf.get_additional_data(metadata),
pf.XMLMatch.return_value)
pf.XMLMatch.assert_called_with(metadata)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
self.assertFalse(mock_copy.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py
index f018b45dc..45f3671e8 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py
@@ -1,9 +1,11 @@
import os
import sys
+import copy
import lxml.etree
-import Bcfg2.Server.Plugin
+import Bcfg2.Options
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugins.Rules import *
+from Bcfg2.Server.Plugin import PluginExecutionError
# add all parent testsuite directories to sys.path to allow (most)
# relative imports in python 2.4
@@ -15,116 +17,159 @@ while path != "/":
break
path = os.path.dirname(path)
from common import *
-from TestPlugin import TestPrioDir
+from TestPlugin.Testhelpers import TestPrioDir
class TestRules(TestPrioDir):
test_obj = Rules
- def test_HandlesEntry(self):
+ abstract = dict(
+ basic=lxml.etree.Element("Path", name="/etc/basic"),
+ unhandled=lxml.etree.Element("Path", name="/etc/unhandled"),
+ priority=lxml.etree.Element("Path", name="/etc/priority"),
+ content=lxml.etree.Element("Path", name="/etc/text-content"),
+ duplicate=lxml.etree.Element("SEBoolean", name="duplicate"),
+ group=lxml.etree.Element("SEPort", name="6789/tcp"),
+ children=lxml.etree.Element("Path", name="/etc/child-entries"),
+ regex=lxml.etree.Element("Package", name="regex"),
+ slash=lxml.etree.Element("Path", name="/etc/trailing/slash"),
+ no_slash=lxml.etree.Element("Path", name="/etc/no/trailing/slash/"))
+
+ concrete = dict(
+ basic=lxml.etree.Element("Path", name="/etc/basic", type="directory",
+ owner="root", group="root", mode="0600"),
+ priority=lxml.etree.Element("Path", name="/etc/priority",
+ type="directory", owner="root",
+ group="root", mode="0600"),
+ content=lxml.etree.Element("Path", name="/etc/text-content",
+ type="file", owner="bar", group="bar",
+ mode="0644"),
+ duplicate=lxml.etree.Element("SEBoolean", name="duplicate",
+ value="on"),
+ group=lxml.etree.Element("SEPort", name="6789/tcp",
+ selinuxtype="bcfg2_server_t"),
+ children=lxml.etree.Element("Path", name="/etc/child-entries",
+ type="directory", owner="root",
+ group="root", mode="0775"),
+ regex=lxml.etree.Element("Package", name="regex", type="yum",
+ version="any"),
+ slash=lxml.etree.Element("Path", name="/etc/trailing/slash",
+ type="directory", owner="root", group="root",
+ mode="0600"),
+ no_slash=lxml.etree.Element("Path", name="/etc/no/trailing/slash/",
+ type="directory", owner="root",
+ group="root", mode="0600"))
+
+ concrete['content'].text = "Text content"
+ lxml.etree.SubElement(concrete['children'],
+ "ACL", type="default", scope="user", user="foouser",
+ perms="rw")
+ lxml.etree.SubElement(concrete['children'],
+ "ACL", type="default", scope="group", group="users",
+ perms="rx")
+
+ in_file = copy.deepcopy(concrete)
+ in_file['regex'].set("name", ".*")
+ in_file['slash'].set("name", "/etc/trailing/slash/")
+ in_file['no_slash'].set("name", "/etc/no/trailing/slash")
+
+ rules1 = lxml.etree.Element("Rules", priority="10")
+ rules1.append(in_file['basic'])
+ lxml.etree.SubElement(rules1, "Path", name="/etc/priority",
+ type="directory", owner="foo", group="foo",
+ mode="0644")
+ foogroup = lxml.etree.SubElement(rules1, "Group", name="foogroup")
+ foogroup.append(in_file['group'])
+ rules1.append(in_file['content'])
+ rules1.append(copy.copy(in_file['duplicate']))
+
+ rules2 = lxml.etree.Element("Rules", priority="20")
+ rules2.append(in_file['priority'])
+ rules2.append(in_file['children'])
+ rules2.append(in_file['no_slash'])
+
+ rules3 = lxml.etree.Element("Rules", priority="10")
+ rules3.append(in_file['duplicate'])
+ rules3.append(in_file['regex'])
+ rules3.append(in_file['slash'])
+
+ rules = {"rules1.xml": rules1, "rules2.xml": rules2, "rules3.xml": rules3}
+
+ def setUp(self):
+ TestPrioDir.setUp(self)
+ set_setup_default("lax_decryption", True)
+ set_setup_default("rules_regex", False)
+
+ def get_child(self, name):
+ """ Turn one of the XML documents in `rules` into a child
+ object """
+ filename = os.path.join(datastore, self.test_obj.name, name)
+ rv = self.test_obj.__child__(filename)
+ rv.data = lxml.etree.tostring(self.rules[name])
+ rv.Index()
+ return rv
+
+ def get_obj(self, core=None):
+ r = TestPrioDir.get_obj(self, core=core)
+ r.entries = dict([(n, self.get_child(n)) for n in self.rules.keys()])
+ return r
+
+ def _do_test(self, name, groups=None):
+ if groups is None:
+ groups = []
r = self.get_obj()
- r.Entries = dict(Path={"/etc/foo.conf": Mock(),
- "/etc/bar.conf": Mock()})
- r._matches = Mock()
- metadata = Mock()
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- self.assertEqual(r.HandlesEntry(entry, metadata),
- r._matches.return_value)
- r._matches.assert_called_with(entry, metadata,
- r.Entries['Path'].keys())
-
- r._matches.reset_mock()
- entry = lxml.etree.Element("Path", name="/etc/baz.conf")
- self.assertEqual(r.HandlesEntry(entry, metadata),
- r._matches.return_value)
- r._matches.assert_called_with(entry, metadata,
- r.Entries['Path'].keys())
-
- r._matches.reset_mock()
- entry = lxml.etree.Element("Package", name="foo")
- self.assertFalse(r.HandlesEntry(entry, metadata))
-
- def test_BindEntry(self, method="BindEntry"):
+ metadata = Mock(groups=groups)
+ entry = copy.deepcopy(self.abstract[name])
+ self.assertTrue(r.HandlesEntry(entry, metadata))
+ r.HandleEntry(entry, metadata)
+ self.assertXMLEqual(entry, self.concrete[name])
+
+ def _do_test_failure(self, name, groups=None, handles=None):
+ if groups is None:
+ groups = []
r = self.get_obj()
- r.get_attrs = Mock()
- r.get_attrs.return_value = dict(overwrite="new", add="add",
- text="text")
- entry = lxml.etree.Element("Test", overwrite="old", keep="keep")
- metadata = Mock()
-
- getattr(r, method)(entry, metadata)
- r.get_attrs.assert_called_with(entry, metadata)
- self.assertItemsEqual(entry.attrib,
- dict(overwrite="old", add="add", keep="keep",
- text="text"))
-
- def test_HandleEntry(self):
- self.test_BindEntry(method="HandleEntry")
-
- @patch("Bcfg2.Server.Plugin.PrioDir._matches")
- def test__matches(self, mock_matches):
- """ test _matches() behavior regardless of state of _regex_enabled """
- r = self.get_obj()
- metadata = Mock()
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- rules = []
- mock_matches.return_value = True
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- # test special Path cases -- adding and removing trailing slash
- mock_matches.reset_mock()
- mock_matches.return_value = False
- rules = ["/etc/foo/", "/etc/bar"]
- entry = lxml.etree.Element("Path", name="/etc/foo")
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- mock_matches.reset_mock()
- entry = lxml.etree.Element("Path", name="/etc/bar/")
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- @patch("Bcfg2.Server.Plugin.PrioDir._matches")
- def test__matches_regex_disabled(self, mock_matches):
- """ test failure to match with regex disabled """
- r = self.get_obj()
- self.set_regex_enabled(r, False)
- metadata = Mock()
- mock_matches.return_value = False
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- rules = []
- self.assertFalse(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- @patch("Bcfg2.Server.Plugin.PrioDir._matches")
- def test__matches_regex_enabled(self, mock_matches):
- """ test match with regex enabled """
- r = self.get_obj()
- self.set_regex_enabled(r, True)
- metadata = Mock()
- mock_matches.return_value = False
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- rules = ["/etc/.*\.conf", "/etc/bar"]
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
- self.assertIn("/etc/.*\.conf", r._regex_cache.keys())
-
- def set_regex_enabled(self, rules_obj, state):
- """ set the state of regex_enabled for this implementation of
- Rules """
- if not isinstance(rules_obj.core.setup, MagicMock):
- rules_obj.core.setup = MagicMock()
- rules_obj.core.setup.cfp.getboolean.return_value = state
-
- def test__regex_enabled(self):
- r = self.get_obj()
- r.core.setup = MagicMock()
- self.assertEqual(r._regex_enabled,
- r.core.setup.cfp.getboolean.return_value)
- r.core.setup.cfp.getboolean.assert_called_with("rules", "regex",
- default=False)
+ metadata = Mock(groups=groups)
+ entry = self.abstract[name]
+ if handles is not None:
+ self.assertEqual(handles, r.HandlesEntry(entry, metadata))
+ self.assertRaises(PluginExecutionError,
+ r.HandleEntry, entry, metadata)
+
+ def test_basic(self):
+ """ Test basic Rules usage """
+ self._do_test('basic')
+ self._do_test_failure('unhandled', handles=False)
+
+ def test_priority(self):
+ """ Test that Rules respects priority """
+ self._do_test('priority')
+
+ def test_duplicate(self):
+ """ Test that Rules raises exceptions for duplicate entries """
+ self._do_test_failure('duplicate')
+
+ def test_content(self):
+ """ Test that Rules copies text content from concrete entries """
+ self._do_test('content')
+
+ def test_group(self):
+ """ Test that Rules respects <Group/> tags """
+ self._do_test('group', groups=['foogroup'])
+ self._do_test_failure('group', groups=['bargroup'], handles=False)
+
+ def test_children(self):
+ """ Test that Rules copies child elements from concrete entries """
+ self._do_test('children')
+
+ def test_regex(self):
+ """ Test that Rules handles regular expressions properly """
+ Bcfg2.Options.setup.rules_regex = False
+ self._do_test_failure('regex', handles=False)
+ Bcfg2.Options.setup.rules_regex = True
+ self._do_test('regex')
+ Bcfg2.Options.setup.rules_regex = False
+
+ def test_slash(self):
+ """ Test that Rules handles trailing slashes on Path entries """
+ self._do_test('slash')
+ self._do_test('no_slash')
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
index bf9a3ced3..128d6cae5 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
@@ -25,7 +25,7 @@ class TestHelperModule(Bcfg2TestCase):
def get_obj(self, path=None):
if path is None:
path = self.path
- return self.test_obj(path, fam=Mock())
+ return self.test_obj(path)
def test__init(self):
hm = self.get_obj()