From 14406cc14a4d832fe83df5da27937051e41dd093 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 3 Jan 2013 13:40:24 -0600 Subject: Cfg: Added feature to provide generation of SSH keys, authorized_keys file --- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 51 ++- .../TestCfg/TestCfgAuthorizedKeysGenerator.py | 176 +++++++++ .../TestCfg/TestCfgPrivateKeyCreator.py | 435 +++++++++++++++++++++ .../TestPlugins/TestCfg/TestCfgPublicKeyCreator.py | 76 ++++ .../TestServer/TestPlugins/TestCfg/Test_init.py | 61 ++- 5 files changed, 752 insertions(+), 47 deletions(-) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py (limited to 'testsuite/Testsrc') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index 559742d00..6dbdc7667 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -433,9 +433,12 @@ class TestXMLFileBacked(TestFileBacked): xdata = dict() mock_parse.side_effect = lambda p: xdata[p] + base = os.path.dirname(self.path) + # basic functionality - xdata['/test/test2.xml'] = lxml.etree.Element("Test").getroottree() - xfb._follow_xincludes(xdata=xdata['/test/test2.xml']) + test2 = os.path.join(base, 'test2.xml') + xdata[test2] = lxml.etree.Element("Test").getroottree() + xfb._follow_xincludes(xdata=xdata[test2]) self.assertFalse(xfb.add_monitor.called) if (not hasattr(self.test_obj, "xdata") or @@ -443,56 +446,56 @@ class TestXMLFileBacked(TestFileBacked): # if xdata is settable, test that method of getting data # to _follow_xincludes reset() - xfb.xdata = xdata['/test/test2.xml'].getroot() + xfb.xdata = xdata[test2].getroot() xfb._follow_xincludes() self.assertFalse(xfb.add_monitor.called) xfb.xdata = None reset() - xfb._follow_xincludes(fname="/test/test2.xml") + xfb._follow_xincludes(fname=test2) self.assertFalse(xfb.add_monitor.called) # test one level of xinclude xdata[self.path] = lxml.etree.Element("Test").getroottree() lxml.etree.SubElement(xdata[self.path].getroot(), Bcfg2.Server.XI_NAMESPACE + "include", - href="/test/test2.xml") + href=test2) reset() xfb._follow_xincludes(fname=self.path) - xfb.add_monitor.assert_called_with("/test/test2.xml") + xfb.add_monitor.assert_called_with(test2) self.assertItemsEqual(mock_parse.call_args_list, [call(f) for f in xdata.keys()]) - mock_exists.assert_called_with("/test/test2.xml") + mock_exists.assert_called_with(test2) reset() xfb._follow_xincludes(fname=self.path, xdata=xdata[self.path]) - xfb.add_monitor.assert_called_with("/test/test2.xml") + xfb.add_monitor.assert_called_with(test2) self.assertItemsEqual(mock_parse.call_args_list, [call(f) for f in xdata.keys() if f != self.path]) - mock_exists.assert_called_with("/test/test2.xml") + mock_exists.assert_called_with(test2) # test two-deep level of xinclude, with some files in another # directory - xdata["/test/test3.xml"] = \ - lxml.etree.Element("Test").getroottree() - lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(), + test3 = os.path.join(base, "test3.xml") + test4 = os.path.join(base, "test_dir", "test4.xml") + test5 = os.path.join(base, "test_dir", "test5.xml") + test6 = os.path.join(base, "test_dir", "test6.xml") + xdata[test3] = lxml.etree.Element("Test").getroottree() + lxml.etree.SubElement(xdata[test3].getroot(), Bcfg2.Server.XI_NAMESPACE + "include", - href="/test/test_dir/test4.xml") - xdata["/test/test_dir/test4.xml"] = \ - lxml.etree.Element("Test").getroottree() - lxml.etree.SubElement(xdata["/test/test_dir/test4.xml"].getroot(), + href=test4) + xdata[test4] = lxml.etree.Element("Test").getroottree() + lxml.etree.SubElement(xdata[test4].getroot(), Bcfg2.Server.XI_NAMESPACE + "include", - href="/test/test_dir/test5.xml") - xdata['/test/test_dir/test5.xml'] = \ - lxml.etree.Element("Test").getroottree() - xdata['/test/test_dir/test6.xml'] = \ - lxml.etree.Element("Test").getroottree() + href=test5) + xdata[test5] = lxml.etree.Element("Test").getroottree() + xdata[test6] = lxml.etree.Element("Test").getroottree() # relative includes lxml.etree.SubElement(xdata[self.path].getroot(), Bcfg2.Server.XI_NAMESPACE + "include", href="test3.xml") - lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(), + lxml.etree.SubElement(xdata[test3].getroot(), Bcfg2.Server.XI_NAMESPACE + "include", href="test_dir/test6.xml") @@ -526,10 +529,6 @@ class TestXMLFileBacked(TestFileBacked): xfb.extras = [] xfb.xdata = None - # syntax error - xfb.data = "<" - self.assertRaises(PluginInitError, xfb.Index) - # no xinclude reset() xdata = lxml.etree.Element("Test", name="test") diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py new file mode 100644 index 000000000..23a77d1e5 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py @@ -0,0 +1,176 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator import * +from Bcfg2.Server.Plugin import PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator +from TestServer.TestPlugin.Testhelpers import TestStructFile + + +class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile): + test_obj = CfgAuthorizedKeysGenerator + should_monitor = False + + def get_obj(self, name=None, core=None, fam=None): + if name is None: + name = self.path + Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG = Mock() + if core is not None: + Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG.core = core + return self.test_obj(name) + + @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") + @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent") + def test_handle_event(self, mock_HandleEvent, mock_handle_event): + akg = self.get_obj() + evt = Mock() + akg.handle_event(evt) + mock_HandleEvent.assert_called_with(akg, evt) + mock_handle_event.assert_called_with(akg, evt) + + def test_category(self): + akg = self.get_obj() + cfp = Mock() + cfp.has_section.return_value = False + cfp.has_option.return_value = False + Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock() + Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp + + self.assertIsNone(akg.category) + cfp.has_section.assert_called_with("sshkeys") + + cfp.reset_mock() + cfp.has_section.return_value = True + self.assertIsNone(akg.category) + cfp.has_section.assert_called_with("sshkeys") + cfp.has_option.assert_called_with("sshkeys", "category") + + cfp.reset_mock() + cfp.has_option.return_value = True + self.assertEqual(akg.category, cfp.get.return_value) + cfp.has_section.assert_called_with("sshkeys") + cfp.has_option.assert_called_with("sshkeys", "category") + cfp.get.assert_called_with("sshkeys", "category") + + @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata") + @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category") + def test_get_data(self, mock_ClientMetadata): + akg = self.get_obj() + akg.XMLMatch = Mock() + + def ClientMetadata(host, profile, groups, *args): + rv = Mock() + rv.hostname = host + rv.profile = profile + rv.groups = groups + return rv + + mock_ClientMetadata.side_effect = ClientMetadata + + def build_metadata(host): + rv = Mock() + rv.hostname = host + rv.profile = host + return rv + + akg.core.build_metadata = Mock() + akg.core.build_metadata.side_effect = build_metadata + + def Bind(ent, md): + ent.text = "%s %s" % (md.profile, ent.get("name")) + return ent + + akg.core.Bind = Mock() + akg.core.Bind.side_effect = Bind + metadata = Mock() + metadata.profile = "profile" + metadata.group_in_category.return_value = "profile" + entry = lxml.etree.Element("Path", name="/root/.ssh/authorized_keys") + + def reset(): + mock_ClientMetadata.reset_mock() + akg.XMLMatch.reset_mock() + akg.core.build_metadata.reset_mock() + akg.core.Bind.reset_mock() + metadata.reset_mock() + + pubkey = "/home/foo/.ssh/id_rsa.pub" + spec = lxml.etree.Element("AuthorizedKeys") + lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey}) + akg.XMLMatch.return_value = spec + self.assertEqual(akg.get_data(entry, metadata), "profile %s" % pubkey) + akg.XMLMatch.assert_called_with(metadata) + self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey) + self.assertEqual(akg.core.Bind.call_args[0][1], metadata) + + reset() + group = "somegroup" + spec = lxml.etree.Element("AuthorizedKeys") + lxml.etree.SubElement(spec, "Allow", + attrib={"from": pubkey, "group": group}) + akg.XMLMatch.return_value = spec + self.assertEqual(akg.get_data(entry, metadata), + "%s %s" % (group, pubkey)) + akg.XMLMatch.assert_called_with(metadata) + self.assertItemsEqual(mock_ClientMetadata.call_args[0][2], [group]) + self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey) + self.assertIn(group, akg.core.Bind.call_args[0][1].groups) + + reset() + host = "baz.example.com" + spec = lxml.etree.Element("AuthorizedKeys") + lxml.etree.SubElement( + lxml.etree.SubElement(spec, + "Allow", + attrib={"from": pubkey, "host": host}), + "Params", foo="foo", bar="bar=bar") + akg.XMLMatch.return_value = spec + self.assertEqual(akg.get_data(entry, metadata), + "foo=foo,bar=bar=bar %s %s" % (host, pubkey)) + akg.XMLMatch.assert_called_with(metadata) + akg.core.build_metadata.assert_called_with(host) + self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey) + self.assertEqual(akg.core.Bind.call_args[0][1].hostname, host) + + reset() + spec = lxml.etree.Element("AuthorizedKeys") + text = lxml.etree.SubElement(spec, "Allow") + text.text = "ssh-rsa publickey /foo/bar\n" + lxml.etree.SubElement(text, "Params", foo="foo") + akg.XMLMatch.return_value = spec + self.assertEqual(akg.get_data(entry, metadata), + "foo=foo %s" % text.text.strip()) + akg.XMLMatch.assert_called_with(metadata) + self.assertFalse(akg.core.build_metadata.called) + self.assertFalse(akg.core.Bind.called) + + reset() + lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey}) + akg.XMLMatch.return_value = spec + self.assertItemsEqual(akg.get_data(entry, metadata).splitlines(), + ["foo=foo %s" % text.text.strip(), + "profile %s" % pubkey]) + akg.XMLMatch.assert_called_with(metadata) + + reset() + metadata.group_in_category.return_value = '' + spec = lxml.etree.Element("AuthorizedKeys") + lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey}) + akg.XMLMatch.return_value = spec + self.assertEqual(akg.get_data(entry, metadata), '') + akg.XMLMatch.assert_called_with(metadata) + self.assertFalse(akg.core.build_metadata.called) + self.assertFalse(akg.core.Bind.called) + self.assertFalse(mock_ClientMetadata.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py new file mode 100644 index 000000000..dd18306cb --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py @@ -0,0 +1,435 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg import CfgCreationError +from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import * +from Bcfg2.Server.Plugin import PluginExecutionError +try: + from Bcfg2.Encryption import EVPError + HAS_CRYPTO = True +except: + HAS_CRYPTO = False + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestServer.TestPlugins.TestCfg.Test_init import TestCfgCreator +from TestServer.TestPlugin.Testhelpers import TestStructFile + + +class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile): + test_obj = CfgPrivateKeyCreator + should_monitor = False + + def get_obj(self, name=None, fam=None): + return TestCfgCreator.get_obj(self, name=name) + + @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event") + @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent") + def test_handle_event(self, mock_HandleEvent, mock_handle_event): + pkc = self.get_obj() + evt = Mock() + pkc.handle_event(evt) + mock_HandleEvent.assert_called_with(pkc, evt) + mock_handle_event.assert_called_with(pkc, evt) + + def test_category(self): + pkc = self.get_obj() + cfp = Mock() + cfp.has_section.return_value = False + cfp.has_option.return_value = False + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + + self.assertIsNone(pkc.category) + cfp.has_section.assert_called_with("sshkeys") + + cfp.reset_mock() + cfp.has_section.return_value = True + self.assertIsNone(pkc.category) + cfp.has_section.assert_called_with("sshkeys") + cfp.has_option.assert_called_with("sshkeys", "category") + + cfp.reset_mock() + cfp.has_option.return_value = True + self.assertEqual(pkc.category, cfp.get.return_value) + cfp.has_section.assert_called_with("sshkeys") + cfp.has_option.assert_called_with("sshkeys", "category") + cfp.get.assert_called_with("sshkeys", "category") + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_passphrase(self): + @patch("Bcfg2.Encryption.get_passphrases") + def inner(mock_get_passphrases): + pkc = self.get_obj() + cfp = Mock() + cfp.has_section.return_value = False + cfp.has_option.return_value = False + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp + + self.assertIsNone(pkc.passphrase) + cfp.has_section.assert_called_with("sshkeys") + + cfp.reset_mock() + cfp.has_section.return_value = True + self.assertIsNone(pkc.passphrase) + cfp.has_section.assert_called_with("sshkeys") + cfp.has_option.assert_called_with("sshkeys", "passphrase") + + cfp.reset_mock() + cfp.get.return_value = "test" + mock_get_passphrases.return_value = dict(test="foo", test2="bar") + cfp.has_option.return_value = True + self.assertEqual(pkc.passphrase, "foo") + cfp.has_section.assert_called_with("sshkeys") + cfp.has_option.assert_called_with("sshkeys", "passphrase") + cfp.get.assert_called_with("sshkeys", "passphrase") + mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP) + + inner() + + @patch("shutil.rmtree") + @patch("tempfile.mkdtemp") + @patch("subprocess.Popen") + def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree): + pkc = self.get_obj() + pkc.XMLMatch = Mock() + mock_mkdtemp.return_value = datastore + metadata = Mock() + + proc = Mock() + proc.wait.return_value = 0 + proc.communicate.return_value = MagicMock() + mock_Popen.return_value = proc + + spec = lxml.etree.Element("PrivateKey") + pkc.XMLMatch.return_value = spec + + def reset(): + pkc.XMLMatch.reset_mock() + mock_Popen.reset_mock() + mock_mkdtemp.reset_mock() + mock_rmtree.reset_mock() + + self.assertEqual(pkc._gen_keypair(metadata), + os.path.join(datastore, "privkey")) + pkc.XMLMatch.assert_called_with(metadata) + mock_mkdtemp.assert_called_with() + self.assertItemsEqual(mock_Popen.call_args[0][0], + ["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "rsa", "-N", ""]) + + reset() + lxml.etree.SubElement(spec, "Params", bits="768", type="dsa") + passphrase = lxml.etree.SubElement(spec, "Passphrase") + passphrase.text = "foo" + + self.assertEqual(pkc._gen_keypair(metadata), + os.path.join(datastore, "privkey")) + pkc.XMLMatch.assert_called_with(metadata) + mock_mkdtemp.assert_called_with() + self.assertItemsEqual(mock_Popen.call_args[0][0], + ["ssh-keygen", "-f", + os.path.join(datastore, "privkey"), + "-t", "dsa", "-b", "768", "-N", "foo"]) + + reset() + proc.wait.return_value = 1 + self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata) + mock_rmtree.assert_called_with(datastore) + + def test_get_specificity(self): + pkc = self.get_obj() + pkc.XMLMatch = Mock() + + metadata = Mock() + + def reset(): + pkc.XMLMatch.reset_mock() + metadata.group_in_category.reset_mock() + + category = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.category" + @patch(category, None) + def inner(): + pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey") + self.assertItemsEqual(pkc.get_specificity(metadata), + dict(host=metadata.hostname)) + inner() + + @patch(category, "foo") + def inner2(): + pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey") + self.assertItemsEqual(pkc.get_specificity(metadata), + dict(group=metadata.group_in_category.return_value, + prio=50)) + metadata.group_in_category.assert_called_with("foo") + + reset() + pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey", + perhost="true") + self.assertItemsEqual(pkc.get_specificity(metadata), + dict(host=metadata.hostname)) + + reset() + pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey", + category="bar") + self.assertItemsEqual(pkc.get_specificity(metadata), + dict(group=metadata.group_in_category.return_value, + prio=50)) + metadata.group_in_category.assert_called_with("bar") + + reset() + pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey", + prio="10") + self.assertItemsEqual(pkc.get_specificity(metadata), + dict(group=metadata.group_in_category.return_value, + prio=10)) + metadata.group_in_category.assert_called_with("foo") + + reset() + pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey") + metadata.group_in_category.return_value = '' + self.assertItemsEqual(pkc.get_specificity(metadata), + dict(host=metadata.hostname)) + metadata.group_in_category.assert_called_with("foo") + + inner2() + + @patch("shutil.rmtree") + @patch("%s.open" % builtins) + def test_create_data(self, mock_open, mock_rmtree): + pkc = self.get_obj() + pkc.XMLMatch = Mock() + pkc.get_specificity = MagicMock() + pkc._gen_keypair = Mock() + privkey = os.path.join(datastore, "privkey") + pkc._gen_keypair.return_value = privkey + pkc.pubkey_creator = Mock() + pkc.pubkey_creator.get_filename.return_value = "pubkey.filename" + pkc.write_data = Mock() + + entry = lxml.etree.Element("Path", name="/home/foo/.ssh/id_rsa") + metadata = Mock() + + def open_read_rv(): + mock_open.return_value.read.side_effect = lambda: "privatekey" + return "ssh-rsa publickey foo@bar.com" + + def reset(): + mock_open.reset_mock() + mock_rmtree.reset_mock() + pkc.XMLMatch.reset_mock() + pkc.get_specificity.reset_mock() + pkc._gen_keypair.reset_mock() + pkc.pubkey_creator.reset_mock() + pkc.write_data.reset_mock() + mock_open.return_value.read.side_effect = open_read_rv + + reset() + passphrase = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.passphrase" + + @patch(passphrase, None) + def inner(): + self.assertEqual(pkc.create_data(entry, metadata), "privatekey") + pkc.XMLMatch.assert_called_with(metadata) + pkc.get_specificity.assert_called_with(metadata, + pkc.XMLMatch.return_value) + pkc._gen_keypair.assert_called_with(metadata, + pkc.XMLMatch.return_value) + self.assertItemsEqual(mock_open.call_args_list, + [call(privkey + ".pub"), call(privkey)]) + pkc.pubkey_creator.get_filename.assert_called_with( + **pkc.get_specificity.return_value) + pkc.pubkey_creator.write_data.assert_called_with( + "ssh-rsa publickey pubkey.filename\n", + **pkc.get_specificity.return_value) + pkc.write_data.assert_called_with( + "privatekey", + **pkc.get_specificity.return_value) + mock_rmtree.assert_called_with(datastore) + + reset() + self.assertEqual(pkc.create_data(entry, metadata, return_pair=True), + ("ssh-rsa publickey pubkey.filename\n", + "privatekey")) + pkc.XMLMatch.assert_called_with(metadata) + pkc.get_specificity.assert_called_with(metadata, + pkc.XMLMatch.return_value) + pkc._gen_keypair.assert_called_with(metadata, + pkc.XMLMatch.return_value) + self.assertItemsEqual(mock_open.call_args_list, + [call(privkey + ".pub"), call(privkey)]) + pkc.pubkey_creator.get_filename.assert_called_with( + **pkc.get_specificity.return_value) + pkc.pubkey_creator.write_data.assert_called_with( + "ssh-rsa publickey pubkey.filename\n", + **pkc.get_specificity.return_value) + pkc.write_data.assert_called_with( + "privatekey", + **pkc.get_specificity.return_value) + mock_rmtree.assert_called_with(datastore) + + inner() + + if HAS_CRYPTO: + @patch(passphrase, "foo") + @patch("Bcfg2.Encryption.ssl_encrypt") + @patch("Bcfg2.Encryption.get_algorithm") + def inner2(mock_get_algorithm, mock_ssl_encrypt): + reset() + mock_ssl_encrypt.return_value = "encryptedprivatekey" + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True + self.assertEqual(pkc.create_data(entry, metadata), + "encryptedprivatekey") + pkc.XMLMatch.assert_called_with(metadata) + pkc.get_specificity.assert_called_with( + metadata, + pkc.XMLMatch.return_value) + pkc._gen_keypair.assert_called_with(metadata, + pkc.XMLMatch.return_value) + self.assertItemsEqual(mock_open.call_args_list, + [call(privkey + ".pub"), call(privkey)]) + pkc.pubkey_creator.get_filename.assert_called_with( + **pkc.get_specificity.return_value) + pkc.pubkey_creator.write_data.assert_called_with( + "ssh-rsa publickey pubkey.filename\n", + **pkc.get_specificity.return_value) + pkc.write_data.assert_called_with( + "encryptedprivatekey", + **pkc.get_specificity.return_value) + mock_ssl_encrypt.assert_called_with( + "privatekey", "foo", + algorithm=mock_get_algorithm.return_value) + mock_rmtree.assert_called_with(datastore) + + inner2() + + def test_Index(self): + has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False + TestStructFile.test_Index(self) + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_Index_crypto(self): + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock() + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict" + + pkc = self.get_obj() + pkc._decrypt = Mock() + pkc._decrypt.return_value = 'plaintext' + pkc.data = ''' + + + crypted + + + plain + +''' + + # test successful decryption + pkc.Index() + self.assertItemsEqual( + pkc._decrypt.call_args_list, + [call(el) + for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) + for el in pkc.xdata.xpath("//Crypted"): + self.assertEqual(el.text, pkc._decrypt.return_value) + + # test failed decryption, strict + pkc._decrypt.reset_mock() + pkc._decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, pkc.Index) + + # test failed decryption, lax + Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax" + pkc._decrypt.reset_mock() + pkc.Index() + self.assertItemsEqual( + pkc._decrypt.call_args_list, + [call(el) + for el in pkc.xdata.xpath("//Passphrase[@encrypted]")]) + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_decrypt(self): + + @patch("Bcfg2.Encryption.ssl_decrypt") + @patch("Bcfg2.Encryption.get_algorithm") + @patch("Bcfg2.Encryption.get_passphrases") + @patch("Bcfg2.Encryption.bruteforce_decrypt") + def inner(mock_bruteforce, mock_get_passphrases, mock_get_algorithm, + mock_ssl): + pkc = self.get_obj() + + def reset(): + mock_bruteforce.reset_mock() + mock_get_algorithm.reset_mock() + mock_get_passphrases.reset_mock() + mock_ssl.reset_mock() + + # test element without text contents + self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test"))) + self.assertFalse(mock_bruteforce.called) + self.assertFalse(mock_get_passphrases.called) + self.assertFalse(mock_ssl.called) + + # test element with a passphrase in the config file + reset() + el = lxml.etree.Element("Test", encrypted="foo") + el.text = "crypted" + mock_get_passphrases.return_value = dict(foo="foopass", + bar="barpass") + mock_get_algorithm.return_value = "bf_cbc" + mock_ssl.return_value = "decrypted with ssl" + self.assertEqual(pkc._decrypt(el), mock_ssl.return_value) + mock_get_passphrases.assert_called_with(SETUP) + mock_get_algorithm.assert_called_with(SETUP) + mock_ssl.assert_called_with(el.text, "foopass", + algorithm="bf_cbc") + self.assertFalse(mock_bruteforce.called) + + # test failure to decrypt element with a passphrase in the config + reset() + mock_ssl.side_effect = EVPError + self.assertRaises(EVPError, pkc._decrypt, el) + mock_get_passphrases.assert_called_with(SETUP) + mock_get_algorithm.assert_called_with(SETUP) + mock_ssl.assert_called_with(el.text, "foopass", + algorithm="bf_cbc") + self.assertFalse(mock_bruteforce.called) + + # test element without valid passphrase + reset() + el.set("encrypted", "true") + mock_bruteforce.return_value = "decrypted with bruteforce" + self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value) + mock_get_passphrases.assert_called_with(SETUP) + mock_get_algorithm.assert_called_with(SETUP) + mock_bruteforce.assert_called_with(el.text, + passphrases=["foopass", + "barpass"], + algorithm="bf_cbc") + self.assertFalse(mock_ssl.called) + + # test failure to decrypt element without valid passphrase + reset() + mock_bruteforce.side_effect = EVPError + self.assertRaises(EVPError, pkc._decrypt, el) + mock_get_passphrases.assert_called_with(SETUP) + mock_get_algorithm.assert_called_with(SETUP) + mock_bruteforce.assert_called_with(el.text, + passphrases=["foopass", + "barpass"], + algorithm="bf_cbc") + self.assertFalse(mock_ssl.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py new file mode 100644 index 000000000..2e7b6eef4 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py @@ -0,0 +1,76 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg import CfgCreationError, CfgCreator +from Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator import * +from Bcfg2.Server.Plugin import StructFile, PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestServer.TestPlugins.TestCfg.Test_init import TestCfgCreator +from TestServer.TestPlugin.Testhelpers import TestStructFile + + +class TestCfgPublicKeyCreator(TestCfgCreator, TestStructFile): + test_obj = CfgPublicKeyCreator + should_monitor = False + + def get_obj(self, name=None, fam=None): + return TestCfgCreator.get_obj(self, name=name) + + @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event") + @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent") + def test_handle_event(self, mock_HandleEvent, mock_handle_event): + pkc = self.get_obj() + evt = Mock() + pkc.handle_event(evt) + mock_HandleEvent.assert_called_with(pkc, evt) + mock_handle_event.assert_called_with(pkc, evt) + + def test_create_data(self): + metadata = Mock() + pkc = self.get_obj() + pkc.cfg = Mock() + + privkey_entryset = Mock() + privkey_creator = Mock() + pubkey = Mock() + privkey = Mock() + privkey_creator.create_data.return_value = (pubkey, privkey) + privkey_entryset.best_matching.return_value = privkey_creator + pkc.cfg.entries = {"/home/foo/.ssh/id_rsa": privkey_entryset} + + # public key doesn't end in .pub + entry = lxml.etree.Element("Path", name="/home/bar/.ssh/bogus") + self.assertRaises(CfgCreationError, + pkc.create_data, entry, metadata) + + # private key not in cfg.entries + entry = lxml.etree.Element("Path", name="/home/bar/.ssh/id_rsa.pub") + self.assertRaises(CfgCreationError, + pkc.create_data, entry, metadata) + + # successful operation + entry = lxml.etree.Element("Path", name="/home/foo/.ssh/id_rsa.pub") + self.assertEqual(pkc.create_data(entry, metadata), pubkey) + privkey_entryset.get_handlers.assert_called_with(metadata, CfgCreator) + privkey_entryset.best_matching.assert_called_with(metadata, + privkey_entryset.get_handlers.return_value) + self.assertXMLEqual(privkey_creator.create_data.call_args[0][0], + lxml.etree.Element("Path", + name="/home/foo/.ssh/id_rsa")) + self.assertEqual(privkey_creator.create_data.call_args[0][1], metadata) + + # no privkey.xml + privkey_entryset.best_matching.side_effect = PluginExecutionError + self.assertRaises(CfgCreationError, + pkc.create_data, entry, metadata) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index 6412480f0..55fbb7446 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -82,9 +82,10 @@ class TestCfgBaseFileMatcher(TestSpecificData): self.assertFalse(self.test_obj.handles(evt)) mock_get_regex.assert_called_with( [b for b in self.test_obj.__basenames__]) - self.assertItemsEqual(match.call_args_list, - [call(evt.filename) + print("match calls: %s" % match.call_args_list) + print("expected: %s" % [call(evt.filename) for b in self.test_obj.__basenames__]) + match.assert_called_with(evt.filename) mock_get_regex.reset_mock() match.reset_mock() @@ -186,47 +187,61 @@ class TestCfgCreator(TestCfgBaseFileMatcher): test_obj = CfgCreator path = "/foo/bar/test.txt" + def get_obj(self, name=None): + if name is None: + name = self.path + return self.test_obj(name) + def test_create_data(self): cc = self.get_obj() self.assertRaises(NotImplementedError, cc.create_data, Mock(), Mock()) + def test_get_filename(self): + cc = self.get_obj() + + # tuples of (args to get_filename(), expected result) + cases = [(dict(), "/foo/bar/bar"), + (dict(prio=50), "/foo/bar/bar"), + (dict(ext=".crypt"), "/foo/bar/bar.crypt"), + (dict(ext="bar"), "/foo/bar/barbar"), + (dict(host="foo.bar.example.com"), + "/foo/bar/bar.H_foo.bar.example.com"), + (dict(host="foo.bar.example.com", prio=50, ext=".crypt"), + "/foo/bar/bar.H_foo.bar.example.com.crypt"), + (dict(group="group", prio=1), "/foo/bar/bar.G01_group"), + (dict(group="group", prio=50), "/foo/bar/bar.G50_group"), + (dict(group="group", prio=50, ext=".crypt"), + "/foo/bar/bar.G50_group.crypt")] + + for args, expected in cases: + self.assertEqual(cc.get_filename(**args), expected) + @patch("os.makedirs") @patch("%s.open" % builtins) def test_write_data(self, mock_open, mock_makedirs): cc = self.get_obj() data = "test\ntest" + parent = os.path.dirname(self.path) def reset(): mock_open.reset_mock() mock_makedirs.reset_mock() - # test writing non-specific file - cc.write_data(data) - mock_makedirs.assert_called_with("/foo/bar") - mock_open.assert_called_with("/foo/bar/bar", "wb") - mock_open.return_value.write.assert_called_with(data) - - # test writing group-specific file - reset() - cc.write_data(data, group="foogroup", prio=9) - mock_makedirs.assert_called_with("/foo/bar") - mock_open.assert_called_with("/foo/bar/bar.G09_foogroup", "wb") - mock_open.return_value.write.assert_called_with(data) - - # test writing host-specific file + # test writing file reset() - cc.write_data(data, host="foo.example.com") - mock_makedirs.assert_called_with("/foo/bar") - mock_open.assert_called_with("/foo/bar/bar.H_foo.example.com", "wb") + spec = dict(group="foogroup", prio=9) + cc.write_data(data, **spec) + mock_makedirs.assert_called_with(parent) + mock_open.assert_called_with(cc.get_filename(**spec), "wb") mock_open.return_value.write.assert_called_with(data) # test already-exists error from makedirs reset() mock_makedirs.side_effect = OSError(errno.EEXIST, self.path) cc.write_data(data) - mock_makedirs.assert_called_with("/foo/bar") - mock_open.assert_called_with("/foo/bar/bar", "wb") + mock_makedirs.assert_called_with(parent) + mock_open.assert_called_with(cc.get_filename(), "wb") mock_open.return_value.write.assert_called_with(data) # test error from open @@ -391,6 +406,10 @@ class TestCfgEntrySet(TestEntrySet): evt = Mock() evt.filename = "test.txt" handler = Mock() + handler.__basenames__ = [] + handler.__extensions__ = [] + handler.deprecated = False + handler.experimental = False handler.__specific__ = True # test handling an event with the parent entry_init -- cgit v1.2.3-1-g7c22