From 26c906478fc8cbb74929e929c822cd7e24de4e35 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 2 Oct 2012 11:55:49 -0400 Subject: testsuite: unit tests for Cfg plugin and base handlers --- src/lib/Bcfg2/Server/Plugin/helpers.py | 3 +- src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py | 224 ++++--- .../Testlib/TestServer/TestPlugin/Testhelpers.py | 14 +- .../TestServer/TestPlugins/TestCfg/Test_init.py | 720 +++++++++++++++++++++ testsuite/common.py | 7 + 5 files changed, 859 insertions(+), 109 deletions(-) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py diff --git a/src/lib/Bcfg2/Server/Plugin/helpers.py b/src/lib/Bcfg2/Server/Plugin/helpers.py index 96d661b57..74f4dad8b 100644 --- a/src/lib/Bcfg2/Server/Plugin/helpers.py +++ b/src/lib/Bcfg2/Server/Plugin/helpers.py @@ -1292,7 +1292,8 @@ class EntrySet(Debuggable): """ Shortcut to call :func:`bind_info` with the base info/info.xml for this EntrySet. - :param entry: The abstract entry to bind the info to + :param entry: The abstract entry to bind the info to. This + will be modified in place :type entry: lxml.etree._Element :param metadata: The client metadata to get info for :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py index c34cad30e..952230080 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py @@ -10,7 +10,7 @@ import Bcfg2.Options import Bcfg2.Server.Plugin import Bcfg2.Server.Lint # pylint: disable=W0622 -from Bcfg2.Compat import u_str, unicode, b64encode, walk_packages +from Bcfg2.Compat import u_str, unicode, b64encode, walk_packages, any # pylint: enable=W0622 LOGGER = logging.getLogger(__name__) @@ -63,7 +63,7 @@ class CfgBaseFileMatcher(Bcfg2.Server.Plugin.SpecificData): Bcfg2.Server.Plugin.SpecificData.__init__(self, name, specific, encoding) self.encoding = encoding - self.regex = self.__class__.get_regex(name) + self.regex = self.__class__.get_regex(basename=name) __init__.__doc__ = Bcfg2.Server.Plugin.SpecificData.__init__.__doc__ + \ """ .. ----- @@ -126,15 +126,10 @@ class CfgBaseFileMatcher(Bcfg2.Server.Plugin.SpecificData): else: basenames = [basename] - # do simple non-regex matching first - match = False - for bname in basenames: - if event.filename.startswith(os.path.basename(bname)): - match = True - break - return (match and - cls.get_regex( - basename=os.path.basename(basename)).match(event.filename)) + return any( + cls.get_regex( + basename=os.path.basename(bname)).match(event.filename) + for bname in basenames) @classmethod def ignore(cls, event, basename=None): @@ -162,15 +157,10 @@ class CfgBaseFileMatcher(Bcfg2.Server.Plugin.SpecificData): else: basenames = [basename] - # do simple non-regex matching first - match = False - for bname in basenames: - if event.filename.startswith(os.path.basename(bname)): - match = True - break - return (match and - cls.get_regex(basename=os.path.basename(basename), - extensions=cls.__ignore__).match(event.filename)) + return any( + cls.get_regex(basename=os.path.basename(bname), + extensions=cls.__ignore__).match(event.filename) + for bname in basenames) def __str__(self): return "%s(%s)" % (self.__class__.__name__, self.name) @@ -282,7 +272,7 @@ class CfgInfo(CfgBaseFileMatcher): """ for key, value in list(info.items()): if not key.startswith("__"): - entry.attrib.__setitem__(key, value) + entry.attrib[key] = value class CfgVerifier(CfgBaseFileMatcher): @@ -413,9 +403,7 @@ class CfgEntrySet(Bcfg2.Server.Plugin.EntrySet): (action, event.filename)) def get_matching(self, metadata): - return [item for item in list(self.entries.values()) - if (isinstance(item, CfgGenerator) and - item.specific.matches(metadata))] + return self.get_handlers(metadata, CfgGenerator) get_matching.__doc__ = Bcfg2.Server.Plugin.EntrySet.get_matching.__doc__ def entry_init(self, event, hdlr): # pylint: disable=W0221 @@ -444,76 +432,22 @@ class CfgEntrySet(Bcfg2.Server.Plugin.EntrySet): self.entries[event.filename] = hdlr(fpath) self.entries[event.filename].handle_event(event) - def bind_entry(self, entry, metadata): # pylint: disable=R0912,R0915 - info_handlers = [] - generators = [] - filters = [] - verifiers = [] - for ent in self.entries.values(): - if ent.__specific__ and not ent.specific.matches(metadata): - continue - if isinstance(ent, CfgInfo): - info_handlers.append(ent) - if isinstance(ent, CfgGenerator): - generators.append(ent) - if isinstance(ent, CfgFilter): - filters.append(ent) - if isinstance(ent, CfgVerifier): - verifiers.append(ent) - if ent.deprecated: - if ent.__basenames__: - fdesc = "/".join(ent.__basenames__) - elif ent.__extensions__: - fdesc = "." + "/.".join(ent.__extensions__) - LOGGER.warning("Cfg: %s: Use of %s files is deprecated" % - (ent.name, fdesc)) - - DEFAULT_INFO.bind_info_to_entry(entry, metadata) - if len(info_handlers) > 1: - LOGGER.error("More than one info supplier found for %s: %s" % - (entry.get("name"), info_handlers)) - if len(info_handlers): - info_handlers[0].bind_info_to_entry(entry, metadata) - if entry.tag == 'Path': - entry.set('type', 'file') - - generator = self.best_matching(metadata, generators) - if entry.get('perms').lower() == 'inherit': - # use on-disk permissions - fname = os.path.join(self.path, generator.name) - entry.set('perms', - str(oct(stat.S_IMODE(os.stat(fname).st_mode)))) - try: - data = generator.get_data(entry, metadata) - except: - msg = "Cfg: exception rendering %s with %s: %s" % \ - (entry.get("name"), generator, sys.exc_info()[1]) - LOGGER.error(msg) - raise Bcfg2.Server.Plugin.PluginExecutionError(msg) + def bind_entry(self, entry, metadata): + self.bind_info_to_entry(entry, metadata) + data = self._generate_data(entry, metadata) - for fltr in filters: + for fltr in self.get_handlers(metadata, CfgFilter): data = fltr.modify_data(entry, metadata, data) if SETUP['validate']: - # we can have multiple verifiers, but we only want to use the - # best matching verifier of each class - verifiers_by_class = dict() - for verifier in verifiers: - cls = verifier.__class__.__name__ - if cls not in verifiers_by_class: - verifiers_by_class[cls] = [verifier] - else: - verifiers_by_class[cls].append(verifier) - for verifiers in verifiers_by_class.values(): - verifier = self.best_matching(metadata, verifiers) - try: - verifier.verify_entry(entry, metadata, data) - except CfgVerificationError: - msg = "Data for %s for %s failed to verify: %s" % \ - (entry.get('name'), metadata.hostname, - sys.exc_info()[1]) - LOGGER.error(msg) - raise Bcfg2.Server.Plugin.PluginExecutionError(msg) + try: + self._validate_data(entry, metadata, data) + except CfgVerificationError: + msg = "Data for %s for %s failed to verify: %s" % \ + (entry.get('name'), metadata.hostname, + sys.exc_info()[1]) + LOGGER.error(msg) + raise Bcfg2.Server.Plugin.PluginExecutionError(msg) if entry.get('encoding') == 'base64': data = b64encode(data) @@ -531,7 +465,7 @@ class CfgEntrySet(Bcfg2.Server.Plugin.EntrySet): msg = "Error in specification for %s: %s" % (entry.get('name'), sys.exc_info()[1]) LOGGER.error(msg) - LOGGER.error("You need to specify base64 encoding for %s." % + LOGGER.error("You need to specify base64 encoding for %s" % entry.get('name')) raise Bcfg2.Server.Plugin.PluginExecutionError(msg) except TypeError: @@ -546,6 +480,100 @@ class CfgEntrySet(Bcfg2.Server.Plugin.EntrySet): return entry bind_entry.__doc__ = Bcfg2.Server.Plugin.EntrySet.bind_entry.__doc__ + def get_handlers(self, metadata, handler_type): + """ Get all handlers of the given type for the given metadata. + + :param metadata: The metadata to get all handlers for. + :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata + :param handler_type: The type of Cfg handler to get + :type handler_type: type + :returns: list of Cfg handler classes + """ + rv = [] + for ent in self.entries.values(): + if (isinstance(ent, handler_type) and + (not ent.__specific__ or ent.specific.matches(metadata))): + rv.append(ent) + if ent.deprecated: + if ent.__basenames__: + fdesc = "/".join(ent.__basenames__) + elif ent.__extensions__: + fdesc = "." + "/.".join(ent.__extensions__) + LOGGER.warning("Cfg: %s: Use of %s files is deprecated" % + (ent.name, fdesc)) + return rv + + def bind_info_to_entry(self, entry, metadata): + """ Bind entry metadata to the entry with the best CfgInfo + handler + + :param entry: The abstract entry to bind the info to. This + will be modified in place + :type entry: lxml.etree._Element + :param metadata: The client metadata to get info for + :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata + :returns: None + """ + info_handlers = self.get_handlers(metadata, CfgInfo) + DEFAULT_INFO.bind_info_to_entry(entry, metadata) + if len(info_handlers) > 1: + LOGGER.error("More than one info supplier found for %s: %s" % + (entry.get("name"), info_handlers)) + if len(info_handlers): + info_handlers[0].bind_info_to_entry(entry, metadata) + if entry.tag == 'Path': + entry.set('type', 'file') + + def _generate_data(self, entry, metadata): + """ Generate data for the given entry on the given client + + :param entry: The abstract entry to generate data for. This + will not be modified + :type entry: lxml.etree._Element + :param metadata: The client metadata generate data for + :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata + :returns: string - the data for the entry + """ + generator = self.best_matching(metadata, + self.get_handlers(metadata, + CfgGenerator)) + if entry.get('perms').lower() == 'inherit': + # use on-disk permissions + fname = os.path.join(self.path, generator.name) + entry.set('perms', + str(oct(stat.S_IMODE(os.stat(fname).st_mode)))) + try: + return generator.get_data(entry, metadata) + except: + msg = "Cfg: exception rendering %s with %s: %s" % \ + (entry.get("name"), generator, sys.exc_info()[1]) + LOGGER.error(msg) + raise Bcfg2.Server.Plugin.PluginExecutionError(msg) + + def _validate_data(self, entry, metadata, data): + """ Validate data for the given entry on the given client + + :param entry: The abstract entry to validate data for + :type entry: lxml.etree._Element + :param metadata: The client metadata validate data for + :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata + :returns: None + :raises: :exc:`Bcfg2.Server.Plugins.Cfg.CfgVerificationError` + """ + verifiers = self.get_handlers(metadata, CfgVerifier) + # we can have multiple verifiers, but we only want to use the + # best matching verifier of each class + verifiers_by_class = dict() + for verifier in verifiers: + cls = verifier.__class__.__name__ + if cls not in verifiers_by_class: + verifiers_by_class[cls] = [verifier] + else: + verifiers_by_class[cls].append(verifier) + for verifiers in verifiers_by_class.values(): + verifier = self.best_matching(metadata, verifiers) + verifier.verify_entry(entry, metadata, data) + def list_accept_choices(self, entry, metadata): '''return a list of candidate pull locations''' generators = [ent for ent in list(self.entries.values()) @@ -643,6 +671,7 @@ class Cfg(Bcfg2.Server.Plugin.GroupSpool, Bcfg2.Server.Plugin.PullTarget.__init__(self) SETUP = core.setup + print "SETUP=%s" % SETUP if 'validate' not in SETUP: SETUP.add_option('validate', Bcfg2.Options.CFG_VALIDATION) SETUP.reparse() @@ -664,12 +693,8 @@ class Cfg(Bcfg2.Server.Plugin.GroupSpool, if entry.get('name') not in self.entries: return False - for ent in self.entries[entry.get('name')].entries.values(): - if ent.__specific__ and not ent.specific.matches(metadata): - continue - if isinstance(ent, CfgGenerator): - return True - return False + return bool(self.entries[entry.get('name')].get_handlers(metadata, + CfgGenerator)) def AcceptChoices(self, entry, metadata): return self.entries[entry.get('name')].list_accept_choices(entry, @@ -692,11 +717,10 @@ class CfgLint(Bcfg2.Server.Lint.ServerPlugin): for basename, entry in list(self.core.plugins['Cfg'].entries.items()): self.check_entry(basename, entry) - @classmethod def Errors(cls): - return {"cat-file-used":"warning", - "diff-file-used":"warning"} + return {"cat-file-used": "warning", + "diff-file-used": "warning"} def check_entry(self, basename, entry): """ check that no .cat or .diff files are in use """ diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py index db8ee0cca..f03ac2fd7 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py @@ -17,16 +17,11 @@ while path != '/': if os.path.basename(path) == "testsuite": break path = os.path.dirname(path) -from common import XI_NAMESPACE, XI, call, builtins, skip, skipIf, skipUnless, \ - Bcfg2TestCase, patchIf, datastore, inPy3k, can_skip +from common import XI_NAMESPACE, XI, call, builtins, skip, skipIf, \ + skipUnless, Bcfg2TestCase, patchIf, datastore, inPy3k, can_skip, re_type from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable from TestServer.TestPlugin.Testinterfaces import TestGenerator -try: - re_type = re._pattern_type -except AttributeError: - re_type = type(re.compile("")) - def tostring(el): return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8') @@ -1349,6 +1344,9 @@ class TestSpecificData(Bcfg2TestCase): specific = Mock() return self.test_obj(name, specific, encoding) + def test__init(self): + pass + @patch("%s.open" % builtins) def test_handle_event(self, mock_open): event = Mock() @@ -1783,7 +1781,7 @@ class TestGroupSpool(TestPlugin, TestGenerator): self.assertItemsEqual(gs.Entries, {gs.entry_type: {}}) inner() - + @patch("os.path.isdir") @patch("os.path.isfile") def test_add_entry(self, mock_isfile, mock_isdir): diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py new file mode 100644 index 000000000..32db90304 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py @@ -0,0 +1,720 @@ +import os +import sys +import copy +import lxml.etree +import Bcfg2.Options +from Bcfg2.Compat import walk_packages +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Cfg import * +from Bcfg2.Server.Plugin import PluginExecutionError + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore, re_type +from TestPlugin import TestSpecificData, TestEntrySet, TestGroupSpool, \ + TestPullTarget + + +class TestCfgBaseFileMatcher(TestSpecificData): + test_obj = CfgBaseFileMatcher + + def test__init(self): + TestSpecificData.test__init(self) + bfm = self.get_obj() + self.assertIsInstance(bfm.regex, re_type) + + def test_get_regex(self): + if self.test_obj.__basenames__: + basenames = self.test_obj.__basenames__ + else: + basenames = [os.path.basename(self.path)] + if self.test_obj.__extensions__: + extensions = self.test_obj.__extensions__ + else: + extensions = [''] + for basename in basenames: + for extension in extensions: + def test_match(spec): + return regex.match(basename + "." + spec + extension) + + regex = self.test_obj.get_regex(basename=basename) + self.assertTrue(regex.match(basename)) + self.assertFalse(regex.match("bogus")) + if self.test_obj.__specific__: + self.assertTrue(test_match("G20_foo")) + self.assertTrue(test_match("G1_foo")) + self.assertTrue(test_match("G32768_foo")) + # a group named '_' + self.assertTrue(test_match("G10__")) + self.assertTrue(test_match("H_hostname")) + self.assertTrue(test_match("H_fqdn.subdomain.example.com")) + self.assertTrue(test_match("G20_group_with_underscores")) + + self.assertFalse(test_match("G20_group with spaces")) + self.assertFalse(test_match("G_foo")) + self.assertFalse(test_match("G_")) + self.assertFalse(test_match("G20_")) + self.assertFalse(test_match("H_")) + else: + self.assertFalse(test_match("G20_foo")) + self.assertFalse(test_match("H_hostname")) + + @patch("Bcfg2.Server.Plugins.Cfg.CfgBaseFileMatcher.get_regex") + def test_handles(self, mock_get_regex): + match = Mock() + mock_get_regex.return_value = Mock() + mock_get_regex.return_value.match = match + + evt = Mock() + evt.filename = "event.txt" + + if self.test_obj.__basenames__: + match.return_value = False + self.assertFalse(self.test_obj.handles(evt)) + self.assertItemsEqual(mock_get_regex.call_args_list, + [call(basename=b) + for b in self.test_obj.__basenames__]) + self.assertItemsEqual(match.call_args_list, + [call(evt.filename) + for b in self.test_obj.__basenames__]) + + mock_get_regex.reset_mock() + match.reset_mock() + match.return_value = True + self.assertTrue(self.test_obj.handles(evt)) + match.assert_called_with(evt.filename) + else: + match.return_value = False + self.assertFalse(self.test_obj.handles(evt, + basename=os.path.basename(self.path))) + mock_get_regex.assert_called_with( + basename=os.path.basename(self.path)) + match.assert_called_with(evt.filename) + + mock_get_regex.reset_mock() + match.reset_mock() + match.return_value = True + self.assertTrue(self.test_obj.handles(evt, + basename=os.path.basename(self.path))) + mock_get_regex.assert_called_with( + basename=os.path.basename(self.path)) + match.assert_called_with(evt.filename) + + @patch("%s.%s.get_regex" % (test_obj.__module__, test_obj.__name__)) + def test_ignore(self, mock_get_regex): + match = Mock() + mock_get_regex.return_value = Mock() + mock_get_regex.return_value.match = match + + evt = Mock() + evt.filename = "event.txt" + + if not self.test_obj.__ignore__: + self.assertFalse(self.test_obj.ignore(evt)) + self.assertFalse(mock_get_regex.called) + self.assertFalse(self.test_obj.ignore(evt, basename=os.path.basename(self.path))) + self.assertFalse(mock_get_regex.called) + elif self.test_obj.__basenames__: + match.return_value = False + self.assertFalse(self.test_obj.handles(evt)) + self.assertItemsEqual(mock_get_regex.call_args_list, + [call(basename=b, + extensions=self.test_obj.__ignore__) + for b in self.test_obj.__basenames__]) + self.assertItemsEqual(match.call_args_list, + [call(evt.filename) + for b in self.test_obj.__basenames__]) + + mock_get_regex.reset_mock() + match.reset_mock() + match.return_value = True + self.assertTrue(self.test_obj.handles(evt)) + match.assert_called_with(evt.filename) + else: + match.return_value = False + self.assertFalse(self.test_obj.handles(evt, + basename=os.path.basename(self.path))) + mock_get_regex.assert_called_with( + basename=os.path.basename(self.path), + extensions=self.test_obj.__ignore__) + match.assert_called_with(evt.filename) + + mock_get_regex.reset_mock() + match.reset_mock() + match.return_value = True + self.assertTrue(self.test_obj.handles(evt, + basename=os.path.basename(self.path), + extensions=self.test_obj.__ignore__)) + mock_get_regex.assert_called_with( + basename=os.path.basename(self.path)) + match.assert_called_with(evt.filename) + + +class TestCfgGenerator(TestCfgBaseFileMatcher): + test_obj = CfgGenerator + + def test_get_data(self): + cg = self.get_obj() + cg.data = "foo bar baz" + self.assertEqual(cg.data, cg.get_data(Mock(), Mock())) + + +class TestCfgFilter(TestCfgBaseFileMatcher): + test_obj = CfgFilter + + def test_modify_data(self): + cf = self.get_obj() + self.assertRaises(NotImplementedError, + cf.modify_data, Mock(), Mock(), Mock()) + + +class TestCfgInfo(TestCfgBaseFileMatcher): + test_obj = CfgInfo + + def get_obj(self, name=None): + if name is None: + name = self.path + return self.test_obj(name) + + @patch("Bcfg2.Server.Plugins.Cfg.CfgBaseFileMatcher.__init__") + def test__init(self, mock__init): + ci = self.get_obj("test.txt") + mock__init.assert_called_with(ci, "test.txt", None, None) + + def test_bind_info_to_entry(self): + ci = self.get_obj() + self.assertRaises(NotImplementedError, + ci.bind_info_to_entry, Mock(), Mock()) + + def test__set_info(self): + ci = self.get_obj() + entry = Mock() + entry.attrib = dict() + + info = {"foo": "foo", + "_bar": "bar", + "bar:baz=quux": "quux", + "baz__": "baz", + "__quux": "quux"} + ci._set_info(entry, info) + self.assertItemsEqual(entry.attrib, + dict([(k, v) for k, v in info.items() + if not k.startswith("__")])) + + +class TestCfgVerifier(TestCfgBaseFileMatcher): + test_obj = CfgVerifier + + def test_verify_entry(self): + cf = self.get_obj() + self.assertRaises(NotImplementedError, + cf.verify_entry, Mock(), Mock(), Mock()) + + +class TestCfgDefaultInfo(TestCfgInfo): + test_obj = CfgDefaultInfo + + def get_obj(self, defaults=None): + if defaults is None: + defaults = dict() + return self.test_obj(defaults) + + @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__") + def test__init(self, mock__init): + defaults = Mock() + cdi = self.get_obj(defaults=defaults) + mock__init.assert_called_with(cdi, '') + self.assertEqual(defaults, cdi.defaults) + + def test_handle_event(self): + # this CfgInfo handler doesn't handle any events -- it's not + # file-driven, but based on the built-in defaults + pass + + def test_bind_info_to_entry(self): + cdi = self.get_obj() + cdi._set_info = Mock() + entry = Mock() + cdi.bind_info_to_entry(entry, Mock()) + cdi._set_info.assert_called_with(entry, cdi.defaults) + + +class TestCfgEntrySet(TestEntrySet): + test_obj = CfgEntrySet + + def test__init(self): + pass + + def test_handlers(self): + # this is really really difficult to mock out, so we just get + # a list of handlers and make sure that it roughly matches + # what's on the filesystem + expected = [] + for submodule in walk_packages(path=Bcfg2.Server.Plugins.Cfg.__path__, + prefix="Bcfg2.Server.Plugins.Cfg."): + expected.append(submodule[1].rsplit('.', 1)[-1]) + eset = self.get_obj() + self.assertItemsEqual(expected, + [h.__name__ for h in eset.handlers]) + + def test_handle_event(self): + eset = self.get_obj() + eset.entry_init = Mock() + eset._handlers = [Mock(), Mock(), Mock()] + for hdlr in eset.handlers: + hdlr.__name__ = "handler" + eset.entries = dict() + + def reset(): + eset.entry_init.reset_mock() + for hdlr in eset.handlers: + hdlr.reset_mock() + + # test that a bogus deleted event is discarded + evt = Mock() + evt.code2str.return_value = "deleted" + evt.filename = os.path.join(datastore, "test.txt") + eset.handle_event(evt) + self.assertFalse(eset.entry_init.called) + self.assertItemsEqual(eset.entries, dict()) + for hdlr in eset.handlers: + self.assertFalse(hdlr.handles.called) + self.assertFalse(hdlr.ignore.called) + + # test creation of a new file + for action in ["exists", "created", "changed"]: + evt = Mock() + evt.code2str.return_value = action + evt.filename = os.path.join(datastore, "test.txt") + + # test with no handler that handles + for hdlr in eset.handlers: + hdlr.handles.return_value = False + hdlr.ignore.return_value = False + + reset() + eset.handle_event(evt) + self.assertFalse(eset.entry_init.called) + self.assertItemsEqual(eset.entries, dict()) + for hdlr in eset.handlers: + hdlr.handles.assert_called_with(evt, basename=eset.path) + hdlr.ignore.assert_called_with(evt, basename=eset.path) + + # test with a handler that handles the entry + reset() + eset.handlers[-1].handles.return_value = True + eset.handle_event(evt) + eset.entry_init.assert_called_with(evt, eset.handlers[-1]) + for hdlr in eset.handlers: + hdlr.handles.assert_called_with(evt, basename=eset.path) + if not hdlr.return_value: + hdlr.ignore.assert_called_with(evt, basename=eset.path) + + # test with a handler that ignores the entry before one + # that handles it + reset() + eset.handlers[0].ignore.return_value = True + eset.handle_event(evt) + self.assertFalse(eset.entry_init.called) + eset.handlers[0].handles.assert_called_with(evt, + basename=eset.path) + eset.handlers[0].ignore.assert_called_with(evt, + basename=eset.path) + for hdlr in eset.handlers[1:]: + self.assertFalse(hdlr.handles.called) + self.assertFalse(hdlr.ignore.called) + + # test changed event with an entry that already exists + reset() + evt = Mock() + evt.code2str.return_value = "changed" + evt.filename = os.path.join(datastore, "test.txt") + eset.entries[evt.filename] = Mock() + eset.handle_event(evt) + self.assertFalse(eset.entry_init.called) + for hdlr in eset.handlers: + self.assertFalse(hdlr.handles.called) + self.assertFalse(hdlr.ignore.called) + eset.entries[evt.filename].handle_event.assert_called_with(evt) + + # test deleted event with an entry that already exists + reset() + evt.code2str.return_value = "deleted" + eset.handle_event(evt) + self.assertFalse(eset.entry_init.called) + for hdlr in eset.handlers: + self.assertFalse(hdlr.handles.called) + self.assertFalse(hdlr.ignore.called) + self.assertItemsEqual(eset.entries, dict()) + + def test_get_matching(self): + eset = self.get_obj() + eset.get_handlers = Mock() + metadata = Mock() + self.assertEqual(eset.get_matching(metadata), + eset.get_handlers.return_value) + eset.get_handlers.assert_called_with(metadata, CfgGenerator) + + @patch("Bcfg2.Server.Plugin.EntrySet.entry_init") + def test_entry_init(self, mock_entry_init): + eset = self.get_obj() + eset.entries = dict() + evt = Mock() + evt.filename = "test.txt" + handler = Mock() + handler.__specific__ = True + + # test handling an event with the parent entry_init + eset.entry_init(evt, handler) + mock_entry_init.assert_called_with(eset, evt, entry_type=handler, + specific=handler.get_regex.return_value) + self.assertItemsEqual(eset.entries, dict()) + + # test handling the event with a Cfg handler + handler.__specific__ = False + eset.entry_init(evt, handler) + handler.assert_called_with(os.path.join(eset.path, evt.filename)) + self.assertItemsEqual(eset.entries, + {evt.filename: handler.return_value}) + handler.return_value.handle_event.assert_called_with(evt) + + # test handling an event for an entry that already exists with + # a Cfg handler + handler.reset_mock() + eset.entry_init(evt, handler) + self.assertFalse(handler.called) + self.assertItemsEqual(eset.entries, + {evt.filename: handler.return_value}) + eset.entries[evt.filename].handle_event.assert_called_with(evt) + + @patch("Bcfg2.Server.Plugins.Cfg.u_str") + @patch("Bcfg2.Server.Plugins.Cfg.b64encode") + def test_bind_entry(self, mock_b64encode, mock_u_str): + Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False) + + mock_u_str.side_effect = lambda x: x + + eset = self.get_obj() + eset.bind_info_to_entry = Mock() + eset._generate_data = Mock() + eset.get_handlers = Mock() + eset._validate_data = Mock() + + def reset(): + mock_b64encode.reset_mock() + mock_u_str.reset_mock() + eset.bind_info_to_entry.reset_mock() + eset._generate_data.reset_mock() + eset.get_handlers.reset_mock() + eset._validate_data.reset_mock() + return lxml.etree.Element("Path", name="/test.txt") + + entry = reset() + metadata = Mock() + + # test basic entry, no validation, no filters, etc. + eset._generate_data.return_value = "data" + eset.get_handlers.return_value = [] + bound = eset.bind_entry(entry, metadata) + eset.bind_info_to_entry.assert_called_with(entry, metadata) + eset._generate_data.assert_called_with(entry, metadata) + self.assertFalse(eset._validate_data.called) + expected = lxml.etree.Element("Path", name="/text.txt") + expected.text = "data" + self.assertXMLEqual(bound, expected) + self.assertEqual(bound, entry) + + # test empty entry + entry = reset() + eset._generate_data.return_value = "" + bound = eset.bind_entry(entry, metadata) + eset.bind_info_to_entry.assert_called_with(entry, metadata) + eset._generate_data.assert_called_with(entry, metadata) + self.assertFalse(eset._validate_data.called) + expected = lxml.etree.Element("Path", name="/text.txt", empty="true") + self.assertXMLEqual(bound, expected) + self.assertEqual(bound, entry) + + # test filters + entry = reset() + eset._generate_data.return_value = "initial data" + filters = [Mock(), Mock()] + filters[0].modify_data.return_value = "modified data" + filters[1].modify_data.return_value = "final data" + eset.get_handlers.return_value = filters + bound = eset.bind_entry(entry, metadata) + eset.bind_info_to_entry.assert_called_with(entry, metadata) + eset._generate_data.assert_called_with(entry, metadata) + filters[0].modify_data.assert_called_with(entry, metadata, + "initial data") + filters[1].modify_data.assert_called_with(entry, metadata, + "modified data") + self.assertFalse(eset._validate_data.called) + expected = lxml.etree.Element("Path", name="/text.txt") + expected.text = "final data" + self.assertXMLEqual(bound, expected) + + # test base64 encoding + entry = reset() + entry.set("encoding", "base64") + mock_b64encode.return_value = "base64 data" + eset.get_handlers.return_value = [] + eset._generate_data.return_value = "data" + bound = eset.bind_entry(entry, metadata) + eset.bind_info_to_entry.assert_called_with(entry, metadata) + eset._generate_data.assert_called_with(entry, metadata) + self.assertFalse(eset._validate_data.called) + mock_b64encode.assert_called_with("data") + self.assertFalse(mock_u_str.called) + expected = lxml.etree.Element("Path", name="/text.txt", + encoding="base64") + expected.text = "base64 data" + self.assertXMLEqual(bound, expected) + self.assertEqual(bound, entry) + + # test successful validation + entry = reset() + Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True + bound = eset.bind_entry(entry, metadata) + eset.bind_info_to_entry.assert_called_with(entry, metadata) + eset._generate_data.assert_called_with(entry, metadata) + eset._validate_data.assert_called_with(entry, metadata, "data") + expected = lxml.etree.Element("Path", name="/text.txt") + expected.text = "data" + self.assertXMLEqual(bound, expected) + self.assertEqual(bound, entry) + + # test failed validation + entry = reset() + eset._validate_data.side_effect = CfgVerificationError + self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError, + eset.bind_entry, entry, metadata) + eset.bind_info_to_entry.assert_called_with(entry, metadata) + eset._generate_data.assert_called_with(entry, metadata) + eset._validate_data.assert_called_with(entry, metadata, "data") + + def test_get_handlers(self): + eset = self.get_obj() + eset.entries['test1.txt'] = CfgInfo("test1.txt") + eset.entries['test2.txt'] = CfgGenerator("test2.txt", Mock(), None) + eset.entries['test2.txt'].specific.matches.return_value = True + eset.entries['test3.txt'] = CfgInfo("test3.txt") + eset.entries['test4.txt'] = CfgGenerator("test4.txt", Mock(), None) + eset.entries['test4.txt'].specific.matches.return_value = False + eset.entries['test5.txt'] = CfgGenerator("test5.txt", Mock(), None) + eset.entries['test5.txt'].specific.matches.return_value = True + eset.entries['test6.txt'] = CfgVerifier("test6.txt", Mock(), None) + eset.entries['test6.txt'].specific.matches.return_value = True + eset.entries['test7.txt'] = CfgFilter("test7.txt", Mock(), None) + eset.entries['test7.txt'].specific.matches.return_value = False + + def reset(): + for e in eset.entries.values(): + if e.specific is not None: + e.specific.reset_mock() + + metadata = Mock() + self.assertItemsEqual(eset.get_handlers(metadata, CfgGenerator), + [eset.entries['test2.txt'], + eset.entries['test5.txt']]) + for ename in ['test2.txt', 'test4.txt', 'test5.txt']: + eset.entries[ename].specific.matches.assert_called_with(metadata) + for ename in ['test6.txt', 'test7.txt']: + self.assertFalse(eset.entries[ename].specific.matches.called) + + reset() + self.assertItemsEqual(eset.get_handlers(metadata, CfgInfo), + [eset.entries['test1.txt'], + eset.entries['test3.txt']]) + for entry in eset.entries.values(): + if entry.specific is not None: + self.assertFalse(entry.specific.matches.called) + + reset() + self.assertItemsEqual(eset.get_handlers(metadata, CfgVerifier), + [eset.entries['test6.txt']]) + eset.entries['test6.txt'].specific.matches.assert_called_with(metadata) + for ename, entry in eset.entries.items(): + if ename != 'test6.txt' and entry.specific is not None: + self.assertFalse(entry.specific.matches.called) + + reset() + self.assertItemsEqual(eset.get_handlers(metadata, CfgFilter), []) + eset.entries['test7.txt'].specific.matches.assert_called_with(metadata) + for ename, entry in eset.entries.items(): + if ename != 'test7.txt' and entry.specific is not None: + self.assertFalse(entry.specific.matches.called) + + reset() + self.assertItemsEqual(eset.get_handlers(metadata, Mock), []) + for ename, entry in eset.entries.items(): + if entry.specific is not None: + self.assertFalse(entry.specific.matches.called) + + def test_bind_info_to_entry(self): + default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO + eset = self.get_obj() + eset.get_handlers = Mock() + eset.get_handlers.return_value = [] + Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock() + metadata = Mock() + + def reset(): + eset.get_handlers.reset_mock() + Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock() + return lxml.etree.Element("Path", name="/test.txt") + + # test with no info handlers + entry = reset() + eset.bind_info_to_entry(entry, metadata) + eset.get_handlers.assert_called_with(metadata, CfgInfo) + Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + self.assertEqual(entry.get("type"), "file") + + # test with one info handler + entry = reset() + handler = Mock() + eset.get_handlers.return_value = [handler] + eset.bind_info_to_entry(entry, metadata) + eset.get_handlers.assert_called_with(metadata, CfgInfo) + Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + handler.bind_info_to_entry.assert_called_with(entry, metadata) + self.assertEqual(entry.get("type"), "file") + + # test with more than one info handler + entry = reset() + handlers = [Mock(), Mock()] + eset.get_handlers.return_value = handlers + eset.bind_info_to_entry(entry, metadata) + eset.get_handlers.assert_called_with(metadata, CfgInfo) + Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata) + # we don't care which handler gets called as long as exactly + # one of them does + called = 0 + for handler in handlers: + if handler.bind_info_to_entry.called: + handler.bind_info_to_entry.assert_called_with(entry, metadata) + called += 1 + self.assertEqual(called, 1) + self.assertEqual(entry.get("type"), "file") + + Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info + + def test_generate_data(self): + eset = self.get_obj() + eset.best_matching = Mock() + generator = Mock() + generator.get_data.return_value = "data" + eset.best_matching.return_value = generator + eset.get_handlers = Mock() + entry = lxml.etree.Element("Path", name="/test.txt", perms="0640") + metadata = Mock() + + def reset(): + eset.best_matching.reset_mock() + eset.get_handlers.reset_mock() + + + # test success + self.assertEqual(eset._generate_data(entry, metadata), + "data") + eset.get_handlers.assert_called_with(metadata, CfgGenerator) + eset.best_matching.assert_called_with(metadata, + eset.get_handlers.return_value) + + # test failure to generate data + reset() + generator.get_data.side_effect = OSError + self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError, + eset._generate_data, entry, metadata) + + def test_validate_data(self): + class MockChild1(Mock): + pass + + class MockChild2(Mock): + pass + + eset = self.get_obj() + eset.get_handlers = Mock() + handlers1 = [MockChild1(), MockChild1()] + handlers2 = [MockChild2()] + eset.get_handlers.return_value = [handlers1[0], handlers2[0], + handlers1[1]] + eset.best_matching = Mock() + eset.best_matching.side_effect = lambda m, v: v[0] + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + data = "data" + + eset._validate_data(entry, metadata, data) + eset.get_handlers.assert_called_with(metadata, CfgVerifier) + self.assertItemsEqual(eset.best_matching.call_args_list, + [call(metadata, handlers1), + call(metadata, handlers2)]) + handlers1[0].verify_entry.assert_called_with(entry, metadata, data) + handlers2[0].verify_entry.assert_called_with(entry, metadata, data) + + def test_specificity_from_filename(self): + pass + + +class TestCfg(TestGroupSpool, TestPullTarget): + test_obj = Cfg + + def get_obj(self, core=None): + if core is None: + core = Mock() + core.setup = MagicMock() + return TestGroupSpool.get_obj(self, core=core) + + @patch("Bcfg2.Server.Plugin.GroupSpool.__init__") + @patch("Bcfg2.Server.Plugin.PullTarget.__init__") + def test__init(self, mock_pulltarget_init, mock_groupspool_init): + core = Mock() + core.setup = MagicMock() + cfg = self.test_obj(core, datastore) + mock_pulltarget_init.assert_called_with(cfg) + mock_groupspool_init.assert_called_with(cfg, core, datastore) + core.setup.add_option.assert_called_with("validate", + Bcfg2.Options.CFG_VALIDATION) + core.setup.reparse.assert_called_with() + + core.reset_mock() + core.setup.reset_mock() + mock_pulltarget_init.reset_mock() + mock_groupspool_init.reset_mock() + core.setup.__contains__.return_value = True + cfg = self.test_obj(core, datastore) + mock_pulltarget_init.assert_called_with(cfg) + mock_groupspool_init.assert_called_with(cfg, core, datastore) + self.assertFalse(core.setup.add_option.called) + self.assertFalse(core.setup.reparse.called) + + def test_has_generator(self): + cfg = self.get_obj() + cfg.entries = dict() + entry = lxml.etree.Element("Path", name="/test.txt") + metadata = Mock() + + self.assertFalse(cfg.has_generator(entry, metadata)) + + eset = Mock() + eset.get_handlers.return_value = [] + cfg.entries[entry.get("name")] = eset + self.assertFalse(cfg.has_generator(entry, metadata)) + eset.get_handlers.assert_called_with(metadata, CfgGenerator) + + eset.get_handlers.reset_mock() + eset.get_handlers.return_value = [Mock()] + self.assertTrue(cfg.has_generator(entry, metadata)) + eset.get_handlers.assert_called_with(metadata, CfgGenerator) diff --git a/testsuite/common.py b/testsuite/common.py index 3aeeaf5f2..5d8157e55 100644 --- a/testsuite/common.py +++ b/testsuite/common.py @@ -284,3 +284,10 @@ class patchIf(object): # 10 args -- mocksignature has been removed args.pop(5) return _noop_patch(*args)(func) + + +try: + re_type = re._pattern_type +except AttributeError: + re_type = type(re.compile("")) + -- cgit v1.2.3-1-g7c22