import os import sys import lxml.etree from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugins.Properties import * from Bcfg2.Server.Plugin import PluginExecutionError # add all parent testsuite directories to sys.path to allow (most) # relative imports in python 2.4 path = os.path.dirname(__file__) while path != "/": if os.path.basename(path).lower().startswith("test"): sys.path.append(path) if os.path.basename(path) == "testsuite": break path = os.path.dirname(path) from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ patchIf, datastore from TestPlugin import TestStructFile, TestConnector, TestPlugin, \ TestDirectoryBacked class TestPropertyFile(TestStructFile): test_obj = PropertyFile @patch("%s.open" % builtins) def test_write(self, mock_open): Bcfg2.Server.Plugins.Properties.SETUP = Mock() pf = self.get_obj() pf.validate_data = Mock() xstr = u("\n") pf.xdata = lxml.etree.XML(xstr) def reset(): pf.validate_data.reset_mock() Bcfg2.Server.Plugins.Properties.SETUP.reset_mock() mock_open.reset_mock() # test writes disabled Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False self.assertRaises(PluginExecutionError, pf.write) self.assertFalse(pf.validate_data.called) self.assertFalse(mock_open.called) Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "writes_enabled", default=True) # test successful write reset() Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True self.assertTrue(pf.write()) pf.validate_data.assert_called_with() mock_open.assert_called_with(pf.name, "wb") mock_open.return_value.write.assert_called_with(xstr) # test error from write reset() mock_open.side_effect = IOError self.assertRaises(PluginExecutionError, pf.write) pf.validate_data.assert_called_with() mock_open.assert_called_with(pf.name, "wb") # test error from validate_data reset() pf.validate_data.side_effect = PluginExecutionError self.assertRaises(PluginExecutionError, pf.write) pf.validate_data.assert_called_with() @patch("os.path.exists") @patch("lxml.etree.XMLSchema") def test_validate_data(self, mock_XMLSchema, mock_exists): pf = self.get_obj() pf.name = os.path.join(datastore, "Properties", "test.xml") schemafile = os.path.join(datastore, "Properties", "test.xsd") def reset(): mock_XMLSchema.reset_mock() mock_exists.reset_mock() # test no schema file mock_exists.return_value = False self.assertTrue(pf.validate_data()) mock_exists.assert_called_with(schemafile) # test schema file exists, valid data reset() mock_exists.return_value = True mock_XMLSchema.return_value = Mock() mock_XMLSchema.return_value.validate.return_value = True self.assertTrue(pf.validate_data()) mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) mock_XMLSchema.return_value.validate.assert_called_with(pf.xdata) # test schema file exists, invalid data reset() mock_XMLSchema.return_value = Mock() mock_XMLSchema.return_value.validate.return_value = False self.assertRaises(PluginExecutionError, pf.validate_data) mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) mock_XMLSchema.return_value.validate.assert_called_with(pf.xdata) # test invalid schema file reset() mock_XMLSchema.side_effect = lxml.etree.XMLSchemaParseError(pf.xdata) self.assertRaises(PluginExecutionError, pf.validate_data) mock_exists.assert_called_with(schemafile) mock_XMLSchema.assert_called_with(file=schemafile) def test_Index(self): TestStructFile.test_Index(self) pf = self.get_obj() pf.xdata = lxml.etree.Element("Properties", encryption="true") pf.data = lxml.etree.tostring(pf.xdata) # extra test: crypto is not available, but properties file is # encrypted has_crypto = Bcfg2.Server.Plugins.Properties.HAS_CRYPTO Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = False self.assertRaises(PluginExecutionError, pf.Index) Bcfg2.Server.Plugins.Properties.HAS_CRYPTO = has_crypto @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") def test_Index_crypto(self): pf = self.get_obj() pf._decrypt = Mock() pf._decrypt.return_value = 'plaintext' pf.data = ''' crypted plain crypted plain crypted ''' # test successful decryption pf.Index() self.assertItemsEqual(pf._decrypt.call_args_list, [call(el) for el in pf.xdata.xpath("//Crypted")]) for el in pf.xdata.xpath("//Crypted"): self.assertEqual(el.text, pf._decrypt.return_value) # test failed decryption pf._decrypt.reset_mock() pf._decrypt.side_effect = EVPError self.assertRaises(PluginExecutionError, pf.Index) @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping") def test_decrypt(self): @patch("Bcfg2.Encryption.ssl_decrypt") @patch("Bcfg2.Encryption.get_algorithm") @patch("Bcfg2.Encryption.get_passphrases") @patch("Bcfg2.Encryption.bruteforce_decrypt") def inner(mock_bruteforce, mock_get_passphrases, mock_get_algorithm, mock_ssl): pf = self.get_obj() def reset(): mock_bruteforce.reset_mock() mock_get_algorithm.reset_mock() mock_get_passphrases.reset_mock() mock_ssl.reset_mock() # test element without text contents self.assertIsNone(pf._decrypt(lxml.etree.Element("Test"))) self.assertFalse(mock_bruteforce.called) self.assertFalse(mock_get_passphrases.called) self.assertFalse(mock_ssl.called) # test element with a passphrase in the config file reset() el = lxml.etree.Element("Test", encrypted="foo") el.text = "crypted" mock_get_passphrases.return_value = dict(foo="foopass", bar="barpass") mock_get_algorithm.return_value = "bf_cbc" mock_ssl.return_value = "decrypted with ssl" self.assertEqual(pf._decrypt(el), mock_ssl.return_value) mock_get_passphrases.assert_called_with(SETUP) mock_get_algorithm.assert_called_with(SETUP) mock_ssl.assert_called_with(el.text, "foopass", algorithm="bf_cbc") self.assertFalse(mock_bruteforce.called) # test failure to decrypt element with a passphrase in the config reset() mock_ssl.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) mock_get_passphrases.assert_called_with(SETUP) mock_get_algorithm.assert_called_with(SETUP) mock_ssl.assert_called_with(el.text, "foopass", algorithm="bf_cbc") self.assertFalse(mock_bruteforce.called) # test element without valid passphrase reset() el.set("encrypted", "true") mock_bruteforce.return_value = "decrypted with bruteforce" self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value) mock_get_passphrases.assert_called_with(SETUP) mock_get_algorithm.assert_called_with(SETUP) mock_bruteforce.assert_called_with(el.text, passphrases=["foopass", "barpass"], algorithm="bf_cbc") self.assertFalse(mock_ssl.called) # test failure to decrypt element without valid passphrase reset() mock_bruteforce.side_effect = EVPError self.assertRaises(EVPError, pf._decrypt, el) mock_get_passphrases.assert_called_with(SETUP) mock_get_algorithm.assert_called_with(SETUP) mock_bruteforce.assert_called_with(el.text, passphrases=["foopass", "barpass"], algorithm="bf_cbc") self.assertFalse(mock_ssl.called) class TestPropDirectoryBacked(TestDirectoryBacked): test_obj = PropDirectoryBacked testfiles = ['foo.xml', 'bar.baz.xml'] ignore = ['foo.xsd', 'bar.baz.xsd', 'quux.xml.xsd'] badevents = ['bogus.txt'] class TestProperties(TestPlugin, TestConnector): test_obj = Properties def test__init(self): TestPlugin.test__init(self) core = Mock() p = self.get_obj(core=core) self.assertIsInstance(p.store, PropDirectoryBacked) self.assertEqual(Bcfg2.Server.Plugins.Properties.SETUP, core.setup) @patch("copy.copy") def test_get_additional_data(self, mock_copy): TestConnector.test_get_additional_data(self) p = self.get_obj() automatch = Mock() automatch.xdata = lxml.etree.Element("Properties", automatch="true") automatch.XMLMatch.return_value = "automatch" raw = Mock() raw.xdata = lxml.etree.Element("Properties") raw.XMLMatch.return_value = "raw" nevermatch = Mock() nevermatch.xdata = lxml.etree.Element("Properties", automatch="false") nevermatch.XMLMatch.return_value = "nevermatch" p.store.entries = { "/foo/automatch.xml": automatch, "/foo/raw.xml": raw, "/foo/nevermatch.xml": nevermatch, } # we make copy just return the object it was asked to copy so # that we can test the return value of get_additional_data(), # which copies every object it doesn't XMLMatch() mock_copy.side_effect = lambda o: o # test with automatch default to false p.core.setup.cfp.getboolean.return_value = False metadata = Mock() self.assertItemsEqual(p.get_additional_data(metadata), { "/foo/automatch.xml": automatch.XMLMatch.return_value, "/foo/raw.xml": raw, "/foo/nevermatch.xml": nevermatch}) automatch.XMLMatch.assert_called_with(metadata) self.assertFalse(raw.XMLMatch.called) self.assertFalse(nevermatch.XMLMatch.called) # test with automatch default to true p.core.setup.cfp.getboolean.return_value = True self.assertItemsEqual(p.get_additional_data(metadata), { "/foo/automatch.xml": automatch.XMLMatch.return_value, "/foo/raw.xml": raw.XMLMatch.return_value, "/foo/nevermatch.xml": nevermatch}) automatch.XMLMatch.assert_called_with(metadata) raw.XMLMatch.assert_called_with(metadata) self.assertFalse(nevermatch.XMLMatch.called)