From 1bdb14055dd1b2395047793ee28c17bbae65c845 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 27 Sep 2012 14:33:34 -0400 Subject: wrote unit tests for Properties --- src/lib/Bcfg2/Server/Plugins/Properties.py | 7 +- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 6 +- .../TestServer/TestPlugin/Testinterfaces.py | 1 + .../TestServer/TestPlugins/TestProperties.py | 294 +++++++++++++++++++++ .../TestServer/TestPlugins/TestSEModules.py | 1 + testsuite/common.py | 8 +- 6 files changed, 305 insertions(+), 12 deletions(-) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py diff --git a/src/lib/Bcfg2/Server/Plugins/Properties.py b/src/lib/Bcfg2/Server/Plugins/Properties.py index 1d5bdbcfc..d9e622645 100644 --- a/src/lib/Bcfg2/Server/Plugins/Properties.py +++ b/src/lib/Bcfg2/Server/Plugins/Properties.py @@ -41,8 +41,7 @@ class PropertyFile(Bcfg2.Server.Plugin.StructFile): raise PluginExecutionError(msg) try: - open(self.name, - "wb").write( + open(self.name, "wb").write( lxml.etree.tostring(self.xdata, xml_declaration=False, pretty_print=True).decode('UTF-8')) @@ -60,7 +59,7 @@ class PropertyFile(Bcfg2.Server.Plugin.StructFile): if os.path.exists(schemafile): try: schema = lxml.etree.XMLSchema(file=schemafile) - except: + except lxml.etree.XMLSchemaParseError: err = sys.exc_info()[1] raise PluginExecutionError("Failed to process schema for %s: " "%s" % (self.name, err)) @@ -105,6 +104,8 @@ class PropertyFile(Bcfg2.Server.Plugin.StructFile): # error is raised below pass except KeyError: + # bruteforce_decrypt raises an EVPError with a sensible + # error message, so we just let it propagate up the stack return bruteforce_decrypt(element.text, passphrases=passes.values(), algorithm=get_algorithm(SETUP)) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index f555d15f1..db8ee0cca 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -155,9 +155,9 @@ class TestDirectoryBacked(Bcfg2TestCase): 6: 'xyzzy/', 7: 'xyzzy/plugh/'} testfiles = ['foo', 'bar/baz.txt', 'plugh.py'] - ignore = [] # ignore no events - badevents = [] # DirectoryBacked handles all files, so there's no - # such thing as a bad event + ignore = [] # ignore no events + badevents = [] # DirectoryBacked handles all files, so there's no + # such thing as a bad event def test_child_interface(self): # ensure that the child object has the correct interface diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py index f05795b1a..6ef40d385 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py @@ -18,6 +18,7 @@ from common import call, builtins, skip, skipIf, skipUnless, Bcfg2TestCase, \ patchIf, datastore from TestServer.TestPlugin.Testbase import TestPlugin + class TestGenerator(Bcfg2TestCase): test_obj = Generator diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py new file mode 100644 index 000000000..883e88ba1 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py @@ -0,0 +1,294 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Properties import * +from Bcfg2.Server.Plugin import PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore +from TestPlugin import TestStructFile, TestConnector, TestPlugin, \ + TestDirectoryBacked + + +class TestPropertyFile(TestStructFile): + test_obj = PropertyFile + + @patch("%s.open" % builtins) + def test_write(self, mock_open): + Bcfg2.Server.Plugins.Properties.SETUP = Mock() + pf = self.get_obj() + pf.validate_data = Mock() + + xstr = u("\n") + pf.xdata = lxml.etree.XML(xstr) + + def reset(): + pf.validate_data.reset_mock() + Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() + mock_open.reset_mock() + + # test writes disabled + Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False + self.assertRaises(PluginExecutionError, pf.write) + self.assertFalse(pf.validate_data.called) + self.assertFalse(mock_open.called) + Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", + "writes_enabled", + default=True) + + # test successful write + reset() + Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True + self.assertTrue(pf.write()) + pf.validate_data.assert_called_with() + mock_open.assert_called_with(pf.name, "wb") + mock_open.return_value.write.assert_called_with(xstr) + + # test error from write + reset() + mock_open.side_effect = IOError + self.assertRaises(PluginExecutionError, pf.write) + pf.validate_data.assert_called_with() + mock_open.assert_called_with(pf.name, "wb") + + # test error from validate_data + reset() + pf.validate_data.side_effect = PluginExecutionError + self.assertRaises(PluginExecutionError, pf.write) + pf.validate_data.assert_called_with() + + @patch("os.path.exists") + @patch("lxml.etree.XMLSchema") + def test_validate_data(self, mock_XMLSchema, mock_exists): + pf = self.get_obj() + pf.name = os.path.join(datastore, "Properties", "test.xml") + schemafile = os.path.join(datastore, "Properties", "test.xsd") + + def reset(): + mock_XMLSchema.reset_mock() + mock_exists.reset_mock() + + # test no schema file + mock_exists.return_value = False + self.assertTrue(pf.validate_data()) + mock_exists.assert_called_with(schemafile) + + # test schema file exists, valid data + reset() + mock_exists.return_value = True + mock_XMLSchema.return_value = Mock() + mock_XMLSchema.return_value.validate.return_value = True + self.assertTrue(pf.validate_data()) + mock_exists.assert_called_with(schemafile) + mock_XMLSchema.assert_called_with(file=schemafile) + mock_XMLSchema.return_value.validate.assert_called_with(pf.xdata) + + # test schema file exists, invalid data + reset() + mock_XMLSchema.return_value = Mock() + mock_XMLSchema.return_value.validate.return_value = False + self.assertRaises(PluginExecutionError, pf.validate_data) + mock_exists.assert_called_with(schemafile) + mock_XMLSchema.assert_called_with(file=schemafile) + mock_XMLSchema.return_value.validate.assert_called_with(pf.xdata) + + # test invalid schema file + reset() + mock_XMLSchema.side_effect = lxml.etree.XMLSchemaParseError(pf.xdata) + self.assertRaises(PluginExecutionError, pf.validate_data) + mock_exists.assert_called_with(schemafile) + mock_XMLSchema.assert_called_with(file=schemafile) + + def test_Index(self): + TestStructFile.test_Index(self) + + pf = self.get_obj() + pf.xdata = lxml.etree.Element("Properties", encryption="true") + pf.data = lxml.etree.tostring(pf.xdata) + # extra test: crypto is not available, but properties file is + # encrypted + has_crypto = Bcfg2.Server.Plugins.Properties.HAS_CRYPTO + Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = False + self.assertRaises(PluginExecutionError, pf.Index) + Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = has_crypto + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_Index_crypto(self): + pf = self.get_obj() + pf._decrypt = Mock() + pf._decrypt.return_value = 'plaintext' + pf.data = ''' + + + crypted + plain + + crypted + plain + + crypted + +''' + + # test successful decryption + pf.Index() + self.assertItemsEqual(pf._decrypt.call_args_list, + [call(el) for el in pf.xdata.xpath("//Crypted")]) + for el in pf.xdata.xpath("//Crypted"): + self.assertEqual(el.text, pf._decrypt.return_value) + + # test failed decryption + pf._decrypt.reset_mock() + pf._decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, pf.Index) + + @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") + def test_decrypt(self): + + @patch("Bcfg2.Encryption.ssl_decrypt") + @patch("Bcfg2.Encryption.get_algorithm") + @patch("Bcfg2.Encryption.get_passphrases") + @patch("Bcfg2.Encryption.bruteforce_decrypt") + def inner(mock_bruteforce, mock_get_passphrases, mock_get_algorithm, + mock_ssl): + pf = self.get_obj() + + def reset(): + mock_bruteforce.reset_mock() + mock_get_algorithm.reset_mock() + mock_get_passphrases.reset_mock() + mock_ssl.reset_mock() + + # test element without text contents + self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) + self.assertFalse(mock_bruteforce.called) + self.assertFalse(mock_get_passphrases.called) + self.assertFalse(mock_ssl.called) + + # test element with a passphrase in the config file + reset() + el = lxml.etree.Element("Test", encrypted="foo") + el.text = "crypted" + mock_get_passphrases.return_value = dict(foo="foopass", + bar="barpass") + mock_get_algorithm.return_value = "bf_cbc" + mock_ssl.return_value = "decrypted with ssl" + self.assertEqual(pf._decrypt(el), mock_ssl.return_value) + mock_get_passphrases.assert_called_with(SETUP) + mock_get_algorithm.assert_called_with(SETUP) + mock_ssl.assert_called_with(el.text, "foopass", + algorithm="bf_cbc") + self.assertFalse(mock_bruteforce.called) + + # test failure to decrypt element with a passphrase in the config + reset() + mock_ssl.side_effect = EVPError + self.assertRaises(EVPError, pf._decrypt, el) + mock_get_passphrases.assert_called_with(SETUP) + mock_get_algorithm.assert_called_with(SETUP) + mock_ssl.assert_called_with(el.text, "foopass", + algorithm="bf_cbc") + self.assertFalse(mock_bruteforce.called) + + # test element without valid passphrase + reset() + el.set("encrypted", "true") + mock_bruteforce.return_value = "decrypted with bruteforce" + self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) + mock_get_passphrases.assert_called_with(SETUP) + mock_get_algorithm.assert_called_with(SETUP) + mock_bruteforce.assert_called_with(el.text, + passphrases=["foopass", + "barpass"], + algorithm="bf_cbc") + self.assertFalse(mock_ssl.called) + + # test failure to decrypt element without valid passphrase + reset() + mock_bruteforce.side_effect = EVPError + self.assertRaises(EVPError, pf._decrypt, el) + mock_get_passphrases.assert_called_with(SETUP) + mock_get_algorithm.assert_called_with(SETUP) + mock_bruteforce.assert_called_with(el.text, + passphrases=["foopass", + "barpass"], + algorithm="bf_cbc") + self.assertFalse(mock_ssl.called) + + +class TestPropDirectoryBacked(TestDirectoryBacked): + test_obj = PropDirectoryBacked + testfiles = ['foo.xml', 'bar.baz.xml'] + ignore = ['foo.xsd', 'bar.baz.xsd', 'quux.xml.xsd'] + badevents = ['bogus.txt'] + + +class TestProperties(TestPlugin, TestConnector): + test_obj = Properties + + def test__init(self): + TestPlugin.test__init(self) + + core = Mock() + p = self.get_obj(core=core) + self.assertIsInstance(p.store, PropDirectoryBacked) + self.assertEqual(Bcfg2.Server.Plugins.Properties.SETUP, core.setup) + + @patch("copy.copy") + def test_get_additional_data(self, mock_copy): + TestConnector.test_get_additional_data(self) + + p = self.get_obj() + automatch = Mock() + automatch.xdata = lxml.etree.Element("Properties", automatch="true") + automatch.XMLMatch.return_value = "automatch" + raw = Mock() + raw.xdata = lxml.etree.Element("Properties") + raw.XMLMatch.return_value = "raw" + nevermatch = Mock() + nevermatch.xdata = lxml.etree.Element("Properties", automatch="false") + nevermatch.XMLMatch.return_value = "nevermatch" + p.store.entries = { + "/foo/automatch.xml": automatch, + "/foo/raw.xml": raw, + "/foo/nevermatch.xml": nevermatch, + } + + # we make copy just return the object it was asked to copy so + # that we can test the return value of get_additional_data(), + # which copies every object it doesn't XMLMatch() + mock_copy.side_effect = lambda o: o + + # test with automatch default to false + p.core.setup.cfp.getboolean.return_value = False + metadata = Mock() + self.assertItemsEqual(p.get_additional_data(metadata), + { + "/foo/automatch.xml": automatch.XMLMatch.return_value, + "/foo/raw.xml": raw, + "/foo/nevermatch.xml": nevermatch}) + automatch.XMLMatch.assert_called_with(metadata) + self.assertFalse(raw.XMLMatch.called) + self.assertFalse(nevermatch.XMLMatch.called) + + # test with automatch default to true + p.core.setup.cfp.getboolean.return_value = True + self.assertItemsEqual(p.get_additional_data(metadata), + { + "/foo/automatch.xml": automatch.XMLMatch.return_value, + "/foo/raw.xml": raw.XMLMatch.return_value, + "/foo/nevermatch.xml": nevermatch}) + automatch.XMLMatch.assert_called_with(metadata) + raw.XMLMatch.assert_called_with(metadata) + self.assertFalse(nevermatch.XMLMatch.called) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py index b13a6b3fb..e18e2bfd6 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py @@ -19,6 +19,7 @@ from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ patchIf, datastore from TestPlugin import TestSpecificData, TestGroupSpool + class TestSEModuleData(TestSpecificData): test_obj = SEModuleData path = os.path.join(datastore, "SEModules", "test.pp", "test.pp") diff --git a/testsuite/common.py b/testsuite/common.py index fc79e73e7..3aeeaf5f2 100644 --- a/testsuite/common.py +++ b/testsuite/common.py @@ -3,12 +3,7 @@ import re import sys import unittest from mock import patch, MagicMock, _patch, DEFAULT - -try: - from functools import wraps -except ImportError: - def wraps(wrapped): - return lambda f: f +from Bcfg2.Compat import wraps datastore = "/" @@ -33,6 +28,7 @@ try: except ImportError: has_django = False + try: from mock import call except ImportError: -- cgit v1.2.3-1-g7c22