summaryrefslogtreecommitdiffstats
path: root/testsuite/Testsrc
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-01-03 13:40:24 -0600
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-01-03 13:40:24 -0600
commit14406cc14a4d832fe83df5da27937051e41dd093 (patch)
tree1fb429513bc5483251412af8251aa24517bcbb68 /testsuite/Testsrc
parent10326a34dd813b88c6c8816115e91977a93a1f10 (diff)
downloadbcfg2-14406cc14a4d832fe83df5da27937051e41dd093.tar.gz
bcfg2-14406cc14a4d832fe83df5da27937051e41dd093.tar.bz2
bcfg2-14406cc14a4d832fe83df5da27937051e41dd093.zip
Cfg: Added feature to provide generation of SSH keys, authorized_keys file
Diffstat (limited to 'testsuite/Testsrc')
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py51
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py176
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py435
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py76
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py61
5 files changed, 752 insertions, 47 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
index 559742d00..6dbdc7667 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
@@ -433,9 +433,12 @@ class TestXMLFileBacked(TestFileBacked):
xdata = dict()
mock_parse.side_effect = lambda p: xdata[p]
+ base = os.path.dirname(self.path)
+
# basic functionality
- xdata['/test/test2.xml'] = lxml.etree.Element("Test").getroottree()
- xfb._follow_xincludes(xdata=xdata['/test/test2.xml'])
+ test2 = os.path.join(base, 'test2.xml')
+ xdata[test2] = lxml.etree.Element("Test").getroottree()
+ xfb._follow_xincludes(xdata=xdata[test2])
self.assertFalse(xfb.add_monitor.called)
if (not hasattr(self.test_obj, "xdata") or
@@ -443,56 +446,56 @@ class TestXMLFileBacked(TestFileBacked):
# if xdata is settable, test that method of getting data
# to _follow_xincludes
reset()
- xfb.xdata = xdata['/test/test2.xml'].getroot()
+ xfb.xdata = xdata[test2].getroot()
xfb._follow_xincludes()
self.assertFalse(xfb.add_monitor.called)
xfb.xdata = None
reset()
- xfb._follow_xincludes(fname="/test/test2.xml")
+ xfb._follow_xincludes(fname=test2)
self.assertFalse(xfb.add_monitor.called)
# test one level of xinclude
xdata[self.path] = lxml.etree.Element("Test").getroottree()
lxml.etree.SubElement(xdata[self.path].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
- href="/test/test2.xml")
+ href=test2)
reset()
xfb._follow_xincludes(fname=self.path)
- xfb.add_monitor.assert_called_with("/test/test2.xml")
+ xfb.add_monitor.assert_called_with(test2)
self.assertItemsEqual(mock_parse.call_args_list,
[call(f) for f in xdata.keys()])
- mock_exists.assert_called_with("/test/test2.xml")
+ mock_exists.assert_called_with(test2)
reset()
xfb._follow_xincludes(fname=self.path, xdata=xdata[self.path])
- xfb.add_monitor.assert_called_with("/test/test2.xml")
+ xfb.add_monitor.assert_called_with(test2)
self.assertItemsEqual(mock_parse.call_args_list,
[call(f) for f in xdata.keys()
if f != self.path])
- mock_exists.assert_called_with("/test/test2.xml")
+ mock_exists.assert_called_with(test2)
# test two-deep level of xinclude, with some files in another
# directory
- xdata["/test/test3.xml"] = \
- lxml.etree.Element("Test").getroottree()
- lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(),
+ test3 = os.path.join(base, "test3.xml")
+ test4 = os.path.join(base, "test_dir", "test4.xml")
+ test5 = os.path.join(base, "test_dir", "test5.xml")
+ test6 = os.path.join(base, "test_dir", "test6.xml")
+ xdata[test3] = lxml.etree.Element("Test").getroottree()
+ lxml.etree.SubElement(xdata[test3].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
- href="/test/test_dir/test4.xml")
- xdata["/test/test_dir/test4.xml"] = \
- lxml.etree.Element("Test").getroottree()
- lxml.etree.SubElement(xdata["/test/test_dir/test4.xml"].getroot(),
+ href=test4)
+ xdata[test4] = lxml.etree.Element("Test").getroottree()
+ lxml.etree.SubElement(xdata[test4].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
- href="/test/test_dir/test5.xml")
- xdata['/test/test_dir/test5.xml'] = \
- lxml.etree.Element("Test").getroottree()
- xdata['/test/test_dir/test6.xml'] = \
- lxml.etree.Element("Test").getroottree()
+ href=test5)
+ xdata[test5] = lxml.etree.Element("Test").getroottree()
+ xdata[test6] = lxml.etree.Element("Test").getroottree()
# relative includes
lxml.etree.SubElement(xdata[self.path].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
href="test3.xml")
- lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(),
+ lxml.etree.SubElement(xdata[test3].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
href="test_dir/test6.xml")
@@ -526,10 +529,6 @@ class TestXMLFileBacked(TestFileBacked):
xfb.extras = []
xfb.xdata = None
- # syntax error
- xfb.data = "<"
- self.assertRaises(PluginInitError, xfb.Index)
-
# no xinclude
reset()
xdata = lxml.etree.Element("Test", name="test")
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
new file mode 100644
index 000000000..23a77d1e5
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
@@ -0,0 +1,176 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator import *
+from Bcfg2.Server.Plugin import PluginExecutionError
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
+from TestServer.TestPlugin.Testhelpers import TestStructFile
+
+
+class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
+ test_obj = CfgAuthorizedKeysGenerator
+ should_monitor = False
+
+ def get_obj(self, name=None, core=None, fam=None):
+ if name is None:
+ name = self.path
+ Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG = Mock()
+ if core is not None:
+ Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG.core = core
+ return self.test_obj(name)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
+ @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent")
+ def test_handle_event(self, mock_HandleEvent, mock_handle_event):
+ akg = self.get_obj()
+ evt = Mock()
+ akg.handle_event(evt)
+ mock_HandleEvent.assert_called_with(akg, evt)
+ mock_handle_event.assert_called_with(akg, evt)
+
+ def test_category(self):
+ akg = self.get_obj()
+ cfp = Mock()
+ cfp.has_section.return_value = False
+ cfp.has_option.return_value = False
+ Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock()
+ Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp
+
+ self.assertIsNone(akg.category)
+ cfp.has_section.assert_called_with("sshkeys")
+
+ cfp.reset_mock()
+ cfp.has_section.return_value = True
+ self.assertIsNone(akg.category)
+ cfp.has_section.assert_called_with("sshkeys")
+ cfp.has_option.assert_called_with("sshkeys", "category")
+
+ cfp.reset_mock()
+ cfp.has_option.return_value = True
+ self.assertEqual(akg.category, cfp.get.return_value)
+ cfp.has_section.assert_called_with("sshkeys")
+ cfp.has_option.assert_called_with("sshkeys", "category")
+ cfp.get.assert_called_with("sshkeys", "category")
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata")
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category")
+ def test_get_data(self, mock_ClientMetadata):
+ akg = self.get_obj()
+ akg.XMLMatch = Mock()
+
+ def ClientMetadata(host, profile, groups, *args):
+ rv = Mock()
+ rv.hostname = host
+ rv.profile = profile
+ rv.groups = groups
+ return rv
+
+ mock_ClientMetadata.side_effect = ClientMetadata
+
+ def build_metadata(host):
+ rv = Mock()
+ rv.hostname = host
+ rv.profile = host
+ return rv
+
+ akg.core.build_metadata = Mock()
+ akg.core.build_metadata.side_effect = build_metadata
+
+ def Bind(ent, md):
+ ent.text = "%s %s" % (md.profile, ent.get("name"))
+ return ent
+
+ akg.core.Bind = Mock()
+ akg.core.Bind.side_effect = Bind
+ metadata = Mock()
+ metadata.profile = "profile"
+ metadata.group_in_category.return_value = "profile"
+ entry = lxml.etree.Element("Path", name="/root/.ssh/authorized_keys")
+
+ def reset():
+ mock_ClientMetadata.reset_mock()
+ akg.XMLMatch.reset_mock()
+ akg.core.build_metadata.reset_mock()
+ akg.core.Bind.reset_mock()
+ metadata.reset_mock()
+
+ pubkey = "/home/foo/.ssh/id_rsa.pub"
+ spec = lxml.etree.Element("AuthorizedKeys")
+ lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey})
+ akg.XMLMatch.return_value = spec
+ self.assertEqual(akg.get_data(entry, metadata), "profile %s" % pubkey)
+ akg.XMLMatch.assert_called_with(metadata)
+ self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey)
+ self.assertEqual(akg.core.Bind.call_args[0][1], metadata)
+
+ reset()
+ group = "somegroup"
+ spec = lxml.etree.Element("AuthorizedKeys")
+ lxml.etree.SubElement(spec, "Allow",
+ attrib={"from": pubkey, "group": group})
+ akg.XMLMatch.return_value = spec
+ self.assertEqual(akg.get_data(entry, metadata),
+ "%s %s" % (group, pubkey))
+ akg.XMLMatch.assert_called_with(metadata)
+ self.assertItemsEqual(mock_ClientMetadata.call_args[0][2], [group])
+ self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey)
+ self.assertIn(group, akg.core.Bind.call_args[0][1].groups)
+
+ reset()
+ host = "baz.example.com"
+ spec = lxml.etree.Element("AuthorizedKeys")
+ lxml.etree.SubElement(
+ lxml.etree.SubElement(spec,
+ "Allow",
+ attrib={"from": pubkey, "host": host}),
+ "Params", foo="foo", bar="bar=bar")
+ akg.XMLMatch.return_value = spec
+ self.assertEqual(akg.get_data(entry, metadata),
+ "foo=foo,bar=bar=bar %s %s" % (host, pubkey))
+ akg.XMLMatch.assert_called_with(metadata)
+ akg.core.build_metadata.assert_called_with(host)
+ self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey)
+ self.assertEqual(akg.core.Bind.call_args[0][1].hostname, host)
+
+ reset()
+ spec = lxml.etree.Element("AuthorizedKeys")
+ text = lxml.etree.SubElement(spec, "Allow")
+ text.text = "ssh-rsa publickey /foo/bar\n"
+ lxml.etree.SubElement(text, "Params", foo="foo")
+ akg.XMLMatch.return_value = spec
+ self.assertEqual(akg.get_data(entry, metadata),
+ "foo=foo %s" % text.text.strip())
+ akg.XMLMatch.assert_called_with(metadata)
+ self.assertFalse(akg.core.build_metadata.called)
+ self.assertFalse(akg.core.Bind.called)
+
+ reset()
+ lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey})
+ akg.XMLMatch.return_value = spec
+ self.assertItemsEqual(akg.get_data(entry, metadata).splitlines(),
+ ["foo=foo %s" % text.text.strip(),
+ "profile %s" % pubkey])
+ akg.XMLMatch.assert_called_with(metadata)
+
+ reset()
+ metadata.group_in_category.return_value = ''
+ spec = lxml.etree.Element("AuthorizedKeys")
+ lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey})
+ akg.XMLMatch.return_value = spec
+ self.assertEqual(akg.get_data(entry, metadata), '')
+ akg.XMLMatch.assert_called_with(metadata)
+ self.assertFalse(akg.core.build_metadata.called)
+ self.assertFalse(akg.core.Bind.called)
+ self.assertFalse(mock_ClientMetadata.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
new file mode 100644
index 000000000..dd18306cb
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
@@ -0,0 +1,435 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg import CfgCreationError
+from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import *
+from Bcfg2.Server.Plugin import PluginExecutionError
+try:
+ from Bcfg2.Encryption import EVPError
+ HAS_CRYPTO = True
+except:
+ HAS_CRYPTO = False
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestServer.TestPlugins.TestCfg.Test_init import TestCfgCreator
+from TestServer.TestPlugin.Testhelpers import TestStructFile
+
+
+class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
+ test_obj = CfgPrivateKeyCreator
+ should_monitor = False
+
+ def get_obj(self, name=None, fam=None):
+ return TestCfgCreator.get_obj(self, name=name)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
+ @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent")
+ def test_handle_event(self, mock_HandleEvent, mock_handle_event):
+ pkc = self.get_obj()
+ evt = Mock()
+ pkc.handle_event(evt)
+ mock_HandleEvent.assert_called_with(pkc, evt)
+ mock_handle_event.assert_called_with(pkc, evt)
+
+ def test_category(self):
+ pkc = self.get_obj()
+ cfp = Mock()
+ cfp.has_section.return_value = False
+ cfp.has_option.return_value = False
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
+
+ self.assertIsNone(pkc.category)
+ cfp.has_section.assert_called_with("sshkeys")
+
+ cfp.reset_mock()
+ cfp.has_section.return_value = True
+ self.assertIsNone(pkc.category)
+ cfp.has_section.assert_called_with("sshkeys")
+ cfp.has_option.assert_called_with("sshkeys", "category")
+
+ cfp.reset_mock()
+ cfp.has_option.return_value = True
+ self.assertEqual(pkc.category, cfp.get.return_value)
+ cfp.has_section.assert_called_with("sshkeys")
+ cfp.has_option.assert_called_with("sshkeys", "category")
+ cfp.get.assert_called_with("sshkeys", "category")
+
+ @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
+ def test_passphrase(self):
+ @patch("Bcfg2.Encryption.get_passphrases")
+ def inner(mock_get_passphrases):
+ pkc = self.get_obj()
+ cfp = Mock()
+ cfp.has_section.return_value = False
+ cfp.has_option.return_value = False
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
+
+ self.assertIsNone(pkc.passphrase)
+ cfp.has_section.assert_called_with("sshkeys")
+
+ cfp.reset_mock()
+ cfp.has_section.return_value = True
+ self.assertIsNone(pkc.passphrase)
+ cfp.has_section.assert_called_with("sshkeys")
+ cfp.has_option.assert_called_with("sshkeys", "passphrase")
+
+ cfp.reset_mock()
+ cfp.get.return_value = "test"
+ mock_get_passphrases.return_value = dict(test="foo", test2="bar")
+ cfp.has_option.return_value = True
+ self.assertEqual(pkc.passphrase, "foo")
+ cfp.has_section.assert_called_with("sshkeys")
+ cfp.has_option.assert_called_with("sshkeys", "passphrase")
+ cfp.get.assert_called_with("sshkeys", "passphrase")
+ mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+
+ inner()
+
+ @patch("shutil.rmtree")
+ @patch("tempfile.mkdtemp")
+ @patch("subprocess.Popen")
+ def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree):
+ pkc = self.get_obj()
+ pkc.XMLMatch = Mock()
+ mock_mkdtemp.return_value = datastore
+ metadata = Mock()
+
+ proc = Mock()
+ proc.wait.return_value = 0
+ proc.communicate.return_value = MagicMock()
+ mock_Popen.return_value = proc
+
+ spec = lxml.etree.Element("PrivateKey")
+ pkc.XMLMatch.return_value = spec
+
+ def reset():
+ pkc.XMLMatch.reset_mock()
+ mock_Popen.reset_mock()
+ mock_mkdtemp.reset_mock()
+ mock_rmtree.reset_mock()
+
+ self.assertEqual(pkc._gen_keypair(metadata),
+ os.path.join(datastore, "privkey"))
+ pkc.XMLMatch.assert_called_with(metadata)
+ mock_mkdtemp.assert_called_with()
+ self.assertItemsEqual(mock_Popen.call_args[0][0],
+ ["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "rsa", "-N", ""])
+
+ reset()
+ lxml.etree.SubElement(spec, "Params", bits="768", type="dsa")
+ passphrase = lxml.etree.SubElement(spec, "Passphrase")
+ passphrase.text = "foo"
+
+ self.assertEqual(pkc._gen_keypair(metadata),
+ os.path.join(datastore, "privkey"))
+ pkc.XMLMatch.assert_called_with(metadata)
+ mock_mkdtemp.assert_called_with()
+ self.assertItemsEqual(mock_Popen.call_args[0][0],
+ ["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "dsa", "-b", "768", "-N", "foo"])
+
+ reset()
+ proc.wait.return_value = 1
+ self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata)
+ mock_rmtree.assert_called_with(datastore)
+
+ def test_get_specificity(self):
+ pkc = self.get_obj()
+ pkc.XMLMatch = Mock()
+
+ metadata = Mock()
+
+ def reset():
+ pkc.XMLMatch.reset_mock()
+ metadata.group_in_category.reset_mock()
+
+ category = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.category"
+ @patch(category, None)
+ def inner():
+ pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
+ self.assertItemsEqual(pkc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+ inner()
+
+ @patch(category, "foo")
+ def inner2():
+ pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
+ self.assertItemsEqual(pkc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=50))
+ metadata.group_in_category.assert_called_with("foo")
+
+ reset()
+ pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
+ perhost="true")
+ self.assertItemsEqual(pkc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+
+ reset()
+ pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
+ category="bar")
+ self.assertItemsEqual(pkc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=50))
+ metadata.group_in_category.assert_called_with("bar")
+
+ reset()
+ pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
+ prio="10")
+ self.assertItemsEqual(pkc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=10))
+ metadata.group_in_category.assert_called_with("foo")
+
+ reset()
+ pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
+ metadata.group_in_category.return_value = ''
+ self.assertItemsEqual(pkc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+ metadata.group_in_category.assert_called_with("foo")
+
+ inner2()
+
+ @patch("shutil.rmtree")
+ @patch("%s.open" % builtins)
+ def test_create_data(self, mock_open, mock_rmtree):
+ pkc = self.get_obj()
+ pkc.XMLMatch = Mock()
+ pkc.get_specificity = MagicMock()
+ pkc._gen_keypair = Mock()
+ privkey = os.path.join(datastore, "privkey")
+ pkc._gen_keypair.return_value = privkey
+ pkc.pubkey_creator = Mock()
+ pkc.pubkey_creator.get_filename.return_value = "pubkey.filename"
+ pkc.write_data = Mock()
+
+ entry = lxml.etree.Element("Path", name="/home/foo/.ssh/id_rsa")
+ metadata = Mock()
+
+ def open_read_rv():
+ mock_open.return_value.read.side_effect = lambda: "privatekey"
+ return "ssh-rsa publickey foo@bar.com"
+
+ def reset():
+ mock_open.reset_mock()
+ mock_rmtree.reset_mock()
+ pkc.XMLMatch.reset_mock()
+ pkc.get_specificity.reset_mock()
+ pkc._gen_keypair.reset_mock()
+ pkc.pubkey_creator.reset_mock()
+ pkc.write_data.reset_mock()
+ mock_open.return_value.read.side_effect = open_read_rv
+
+ reset()
+ passphrase = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.passphrase"
+
+ @patch(passphrase, None)
+ def inner():
+ self.assertEqual(pkc.create_data(entry, metadata), "privatekey")
+ pkc.XMLMatch.assert_called_with(metadata)
+ pkc.get_specificity.assert_called_with(metadata,
+ pkc.XMLMatch.return_value)
+ pkc._gen_keypair.assert_called_with(metadata,
+ pkc.XMLMatch.return_value)
+ self.assertItemsEqual(mock_open.call_args_list,
+ [call(privkey + ".pub"), call(privkey)])
+ pkc.pubkey_creator.get_filename.assert_called_with(
+ **pkc.get_specificity.return_value)
+ pkc.pubkey_creator.write_data.assert_called_with(
+ "ssh-rsa publickey pubkey.filename\n",
+ **pkc.get_specificity.return_value)
+ pkc.write_data.assert_called_with(
+ "privatekey",
+ **pkc.get_specificity.return_value)
+ mock_rmtree.assert_called_with(datastore)
+
+ reset()
+ self.assertEqual(pkc.create_data(entry, metadata, return_pair=True),
+ ("ssh-rsa publickey pubkey.filename\n",
+ "privatekey"))
+ pkc.XMLMatch.assert_called_with(metadata)
+ pkc.get_specificity.assert_called_with(metadata,
+ pkc.XMLMatch.return_value)
+ pkc._gen_keypair.assert_called_with(metadata,
+ pkc.XMLMatch.return_value)
+ self.assertItemsEqual(mock_open.call_args_list,
+ [call(privkey + ".pub"), call(privkey)])
+ pkc.pubkey_creator.get_filename.assert_called_with(
+ **pkc.get_specificity.return_value)
+ pkc.pubkey_creator.write_data.assert_called_with(
+ "ssh-rsa publickey pubkey.filename\n",
+ **pkc.get_specificity.return_value)
+ pkc.write_data.assert_called_with(
+ "privatekey",
+ **pkc.get_specificity.return_value)
+ mock_rmtree.assert_called_with(datastore)
+
+ inner()
+
+ if HAS_CRYPTO:
+ @patch(passphrase, "foo")
+ @patch("Bcfg2.Encryption.ssl_encrypt")
+ @patch("Bcfg2.Encryption.get_algorithm")
+ def inner2(mock_get_algorithm, mock_ssl_encrypt):
+ reset()
+ mock_ssl_encrypt.return_value = "encryptedprivatekey"
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True
+ self.assertEqual(pkc.create_data(entry, metadata),
+ "encryptedprivatekey")
+ pkc.XMLMatch.assert_called_with(metadata)
+ pkc.get_specificity.assert_called_with(
+ metadata,
+ pkc.XMLMatch.return_value)
+ pkc._gen_keypair.assert_called_with(metadata,
+ pkc.XMLMatch.return_value)
+ self.assertItemsEqual(mock_open.call_args_list,
+ [call(privkey + ".pub"), call(privkey)])
+ pkc.pubkey_creator.get_filename.assert_called_with(
+ **pkc.get_specificity.return_value)
+ pkc.pubkey_creator.write_data.assert_called_with(
+ "ssh-rsa publickey pubkey.filename\n",
+ **pkc.get_specificity.return_value)
+ pkc.write_data.assert_called_with(
+ "encryptedprivatekey",
+ **pkc.get_specificity.return_value)
+ mock_ssl_encrypt.assert_called_with(
+ "privatekey", "foo",
+ algorithm=mock_get_algorithm.return_value)
+ mock_rmtree.assert_called_with(datastore)
+
+ inner2()
+
+ def test_Index(self):
+ has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False
+ TestStructFile.test_Index(self)
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto
+
+ @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
+ def test_Index_crypto(self):
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict"
+
+ pkc = self.get_obj()
+ pkc._decrypt = Mock()
+ pkc._decrypt.return_value = 'plaintext'
+ pkc.data = '''
+<PrivateKey>
+ <Group name="test">
+ <Passphrase encrypted="foo">crypted</Passphrase>
+ </Group>
+ <Group name="test" negate="true">
+ <Passphrase>plain</Passphrase>
+ </Group>
+</PrivateKey>'''
+
+ # test successful decryption
+ pkc.Index()
+ self.assertItemsEqual(
+ pkc._decrypt.call_args_list,
+ [call(el)
+ for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
+ for el in pkc.xdata.xpath("//Crypted"):
+ self.assertEqual(el.text, pkc._decrypt.return_value)
+
+ # test failed decryption, strict
+ pkc._decrypt.reset_mock()
+ pkc._decrypt.side_effect = EVPError
+ self.assertRaises(PluginExecutionError, pkc.Index)
+
+ # test failed decryption, lax
+ Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax"
+ pkc._decrypt.reset_mock()
+ pkc.Index()
+ self.assertItemsEqual(
+ pkc._decrypt.call_args_list,
+ [call(el)
+ for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
+
+ @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
+ def test_decrypt(self):
+
+ @patch("Bcfg2.Encryption.ssl_decrypt")
+ @patch("Bcfg2.Encryption.get_algorithm")
+ @patch("Bcfg2.Encryption.get_passphrases")
+ @patch("Bcfg2.Encryption.bruteforce_decrypt")
+ def inner(mock_bruteforce, mock_get_passphrases, mock_get_algorithm,
+ mock_ssl):
+ pkc = self.get_obj()
+
+ def reset():
+ mock_bruteforce.reset_mock()
+ mock_get_algorithm.reset_mock()
+ mock_get_passphrases.reset_mock()
+ mock_ssl.reset_mock()
+
+ # test element without text contents
+ self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test")))
+ self.assertFalse(mock_bruteforce.called)
+ self.assertFalse(mock_get_passphrases.called)
+ self.assertFalse(mock_ssl.called)
+
+ # test element with a passphrase in the config file
+ reset()
+ el = lxml.etree.Element("Test", encrypted="foo")
+ el.text = "crypted"
+ mock_get_passphrases.return_value = dict(foo="foopass",
+ bar="barpass")
+ mock_get_algorithm.return_value = "bf_cbc"
+ mock_ssl.return_value = "decrypted with ssl"
+ self.assertEqual(pkc._decrypt(el), mock_ssl.return_value)
+ mock_get_passphrases.assert_called_with(SETUP)
+ mock_get_algorithm.assert_called_with(SETUP)
+ mock_ssl.assert_called_with(el.text, "foopass",
+ algorithm="bf_cbc")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test failure to decrypt element with a passphrase in the config
+ reset()
+ mock_ssl.side_effect = EVPError
+ self.assertRaises(EVPError, pkc._decrypt, el)
+ mock_get_passphrases.assert_called_with(SETUP)
+ mock_get_algorithm.assert_called_with(SETUP)
+ mock_ssl.assert_called_with(el.text, "foopass",
+ algorithm="bf_cbc")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test element without valid passphrase
+ reset()
+ el.set("encrypted", "true")
+ mock_bruteforce.return_value = "decrypted with bruteforce"
+ self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value)
+ mock_get_passphrases.assert_called_with(SETUP)
+ mock_get_algorithm.assert_called_with(SETUP)
+ mock_bruteforce.assert_called_with(el.text,
+ passphrases=["foopass",
+ "barpass"],
+ algorithm="bf_cbc")
+ self.assertFalse(mock_ssl.called)
+
+ # test failure to decrypt element without valid passphrase
+ reset()
+ mock_bruteforce.side_effect = EVPError
+ self.assertRaises(EVPError, pkc._decrypt, el)
+ mock_get_passphrases.assert_called_with(SETUP)
+ mock_get_algorithm.assert_called_with(SETUP)
+ mock_bruteforce.assert_called_with(el.text,
+ passphrases=["foopass",
+ "barpass"],
+ algorithm="bf_cbc")
+ self.assertFalse(mock_ssl.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py
new file mode 100644
index 000000000..2e7b6eef4
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py
@@ -0,0 +1,76 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg import CfgCreationError, CfgCreator
+from Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator import *
+from Bcfg2.Server.Plugin import StructFile, PluginExecutionError
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestServer.TestPlugins.TestCfg.Test_init import TestCfgCreator
+from TestServer.TestPlugin.Testhelpers import TestStructFile
+
+
+class TestCfgPublicKeyCreator(TestCfgCreator, TestStructFile):
+ test_obj = CfgPublicKeyCreator
+ should_monitor = False
+
+ def get_obj(self, name=None, fam=None):
+ return TestCfgCreator.get_obj(self, name=name)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
+ @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent")
+ def test_handle_event(self, mock_HandleEvent, mock_handle_event):
+ pkc = self.get_obj()
+ evt = Mock()
+ pkc.handle_event(evt)
+ mock_HandleEvent.assert_called_with(pkc, evt)
+ mock_handle_event.assert_called_with(pkc, evt)
+
+ def test_create_data(self):
+ metadata = Mock()
+ pkc = self.get_obj()
+ pkc.cfg = Mock()
+
+ privkey_entryset = Mock()
+ privkey_creator = Mock()
+ pubkey = Mock()
+ privkey = Mock()
+ privkey_creator.create_data.return_value = (pubkey, privkey)
+ privkey_entryset.best_matching.return_value = privkey_creator
+ pkc.cfg.entries = {"/home/foo/.ssh/id_rsa": privkey_entryset}
+
+ # public key doesn't end in .pub
+ entry = lxml.etree.Element("Path", name="/home/bar/.ssh/bogus")
+ self.assertRaises(CfgCreationError,
+ pkc.create_data, entry, metadata)
+
+ # private key not in cfg.entries
+ entry = lxml.etree.Element("Path", name="/home/bar/.ssh/id_rsa.pub")
+ self.assertRaises(CfgCreationError,
+ pkc.create_data, entry, metadata)
+
+ # successful operation
+ entry = lxml.etree.Element("Path", name="/home/foo/.ssh/id_rsa.pub")
+ self.assertEqual(pkc.create_data(entry, metadata), pubkey)
+ privkey_entryset.get_handlers.assert_called_with(metadata, CfgCreator)
+ privkey_entryset.best_matching.assert_called_with(metadata,
+ privkey_entryset.get_handlers.return_value)
+ self.assertXMLEqual(privkey_creator.create_data.call_args[0][0],
+ lxml.etree.Element("Path",
+ name="/home/foo/.ssh/id_rsa"))
+ self.assertEqual(privkey_creator.create_data.call_args[0][1], metadata)
+
+ # no privkey.xml
+ privkey_entryset.best_matching.side_effect = PluginExecutionError
+ self.assertRaises(CfgCreationError,
+ pkc.create_data, entry, metadata)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
index 6412480f0..55fbb7446 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
@@ -82,9 +82,10 @@ class TestCfgBaseFileMatcher(TestSpecificData):
self.assertFalse(self.test_obj.handles(evt))
mock_get_regex.assert_called_with(
[b for b in self.test_obj.__basenames__])
- self.assertItemsEqual(match.call_args_list,
- [call(evt.filename)
+ print("match calls: %s" % match.call_args_list)
+ print("expected: %s" % [call(evt.filename)
for b in self.test_obj.__basenames__])
+ match.assert_called_with(evt.filename)
mock_get_regex.reset_mock()
match.reset_mock()
@@ -186,47 +187,61 @@ class TestCfgCreator(TestCfgBaseFileMatcher):
test_obj = CfgCreator
path = "/foo/bar/test.txt"
+ def get_obj(self, name=None):
+ if name is None:
+ name = self.path
+ return self.test_obj(name)
+
def test_create_data(self):
cc = self.get_obj()
self.assertRaises(NotImplementedError,
cc.create_data, Mock(), Mock())
+ def test_get_filename(self):
+ cc = self.get_obj()
+
+ # tuples of (args to get_filename(), expected result)
+ cases = [(dict(), "/foo/bar/bar"),
+ (dict(prio=50), "/foo/bar/bar"),
+ (dict(ext=".crypt"), "/foo/bar/bar.crypt"),
+ (dict(ext="bar"), "/foo/bar/barbar"),
+ (dict(host="foo.bar.example.com"),
+ "/foo/bar/bar.H_foo.bar.example.com"),
+ (dict(host="foo.bar.example.com", prio=50, ext=".crypt"),
+ "/foo/bar/bar.H_foo.bar.example.com.crypt"),
+ (dict(group="group", prio=1), "/foo/bar/bar.G01_group"),
+ (dict(group="group", prio=50), "/foo/bar/bar.G50_group"),
+ (dict(group="group", prio=50, ext=".crypt"),
+ "/foo/bar/bar.G50_group.crypt")]
+
+ for args, expected in cases:
+ self.assertEqual(cc.get_filename(**args), expected)
+
@patch("os.makedirs")
@patch("%s.open" % builtins)
def test_write_data(self, mock_open, mock_makedirs):
cc = self.get_obj()
data = "test\ntest"
+ parent = os.path.dirname(self.path)
def reset():
mock_open.reset_mock()
mock_makedirs.reset_mock()
- # test writing non-specific file
- cc.write_data(data)
- mock_makedirs.assert_called_with("/foo/bar")
- mock_open.assert_called_with("/foo/bar/bar", "wb")
- mock_open.return_value.write.assert_called_with(data)
-
- # test writing group-specific file
- reset()
- cc.write_data(data, group="foogroup", prio=9)
- mock_makedirs.assert_called_with("/foo/bar")
- mock_open.assert_called_with("/foo/bar/bar.G09_foogroup", "wb")
- mock_open.return_value.write.assert_called_with(data)
-
- # test writing host-specific file
+ # test writing file
reset()
- cc.write_data(data, host="foo.example.com")
- mock_makedirs.assert_called_with("/foo/bar")
- mock_open.assert_called_with("/foo/bar/bar.H_foo.example.com", "wb")
+ spec = dict(group="foogroup", prio=9)
+ cc.write_data(data, **spec)
+ mock_makedirs.assert_called_with(parent)
+ mock_open.assert_called_with(cc.get_filename(**spec), "wb")
mock_open.return_value.write.assert_called_with(data)
# test already-exists error from makedirs
reset()
mock_makedirs.side_effect = OSError(errno.EEXIST, self.path)
cc.write_data(data)
- mock_makedirs.assert_called_with("/foo/bar")
- mock_open.assert_called_with("/foo/bar/bar", "wb")
+ mock_makedirs.assert_called_with(parent)
+ mock_open.assert_called_with(cc.get_filename(), "wb")
mock_open.return_value.write.assert_called_with(data)
# test error from open
@@ -391,6 +406,10 @@ class TestCfgEntrySet(TestEntrySet):
evt = Mock()
evt.filename = "test.txt"
handler = Mock()
+ handler.__basenames__ = []
+ handler.__extensions__ = []
+ handler.deprecated = False
+ handler.experimental = False
handler.__specific__ = True
# test handling an event with the parent entry_init