summaryrefslogtreecommitdiffstats
path: root/testsuite/Testsrc/Testlib/TestServer/TestPlugins
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins')
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py223
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py111
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py32
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py7
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py9
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py12
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py31
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py202
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py37
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py235
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py99
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py60
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py82
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py17
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py169
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py82
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py2
17 files changed, 719 insertions, 691 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py
new file mode 100644
index 000000000..86a960701
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py
@@ -0,0 +1,223 @@
+import os
+import sys
+import lxml.etree
+import Bcfg2.Server.Plugin
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.ACL import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \
+ TestClientACLs
+
+
+class TestFunctions(Bcfg2TestCase):
+ def test_rmi_names_equal(self):
+ good_cases = [('*', 'foo'),
+ ('foo', 'foo'),
+ ('foo.*', 'foo.bar'),
+ ('*.*', 'foo.bar'),
+ ('foo.bar', 'foo.bar'),
+ ('*.bar', 'foo.bar'),
+ ('foo.*.bar', 'foo.baz.bar')]
+ bad_cases = [('foo', 'bar'),
+ ('*', 'foo.bar'),
+ ('*.*', 'foo'),
+ ('*.*', 'foo.bar.baz'),
+ ('foo.*', 'bar.foo'),
+ ('*.bar', 'bar.foo'),
+ ('foo.*', 'foobar')]
+ for first, second in good_cases:
+ self.assertTrue(rmi_names_equal(first, second),
+ "rmi_names_equal(%s, %s) unexpectedly False" %
+ (first, second))
+ self.assertTrue(rmi_names_equal(second, first),
+ "rmi_names_equal(%s, %s) unexpectedly False" %
+ (second, first))
+ for first, second in bad_cases:
+ self.assertFalse(rmi_names_equal(first, second),
+ "rmi_names_equal(%s, %s) unexpectedly True" %
+ (first, second))
+ self.assertFalse(rmi_names_equal(second, first),
+ "rmi_names_equal(%s, %s) unexpectedly True" %
+ (second, first))
+
+ def test_ip_matches(self):
+ good_cases = [
+ ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")),
+ ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="24")),
+ ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.0")),
+ ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.224")),
+ ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="27")),
+ ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0",
+ netmask="16"))]
+ bad_cases = [
+ ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")),
+ ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="24")),
+ ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.0")),
+ ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.224")),
+ ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="27")),
+ ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0",
+ netmask="16"))]
+ for ip, entry in good_cases:
+ self.assertTrue(ip_matches(ip, entry),
+ "ip_matches(%s, %s) unexpectedly False" %
+ (ip, lxml.etree.tostring(entry)))
+ for ip, entry in bad_cases:
+ self.assertFalse(ip_matches(ip, entry),
+ "ip_matches(%s, %s) unexpectedly True" %
+ (ip, lxml.etree.tostring(entry)))
+
+
+class TestIPACLFile(TestXMLFileBacked):
+ test_obj = IPACLFile
+
+ @patch("Bcfg2.Server.Plugins.ACL.ip_matches")
+ @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal")
+ def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches):
+ af = self.get_obj()
+ ip = "10.0.0.8"
+ rmi = "ACL.test"
+
+ def reset():
+ mock_rmi_names_equal.reset_mock()
+ mock_ip_matches.reset_mock()
+
+ # test default defer with no entries
+ af.entries = []
+ self.assertIsNone(af.check_acl(ip, rmi))
+
+ # test explicit allow, deny, and defer
+ entries = dict(Allow=lxml.etree.Element("Allow", method=rmi),
+ Deny=lxml.etree.Element("Deny", method=rmi),
+ Defer=lxml.etree.Element("Defer", method=rmi))
+ af.entries = list(entries.values())
+
+ def get_ip_matches(tag):
+ def ip_matches(ip, entry):
+ return entry.tag == tag
+
+ return ip_matches
+
+ mock_rmi_names_equal.return_value = True
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Allow")
+ self.assertTrue(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Allow'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Deny")
+ self.assertFalse(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Deny'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Defer")
+ self.assertIsNone(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Defer'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ # test matching RMI names
+ reset()
+ mock_ip_matches.side_effect = lambda i, e: True
+ mock_rmi_names_equal.side_effect = lambda a, b: a == b
+ rmi = "ACL.test2"
+ matching = lxml.etree.Element("Allow", method=rmi)
+ af.entries.append(matching)
+ self.assertTrue(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, matching)
+ self.assertTrue(
+ call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or
+ call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list)
+
+ # test implicit allow for localhost, defer for others
+ reset()
+ mock_ip_matches.side_effect = lambda i, e: False
+ self.assertIsNone(af.check_acl(ip, rmi))
+
+ reset()
+ self.assertTrue(af.check_acl("127.0.0.1", rmi))
+
+
+class TestMetadataACLFile(TestStructFile):
+ test_obj = MetadataACLFile
+
+ @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal")
+ def test_check_acl(self, mock_rmi_names_equal):
+ af = self.get_obj()
+ af.Match = Mock()
+ metadata = Mock()
+ mock_rmi_names_equal.side_effect = lambda a, b: a == b
+
+ def reset():
+ af.Match.reset_mock()
+ mock_rmi_names_equal.reset_mock()
+
+ # test default allow
+ af.entries = []
+ self.assertTrue(af.check_acl(metadata, 'ACL.test'))
+
+ # test explicit allow and deny
+ reset()
+ af.entries = [lxml.etree.Element("Allow", method='ACL.test'),
+ lxml.etree.Element("Deny", method='ACL.test2')]
+ af.Match.return_value = af.entries
+ self.assertTrue(af.check_acl(metadata, 'ACL.test'))
+ af.Match.assert_called_with(metadata)
+ self.assertIn(call('ACL.test', 'ACL.test'),
+ mock_rmi_names_equal.call_args_list)
+
+ reset()
+ self.assertFalse(af.check_acl(metadata, 'ACL.test2'))
+ af.Match.assert_called_with(metadata)
+ self.assertIn(call('ACL.test2', 'ACL.test2'),
+ mock_rmi_names_equal.call_args_list)
+
+ # test default deny for non-localhost
+ reset()
+ self.assertFalse(af.check_acl(metadata, 'ACL.test3'))
+ af.Match.assert_called_with(metadata)
+
+ # test default allow for localhost
+ reset()
+ metadata.hostname = 'localhost'
+ self.assertTrue(af.check_acl(metadata, 'ACL.test3'))
+ af.Match.assert_called_with(metadata)
+
+
+class TestACL(TestPlugin, TestClientACLs):
+ test_obj = ACL
+
+ def test_check_acl_ip(self):
+ acl = self.get_obj()
+ acl.ip_acls = Mock()
+ self.assertEqual(acl.check_acl_ip(("192.168.1.10", "12345"),
+ "ACL.test"),
+ acl.ip_acls.check_acl.return_value)
+ acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test")
+
+ def test_check_acl_metadata(self):
+ acl = self.get_obj()
+ acl.metadata_acls = Mock()
+ metadata = Mock()
+ self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"),
+ acl.metadata_acls.check_acl.return_value)
+ acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test")
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py
new file mode 100644
index 000000000..cfb379c40
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py
@@ -0,0 +1,111 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Bundler import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestStructFile, TestPlugin, TestStructure, \
+ TestXMLDirectoryBacked
+
+
+class TestBundleFile(TestStructFile):
+ test_obj = BundleFile
+ path = os.path.join(datastore, "test", "test1.xml")
+
+ def test_bundle_name(self):
+ cases = [("foo.xml", "foo"),
+ ("foo.bar.xml", "foo.bar"),
+ ("foo-bar-baz.xml", "foo-bar-baz"),
+ ("foo....xml", "foo..."),
+ ("foo.genshi", "foo")]
+ bf = self.get_obj()
+ for fname, bname in cases:
+ bf.name = fname
+ self.assertEqual(bf.bundle_name, bname)
+
+
+class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked):
+ test_obj = Bundler
+
+ def get_obj(self, core=None):
+ @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__,
+ self.test_obj.__name__),
+ Mock())
+ def inner():
+ return TestPlugin.get_obj(self, core=core)
+ return inner()
+
+ @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent")
+ def test_HandleEvent(self, mock_HandleEvent):
+ b = self.get_obj()
+ b.bundles = dict(foo=Mock(), bar=Mock())
+ b.entries = {"foo.xml": BundleFile("foo.xml"),
+ "baz.xml": BundleFile("baz.xml")}
+ event = Mock()
+ b.HandleEvent(event)
+ mock_HandleEvent.assert_called_with(b, event)
+ self.assertItemsEqual(b.bundles,
+ dict(foo=b.entries['foo.xml'],
+ baz=b.entries['baz.xml']))
+
+ def test_BuildStructures(self):
+ b = self.get_obj()
+ b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(),
+ has_dep=Mock(), is_dep=Mock(), indep=Mock())
+ expected = dict()
+
+ b.bundles['error'].XMLMatch.side_effect = TemplateError(None)
+
+ xinclude = lxml.etree.Element("Bundle")
+ lxml.etree.SubElement(lxml.etree.SubElement(xinclude, "Bundle"),
+ "Path", name="/test")
+ b.bundles['xinclude'].XMLMatch.return_value = xinclude
+ expected['xinclude'] = lxml.etree.Element("Bundle", name="xinclude")
+ lxml.etree.SubElement(expected['xinclude'], "Path", name="/test")
+
+ has_dep = lxml.etree.Element("Bundle")
+ lxml.etree.SubElement(has_dep, "Bundle", name="is_dep")
+ lxml.etree.SubElement(has_dep, "Package", name="foo")
+ b.bundles['has_dep'].XMLMatch.return_value = has_dep
+ expected['has_dep'] = lxml.etree.Element("Bundle", name="has_dep")
+ lxml.etree.SubElement(expected['has_dep'], "Package", name="foo")
+
+ is_dep = lxml.etree.Element("Bundle")
+ lxml.etree.SubElement(is_dep, "Package", name="bar")
+ b.bundles['is_dep'].XMLMatch.return_value = is_dep
+ expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep")
+ lxml.etree.SubElement(expected['is_dep'], "Package", name="bar")
+
+ indep = lxml.etree.Element("Bundle", independent="true")
+ lxml.etree.SubElement(indep, "Service", name="baz")
+ b.bundles['indep'].XMLMatch.return_value = indep
+ expected['indep'] = lxml.etree.Element("Independent", name="indep")
+ lxml.etree.SubElement(expected['indep'], "Service", name="baz")
+
+ metadata = Mock()
+ metadata.bundles = ["error", "xinclude", "has_dep", "indep"]
+
+ rv = b.BuildStructures(metadata)
+ self.assertEqual(len(rv), 4)
+ for bundle in rv:
+ name = bundle.get("name")
+ self.assertIsNotNone(name,
+ "Bundle %s was not built" % name)
+ self.assertIn(name, expected,
+ "Unexpected bundle %s was built" % name)
+ self.assertXMLEqual(bundle, expected[name],
+ "Bundle %s was not built correctly" % name)
+ b.bundles[name].XMLMatch.assert_called_with(metadata)
+
+ b.bundles['error'].XMLMatch.assert_called_with(metadata)
+ self.assertFalse(b.bundles['skip'].XMLMatch.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
index d655a20cd..b77d52033 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
@@ -42,27 +42,25 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
def test_category(self):
akg = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp
+ akg.setup = Mock()
+ akg.setup.cfp.has_section.return_value = False
+ akg.setup.cfp.has_option.return_value = False
self.assertIsNone(akg.category)
- cfp.has_section.assert_called_with("sshkeys")
+ akg.setup.cfp.has_section.assert_called_with("sshkeys")
- cfp.reset_mock()
- cfp.has_section.return_value = True
+ akg.setup.reset_mock()
+ akg.setup.cfp.has_section.return_value = True
self.assertIsNone(akg.category)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
-
- cfp.reset_mock()
- cfp.has_option.return_value = True
- self.assertEqual(akg.category, cfp.get.return_value)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.get.assert_called_with("sshkeys", "category")
+ akg.setup.cfp.has_section.assert_called_with("sshkeys")
+ akg.setup.cfp.has_option.assert_called_with("sshkeys", "category")
+
+ akg.setup.reset_mock()
+ akg.setup.cfp.has_option.return_value = True
+ self.assertEqual(akg.category, akg.setup.cfp.get.return_value)
+ akg.setup.cfp.has_section.assert_called_with("sshkeys")
+ akg.setup.cfp.has_option.assert_called_with("sshkeys", "category")
+ akg.setup.cfp.get.assert_called_with("sshkeys", "category")
@patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata")
@patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category")
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
index fc5d5e53d..31227329c 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
@@ -31,11 +31,11 @@ if HAS_CHEETAH or can_skip:
ccg.data = "data"
entry = lxml.etree.Element("Path", name="/test.txt")
metadata = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock()
+ ccg.setup = MagicMock()
self.assertEqual(ccg.get_data(entry, metadata),
mock_Template.return_value.respond.return_value)
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo")
+ ccg.setup.__getitem__.assert_called_with("repo")
mock_Template.assert_called_with("data".decode(ccg.encoding),
compilerSettings=ccg.settings)
tmpl = mock_Template.return_value
@@ -44,5 +44,4 @@ if HAS_CHEETAH or can_skip:
self.assertEqual(tmpl.name, entry.get("name"))
self.assertEqual(tmpl.path, entry.get("name"))
self.assertEqual(tmpl.source_path, ccg.name)
- self.assertEqual(tmpl.repo,
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value)
+ self.assertEqual(tmpl.repo, ccg.setup.__getitem__.return_value)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
index 71a7410da..f8e9b1990 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
@@ -27,15 +27,12 @@ if can_skip or HAS_CRYPTO:
pass
@patchIf(HAS_CRYPTO,
- "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm")
- @patchIf(HAS_CRYPTO,
"Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt")
- def test_handle_event(self, mock_decrypt, mock_get_algorithm):
+ def test_handle_event(self, mock_decrypt):
@patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
def inner(mock_handle_event):
def reset():
mock_decrypt.reset_mock()
- mock_get_algorithm.reset_mock()
mock_handle_event.reset_mock()
def get_event_data(obj, event):
@@ -47,9 +44,7 @@ if can_skip or HAS_CRYPTO:
ceg = self.get_obj()
ceg.handle_event(event)
mock_handle_event.assert_called_with(ceg, event)
- mock_decrypt.assert_called_with("encrypted",
- setup=SETUP,
- algorithm=mock_get_algorithm.return_value)
+ mock_decrypt.assert_called_with("encrypted")
self.assertEqual(ceg.data, "plaintext")
reset()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
index b447a9bb8..330779c99 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
@@ -14,20 +14,14 @@ while path != "/":
path = os.path.dirname(path)
from common import *
-try:
- from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
- TestCfgGenshiGenerator
- HAS_GENSHI = True
-except ImportError:
- TestCfgGenshiGenerator = object
- HAS_GENSHI = False
+from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
+ TestCfgGenshiGenerator
-if can_skip or (HAS_CRYPTO and HAS_GENSHI):
+if can_skip or HAS_CRYPTO:
class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
test_obj = CfgEncryptedGenshiGenerator
@skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
def setUp(self):
pass
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
index 0f369113b..7ceedb7c2 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
@@ -21,35 +21,32 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier
class TestCfgExternalCommandVerifier(TestCfgVerifier):
test_obj = CfgExternalCommandVerifier
- @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen")
- def test_verify_entry(self, mock_Popen):
- proc = Mock()
- mock_Popen.return_value = proc
- proc.wait.return_value = 0
- proc.communicate.return_value = ("stdout", "stderr")
+ def test_verify_entry(self):
entry = lxml.etree.Element("Path", name="/test.txt")
metadata = Mock()
ecv = self.get_obj()
ecv.cmd = ["/bin/bash", "-x", "foo"]
+ ecv.exc = Mock()
+ ecv.exc.run.return_value = Mock()
+ ecv.exc.run.return_value.success = True
+
ecv.verify_entry(entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
- proc.communicate.assert_called_with(input="data")
- proc.wait.assert_called_with()
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
- mock_Popen.reset_mock()
- proc.wait.return_value = 13
+ ecv.exc.reset_mock()
+ ecv.exc.run.return_value.success = False
self.assertRaises(CfgVerificationError,
ecv.verify_entry, entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
- proc.communicate.assert_called_with(input="data")
- proc.wait.assert_called_with()
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
+
+ ecv.exc.reset_mock()
- mock_Popen.reset_mock()
- mock_Popen.side_effect = OSError
+ ecv.exc.reset_mock()
+ ecv.exc.run.side_effect = OSError
self.assertRaises(CfgVerificationError,
ecv.verify_entry, entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
@patch("os.access")
def test_handle_event(self, mock_access):
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
index 2e8b7bfa5..b73670fb7 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
@@ -19,111 +19,97 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if can_skip or HAS_GENSHI:
- class TestCfgGenshiGenerator(TestCfgGenerator):
- test_obj = CfgGenshiGenerator
-
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
- def setUp(self):
- pass
-
- def test_removecomment(self):
- data = [(None, "test", 1),
- (None, "test2", 2)]
- stream = [(genshi.core.COMMENT, "test", 0),
- data[0],
- (genshi.core.COMMENT, "test3", 0),
- data[1]]
- self.assertItemsEqual(list(removecomment(stream)), data)
-
- def test__init(self):
- TestCfgGenerator.test__init(self)
- cgg = self.get_obj()
- self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
-
- def test_get_data(self):
- cgg = self.get_obj()
- cgg._handle_genshi_exception = Mock()
- cgg.template = Mock()
- fltr = Mock()
- cgg.template.generate.return_value = fltr
- stream = Mock()
- fltr.filter.return_value = stream
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
-
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock()
-
- def reset():
- cgg.template.reset_mock()
- cgg._handle_genshi_exception.reset_mock()
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock()
-
- template_vars = dict(
- name=entry.get("name"),
- metadata=metadata,
- path=cgg.name,
- source_path=cgg.name,
- repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value)
-
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- def render(fmt, **kwargs):
- stream.render.side_effect = None
- raise TypeError
- stream.render.side_effect = render
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- self.assertEqual(stream.render.call_args_list,
- [call("text", encoding=cgg.encoding,
- strip_whitespace=False),
- call("text", encoding=cgg.encoding)])
-
- reset()
- stream.render.side_effect = UndefinedError("test")
- self.assertRaises(UndefinedError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- stream.render.side_effect = ValueError
- cgg._handle_genshi_exception.side_effect = ValueError
- self.assertRaises(ValueError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
- self.assertTrue(cgg._handle_genshi_exception.called)
-
- def test_handle_event(self):
- cgg = self.get_obj()
- cgg.loader = Mock()
- event = Mock()
- cgg.handle_event(event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
-
- cgg.loader.reset_mock()
- cgg.loader.load.side_effect = OSError
- self.assertRaises(PluginExecutionError,
- cgg.handle_event, event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
+class TestCfgGenshiGenerator(TestCfgGenerator):
+ test_obj = CfgGenshiGenerator
+
+ def test__init(self):
+ TestCfgGenerator.test__init(self)
+ cgg = self.get_obj()
+ self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
+
+ def test_get_data(self):
+ cgg = self.get_obj()
+ cgg._handle_genshi_exception = Mock()
+ cgg.setup = MagicMock()
+ cgg.template = Mock()
+ fltr = Mock()
+ cgg.template.generate.return_value = fltr
+ stream = Mock()
+ fltr.filter.return_value = stream
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+
+ def reset():
+ cgg.template.reset_mock()
+ cgg._handle_genshi_exception.reset_mock()
+ cgg.setup.reset_mock()
+
+ template_vars = dict(
+ name=entry.get("name"),
+ metadata=metadata,
+ path=cgg.name,
+ source_path=cgg.name,
+ repo=cgg.setup.__getitem__.return_value)
+
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+
+ reset()
+ def render(fmt, **kwargs):
+ stream.render.side_effect = None
+ raise TypeError
+ stream.render.side_effect = render
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ self.assertEqual(stream.render.call_args_list,
+ [call("text", encoding=cgg.encoding,
+ strip_whitespace=False),
+ call("text", encoding=cgg.encoding)])
+
+ reset()
+ stream.render.side_effect = UndefinedError("test")
+ self.assertRaises(UndefinedError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+
+ reset()
+ stream.render.side_effect = ValueError
+ cgg._handle_genshi_exception.side_effect = ValueError
+ self.assertRaises(ValueError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+ self.assertTrue(cgg._handle_genshi_exception.called)
+
+ def test_handle_event(self):
+ cgg = self.get_obj()
+ cgg.loader = Mock()
+ event = Mock()
+ cgg.handle_event(event)
+ cgg.loader.load.assert_called_with(cgg.name,
+ cls=NewTextTemplate,
+ encoding=cgg.encoding)
+
+ cgg.loader.reset_mock()
+ cgg.loader.load.side_effect = OSError
+ self.assertRaises(PluginExecutionError,
+ cgg.handle_event, event)
+ cgg.loader.load.assert_called_with(cgg.name,
+ cls=NewTextTemplate,
+ encoding=cgg.encoding)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
index 839e9c3b8..7e7cb5e3c 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
@@ -27,47 +27,16 @@ class TestCfgInfoXML(TestCfgInfo):
self.assertIsInstance(ci.infoxml, InfoXML)
def test_bind_info_to_entry(self):
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
ci = self.get_obj()
ci.infoxml = Mock()
- ci._set_info = Mock()
-
- self.assertRaises(PluginExecutionError,
- ci.bind_info_to_entry, entry, metadata)
- ci.infoxml.pnode.Match.assert_called_with(metadata, dict(),
- entry=entry)
- self.assertFalse(ci._set_info.called)
-
- ci.infoxml.reset_mock()
- ci._set_info.reset_mock()
- mdata_value = Mock()
- def set_mdata(metadata, mdata, entry=None):
- mdata['Info'] = {None: mdata_value}
+ entry = Mock()
+ metadata = Mock()
- ci.infoxml.pnode.Match.side_effect = set_mdata
ci.bind_info_to_entry(entry, metadata)
- ci.infoxml.pnode.Match.assert_called_with(metadata,
- dict(Info={None: mdata_value}),
- entry=entry)
- ci._set_info.assert_called_with(entry, mdata_value)
+ ci.infoxml.BindEntry.assert_called_with(entry, metadata)
def test_handle_event(self):
ci = self.get_obj()
ci.infoxml = Mock()
ci.handle_event(Mock)
ci.infoxml.HandleEvent.assert_called_with()
-
- def test__set_info(self):
- @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info")
- def inner(mock_set_info):
- ci = self.get_obj()
- entry = Mock()
- info = {"foo": "foo",
- "__children__": ["one", "two"]}
- ci._set_info(entry, info)
- self.assertItemsEqual(entry.append.call_args_list,
- [call(c) for c in info['__children__']])
-
- inner()
- TestCfgInfo.test__set_info(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
index dc4b11241..48d5cdbfe 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
@@ -7,7 +7,7 @@ from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import *
from Bcfg2.Server.Plugin import PluginExecutionError
import Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator
try:
- from Bcfg2.Encryption import EVPError
+ from Bcfg2.Server.Encryption import EVPError
HAS_CRYPTO = True
except:
HAS_CRYPTO = False
@@ -44,77 +44,76 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
def test_category(self):
pkc = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
+ pkc.setup = Mock()
+ pkc.setup.cfp = Mock()
+ pkc.setup.cfp.has_section.return_value = False
+ pkc.setup.cfp.has_option.return_value = False
self.assertIsNone(pkc.category)
- cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
- cfp.reset_mock()
- cfp.has_section.return_value = True
+ pkc.setup.reset_mock()
+ pkc.setup.cfp.has_section.return_value = True
self.assertIsNone(pkc.category)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.reset_mock()
- cfp.has_option.return_value = True
- self.assertEqual(pkc.category, cfp.get.return_value)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.get.assert_called_with("sshkeys", "category")
+ pkc.setup.reset_mock()
+ pkc.setup.cfp.has_option.return_value = True
+ self.assertEqual(pkc.category, pkc.setup.cfp.get.return_value)
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_option.assert_called_with("sshkeys", "category")
+ pkc.setup.cfp.get.assert_called_with("sshkeys", "category")
@skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.get_passphrases")
def test_passphrase(self, mock_get_passphrases):
pkc = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
+ pkc.setup = Mock()
+ pkc.setup.cfp = Mock()
+ pkc.setup.cfp.has_section.return_value = False
+ pkc.setup.cfp.has_option.return_value = False
self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
- cfp.reset_mock()
- cfp.has_section.return_value = True
+ pkc.setup.reset_mock()
+ pkc.setup.cfp.has_section.return_value = True
self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_option.assert_called_with("sshkeys",
+ "passphrase")
- cfp.reset_mock()
- cfp.get.return_value = "test"
+ pkc.setup.reset_mock()
+ pkc.setup.cfp.get.return_value = "test"
mock_get_passphrases.return_value = dict(test="foo", test2="bar")
- cfp.has_option.return_value = True
+ pkc.setup.cfp.has_option.return_value = True
self.assertEqual(pkc.passphrase, "foo")
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
- cfp.get.assert_called_with("sshkeys", "passphrase")
- mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ pkc.setup.cfp.has_section.assert_called_with("sshkeys")
+ pkc.setup.cfp.has_option.assert_called_with("sshkeys",
+ "passphrase")
+ pkc.setup.cfp.get.assert_called_with("sshkeys", "passphrase")
+ mock_get_passphrases.assert_called_with()
@patch("shutil.rmtree")
@patch("tempfile.mkdtemp")
- @patch("subprocess.Popen")
- def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree):
+ def test__gen_keypair(self, mock_mkdtemp, mock_rmtree):
pkc = self.get_obj()
+ pkc.cmd = Mock()
pkc.XMLMatch = Mock()
mock_mkdtemp.return_value = datastore
metadata = Mock()
- proc = Mock()
- proc.wait.return_value = 0
- proc.communicate.return_value = MagicMock()
- mock_Popen.return_value = proc
+ exc = Mock()
+ exc.success = True
+ pkc.cmd.run.return_value = exc
spec = lxml.etree.Element("PrivateKey")
pkc.XMLMatch.return_value = spec
def reset():
pkc.XMLMatch.reset_mock()
- mock_Popen.reset_mock()
+ pkc.cmd.reset_mock()
mock_mkdtemp.reset_mock()
mock_rmtree.reset_mock()
@@ -122,10 +121,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
os.path.join(datastore, "privkey"))
pkc.XMLMatch.assert_called_with(metadata)
mock_mkdtemp.assert_called_with()
- self.assertItemsEqual(mock_Popen.call_args[0][0],
- ["ssh-keygen", "-f",
- os.path.join(datastore, "privkey"),
- "-t", "rsa", "-N", ""])
+ pkc.cmd.run.assert_called_with(["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "rsa", "-N", ""])
reset()
lxml.etree.SubElement(spec, "Params", bits="768", type="dsa")
@@ -136,13 +134,12 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
os.path.join(datastore, "privkey"))
pkc.XMLMatch.assert_called_with(metadata)
mock_mkdtemp.assert_called_with()
- self.assertItemsEqual(mock_Popen.call_args[0][0],
- ["ssh-keygen", "-f",
- os.path.join(datastore, "privkey"),
- "-t", "dsa", "-b", "768", "-N", "foo"])
+ pkc.cmd.run.assert_called_with(["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "dsa", "-b", "768", "-N", "foo"])
reset()
- proc.wait.return_value = 1
+ pkc.cmd.run.return_value.success = False
self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata)
mock_rmtree.assert_called_with(datastore)
@@ -281,9 +278,8 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
if HAS_CRYPTO:
@patch(passphrase, "foo")
- @patch("Bcfg2.Encryption.ssl_encrypt")
- @patch("Bcfg2.Encryption.get_algorithm")
- def inner2(mock_get_algorithm, mock_ssl_encrypt):
+ @patch("Bcfg2.Server.Encryption.ssl_encrypt")
+ def inner2(mock_ssl_encrypt):
reset()
mock_ssl_encrypt.return_value = "encryptedprivatekey"
Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True
@@ -302,136 +298,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
"ssh-rsa publickey pubkey.filename\n", group="foo")
pkc.write_data.assert_called_with("encryptedprivatekey",
group="foo", ext=".crypt")
- mock_ssl_encrypt.assert_called_with(
- "privatekey", "foo",
- algorithm=mock_get_algorithm.return_value)
+ mock_ssl_encrypt.assert_called_with("privatekey", "foo")
mock_rmtree.assert_called_with(datastore)
inner2()
-
- def test_Index(self):
- has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False
- TestStructFile.test_Index(self)
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_Index_crypto(self):
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict"
-
- pkc = self.get_obj()
- pkc._decrypt = Mock()
- pkc._decrypt.return_value = 'plaintext'
- pkc.data = '''
-<PrivateKey>
- <Group name="test">
- <Passphrase encrypted="foo">crypted</Passphrase>
- </Group>
- <Group name="test" negate="true">
- <Passphrase>plain</Passphrase>
- </Group>
-</PrivateKey>'''
-
- # test successful decryption
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
- for el in pkc.xdata.xpath("//Crypted"):
- self.assertEqual(el.text, pkc._decrypt.return_value)
-
- # test failed decryption, strict
- pkc._decrypt.reset_mock()
- pkc._decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError, pkc.Index)
-
- # test failed decryption, lax
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax"
- pkc._decrypt.reset_mock()
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
- def test_decrypt(self, mock_bruteforce, mock_get_passphrases,
- mock_get_algorithm, mock_ssl):
- pkc = self.get_obj()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass",
- bar="barpass")
- mock_get_algorithm.return_value = "bf_cbc"
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pkc._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
index ea3549c1b..07a1b120e 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
@@ -159,21 +159,6 @@ class TestCfgInfo(TestCfgBaseFileMatcher):
self.assertRaises(NotImplementedError,
ci.bind_info_to_entry, Mock(), Mock())
- def test__set_info(self):
- ci = self.get_obj()
- entry = Mock()
- entry.attrib = dict()
-
- info = {"foo": "foo",
- "_bar": "bar",
- "bar:baz=quux": "quux",
- "baz__": "baz",
- "__quux": "quux"}
- ci._set_info(entry, info)
- self.assertItemsEqual(entry.attrib,
- dict([(k, v) for k, v in info.items()
- if not k.startswith("__")]))
-
class TestCfgVerifier(TestCfgBaseFileMatcher):
test_obj = CfgVerifier
@@ -259,29 +244,26 @@ class TestCfgCreator(TestCfgBaseFileMatcher):
class TestCfgDefaultInfo(TestCfgInfo):
test_obj = CfgDefaultInfo
- def get_obj(self, defaults=None):
- if defaults is None:
- defaults = dict()
- return self.test_obj(defaults)
+ def get_obj(self, *_):
+ return self.test_obj()
- @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__")
- def test__init(self, mock__init):
- defaults = Mock()
- cdi = self.get_obj(defaults=defaults)
- mock__init.assert_called_with(cdi, '')
- self.assertEqual(defaults, cdi.defaults)
+ def test__init(self):
+ pass
def test_handle_event(self):
# this CfgInfo handler doesn't handle any events -- it's not
# file-driven, but based on the built-in defaults
pass
- def test_bind_info_to_entry(self):
+ @patch("Bcfg2.Server.Plugin.default_path_metadata")
+ def test_bind_info_to_entry(self, mock_default_path_metadata):
cdi = self.get_obj()
- cdi._set_info = Mock()
- entry = Mock()
+ entry = lxml.etree.Element("Test", name="test")
+ mock_default_path_metadata.return_value = \
+ dict(owner="root", mode="0600")
cdi.bind_info_to_entry(entry, Mock())
- cdi._set_info.assert_called_with(entry, cdi.defaults)
+ self.assertItemsEqual(entry.attrib,
+ dict(owner="root", mode="0600", name="test"))
class TestCfgEntrySet(TestEntrySet):
@@ -438,8 +420,6 @@ class TestCfgEntrySet(TestEntrySet):
@patch("Bcfg2.Server.Plugins.Cfg.u_str")
@patch("Bcfg2.Server.Plugins.Cfg.b64encode")
def test_bind_entry(self, mock_b64encode, mock_u_str):
- Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False)
-
mock_u_str.side_effect = lambda x: x
eset = self.get_obj()
@@ -447,6 +427,7 @@ class TestCfgEntrySet(TestEntrySet):
eset._generate_data = Mock()
eset.get_handlers = Mock()
eset._validate_data = Mock()
+ eset.setup = dict(validate=False)
def reset():
mock_b64encode.reset_mock()
@@ -524,7 +505,7 @@ class TestCfgEntrySet(TestEntrySet):
# test successful validation
entry = reset()
- Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True
+ eset.setup['validate'] = True
bound = eset.bind_entry(entry, metadata)
eset.bind_info_to_entry.assert_called_with(entry, metadata)
eset._generate_data.assert_called_with(entry, metadata)
@@ -601,24 +582,24 @@ class TestCfgEntrySet(TestEntrySet):
if entry.specific is not None:
self.assertFalse(entry.specific.matches.called)
- def test_bind_info_to_entry(self):
- default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgDefaultInfo")
+ def test_bind_info_to_entry(self, mock_DefaultInfo):
eset = self.get_obj()
eset.get_handlers = Mock()
eset.get_handlers.return_value = []
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock()
metadata = Mock()
def reset():
eset.get_handlers.reset_mock()
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock()
+ mock_DefaultInfo.reset_mock()
return lxml.etree.Element("Path", name="/test.txt")
# test with no info handlers
entry = reset()
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
self.assertEqual(entry.get("type"), "file")
# test with one info handler
@@ -627,7 +608,8 @@ class TestCfgEntrySet(TestEntrySet):
eset.get_handlers.return_value = [handler]
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
handler.bind_info_to_entry.assert_called_with(entry, metadata)
self.assertEqual(entry.get("type"), "file")
@@ -637,7 +619,8 @@ class TestCfgEntrySet(TestEntrySet):
eset.get_handlers.return_value = handlers
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
# we don't care which handler gets called as long as exactly
# one of them does
called = 0
@@ -648,8 +631,6 @@ class TestCfgEntrySet(TestEntrySet):
self.assertEqual(called, 1)
self.assertEqual(entry.get("type"), "file")
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info
-
def test_create_data(self):
eset = self.get_obj()
eset.best_matching = Mock()
@@ -754,31 +735,39 @@ class TestCfg(TestGroupSpool, TestPullTarget):
def get_obj(self, core=None):
if core is None:
core = Mock()
- core.setup = MagicMock()
return TestGroupSpool.get_obj(self, core=core)
+ @patch("Bcfg2.Options.get_option_parser")
@patch("Bcfg2.Server.Plugin.GroupSpool.__init__")
@patch("Bcfg2.Server.Plugin.PullTarget.__init__")
- def test__init(self, mock_pulltarget_init, mock_groupspool_init):
+ def test__init(self, mock_pulltarget_init, mock_groupspool_init,
+ mock_get_option_parser):
+ setup = MagicMock()
+ setup.__contains__.return_value = False
+ mock_get_option_parser.return_value = setup
+
+ def reset():
+ core.reset_mock()
+ setup.reset_mock()
+ mock_pulltarget_init.reset_mock()
+ mock_groupspool_init.reset_mock()
+
core = Mock()
- core.setup = MagicMock()
cfg = self.test_obj(core, datastore)
mock_pulltarget_init.assert_called_with(cfg)
mock_groupspool_init.assert_called_with(cfg, core, datastore)
- core.setup.add_option.assert_called_with("validate",
- Bcfg2.Options.CFG_VALIDATION)
- core.setup.reparse.assert_called_with()
-
- core.reset_mock()
- core.setup.reset_mock()
- mock_pulltarget_init.reset_mock()
- mock_groupspool_init.reset_mock()
- core.setup.__contains__.return_value = True
+ setup.add_option.assert_called_with(
+ "validate",
+ Bcfg2.Options.CFG_VALIDATION)
+ mock_get_option_parser.return_value.reparse.assert_called_with()
+
+ reset()
+ setup.__contains__.return_value = True
cfg = self.test_obj(core, datastore)
mock_pulltarget_init.assert_called_with(cfg)
mock_groupspool_init.assert_called_with(cfg, core, datastore)
- self.assertFalse(core.setup.add_option.called)
- self.assertFalse(core.setup.reparse.called)
+ self.assertFalse(setup.add_option.called)
+ self.assertFalse(setup.reparse.called)
def test_has_generator(self):
cfg = self.get_obj()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py
new file mode 100644
index 000000000..537ceb4ff
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py
@@ -0,0 +1,60 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Decisions import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestStructFile, TestPlugin, TestDecision
+
+
+class TestDecisionFile(TestStructFile):
+ test_obj = DecisionFile
+
+ def test_get_decisions(self):
+ df = self.get_obj()
+ metadata = Mock()
+
+ df.xdata = None
+ self.assertItemsEqual(df.get_decisions(metadata), [])
+
+ df.xdata = lxml.etree.Element("Decisions")
+ df.XMLMatch = Mock()
+ df.XMLMatch.return_value = lxml.etree.Element("Decisions")
+ lxml.etree.SubElement(df.XMLMatch.return_value,
+ "Decision", type="Service", name='*')
+ lxml.etree.SubElement(df.XMLMatch.return_value,
+ "Decision", type="Path",
+ name='/etc/apt/apt.conf')
+
+ self.assertItemsEqual(df.get_decisions(metadata),
+ [("Service", '*'),
+ ("Path", '/etc/apt/apt.conf')])
+ df.XMLMatch.assert_called_with(metadata)
+
+
+class TestDecisions(TestPlugin, TestDecision):
+ test_obj = Decisions
+
+ def test_GetDecisions(self):
+ d = self.get_obj()
+ d.whitelist = Mock()
+ d.blacklist = Mock()
+ metadata = Mock()
+
+ self.assertEqual(d.GetDecisions(metadata, "whitelist"),
+ d.whitelist.get_decision.return_value)
+ d.whitelist.get_decision.assert_called_with(metadata)
+
+ self.assertEqual(d.GetDecisions(metadata, "blacklist"),
+ d.blacklist.get_decision.return_value)
+ d.blacklist.get_decision.assert_called_with(metadata)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
index 32c893548..90f592eb2 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
@@ -215,9 +215,9 @@ class TestXMLMetadataConfig(TestXMLFileBacked):
return XMLMetadataConfig(self.metadata, watch_clients, basefile)
return inner()
+ @patch("Bcfg2.Server.FileMonitor.get_fam", Mock())
def test__init(self):
xmc = self.get_obj()
- self.assertEqual(self.metadata.core.fam, xmc.fam)
self.assertFalse(xmc.fam.AddMonitor.called)
def test_xdata(self):
@@ -262,20 +262,21 @@ class TestXMLMetadataConfig(TestXMLFileBacked):
self.assertEqual(config.base_xdata, "<test/>")
def test_add_monitor(self):
- core = MagicMock()
- config = self.get_obj(core=core)
+ config = self.get_obj()
+ config.fam = Mock()
fname = "test.xml"
fpath = os.path.join(self.metadata.data, fname)
config.extras = []
config.add_monitor(fpath)
- self.assertFalse(core.fam.AddMonitor.called)
+ self.assertFalse(config.fam.AddMonitor.called)
self.assertEqual(config.extras, [fpath])
- config = self.get_obj(core=core, watch_clients=True)
+ config = self.get_obj(watch_clients=True)
+ config.fam = Mock()
config.add_monitor(fpath)
- core.fam.AddMonitor.assert_called_with(fpath, config.metadata)
+ config.fam.AddMonitor.assert_called_with(fpath, config.metadata)
self.assertItemsEqual(config.extras, [fpath])
def test_Index(self):
@@ -499,7 +500,8 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
client_name = "%s%s" % (prefix, i)
return client_name
- def test__init(self):
+ @patch("Bcfg2.Server.FileMonitor.get_fam")
+ def test__init(self, mock_get_fam):
# test with watch_clients=False
core = MagicMock()
metadata = self.get_obj(core=core)
@@ -512,18 +514,19 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
self.assertEqual(metadata.states, dict())
# test with watch_clients=True
- core.fam = MagicMock()
metadata = self.get_obj(core=core, watch_clients=True)
self.assertEqual(len(metadata.states), 2)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "clients.xml"),
- metadata)
-
- core.fam.reset_mock()
- core.fam.AddMonitor = Mock(side_effect=IOError)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "groups.xml"),
+ metadata)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "clients.xml"),
+ metadata)
+
+ mock_get_fam.reset_mock()
+ fam = Mock()
+ fam.AddMonitor = Mock(side_effect=IOError)
+ mock_get_fam.return_value = fam
self.assertRaises(Bcfg2.Server.Plugin.PluginInitError,
self.get_obj, core=core, watch_clients=True)
@@ -586,6 +589,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_add_group(self):
metadata = self.get_obj()
metadata.groups_xml.write = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -618,6 +622,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_update_group(self):
metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -635,6 +640,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_remove_group(self):
metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -650,6 +656,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_add_bundle(self):
metadata = self.get_obj()
metadata.groups_xml.write = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -673,6 +680,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_remove_bundle(self):
metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -688,6 +696,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_add_client(self):
metadata = self.get_obj()
metadata.clients_xml.write = Mock()
+ metadata.clients_xml.load_xml = Mock()
metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree()
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
@@ -722,6 +731,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_update_client(self):
metadata = self.get_obj()
metadata.clients_xml.write_xml = Mock()
+ metadata.clients_xml.load_xml = Mock()
metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree())
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
@@ -1272,25 +1282,24 @@ class TestMetadataBase(TestMetadata):
return client_name
@patch('os.path.exists')
- def test__init(self, mock_exists):
- core = MagicMock()
- core.fam = Mock()
+ @patch('Bcfg2.Server.FileMonitor.get_fam')
+ def test__init(self, mock_get_fam, mock_exists):
mock_exists.return_value = False
- metadata = self.get_obj(core=core, watch_clients=True)
+ metadata = self.get_obj(watch_clients=True)
self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked)
- core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
+ mock_get_fam.return_value.AddMonitor.assert_called_with(
+ os.path.join(metadata.data, "groups.xml"),
+ metadata)
mock_exists.return_value = True
- core.fam.reset_mock()
- metadata = self.get_obj(core=core, watch_clients=True)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "clients.xml"),
- metadata)
+ mock_get_fam.reset_mock()
+ metadata = self.get_obj(watch_clients=True)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "groups.xml"),
+ metadata)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "clients.xml"),
+ metadata)
def test_add_group(self):
pass
@@ -1533,7 +1542,8 @@ class TestMetadata_ClientsXML(TestMetadataBase):
def load_clients_data(self, metadata=None, xdata=None):
if metadata is None:
metadata = self.get_obj()
- metadata.core.fam = Mock()
+ fam = Bcfg2.Server.FileMonitor._FAM
+ Bcfg2.Server.FileMonitor._FAM = MagicMock()
@patchIf(not isinstance(lxml.etree.Element, Mock),
"lxml.etree.Element", Mock())
def inner():
@@ -1541,5 +1551,7 @@ class TestMetadata_ClientsXML(TestMetadataBase):
inner()
metadata = TestMetadata.load_clients_data(self, metadata=metadata,
xdata=xdata)
- return TestMetadataBase.load_clients_data(self, metadata=metadata,
- xdata=xdata)
+ rv = TestMetadataBase.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+ Bcfg2.Server.FileMonitor._FAM = fam
+ return rv
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
index a87628eaf..f44bc338c 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
@@ -134,22 +134,20 @@ class TestProbeSet(TestEntrySet):
ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"]
bogus_names = ["test.py"]
- def get_obj(self, path=datastore, fam=None, encoding=None,
+ def get_obj(self, path=datastore, encoding=None,
plugin_name="Probes", basename=None):
# get_obj() accepts the basename argument, accepted by the
# parent get_obj() method, and just throws it away, since
# ProbeSet uses a regex for the "basename"
- if fam is None:
- fam = Mock()
- rv = self.test_obj(path, fam, encoding, plugin_name)
+ rv = self.test_obj(path, encoding, plugin_name)
rv.entry_type = MagicMock()
return rv
- def test__init(self):
- fam = Mock()
- ps = self.get_obj(fam=fam)
+ @patch("Bcfg2.Server.FileMonitor.get_fam")
+ def test__init(self, mock_get_fam):
+ ps = self.get_obj()
self.assertEqual(ps.plugin_name, "Probes")
- fam.AddMonitor.assert_called_with(datastore, ps)
+ mock_get_fam.return_value.AddMonitor.assert_called_with(datastore, ps)
TestEntrySet.test__init(self)
def test_HandleEvent(self):
@@ -300,9 +298,6 @@ text
def test__init(self):
mock_load_data = Mock()
probes = self.get_probes_object(load_data=mock_load_data)
- probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore,
- probes.name),
- probes.probes)
mock_load_data.assert_any_call()
self.assertEqual(probes.probedata, ClientProbeDataSet())
self.assertEqual(probes.cgroups, dict())
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
index 896f5861e..92dc85fb1 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
@@ -19,12 +19,6 @@ from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \
TestPlugin, TestDirectoryBacked
try:
- from Bcfg2.Encryption import EVPError
- HAS_CRYPTO = True
-except:
- HAS_CRYPTO = False
-
-try:
import json
JSON = "json"
except ImportError:
@@ -41,10 +35,10 @@ class TestPropertyFile(Bcfg2TestCase):
return self.test_obj(path)
def test_write(self):
- Bcfg2.Server.Plugins.Properties.SETUP = MagicMock()
pf = self.get_obj()
pf.validate_data = Mock()
pf._write = Mock()
+ pf.setup = Mock()
xstr = u("<Properties/>\n")
pf.xdata = lxml.etree.XML(xstr)
@@ -52,20 +46,20 @@ class TestPropertyFile(Bcfg2TestCase):
def reset():
pf.validate_data.reset_mock()
pf._write.reset_mock()
- Bcfg2.Server.Plugins.Properties.SETUP.reset_mock()
+ pf.setup.reset_mock()
# test writes disabled
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False
+ pf.setup.cfp.getboolean.return_value = False
self.assertRaises(PluginExecutionError, pf.write)
self.assertFalse(pf.validate_data.called)
self.assertFalse(pf._write.called)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties",
+ pf.setup.cfp.getboolean.assert_called_with("properties",
"writes_enabled",
default=True)
# test successful write
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True
+ pf.setup.cfp.getboolean.return_value = True
self.assertEqual(pf.write(), pf._write.return_value)
pf.validate_data.assert_called_with()
pf._write.assert_called_with()
@@ -243,178 +237,61 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile):
mock_exists.assert_called_with(schemafile)
mock_XMLSchema.assert_called_with(file=schemafile)
- def test_Index(self):
- TestStructFile.test_Index(self)
-
- pf = self.get_obj()
- pf.xdata = lxml.etree.Element("Properties")
- lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo")
- pf.data = lxml.etree.tostring(pf.xdata)
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_Index_crypto(self):
- pf = self.get_obj()
- pf._decrypt = Mock()
- pf._decrypt.return_value = 'plaintext'
- pf.data = '''
-<Properties decrypt="strict">
- <Crypted encrypted="foo">
- crypted
- <Plain foo="bar">plain</Plain>
- </Crypted>
- <Crypted encrypted="bar">crypted</Crypted>
- <Plain bar="baz">plain</Plain>
- <Plain>
- <Crypted encrypted="foo">crypted</Crypted>
- </Plain>
-</Properties>'''
-
- # test successful decryption
- pf.Index()
- self.assertItemsEqual(pf._decrypt.call_args_list,
- [call(el) for el in pf.xdata.xpath("//Crypted")])
- for el in pf.xdata.xpath("//Crypted"):
- self.assertEqual(el.text, pf._decrypt.return_value)
-
- # test failed decryption, strict
- pf._decrypt.reset_mock()
- pf._decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError, pf.Index)
-
- # test failed decryption, lax
- pf.data = pf.data.replace("strict", "lax")
- pf._decrypt.reset_mock()
- pf.Index()
- self.assertItemsEqual(pf._decrypt.call_args_list,
- [call(el) for el in pf.xdata.xpath("//Crypted")])
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
- def test_decrypt(self, mock_bruteforce, mock_get_passphrases,
- mock_get_algorithm, mock_ssl):
- pf = self.get_obj()
- Bcfg2.Server.Plugins.Properties.SETUP = MagicMock()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pf._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass",
- bar="barpass")
- mock_get_algorithm.return_value = "bf_cbc"
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pf._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
@patch("copy.copy")
def test_get_additional_data(self, mock_copy):
- Bcfg2.Server.Plugins.Properties.SETUP = Mock()
pf = self.get_obj()
+ pf.setup = Mock()
pf.XMLMatch = Mock()
metadata = Mock()
def reset():
mock_copy.reset_mock()
pf.XMLMatch.reset_mock()
- Bcfg2.Server.Plugins.Properties.SETUP.reset_mock()
+ pf.setup.reset_mock()
pf.xdata = lxml.etree.Element("Properties", automatch="true")
for automatch in [True, False]:
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch
+ pf.setup.cfp.getboolean.return_value = automatch
self.assertEqual(pf.get_additional_data(metadata),
pf.XMLMatch.return_value)
pf.XMLMatch.assert_called_with(metadata)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
+ pf.setup.cfp.getboolean.assert_called_with("properties",
+ "automatch",
+ default=False)
self.assertFalse(mock_copy.called)
pf.xdata = lxml.etree.Element("Properties", automatch="false")
for automatch in [True, False]:
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch
+ pf.setup.cfp.getboolean.return_value = automatch
self.assertEqual(pf.get_additional_data(metadata),
mock_copy.return_value)
mock_copy.assert_called_with(pf)
self.assertFalse(pf.XMLMatch.called)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
+ pf.setup.cfp.getboolean.assert_called_with("properties",
+ "automatch",
+ default=False)
pf.xdata = lxml.etree.Element("Properties")
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False
+ pf.setup.cfp.getboolean.return_value = False
self.assertEqual(pf.get_additional_data(metadata),
mock_copy.return_value)
mock_copy.assert_called_with(pf)
self.assertFalse(pf.XMLMatch.called)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
+ pf.setup.cfp.getboolean.assert_called_with("properties",
+ "automatch",
+ default=False)
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True
+ pf.setup.cfp.getboolean.return_value = True
self.assertEqual(pf.get_additional_data(metadata),
pf.XMLMatch.return_value)
pf.XMLMatch.assert_called_with(metadata)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
+ pf.setup.cfp.getboolean.assert_called_with("properties",
+ "automatch",
+ default=False)
self.assertFalse(mock_copy.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py
index f018b45dc..7083fff06 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py
@@ -45,81 +45,37 @@ class TestRules(TestPrioDir):
entry = lxml.etree.Element("Package", name="foo")
self.assertFalse(r.HandlesEntry(entry, metadata))
- def test_BindEntry(self, method="BindEntry"):
- r = self.get_obj()
- r.get_attrs = Mock()
- r.get_attrs.return_value = dict(overwrite="new", add="add",
- text="text")
- entry = lxml.etree.Element("Test", overwrite="old", keep="keep")
- metadata = Mock()
-
- getattr(r, method)(entry, metadata)
- r.get_attrs.assert_called_with(entry, metadata)
- self.assertItemsEqual(entry.attrib,
- dict(overwrite="old", add="add", keep="keep",
- text="text"))
-
- def test_HandleEntry(self):
- self.test_BindEntry(method="HandleEntry")
-
@patch("Bcfg2.Server.Plugin.PrioDir._matches")
def test__matches(self, mock_matches):
- """ test _matches() behavior regardless of state of _regex_enabled """
r = self.get_obj()
metadata = Mock()
+ # test parent _matches() returning True
entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- rules = []
+ candidate = lxml.etree.Element("Path", name="/etc/bar.conf")
mock_matches.return_value = True
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
+ self.assertTrue(r._matches(entry, metadata, candidate))
+ mock_matches.assert_called_with(r, entry, metadata, candidate)
- # test special Path cases -- adding and removing trailing slash
+ # test all conditions returning False
mock_matches.reset_mock()
mock_matches.return_value = False
- rules = ["/etc/foo/", "/etc/bar"]
- entry = lxml.etree.Element("Path", name="/etc/foo")
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
+ self.assertFalse(r._matches(entry, metadata, candidate))
+ mock_matches.assert_called_with(r, entry, metadata, candidate)
+ # test special Path cases -- adding and removing trailing slash
mock_matches.reset_mock()
- entry = lxml.etree.Element("Path", name="/etc/bar/")
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- @patch("Bcfg2.Server.Plugin.PrioDir._matches")
- def test__matches_regex_disabled(self, mock_matches):
- """ test failure to match with regex disabled """
- r = self.get_obj()
- self.set_regex_enabled(r, False)
- metadata = Mock()
- mock_matches.return_value = False
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- rules = []
- self.assertFalse(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- @patch("Bcfg2.Server.Plugin.PrioDir._matches")
- def test__matches_regex_enabled(self, mock_matches):
- """ test match with regex enabled """
- r = self.get_obj()
- self.set_regex_enabled(r, True)
- metadata = Mock()
- mock_matches.return_value = False
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- rules = ["/etc/.*\.conf", "/etc/bar"]
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
- self.assertIn("/etc/.*\.conf", r._regex_cache.keys())
-
- def set_regex_enabled(self, rules_obj, state):
- """ set the state of regex_enabled for this implementation of
- Rules """
- if not isinstance(rules_obj.core.setup, MagicMock):
- rules_obj.core.setup = MagicMock()
- rules_obj.core.setup.cfp.getboolean.return_value = state
+ withslash = lxml.etree.Element("Path", name="/etc/foo")
+ withoutslash = lxml.etree.Element("Path", name="/etc/foo/")
+ self.assertTrue(r._matches(withslash, metadata, withoutslash))
+ self.assertTrue(r._matches(withoutslash, metadata, withslash))
+
+ if r._regex_enabled:
+ mock_matches.reset_mock()
+ candidate = lxml.etree.Element("Path", name="/etc/.*\.conf")
+ self.assertTrue(r._matches(entry, metadata, candidate))
+ mock_matches.assert_called_with(r, entry, metadata, candidate)
+ self.assertIn("/etc/.*\.conf", r._regex_cache.keys())
def test__regex_enabled(self):
r = self.get_obj()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
index bf9a3ced3..128d6cae5 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
@@ -25,7 +25,7 @@ class TestHelperModule(Bcfg2TestCase):
def get_obj(self, path=None):
if path is None:
path = self.path
- return self.test_obj(path, fam=Mock())
+ return self.test_obj(path)
def test__init(self):
hm = self.get_obj()