From adf037aa31031be164e68b1a4817a7cada936c90 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 2 Oct 2012 15:00:03 -0400 Subject: testsuite: added unit tests for Cfg handlers --- src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py | 1 + .../Server/Plugins/Cfg/CfgEncryptedGenerator.py | 12 +- .../Plugins/Cfg/CfgExternalCommandVerifier.py | 24 ++-- .../Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py | 6 +- src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py | 6 +- .../Server/Plugins/Cfg/CfgPlaintextGenerator.py | 1 + src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py | 1 - .../Testlib/TestServer/TestPlugin/Testhelpers.py | 10 +- .../TestPlugins/TestCfg/TestCfgCheetahGenerator.py | 41 +++++++ .../TestCfg/TestCfgEncryptedCheetahGenerator.py | 50 ++++++++ .../TestCfg/TestCfgEncryptedGenerator.py | 76 ++++++++++++ .../TestCfg/TestCfgEncryptedGenshiGenerator.py | 35 ++++++ .../TestCfg/TestCfgExternalCommandVerifier.py | 89 ++++++++++++++ .../TestPlugins/TestCfg/TestCfgGenshiGenerator.py | 133 +++++++++++++++++++++ .../TestPlugins/TestCfg/TestCfgInfoXML.py | 75 ++++++++++++ .../TestCfg/TestCfgPlaintextGenerator.py | 18 +++ .../TestServer/TestPlugins/TestCfg/Test_init.py | 76 ++++++------ .../TestServer/TestPlugins/TestCfg/__init__.py | 0 18 files changed, 591 insertions(+), 63 deletions(-) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPlaintextGenerator.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/__init__.py diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py index a2e86b3db..49a5a85b3 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py @@ -3,6 +3,7 @@ plaintext files """ from Bcfg2.Server.Plugins.Cfg import CfgFilter + class CfgCatFilter(CfgFilter): """ CfgCatFilter appends lines to and remove lines from plaintext :ref:`server-plugins-generators-Cfg` files""" diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py index f8d08b394..dc4bab9f6 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py @@ -30,18 +30,12 @@ class CfgEncryptedGenerator(CfgGenerator): __init__.__doc__ = CfgGenerator.__init__.__doc__ def handle_event(self, event): - if event.code2str() == 'deleted': - return - try: - crypted = open(self.name).read() - except UnicodeDecodeError: - crypted = open(self.name, mode='rb').read() - except: - LOGGER.error("Failed to read %s" % self.name) + CfgGenerator.handle_event(self, event) + if self.data is None: return # todo: let the user specify a passphrase by name try: - self.data = bruteforce_decrypt(crypted, setup=SETUP, + self.data = bruteforce_decrypt(self.data, setup=SETUP, algorithm=get_algorithm(SETUP)) except EVPError: msg = "Failed to decrypt %s" % self.name diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgExternalCommandVerifier.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgExternalCommandVerifier.py index fb66ca8bf..023af7d4e 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgExternalCommandVerifier.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgExternalCommandVerifier.py @@ -1,6 +1,7 @@ """ Invoke an external command to verify file contents """ import os +import sys import shlex import logging import Bcfg2.Server.Plugin @@ -23,23 +24,30 @@ class CfgExternalCommandVerifier(CfgVerifier): __init__.__doc__ = CfgVerifier.__init__.__doc__ def verify_entry(self, entry, metadata, data): - proc = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) - err = proc.communicate(input=data)[1] - rv = proc.wait() - if rv != 0: - raise CfgVerificationError(err) + try: + proc = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + err = proc.communicate(input=data)[1] + rv = proc.wait() + if rv != 0: + raise CfgVerificationError(err) + except: + err = sys.exc_info()[1] + raise CfgVerificationError("Error running external command " + "verifier: %s" % err) verify_entry.__doc__ = CfgVerifier.verify_entry.__doc__ def handle_event(self, event): - if event.code2str() == 'deleted': + CfgVerifier.handle_event(self, event) + if not self.data: return self.cmd = [] if not os.access(self.name, os.X_OK): - bangpath = open(self.name).readline().strip() + bangpath = self.data.splitlines()[0].strip() if bangpath.startswith("#!"): self.cmd.extend(shlex.split(bangpath[2:].strip())) else: - msg = "Cannot execute %s" % self.name + msg = "%s: Cannot execute %s" % (self.__class__.__name__, + self.name) LOGGER.error(msg) raise Bcfg2.Server.Plugin.PluginExecutionError(msg) self.cmd.append(self.name) diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py index 21662a984..2f59d74f2 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py @@ -142,13 +142,13 @@ class CfgGenshiGenerator(CfgGenerator): raise def handle_event(self, event): - if event.code2str() == 'deleted': - return CfgGenerator.handle_event(self, event) + if self.data is None: + return try: self.template = self.loader.load(self.name, cls=NewTextTemplate, encoding=self.encoding) - except Exception: + except: msg = "Cfg: Could not load template %s: %s" % (self.name, sys.exc_info()[1]) LOGGER.error(msg) diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py index 2396d6eb6..e5ba0a51b 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py @@ -23,9 +23,9 @@ class CfgInfoXML(CfgInfo): mdata = dict() self.infoxml.pnode.Match(metadata, mdata, entry=entry) if 'Info' not in mdata: - LOGGER.error("Failed to set metadata for file %s" % - entry.get('name')) - raise Bcfg2.Server.Plugin.PluginExecutionError + msg = "Failed to set metadata for file %s" % entry.get('name') + LOGGER.error(msg) + raise Bcfg2.Server.Plugin.PluginExecutionError(msg) self._set_info(entry, mdata['Info'][None]) bind_info_to_entry.__doc__ = CfgInfo.bind_info_to_entry.__doc__ diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPlaintextGenerator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPlaintextGenerator.py index 333e2f670..92fb06ee9 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPlaintextGenerator.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPlaintextGenerator.py @@ -4,6 +4,7 @@ from Bcfg2.Server.Plugins.Cfg import CfgGenerator + class CfgPlaintextGenerator(CfgGenerator): """ CfgPlaintextGenerator is a :class:`Bcfg2.Server.Plugins.Cfg.CfgGenerator` that handles plain diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py index c4958ceef..0ab7fd98e 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py @@ -673,7 +673,6 @@ class Cfg(Bcfg2.Server.Plugin.GroupSpool, Bcfg2.Server.Plugin.PullTarget.__init__(self) SETUP = core.setup - print "SETUP=%s" % SETUP if 'validate' not in SETUP: SETUP.add_option('validate', Bcfg2.Options.CFG_VALIDATION) SETUP.reparse() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index f03ac2fd7..27d726fbb 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -49,7 +49,7 @@ class TestFunctions(Bcfg2TestCase): bind_info, entry, metadata, infoxml=infoxml) infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry) - + # test with valid infoxml entry = lxml.etree.Element("Path", name="/test") infoxml.reset_mock() @@ -1343,7 +1343,7 @@ class TestSpecificData(Bcfg2TestCase): if specific is None: specific = Mock() return self.test_obj(name, specific, encoding) - + def test__init(self): pass @@ -1358,7 +1358,7 @@ class TestSpecificData(Bcfg2TestCase): self.assertIsNone(sd.data) else: self.assertFalse(hasattr(sd, 'data')) - + event = Mock() mock_open.return_value.read.return_value = "test" sd.handle_event(event) @@ -1761,14 +1761,14 @@ class TestEntrySet(TestDebuggable): class TestGroupSpool(TestPlugin, TestGenerator): test_obj = GroupSpool - + def get_obj(self, core=None): @patch("%s.%s.AddDirectoryMonitor" % (self.test_obj.__module__, self.test_obj.__name__), Mock()) def inner(): return TestPlugin.get_obj(self, core=core) - + return inner() def test__init(self): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py new file mode 100644 index 000000000..2577b29df --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py @@ -0,0 +1,41 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore, re_type +from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator + + +if HAS_CHEETAH or can_skip: + class TestCfgCheetahGenerator(TestCfgGenerator): + test_obj = CfgCheetahGenerator + + @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") + def setUp(self): + pass + + @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template") + def test_get_data(self, mock_Template): + ccg = self.get_obj(encoding='UTF-8') + ccg.data = "data" + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + self.assertEqual(ccg.get_data(entry, metadata), + mock_Template.return_value.respond.return_value) + mock_Template.assert_called_with("data".decode(ccg.encoding), + compilerSettings=ccg.settings) + mock_Template.return_value.respond.assert_called_with() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py new file mode 100644 index 000000000..0522b9206 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py @@ -0,0 +1,50 @@ +import os +import sys +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg.CfgEncryptedCheetahGenerator import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore, re_type + +try: + from TestServer.TestPlugins.TestCfg.TestCfgCheetahGenerator import \ + TestCfgCheetahGenerator + HAS_CHEETAH = True +except ImportError: + TestCfgCheetahGenerator = object + HAS_CHEETAH = False + +try: + from TestServer.TestPlugins.TestCfg.TestCfgEncryptedGenerator import \ + TestCfgEncryptedGenerator + HAS_CRYPTO = True +except ImportError: + TestCfgEncryptedGenerator = object + HAS_CRYPTO = False + + +if can_skip or (HAS_CRYPTO and HAS_CHEETAH): + class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator, + TestCfgEncryptedGenerator): + test_obj = CfgEncryptedCheetahGenerator + + @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") + @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping") + def setUp(self): + pass + + def test_handle_event(self): + TestCfgEncryptedGenerator.test_handle_event(self) + + def test_get_data(self): + TestCfgCheetahGenerator.test_get_data(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py new file mode 100644 index 000000000..d8819b11c --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py @@ -0,0 +1,76 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator import * +from Bcfg2.Server.Plugin import PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore, re_type +from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator + + +if can_skip or HAS_CRYPTO: + class TestCfgEncryptedGenerator(TestCfgGenerator): + test_obj = CfgEncryptedGenerator + + @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") + def setUp(self): + pass + + @patch("Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm") + @patch("Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt") + def test_handle_event(self, mock_decrypt, mock_get_algorithm): + @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") + def inner(mock_handle_event): + def reset(): + mock_decrypt.reset_mock() + mock_get_algorithm.reset_mock() + mock_handle_event.reset_mock() + + def get_event_data(obj, event): + obj.data = "encrypted" + + mock_handle_event.side_effect = get_event_data + mock_decrypt.side_effect = lambda d, **kw: "plaintext" + event = Mock() + ceg = self.get_obj() + ceg.handle_event(event) + mock_handle_event.assert_called_with(ceg, event) + mock_decrypt.assert_called_with("encrypted", + setup=SETUP, + algorithm=mock_get_algorithm.return_value) + self.assertEqual(ceg.data, "plaintext") + + reset() + mock_decrypt.side_effect = EVPError + self.assertRaises(PluginExecutionError, + ceg.handle_event, event) + inner() + + # to perform the tests from the parent test object, we + # make bruteforce_decrypt just return whatever data was + # given to it + mock_decrypt.side_effect = lambda d, **kw: d + TestCfgGenerator.test_handle_event(self) + + def test_get_data(self): + ceg = self.get_obj() + ceg.data = None + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + self.assertRaises(PluginExecutionError, + ceg.get_data, entry, metadata) + + TestCfgGenerator.test_get_data(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py new file mode 100644 index 000000000..0375b8928 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py @@ -0,0 +1,35 @@ +import os +import sys +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenshiGenerator import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore, re_type + +try: + from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \ + TestCfgGenshiGenerator + HAS_GENSHI = True +except ImportError: + TestCfgGenshiGenerator = object + HAS_GENSHI = False + + +if can_skip or (HAS_CRYPTO and HAS_GENSHI): + class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator): + test_obj = CfgEncryptedGenshiGenerator + + @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping") + @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") + def setUp(self): + pass diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py new file mode 100644 index 000000000..fde7ed722 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py @@ -0,0 +1,89 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier import * +from Bcfg2.Server.Plugin import PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore, re_type +from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier + + +class TestCfgExternalCommandVerifier(TestCfgVerifier): + test_obj = CfgExternalCommandVerifier + + @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen") + def test_verify_entry(self, mock_Popen): + proc = Mock() + mock_Popen.return_value = proc + proc.wait.return_value = 0 + proc.communicate.return_value = MagicMock() + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + ecv = self.get_obj() + ecv.cmd = ["/bin/bash", "-x", "foo"] + ecv.verify_entry(entry, metadata, "data") + self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) + proc.communicate.assert_called_with(input="data") + proc.wait.assert_called_with() + + mock_Popen.reset_mock() + proc.wait.return_value = 13 + self.assertRaises(CfgVerificationError, + ecv.verify_entry, entry, metadata, "data") + self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) + proc.communicate.assert_called_with(input="data") + proc.wait.assert_called_with() + + mock_Popen.reset_mock() + mock_Popen.side_effect = OSError + self.assertRaises(CfgVerificationError, + ecv.verify_entry, entry, metadata, "data") + self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,)) + + @patch("os.access") + def test_handle_event(self, mock_access): + @patch("Bcfg2.Server.Plugins.Cfg.CfgVerifier.handle_event") + def inner(mock_handle_event): + ecv = self.get_obj() + event = Mock() + mock_access.return_value = False + ecv.data = "data" + self.assertRaises(PluginExecutionError, + ecv.handle_event, event) + mock_handle_event.assert_called_with(ecv, event) + mock_access.assert_called_with(ecv.name, os.X_OK) + self.assertItemsEqual(ecv.cmd, []) + + mock_access.reset_mock() + mock_handle_event.reset_mock() + ecv.data = "#! /bin/bash -x\ntrue" + ecv.handle_event(event) + mock_handle_event.assert_called_with(ecv, event) + mock_access.assert_called_with(ecv.name, os.X_OK) + self.assertEqual(ecv.cmd, ["/bin/bash", "-x", ecv.name]) + + mock_access.reset_mock() + mock_handle_event.reset_mock() + mock_access.return_value = True + ecv.data = "true" + ecv.handle_event(event) + mock_handle_event.assert_called_with(ecv, event) + mock_access.assert_called_with(ecv.name, os.X_OK) + self.assertItemsEqual(ecv.cmd, [ecv.name]) + + inner() + mock_access.return_value = True + TestCfgVerifier.test_handle_event(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py new file mode 100644 index 000000000..2dd647352 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py @@ -0,0 +1,133 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator import * +from Bcfg2.Server.Plugin import PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore, re_type +from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator + + +if can_skip or HAS_GENSHI: + class TestCfgGenshiGenerator(TestCfgGenerator): + test_obj = CfgGenshiGenerator + + @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping") + def setUp(self): + pass + + def test_removecomment(self): + data = [(None, "test", 1), + (None, "test2", 2)] + stream = [(genshi.core.COMMENT, "test", 0), + data[0], + (genshi.core.COMMENT, "test3", 0), + data[1]] + self.assertItemsEqual(list(removecomment(stream)), data) + + def test__init(self): + TestCfgGenerator.test__init(self) + cgg = self.get_obj() + self.assertIsInstance(cgg.loader, cgg.__loader_cls__) + + def test_get_data(self): + cgg = self.get_obj() + cgg._handle_genshi_exception = Mock() + cgg.template = Mock() + fltr = Mock() + cgg.template.generate.return_value = fltr + stream = Mock() + fltr.filter.return_value = stream + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + def reset(): + cgg.template.reset_mock() + cgg._handle_genshi_exception.reset_mock() + + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(name=entry.get("name"), + metadata=metadata, + path=cgg.name) + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + def render(fmt, **kwargs): + stream.render.side_effect = None + raise TypeError + stream.render.side_effect = render + self.assertEqual(cgg.get_data(entry, metadata), + stream.render.return_value) + cgg.template.generate.assert_called_with(name=entry.get("name"), + metadata=metadata, + path=cgg.name) + fltr.filter.assert_called_with(removecomment) + self.assertEqual(stream.render.call_args_list, + [call("text", encoding=cgg.encoding, + strip_whitespace=False), + call("text", encoding=cgg.encoding)]) + + reset() + stream.render.side_effect = UndefinedError("test") + self.assertRaises(UndefinedError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(name=entry.get("name"), + metadata=metadata, + path=cgg.name) + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + + reset() + stream.render.side_effect = ValueError + cgg._handle_genshi_exception.side_effect = ValueError + self.assertRaises(ValueError, + cgg.get_data, entry, metadata) + cgg.template.generate.assert_called_with(name=entry.get("name"), + metadata=metadata, + path=cgg.name) + fltr.filter.assert_called_with(removecomment) + stream.render.assert_called_with("text", encoding=cgg.encoding, + strip_whitespace=False) + self.assertTrue(cgg._handle_genshi_exception.called) + + def test_handle_event(self): + @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event") + def inner(mock_handle_event): + cgg = self.get_obj() + cgg.loader = Mock() + cgg.data = "template data" + event = Mock() + cgg.handle_event(event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) + + cgg.loader.reset_mock() + cgg.loader.load.side_effect = OSError + self.assertRaises(PluginExecutionError, + cgg.handle_event, event) + cgg.loader.load.assert_called_with(cgg.name, + cls=NewTextTemplate, + encoding=cgg.encoding) + + inner() + loader_cls = self.test_obj.__loader_cls__ + self.test_obj.__loader_cls__ = Mock + TestCfgGenerator.test_handle_event(self) + self.test_obj.__loader_cls__ = loader_cls diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py new file mode 100644 index 000000000..88dd1f18f --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py @@ -0,0 +1,75 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg.CfgInfoXML import * +from Bcfg2.Server.Plugin import InfoXML, PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore, re_type +from TestServer.TestPlugins.TestCfg.Test_init import TestCfgInfo + + +class TestCfgInfoXML(TestCfgInfo): + test_obj = CfgInfoXML + + def test__init(self): + TestCfgInfo.test__init(self) + ci = self.get_obj() + self.assertIsInstance(ci.infoxml, InfoXML) + + def test_bind_info_to_entry(self): + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + ci = self.get_obj() + ci.infoxml = Mock() + ci._set_info = Mock() + + self.assertRaises(PluginExecutionError, + ci.bind_info_to_entry, entry, metadata) + ci.infoxml.pnode.Match.assert_called_with(metadata, dict(), + entry=entry) + self.assertFalse(ci._set_info.called) + + ci.infoxml.reset_mock() + ci._set_info.reset_mock() + mdata_value = Mock() + def set_mdata(metadata, mdata, entry=None): + mdata['Info'] = {None: mdata_value} + + ci.infoxml.pnode.Match.side_effect = set_mdata + ci.bind_info_to_entry(entry, metadata) + ci.infoxml.pnode.Match.assert_called_with(metadata, + dict(Info={None: mdata_value}), + entry=entry) + ci._set_info.assert_called_with(entry, mdata_value) + + def test_handle_event(self): + ci = self.get_obj() + ci.infoxml = Mock() + ci.handle_event(Mock) + ci.infoxml.HandleEvent.assert_called_with() + + def test__set_info(self): + @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info") + def inner(mock_set_info): + ci = self.get_obj() + entry = Mock() + info = {"foo": "foo", + "__children__": ["one", "two"]} + ci._set_info(entry, info) + self.assertItemsEqual(entry.append.call_args_list, + [call(c) for c in info['__children__']]) + + inner() + TestCfgInfo.test__set_info(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPlaintextGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPlaintextGenerator.py new file mode 100644 index 000000000..5c6767a59 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPlaintextGenerator.py @@ -0,0 +1,18 @@ +import os +import sys +from Bcfg2.Server.Plugins.Cfg.CfgPlaintextGenerator import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator + + +class TestCfgPlaintextGenerator(TestCfgGenerator): + test_obj = CfgPlaintextGenerator diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py index 32db90304..fb1a28bca 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -1,6 +1,5 @@ import os import sys -import copy import lxml.etree import Bcfg2.Options from Bcfg2.Compat import walk_packages @@ -44,29 +43,36 @@ class TestCfgBaseFileMatcher(TestSpecificData): for basename in basenames: for extension in extensions: def test_match(spec): - return regex.match(basename + "." + spec + extension) - - regex = self.test_obj.get_regex(basename=basename) - self.assertTrue(regex.match(basename)) - self.assertFalse(regex.match("bogus")) - if self.test_obj.__specific__: - self.assertTrue(test_match("G20_foo")) - self.assertTrue(test_match("G1_foo")) - self.assertTrue(test_match("G32768_foo")) - # a group named '_' - self.assertTrue(test_match("G10__")) - self.assertTrue(test_match("H_hostname")) - self.assertTrue(test_match("H_fqdn.subdomain.example.com")) - self.assertTrue(test_match("G20_group_with_underscores")) - - self.assertFalse(test_match("G20_group with spaces")) - self.assertFalse(test_match("G_foo")) - self.assertFalse(test_match("G_")) - self.assertFalse(test_match("G20_")) - self.assertFalse(test_match("H_")) - else: - self.assertFalse(test_match("G20_foo")) - self.assertFalse(test_match("H_hostname")) + mstr = basename + if spec: + mstr += "." + spec + if extension: + mstr += "." + extension + return regex.match(mstr) + + regex = self.test_obj.get_regex(basename=basename) + self.assertTrue(test_match('')) + self.assertFalse(regex.match("bogus")) + if self.test_obj.__specific__: + if extension: + self.assertFalse(regex.match("bogus." + extension)) + self.assertTrue(test_match("G20_foo")) + self.assertTrue(test_match("G1_foo")) + self.assertTrue(test_match("G32768_foo")) + # a group named '_' + self.assertTrue(test_match("G10__")) + self.assertTrue(test_match("H_hostname")) + self.assertTrue(test_match("H_fqdn.subdomain.example.com")) + self.assertTrue(test_match("G20_group_with_underscores")) + + self.assertFalse(test_match("G20_group with spaces")) + self.assertFalse(test_match("G_foo")) + self.assertFalse(test_match("G_")) + self.assertFalse(test_match("G20_")) + self.assertFalse(test_match("H_")) + else: + self.assertFalse(test_match("G20_foo")) + self.assertFalse(test_match("H_hostname")) @patch("Bcfg2.Server.Plugins.Cfg.CfgBaseFileMatcher.get_regex") def test_handles(self, mock_get_regex): @@ -125,7 +131,7 @@ class TestCfgBaseFileMatcher(TestSpecificData): self.assertFalse(mock_get_regex.called) elif self.test_obj.__basenames__: match.return_value = False - self.assertFalse(self.test_obj.handles(evt)) + self.assertFalse(self.test_obj.ignore(evt)) self.assertItemsEqual(mock_get_regex.call_args_list, [call(basename=b, extensions=self.test_obj.__ignore__) @@ -137,12 +143,13 @@ class TestCfgBaseFileMatcher(TestSpecificData): mock_get_regex.reset_mock() match.reset_mock() match.return_value = True - self.assertTrue(self.test_obj.handles(evt)) + self.assertTrue(self.test_obj.ignore(evt)) match.assert_called_with(evt.filename) else: match.return_value = False - self.assertFalse(self.test_obj.handles(evt, - basename=os.path.basename(self.path))) + self.assertFalse(self.test_obj.ignore( + evt, + basename=os.path.basename(self.path))) mock_get_regex.assert_called_with( basename=os.path.basename(self.path), extensions=self.test_obj.__ignore__) @@ -151,11 +158,12 @@ class TestCfgBaseFileMatcher(TestSpecificData): mock_get_regex.reset_mock() match.reset_mock() match.return_value = True - self.assertTrue(self.test_obj.handles(evt, - basename=os.path.basename(self.path), - extensions=self.test_obj.__ignore__)) + self.assertTrue(self.test_obj.ignore( + evt, + basename=os.path.basename(self.path))) mock_get_regex.assert_called_with( - basename=os.path.basename(self.path)) + basename=os.path.basename(self.path), + extensions=self.test_obj.__ignore__) match.assert_called_with(evt.filename) @@ -496,7 +504,7 @@ class TestCfgEntrySet(TestEntrySet): # test failed validation entry = reset() eset._validate_data.side_effect = CfgVerificationError - self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError, + self.assertRaises(PluginExecutionError, eset.bind_entry, entry, metadata) eset.bind_info_to_entry.assert_called_with(entry, metadata) eset._generate_data.assert_called_with(entry, metadata) @@ -634,7 +642,7 @@ class TestCfgEntrySet(TestEntrySet): # test failure to generate data reset() generator.get_data.side_effect = OSError - self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError, + self.assertRaises(PluginExecutionError, eset._generate_data, entry, metadata) def test_validate_data(self): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/__init__.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/__init__.py new file mode 100644 index 000000000..e69de29bb -- cgit v1.2.3-1-g7c22