summaryrefslogtreecommitdiffstats
path: root/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg')
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py25
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py49
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py23
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py83
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py11
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py162
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py116
7 files changed, 224 insertions, 245 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
index e5cef8fa2..f41ae8a46 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
@@ -27,12 +27,12 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
TestCfgGenerator.setUp(self)
TestStructFile.setUp(self)
- def get_obj(self, name=None, core=None, fam=None):
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.get_cfg")
+ def get_obj(self, mock_get_cfg, name=None, core=None, fam=None):
if name is None:
name = self.path
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG = Mock()
if core is not None:
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG.core = core
+ mock_get_cfg.return_value.core = core
return self.test_obj(name)
@patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
@@ -111,17 +111,18 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
reset()
host = "baz.example.com"
spec = lxml.etree.Element("AuthorizedKeys")
- lxml.etree.SubElement(
- lxml.etree.SubElement(spec,
- "Allow",
- attrib={"from": pubkey, "host": host}),
- "Params", foo="foo", bar="bar=bar")
+ allow = lxml.etree.SubElement(spec, "Allow",
+ attrib={"from": pubkey, "host": host})
+ lxml.etree.SubElement(allow, "Option", name="foo", value="foo")
+ lxml.etree.SubElement(allow, "Option", name="bar")
+ lxml.etree.SubElement(allow, "Option", name="baz", value="baz=baz")
akg.XMLMatch.return_value = spec
params, actual_host, actual_pubkey = akg.get_data(entry,
metadata).split()
self.assertEqual(actual_host, host)
self.assertEqual(actual_pubkey, pubkey)
- self.assertItemsEqual(params.split(","), ["foo=foo", "bar=bar=bar"])
+ self.assertItemsEqual(params.split(","), ["foo=foo", "bar",
+ "baz=baz=baz"])
akg.XMLMatch.assert_called_with(metadata)
akg.core.build_metadata.assert_called_with(host)
self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey)
@@ -131,10 +132,10 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
spec = lxml.etree.Element("AuthorizedKeys")
text = lxml.etree.SubElement(spec, "Allow")
text.text = "ssh-rsa publickey /foo/bar\n"
- lxml.etree.SubElement(text, "Params", foo="foo")
+ lxml.etree.SubElement(text, "Option", name="foo")
akg.XMLMatch.return_value = spec
self.assertEqual(akg.get_data(entry, metadata),
- "foo=foo %s" % text.text.strip())
+ "foo %s" % text.text.strip())
akg.XMLMatch.assert_called_with(metadata)
self.assertFalse(akg.core.build_metadata.called)
self.assertFalse(akg.core.Bind.called)
@@ -143,7 +144,7 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey})
akg.XMLMatch.return_value = spec
self.assertItemsEqual(akg.get_data(entry, metadata).splitlines(),
- ["foo=foo %s" % text.text.strip(),
+ ["foo %s" % text.text.strip(),
"profile %s" % pubkey])
akg.XMLMatch.assert_called_with(metadata)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
index e1ffa7272..2285fb458 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
@@ -17,31 +17,30 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if HAS_CHEETAH or can_skip:
- class TestCfgCheetahGenerator(TestCfgGenerator):
- test_obj = CfgCheetahGenerator
+class TestCfgCheetahGenerator(TestCfgGenerator):
+ test_obj = CfgCheetahGenerator
- @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
- def setUp(self):
- TestCfgGenerator.setUp(self)
- set_setup_default("repository", datastore)
+ @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
+ set_setup_default("repository", datastore)
- @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template")
- def test_get_data(self, mock_Template):
- ccg = self.get_obj()
- ccg.data = "data"
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template")
+ def test_get_data(self, mock_Template):
+ ccg = self.get_obj()
+ ccg.data = "data"
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
- self.assertEqual(ccg.get_data(entry, metadata),
- mock_Template.return_value.respond.return_value)
- mock_Template.assert_called_with(
- "data".decode(Bcfg2.Options.setup.encoding),
- compilerSettings=ccg.settings)
- tmpl = mock_Template.return_value
- tmpl.respond.assert_called_with()
- self.assertEqual(tmpl.metadata, metadata)
- self.assertEqual(tmpl.name, entry.get("name"))
- self.assertEqual(tmpl.path, entry.get("name"))
- self.assertEqual(tmpl.source_path, ccg.name)
- self.assertEqual(tmpl.repo, datastore)
+ self.assertEqual(ccg.get_data(entry, metadata),
+ mock_Template.return_value.respond.return_value)
+ mock_Template.assert_called_with(
+ "data".decode(Bcfg2.Options.setup.encoding),
+ compilerSettings=ccg.settings)
+ tmpl = mock_Template.return_value
+ tmpl.respond.assert_called_with()
+ self.assertEqual(tmpl.metadata, metadata)
+ self.assertEqual(tmpl.name, entry.get("name"))
+ self.assertEqual(tmpl.path, entry.get("name"))
+ self.assertEqual(tmpl.source_path, ccg.name)
+ self.assertEqual(tmpl.repo, datastore)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
index 46062569d..4c987551b 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
@@ -30,18 +30,17 @@ except ImportError:
HAS_CRYPTO = False
-if can_skip or (HAS_CRYPTO and HAS_CHEETAH):
- class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator,
- TestCfgEncryptedGenerator):
- test_obj = CfgEncryptedCheetahGenerator
+class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator,
+ TestCfgEncryptedGenerator):
+ test_obj = CfgEncryptedCheetahGenerator
- @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
- def setUp(self):
- pass
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
+ def setUp(self):
+ pass
- def test_handle_event(self):
- TestCfgEncryptedGenerator.test_handle_event(self)
+ def test_handle_event(self):
+ TestCfgEncryptedGenerator.test_handle_event(self)
- def test_get_data(self):
- TestCfgCheetahGenerator.test_get_data(self)
+ def test_get_data(self):
+ TestCfgCheetahGenerator.test_get_data(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
index 5409cf863..873ebd837 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
@@ -18,54 +18,53 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if can_skip or HAS_CRYPTO:
- class TestCfgEncryptedGenerator(TestCfgGenerator):
- test_obj = CfgEncryptedGenerator
+class TestCfgEncryptedGenerator(TestCfgGenerator):
+ test_obj = CfgEncryptedGenerator
- @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- def setUp(self):
- TestCfgGenerator.setUp(self)
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
- @patchIf(HAS_CRYPTO,
- "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt")
- def test_handle_event(self, mock_decrypt):
- @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
- def inner(mock_handle_event):
- def reset():
- mock_decrypt.reset_mock()
- mock_handle_event.reset_mock()
+ @patchIf(HAS_CRYPTO,
+ "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt")
+ def test_handle_event(self, mock_decrypt):
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
+ def inner(mock_handle_event):
+ def reset():
+ mock_decrypt.reset_mock()
+ mock_handle_event.reset_mock()
- def get_event_data(obj, event):
- obj.data = "encrypted"
+ def get_event_data(obj, event):
+ obj.data = "encrypted"
- mock_handle_event.side_effect = get_event_data
- mock_decrypt.side_effect = lambda d, **kw: "plaintext"
- event = Mock()
- ceg = self.get_obj()
- ceg.handle_event(event)
- mock_handle_event.assert_called_with(ceg, event)
- mock_decrypt.assert_called_with("encrypted")
- self.assertEqual(ceg.data, "plaintext")
+ mock_handle_event.side_effect = get_event_data
+ mock_decrypt.side_effect = lambda d, **kw: "plaintext"
+ event = Mock()
+ ceg = self.get_obj()
+ ceg.handle_event(event)
+ mock_handle_event.assert_called_with(ceg, event)
+ mock_decrypt.assert_called_with("encrypted")
+ self.assertEqual(ceg.data, "plaintext")
- reset()
- mock_decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError,
- ceg.handle_event, event)
- inner()
+ reset()
+ mock_decrypt.side_effect = EVPError
+ self.assertRaises(PluginExecutionError,
+ ceg.handle_event, event)
+ inner()
- # to perform the tests from the parent test object, we
- # make bruteforce_decrypt just return whatever data was
- # given to it
- mock_decrypt.side_effect = lambda d, **kw: d
- TestCfgGenerator.test_handle_event(self)
+ # to perform the tests from the parent test object, we
+ # make bruteforce_decrypt just return whatever data was
+ # given to it
+ mock_decrypt.side_effect = lambda d, **kw: d
+ TestCfgGenerator.test_handle_event(self)
- def test_get_data(self):
- ceg = self.get_obj()
- ceg.data = None
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
+ def test_get_data(self):
+ ceg = self.get_obj()
+ ceg.data = None
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
- self.assertRaises(PluginExecutionError,
- ceg.get_data, entry, metadata)
+ self.assertRaises(PluginExecutionError,
+ ceg.get_data, entry, metadata)
- TestCfgGenerator.test_get_data(self)
+ TestCfgGenerator.test_get_data(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
index 25d2fb83b..0b74e4a60 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
@@ -18,10 +18,9 @@ from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
TestCfgGenshiGenerator
-if can_skip or HAS_CRYPTO:
- class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
- test_obj = CfgEncryptedGenshiGenerator
+class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
+ test_obj = CfgEncryptedGenshiGenerator
- @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- def setUp(self):
- TestCfgGenshiGenerator.setUp(self)
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ def setUp(self):
+ TestCfgGenshiGenerator.setUp(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
index c4961db1c..2967a23b6 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
@@ -22,25 +22,15 @@ while path != "/":
break
path = os.path.dirname(path)
from common import *
-from TestServer.TestPlugins.TestCfg.Test_init import TestCfgCreator
-from TestServer.TestPlugin.Testhelpers import TestStructFile
+from TestServer.TestPlugins.TestCfg.Test_init import TestXMLCfgCreator
-class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
+class TestCfgPrivateKeyCreator(TestXMLCfgCreator):
test_obj = CfgPrivateKeyCreator
should_monitor = False
def get_obj(self, name=None, fam=None):
- return TestCfgCreator.get_obj(self, name=name)
-
- @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
- @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent")
- def test_handle_event(self, mock_HandleEvent, mock_handle_event):
- pkc = self.get_obj()
- evt = Mock()
- pkc.handle_event(evt)
- mock_HandleEvent.assert_called_with(pkc, evt)
- mock_handle_event.assert_called_with(pkc, evt)
+ return TestXMLCfgCreator.get_obj(self, name=name)
@patch("shutil.rmtree")
@patch("tempfile.mkdtemp")
@@ -90,57 +80,6 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata)
mock_rmtree.assert_called_with(datastore)
- def test_get_specificity(self):
- pkc = self.get_obj()
- pkc.XMLMatch = Mock()
-
- metadata = Mock()
-
- def reset():
- pkc.XMLMatch.reset_mock()
- metadata.group_in_category.reset_mock()
-
- Bcfg2.Options.setup.sshkeys_category = None
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(host=metadata.hostname))
-
- Bcfg2.Options.setup.sshkeys_category = "foo"
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(group=metadata.group_in_category.return_value,
- prio=50))
- metadata.group_in_category.assert_called_with("foo")
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
- perhost="true")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(host=metadata.hostname))
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
- category="bar")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(group=metadata.group_in_category.return_value,
- prio=50))
- metadata.group_in_category.assert_called_with("bar")
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
- prio="10")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(group=metadata.group_in_category.return_value,
- prio=10))
- metadata.group_in_category.assert_called_with("foo")
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
- metadata.group_in_category.return_value = ''
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(host=metadata.hostname))
- metadata.group_in_category.assert_called_with("foo")
-
@patch("shutil.rmtree")
@patch("%s.open" % builtins)
def test_create_data(self, mock_open, mock_rmtree):
@@ -153,7 +92,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
# the get_specificity() return value is being used
# appropriately, we put some dummy data in it and test for
# that data
- pkc.get_specificity.side_effect = lambda m, s: dict(group="foo")
+ pkc.get_specificity.side_effect = lambda m: dict(group="foo")
pkc._gen_keypair = Mock()
privkey = os.path.join(datastore, "privkey")
pkc._gen_keypair.return_value = privkey
@@ -179,67 +118,32 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
mock_open.return_value.read.side_effect = open_read_rv
reset()
- passphrase = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.passphrase"
-
- @patch(passphrase, None)
- def inner():
- self.assertEqual(pkc.create_data(entry, metadata), "privatekey")
- pkc.XMLMatch.assert_called_with(metadata)
- pkc.get_specificity.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- pkc._gen_keypair.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- self.assertItemsEqual(mock_open.call_args_list,
- [call(privkey + ".pub"), call(privkey)])
- pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
- pkc.pubkey_creator.write_data.assert_called_with(
- "ssh-rsa publickey pubkey.filename\n", group="foo")
- pkc.write_data.assert_called_with("privatekey", group="foo")
- mock_rmtree.assert_called_with(datastore)
-
- reset()
- self.assertEqual(pkc.create_data(entry, metadata, return_pair=True),
- ("ssh-rsa publickey pubkey.filename\n",
- "privatekey"))
- pkc.XMLMatch.assert_called_with(metadata)
- pkc.get_specificity.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- pkc._gen_keypair.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- self.assertItemsEqual(mock_open.call_args_list,
- [call(privkey + ".pub"), call(privkey)])
- pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
- pkc.pubkey_creator.write_data.assert_called_with(
- "ssh-rsa publickey pubkey.filename\n",
- group="foo")
- pkc.write_data.assert_called_with("privatekey", group="foo")
- mock_rmtree.assert_called_with(datastore)
-
- inner()
-
- if HAS_CRYPTO:
- @patch(passphrase, "foo")
- @patch("Bcfg2.Server.Encryption.ssl_encrypt")
- def inner2(mock_ssl_encrypt):
- reset()
- mock_ssl_encrypt.return_value = "encryptedprivatekey"
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True
- self.assertEqual(pkc.create_data(entry, metadata),
- "encryptedprivatekey")
- pkc.XMLMatch.assert_called_with(metadata)
- pkc.get_specificity.assert_called_with(
- metadata,
- pkc.XMLMatch.return_value)
- pkc._gen_keypair.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- self.assertItemsEqual(mock_open.call_args_list,
- [call(privkey + ".pub"), call(privkey)])
- pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
- pkc.pubkey_creator.write_data.assert_called_with(
- "ssh-rsa publickey pubkey.filename\n", group="foo")
- pkc.write_data.assert_called_with("encryptedprivatekey",
- group="foo", ext=".crypt")
- mock_ssl_encrypt.assert_called_with("privatekey", "foo")
- mock_rmtree.assert_called_with(datastore)
-
- inner2()
+ self.assertEqual(pkc.create_data(entry, metadata), "privatekey")
+ pkc.XMLMatch.assert_called_with(metadata)
+ pkc.get_specificity.assert_called_with(metadata)
+ pkc._gen_keypair.assert_called_with(metadata,
+ pkc.XMLMatch.return_value)
+ self.assertItemsEqual(mock_open.call_args_list,
+ [call(privkey + ".pub"), call(privkey)])
+ pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
+ pkc.pubkey_creator.write_data.assert_called_with(
+ "ssh-rsa publickey pubkey.filename\n", group="foo")
+ pkc.write_data.assert_called_with("privatekey", group="foo")
+ mock_rmtree.assert_called_with(datastore)
+
+ reset()
+ self.assertEqual(pkc.create_data(entry, metadata, return_pair=True),
+ ("ssh-rsa publickey pubkey.filename\n",
+ "privatekey"))
+ pkc.XMLMatch.assert_called_with(metadata)
+ pkc.get_specificity.assert_called_with(metadata)
+ pkc._gen_keypair.assert_called_with(metadata,
+ pkc.XMLMatch.return_value)
+ self.assertItemsEqual(mock_open.call_args_list,
+ [call(privkey + ".pub"), call(privkey)])
+ pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
+ pkc.pubkey_creator.write_data.assert_called_with(
+ "ssh-rsa publickey pubkey.filename\n",
+ group="foo")
+ pkc.write_data.assert_called_with("privatekey", group="foo")
+ mock_rmtree.assert_called_with(datastore)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
index 72be50299..307461918 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
@@ -3,7 +3,7 @@ import sys
import errno
import lxml.etree
import Bcfg2.Options
-from Bcfg2.Compat import walk_packages
+from Bcfg2.Compat import walk_packages, ConfigParser
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugins.Cfg import *
from Bcfg2.Server.Plugin import PluginExecutionError, Specificity
@@ -19,7 +19,7 @@ while path != "/":
path = os.path.dirname(path)
from common import *
from TestPlugin import TestSpecificData, TestEntrySet, TestGroupSpool, \
- TestPullTarget
+ TestPullTarget, TestStructFile
class TestCfgBaseFileMatcher(TestSpecificData):
@@ -172,10 +172,12 @@ class TestCfgVerifier(TestCfgBaseFileMatcher):
class TestCfgCreator(TestCfgBaseFileMatcher):
test_obj = CfgCreator
path = "/foo/bar/test.txt"
+ should_monitor = False
def setUp(self):
TestCfgBaseFileMatcher.setUp(self)
set_setup_default("filemonitor", MagicMock())
+ set_setup_default("cfg_passphrase", None)
def get_obj(self, name=None):
if name is None:
@@ -245,6 +247,75 @@ class TestCfgCreator(TestCfgBaseFileMatcher):
self.assertRaises(CfgCreationError, cc.write_data, data)
+class TestXMLCfgCreator(TestCfgCreator, TestStructFile):
+ test_obj = XMLCfgCreator
+
+ def setUp(self):
+ TestCfgCreator.setUp(self)
+ TestStructFile.setUp(self)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
+ @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent")
+ def test_handle_event(self, mock_HandleEvent, mock_handle_event):
+ cc = self.get_obj()
+ evt = Mock()
+ cc.handle_event(evt)
+ mock_HandleEvent.assert_called_with(cc, evt)
+ mock_handle_event.assert_called_with(cc, evt)
+
+ def test_get_specificity(self):
+ cc = self.get_obj()
+ metadata = Mock()
+
+ def reset():
+ metadata.group_in_category.reset_mock()
+
+ category = "%s.%s.category" % (self.test_obj.__module__,
+ self.test_obj.__name__)
+ @patch(category, None)
+ def inner():
+ cc.xdata = lxml.etree.Element("PrivateKey")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+ inner()
+
+ @patch(category, "foo")
+ def inner2():
+ cc.xdata = lxml.etree.Element("PrivateKey")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=50))
+ metadata.group_in_category.assert_called_with("foo")
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey", perhost="true")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey", category="bar")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=50))
+ metadata.group_in_category.assert_called_with("bar")
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey", prio="10")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=10))
+ metadata.group_in_category.assert_called_with("foo")
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey")
+ metadata.group_in_category.return_value = ''
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+ metadata.group_in_category.assert_called_with("foo")
+
+ inner2()
+
+
class TestCfgDefaultInfo(TestCfgInfo):
test_obj = CfgDefaultInfo
@@ -276,6 +347,7 @@ class TestCfgEntrySet(TestEntrySet):
def setUp(self):
TestEntrySet.setUp(self)
set_setup_default("cfg_validation", False)
+ set_setup_default("cfg_handlers", [])
def test__init(self):
pass
@@ -283,14 +355,14 @@ class TestCfgEntrySet(TestEntrySet):
def test_handle_event(self):
eset = self.get_obj()
eset.entry_init = Mock()
- eset._handlers = [Mock(), Mock(), Mock()]
- for hdlr in eset._handlers:
+ Bcfg2.Options.setup.cfg_handlers = [Mock(), Mock(), Mock()]
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.__name__ = "handler"
eset.entries = dict()
def reset():
eset.entry_init.reset_mock()
- for hdlr in eset._handlers:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.reset_mock()
# test that a bogus deleted event is discarded
@@ -300,18 +372,19 @@ class TestCfgEntrySet(TestEntrySet):
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
self.assertItemsEqual(eset.entries, dict())
- for hdlr in eset._handlers:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
# test creation of a new file
for action in ["exists", "created", "changed"]:
+ print("Testing handling of %s events" % action)
evt = Mock()
evt.code2str.return_value = action
evt.filename = os.path.join(datastore, "test.txt")
# test with no handler that handles
- for hdlr in eset._handlers:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.handles.return_value = False
hdlr.ignore.return_value = False
@@ -319,16 +392,16 @@ class TestCfgEntrySet(TestEntrySet):
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
self.assertItemsEqual(eset.entries, dict())
- for hdlr in eset._handlers:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.handles.assert_called_with(evt, basename=eset.path)
hdlr.ignore.assert_called_with(evt, basename=eset.path)
# test with a handler that handles the entry
reset()
- eset._handlers[-1].handles.return_value = True
+ Bcfg2.Options.setup.cfg_handlers[-1].handles.return_value = True
eset.handle_event(evt)
- eset.entry_init.assert_called_with(evt, eset._handlers[-1])
- for hdlr in eset._handlers:
+ eset.entry_init.assert_called_with(evt, Bcfg2.Options.setup.cfg_handlers[-1])
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.handles.assert_called_with(evt, basename=eset.path)
if not hdlr.return_value:
hdlr.ignore.assert_called_with(evt, basename=eset.path)
@@ -336,14 +409,14 @@ class TestCfgEntrySet(TestEntrySet):
# test with a handler that ignores the entry before one
# that handles it
reset()
- eset._handlers[0].ignore.return_value = True
+ Bcfg2.Options.setup.cfg_handlers[0].ignore.return_value = True
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
- eset._handlers[0].handles.assert_called_with(evt,
- basename=eset.path)
- eset._handlers[0].ignore.assert_called_with(evt,
- basename=eset.path)
- for hdlr in eset._handlers[1:]:
+ Bcfg2.Options.setup.cfg_handlers[0].handles.assert_called_with(
+ evt, basename=eset.path)
+ Bcfg2.Options.setup.cfg_handlers[0].ignore.assert_called_with(
+ evt, basename=eset.path)
+ for hdlr in Bcfg2.Options.setup.cfg_handlers[1:]:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
@@ -355,7 +428,7 @@ class TestCfgEntrySet(TestEntrySet):
eset.entries[evt.filename] = Mock()
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
- for hdlr in eset._handlers:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
eset.entries[evt.filename].handle_event.assert_called_with(evt)
@@ -365,7 +438,7 @@ class TestCfgEntrySet(TestEntrySet):
evt.code2str.return_value = "deleted"
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
- for hdlr in eset._handlers:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
self.assertItemsEqual(eset.entries, dict())
@@ -730,6 +803,11 @@ class TestCfgEntrySet(TestEntrySet):
class TestCfg(TestGroupSpool, TestPullTarget):
test_obj = Cfg
+ def setUp(self):
+ TestGroupSpool.setUp(self)
+ TestPullTarget.setUp(self)
+ set_setup_default("cfg_handlers", [])
+
def get_obj(self, core=None):
if core is None:
core = Mock()