diff options
-rw-r--r-- | src/lib/Bcfg2/Server/Plugin.py | 56 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/Plugins/Decisions.py | 6 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/Plugins/Probes.py | 5 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/Plugins/TemplateHelper.py | 7 | ||||
-rw-r--r-- | testsuite/Testlib/TestServer/TestPlugin.py | 399 | ||||
-rw-r--r-- | testsuite/Testlib/TestServer/TestPlugins/TestProbes.py | 15 |
6 files changed, 432 insertions, 56 deletions
diff --git a/src/lib/Bcfg2/Server/Plugin.py b/src/lib/Bcfg2/Server/Plugin.py index b630081d4..3fcf578d1 100644 --- a/src/lib/Bcfg2/Server/Plugin.py +++ b/src/lib/Bcfg2/Server/Plugin.py @@ -36,16 +36,15 @@ del default_file_metadata['args'] logger = logging.getLogger('Bcfg2.Server.Plugin') -info_regex = re.compile( \ - 'encoding:(\s)*(?P<encoding>\w+)|' + - 'group:(\s)*(?P<group>\S+)|' + - 'important:(\s)*(?P<important>\S+)|' + - 'mtime:(\s)*(?P<mtime>\w+)|' + - 'owner:(\s)*(?P<owner>\S+)|' + - 'paranoid:(\s)*(?P<paranoid>\S+)|' + - 'perms:(\s)*(?P<perms>\w+)|' + - 'secontext:(\s)*(?P<secontext>\S+)|' + - 'sensitive:(\s)*(?P<sensitive>\S+)|') +info_regex = re.compile('owner:(\s)*(?P<owner>\S+)|' + + 'group:(\s)*(?P<group>\S+)|' + + 'perms:(\s)*(?P<perms>\w+)|' + + 'secontext:(\s)*(?P<secontext>\S+)|' + + 'paranoid:(\s)*(?P<paranoid>\S+)|' + + 'sensitive:(\s)*(?P<sensitive>\S+)|' + + 'encoding:(\s)*(?P<encoding>\w+)|' + + 'important:(\s)*(?P<important>\S+)|' + + 'mtime:(\s)*(?P<mtime>\w+)|') def bind_info(entry, metadata, infoxml=None, default=default_file_metadata): for attr, val in list(default.items()): @@ -654,7 +653,6 @@ class XMLFileBacked(FileBacked): if fpath not in self.extras: if os.path.exists(fpath): self._follow_xincludes(fname=fpath) - print "adding monitor to %s" % fpath self.add_monitor(fpath) else: msg = "%s: %s does not exist, skipping" % (self.name, name) @@ -1068,7 +1066,7 @@ class EntrySet(Debuggable): """Entry sets deal with the host- and group-specific entries.""" ignore = re.compile("^(\.#.*|.*~|\\..*\\.(sw[px])|.*\\.genshi_include)$") - def __init__(self, basename, path, entry_type, encoding): + def __init__(self, basename, path, entry_type, encoding, is_regex=False): Debuggable.__init__(self, name=basename) self.path = path self.entry_type = entry_type @@ -1076,7 +1074,12 @@ class EntrySet(Debuggable): self.metadata = default_file_metadata.copy() self.infoxml = None self.encoding = encoding - pattern = '(.*/)?%s(\.((H_(?P<hostname>\S+))|' % basename + + if is_regex: + base_pat = basename + else: + base_pat = re.escape(basename) + pattern = '(.*/)?%s(\.((H_(?P<hostname>\S+))|' % base_pat pattern += '(G(?P<prio>\d+)_(?P<group>\S+))))?$' self.specific = re.compile(pattern) @@ -1093,20 +1096,13 @@ class EntrySet(Debuggable): if matching is None: matching = self.get_matching(metadata) - hspec = [ent for ent in matching if ent.specific.hostname] - if hspec: - return hspec[0] - - gspec = [ent for ent in matching if ent.specific.group] - if gspec: - gspec.sort() - return gspec[-1] - - aspec = [ent for ent in matching if ent.specific.all] - if aspec: - return aspec[0] - - raise PluginExecutionError + if matching: + matching.sort() + return matching[0] + else: + raise PluginExecutionError("No matching entries available for %s " + "for %s" % (self.path, + metadata.hostname)) def handle_event(self, event): """Handle FAM events for the TemplateSet.""" @@ -1195,8 +1191,7 @@ class EntrySet(Debuggable): if value: self.metadata[key] = value if len(self.metadata['perms']) == 3: - self.metadata['perms'] = "0%s" % \ - (self.metadata['perms']) + self.metadata['perms'] = "0%s" % self.metadata['perms'] def reset_metadata(self, event): """Reset metadata to defaults if info or info.xml removed.""" @@ -1209,7 +1204,8 @@ class EntrySet(Debuggable): bind_info(entry, metadata, infoxml=self.infoxml, default=self.metadata) def bind_entry(self, entry, metadata): - """Return the appropriate interpreted template from the set of available templates.""" + """Return the appropriate interpreted template from the set of + available templates.""" self.bind_info_to_entry(entry, metadata) return self.best_matching(metadata).bind_entry(entry, metadata) diff --git a/src/lib/Bcfg2/Server/Plugins/Decisions.py b/src/lib/Bcfg2/Server/Plugins/Decisions.py index b432474f2..78b549c2c 100644 --- a/src/lib/Bcfg2/Server/Plugins/Decisions.py +++ b/src/lib/Bcfg2/Server/Plugins/Decisions.py @@ -23,9 +23,9 @@ class DecisionSet(Bcfg2.Server.Plugin.EntrySet): - `encoding`: XML character encoding """ - pattern = '(white|black)list' - Bcfg2.Server.Plugin.EntrySet.__init__(self, pattern, path, \ - DecisionFile, encoding) + Bcfg2.Server.Plugin.EntrySet.__init__(self, '(white|black)list', path, + DecisionFile, encoding, + is_regex=True) try: fam.AddMonitor(path, self) except OSError: diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py index 53deefebd..e08f52a28 100644 --- a/src/lib/Bcfg2/Server/Plugins/Probes.py +++ b/src/lib/Bcfg2/Server/Plugins/Probes.py @@ -115,11 +115,10 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet): bangline = re.compile('^#!\s*(?P<interpreter>.*)$') def __init__(self, path, fam, encoding, plugin_name): - fpattern = '[0-9A-Za-z_\-]+' self.plugin_name = plugin_name - Bcfg2.Server.Plugin.EntrySet.__init__(self, fpattern, path, + Bcfg2.Server.Plugin.EntrySet.__init__(self, '[0-9A-Za-z_\-]+', path, Bcfg2.Server.Plugin.SpecificData, - encoding) + encoding, is_regex=True) fam.AddMonitor(path, self) def HandleEvent(self, event): diff --git a/src/lib/Bcfg2/Server/Plugins/TemplateHelper.py b/src/lib/Bcfg2/Server/Plugins/TemplateHelper.py index 3712506d6..2b3aa6bc5 100644 --- a/src/lib/Bcfg2/Server/Plugins/TemplateHelper.py +++ b/src/lib/Bcfg2/Server/Plugins/TemplateHelper.py @@ -50,12 +50,13 @@ class HelperModule(Bcfg2.Server.Plugin.SpecificData): class HelperSet(Bcfg2.Server.Plugin.EntrySet): ignore = re.compile("^(\.#.*|.*~|\\..*\\.(sw[px])|.*\.py[co])$") + fpattern = '[0-9A-Za-z_\-]+\.py' def __init__(self, path, fam, encoding, plugin_name): - fpattern = '[0-9A-Za-z_\-]+\.py' self.plugin_name = plugin_name - Bcfg2.Server.Plugin.EntrySet.__init__(self, fpattern, path, - HelperModule, encoding) + Bcfg2.Server.Plugin.EntrySet.__init__(self, self.fpattern, path, + HelperModule, encoding, + is_regex=True) fam.AddMonitor(path, self) def HandleEvent(self, event): diff --git a/testsuite/Testlib/TestServer/TestPlugin.py b/testsuite/Testlib/TestServer/TestPlugin.py index 9ca3243a3..e5baf9a0b 100644 --- a/testsuite/Testlib/TestServer/TestPlugin.py +++ b/testsuite/Testlib/TestServer/TestPlugin.py @@ -1,4 +1,5 @@ import os +import re import copy import logging import unittest @@ -164,32 +165,39 @@ class TestPluginDatabaseModel(Bcfg2TestCase): class TestGenerator(Bcfg2TestCase): - def test_HandleEntry(self): - g = Generator() - self.assertRaises(NotImplementedError, - g.HandleEntry, None, None) + test_obj = Generator class TestStructure(Bcfg2TestCase): + test_obj = Structure + + def get_obj(self): + return self.test_obj() + def test_BuildStructures(self): - s = Structure() + s = self.get_obj() self.assertRaises(NotImplementedError, s.BuildStructures, None) class TestMetadata(Bcfg2TestCase): + test_obj = Metadata + + def get_obj(self): + return self.test_obj() + def test_get_initial_metadata(self): - m = Metadata() + m = self.get_obj() self.assertRaises(NotImplementedError, m.get_initial_metadata, None) def test_merge_additional_data(self): - m = Metadata() + m = self.get_obj() self.assertRaises(NotImplementedError, m.merge_additional_data, None, None, None) def test_merge_additional_groups(self): - m = Metadata() + m = self.get_obj() self.assertRaises(NotImplementedError, m.merge_additional_groups, None, None) @@ -1511,12 +1519,12 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked): pd.get_attrs, entry, metadata) -class SpecificityError(Bcfg2TestCase): +class TestSpecificityError(Bcfg2TestCase): """ placeholder for future tests """ pass -class Specificity(Bcfg2TestCase): +class TestSpecificity(Bcfg2TestCase): test_obj = Specificity def get_obj(self, **kwargs): @@ -1560,7 +1568,6 @@ class Specificity(Bcfg2TestCase): for i in range(len(specs)): for j in range(len(specs)): - print "i=%s, j=%s" % (i, j) if i < j: self.assertGreater(specs[i], specs[j]) self.assertLess(specs[j], specs[i]) @@ -1578,7 +1585,7 @@ class Specificity(Bcfg2TestCase): self.assertGreaterEqual(specs[j], specs[i]) -class SpecificData(Bcfg2TestCase): +class TestSpecificData(Bcfg2TestCase): test_obj = SpecificData def get_obj(self, name="/test.txt", specific=None, encoding=None): @@ -1608,12 +1615,376 @@ class SpecificData(Bcfg2TestCase): class TestEntrySet(TestDebuggable): test_obj = EntrySet + # filenames that should be matched successfully by the EntrySet + # 'specific' regex. these are filenames alone -- a specificity + # will be added to these + basenames = ["test", "test.py", "test with spaces.txt", + "test.multiple.dots.py", "test_underscores.and.dots", + "really_misleading.G10_test", + "name$with*regex(special){chars}", + "misleading.H_hostname.test.com"] + # filenames that do not match any of the basenames (or the + # basename regex, if applicable) + bogus_names = ["bogus"] + # filenames that should be ignored + ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", + "test.txt.genshi_include", "test.G_foo.genshi_include"] + - def get_obj(self, basename="test", path=datastore, entry_type=SpecificData, + def get_obj(self, basename="test", path=datastore, entry_type=MagicMock(), encoding=None): return self.test_obj(basename, path, entry_type, encoding) + def test__init(self): + for basename in self.basenames: + eset = self.get_obj(basename=basename) + self.assertIsInstance(eset.specific, re._pattern_type) + self.assertTrue(eset.specific.match(os.path.join(datastore, + basename))) + ppath = os.path.join(datastore, "Plugin", basename) + self.assertTrue(eset.specific.match(ppath)) + self.assertTrue(eset.specific.match(ppath + ".G20_foo")) + self.assertTrue(eset.specific.match(ppath + ".G1_foo")) + self.assertTrue(eset.specific.match(ppath + ".G32768_foo")) + # a group named '_' + self.assertTrue(eset.specific.match(ppath + ".G10__")) + self.assertTrue(eset.specific.match(ppath + ".H_hostname")) + self.assertTrue(eset.specific.match(ppath + ".H_fqdn.subdomain.example.com")) + self.assertTrue(eset.specific.match(ppath + ".G20_group_with_underscores")) + + self.assertFalse(eset.specific.match(ppath + ".G20_group with spaces")) + self.assertFalse(eset.specific.match(ppath + ".G_foo")) + self.assertFalse(eset.specific.match(ppath + ".G_")) + self.assertFalse(eset.specific.match(ppath + ".G20_")) + self.assertFalse(eset.specific.match(ppath + ".H_")) + + for bogus in self.bogus_names: + self.assertFalse(eset.specific.match(os.path.join(datastore, + "Plugin", + bogus))) + + for ignore in self.ignore: + self.assertTrue(eset.ignore.match(ignore)) + + self.assertFalse(eset.ignore.match(basename)) + self.assertFalse(eset.ignore.match(basename + ".G20_foo")) + self.assertFalse(eset.ignore.match(basename + ".G1_foo")) + self.assertFalse(eset.ignore.match(basename + ".G32768_foo")) + self.assertFalse(eset.ignore.match(basename + ".G10__")) + self.assertFalse(eset.ignore.match(basename + ".H_hostname")) + self.assertFalse(eset.ignore.match(basename + ".H_fqdn.subdomain.example.com")) + self.assertFalse(eset.ignore.match(basename + ".G20_group_with_underscores")) + + def test_get_matching(self): + items = {0: Mock(), 1: Mock(), 2: Mock(), 3: Mock(), 4: Mock(), + 5: Mock()} + items[0].specific.matches.return_value = False + items[1].specific.matches.return_value = True + items[2].specific.matches.return_value = False + items[3].specific.matches.return_value = False + items[4].specific.matches.return_value = True + items[5].specific.matches.return_value = True + metadata = Mock() + eset = self.get_obj() + eset.entries = items + self.assertItemsEqual(eset.get_matching(metadata), + [items[1], items[4], items[5]]) + for i in items.values(): + i.specific.matches.assert_called_with(metadata) + + @patch("Bcfg2.Server.Plugin.%s.get_matching" % test_obj.__name__) + def test_best_matching(self, mock_get_matching): + eset = self.get_obj() + metadata = Mock() + matching = [] + + def reset(): + mock_get_matching.reset_mock() + metadata.reset_mock() + for m in matching: + m.reset_mock() + + def specific(all=False, group=False, prio=None, host=False): + spec = MagicMock() + spec.all = all + spec.group = group + spec.prio = prio + spec.host = host + if prio: + spec.__cmp__ = lambda o: cmp(spec.prio, o.prio) + return spec + + self.assertRaises(PluginExecutionError, + eset.best_matching, metadata, matching=[]) + + reset() + mock_get_matching.return_value = matching + self.assertRaises(PluginExecutionError, + eset.best_matching, metadata) + mock_get_matching.assert_called_with(metadata) + + # test with a single file for all + reset() + matching.insert(0, specific(all=True)) + mock_get_matching.return_value = matching + self.assertEqual(eset.best_matching(metadata), + matching[0]) + mock_get_matching.assert_called_with(metadata) + + # test with a single group-specific file + reset() + matching.insert(0, specific(group=True, prio=10)) + mock_get_matching.return_value = matching + self.assertEqual(eset.best_matching(metadata), + matching[0]) + mock_get_matching.assert_called_with(metadata) + + # test with multiple group-specific files + reset() + matching.insert(0, specific(group=True, prio=20)) + mock_get_matching.return_value = matching + self.assertEqual(eset.best_matching(metadata), + matching[0]) + mock_get_matching.assert_called_with(metadata) + + # test with host-specific file + reset() + matching.insert(0, specific(host=True)) + mock_get_matching.return_value = matching + self.assertEqual(eset.best_matching(metadata), + matching[0]) + mock_get_matching.assert_called_with(metadata) + + @patch("Bcfg2.Server.Plugin.%s.entry_init" % test_obj.__name__) + @patch("Bcfg2.Server.Plugin.%s.reset_metadata" % test_obj.__name__) + @patch("Bcfg2.Server.Plugin.%s.update_metadata" % test_obj.__name__) + def test_handle_event(self, mock_update_md, mock_reset_md, mock_init): + def reset(): + mock_update_md.reset_mock() + mock_reset_md.reset_mock() + mock_init.reset_mock() + + eset = self.get_obj() + for fname in ["info", "info.xml", ":info"]: + for evt in ["exists", "created", "changed"]: + reset() + event = Mock() + event.code2str.return_value = evt + event.filename = fname + eset.handle_event(event) + mock_update_md.assert_called_with(event) + self.assertFalse(mock_init.called) + self.assertFalse(mock_reset_md.called) + + reset() + event = Mock() + event.code2str.return_value = "deleted" + event.filename = fname + eset.handle_event(event) + mock_reset_md.assert_called_with(event) + self.assertFalse(mock_init.called) + self.assertFalse(mock_update_md.called) + + for evt in ["exists", "created", "changed"]: + reset() + event = Mock() + event.code2str.return_value = evt + event.filename = "test.txt" + eset.handle_event(event) + mock_init.assert_called_with(event) + self.assertFalse(mock_reset_md.called) + self.assertFalse(mock_update_md.called) + + reset() + entry = Mock() + eset.entries["test.txt"] = entry + event = Mock() + event.code2str.return_value = "changed" + event.filename = "test.txt" + eset.handle_event(event) + entry.handle_event.assert_called_with(event) + self.assertFalse(mock_init.called) + self.assertFalse(mock_reset_md.called) + self.assertFalse(mock_update_md.called) + + reset() + entry = Mock() + eset.entries["test.txt"] = entry + event = Mock() + event.code2str.return_value = "deleted" + event.filename = "test.txt" + eset.handle_event(event) + self.assertNotIn("test.txt", eset.entries) + + @patch("Bcfg2.Server.Plugin.%s.specificity_from_filename" % + test_obj.__name__) + def test_entry_init(self, mock_spec): + eset = self.get_obj() + + def reset(): + eset.entry_type.reset_mock() + mock_spec.reset_mock() + + event = Mock() + event.code2str.return_value = "created" + event.filename = "test.txt" + eset.entry_init(event) + mock_spec.assert_called_with("test.txt", specific=None) + eset.entry_type.assert_called_with(os.path.join(eset.path, "test.txt"), + mock_spec.return_value, None) + eset.entry_type.return_value.handle_event.assert_called_with(event) + self.assertIn("test.txt", eset.entries) + + # test duplicate add + reset() + eset.entry_init(event) + self.assertFalse(mock_spec.called) + self.assertFalse(eset.entry_type.called) + eset.entries["test.txt"].handle_event.assert_called_with(event) + + # test keyword args + etype = Mock() + specific = Mock() + event = Mock() + event.code2str.return_value = "created" + event.filename = "test2.txt" + eset.entry_init(event, entry_type=etype, specific=specific) + mock_spec.assert_called_with("test2.txt", specific=specific) + etype.assert_called_with(os.path.join(eset.path, "test2.txt"), + mock_spec.return_value, None) + etype.return_value.handle_event.assert_called_with(event) + self.assertIn("test2.txt", eset.entries) + + # test specificity error + event = Mock() + event.code2str.return_value = "created" + event.filename = "test3.txt" + mock_spec.side_effect = SpecificityError + eset.entry_init(event) + mock_spec.assert_called_with("test3.txt", specific=None) + self.assertFalse(eset.entry_type.called) + + @patch("Bcfg2.Server.Plugin.Specificity") + def test_specificity_from_filename(self, mock_spec): + def test(eset, fname, **kwargs): + mock_spec.reset_mock() + if "specific" in kwargs: + specific = kwargs['specific'] + del kwargs['specific'] + else: + specific = None + self.assertEqual(eset.specificity_from_filename(fname, + specific=specific), + mock_spec.return_value) + mock_spec.assert_called_with(**kwargs) + + def fails(eset, fname, specific=None): + mock_spec.reset_mock() + self.assertRaises(SpecificityError, + eset.specificity_from_filename, fname, + specific=specific) + + for basename in self.basenames: + eset = self.get_obj(basename=basename) + ppath = os.path.join(datastore, "Plugin", basename) + test(eset, ppath, all=True) + test(eset, ppath + ".G20_foo", group="foo", prio=20) + test(eset, ppath + ".G1_foo", group="foo", prio=1) + test(eset, ppath + ".G32768_foo", group="foo", prio=32768) + test(eset, ppath + ".G10__", group="_", prio=10) + test(eset, ppath + ".H_hostname", hostname="hostname") + test(eset, ppath + ".H_fqdn.subdomain.example.com", + hostname="fqdn.subdomain.example.com") + test(eset, ppath + ".G20_group_with_underscores", + group="group_with_underscores", prio=20) + + for bogus in self.bogus_names: + fails(eset, bogus) + fails(eset, ppath + ".G_group with spaces") + fails(eset, ppath + ".G_foo") + fails(eset, ppath + ".G_") + fails(eset, ppath + ".G20_") + fails(eset, ppath + ".H_") + + @patch("__builtin__.open") + @patch("Bcfg2.Server.Plugin.InfoXML") + def test_update_metadata(self, mock_InfoXML, mock_open): + eset = self.get_obj() + + # add info.xml + event = Mock() + event.filename = "info.xml" + eset.update_metadata(event) + mock_InfoXML.assert_called_with(os.path.join(eset.path, "info.xml"), + True) + mock_InfoXML.return_value.HandleEvent.assert_called_with(event) + self.assertEqual(eset.infoxml, mock_InfoXML.return_value) + + # modify info.xml + mock_InfoXML.reset_mock() + eset.update_metadata(event) + self.assertFalse(mock_InfoXML.called) + eset.infoxml.HandleEvent.assert_called_with(event) + + for fname in [':info', 'info']: + event = Mock() + event.filename = fname + + idata = ["owner:owner", + "group: GROUP", + "perms: 775", + "important: true", + "bogus: line"] + mock_open.return_value.readlines.return_value = idata + eset.update_metadata(event) + expected = default_file_metadata.copy() + expected['owner'] = 'owner' + expected['group'] = 'GROUP' + expected['perms'] = '0775' + expected['important'] = 'true' + self.assertItemsEqual(eset.metadata, + expected) + + def test_reset_metadata(self): + eset = self.get_obj() + + # test info.xml + event = Mock() + event.filename = "info.xml" + eset.infoxml = Mock() + eset.reset_metadata(event) + self.assertIsNone(eset.infoxml) + + for fname in [':info', 'info']: + event = Mock() + event.filename = fname + eset.metadata = Mock() + eset.reset_metadata(event) + self.assertItemsEqual(eset.metadata, default_file_metadata) + + @patch("Bcfg2.Server.Plugin.bind_info") + def test_bind_info_to_entry(self, mock_bind_info): + eset = self.get_obj() + entry = Mock() + metadata = Mock() + eset.bind_info_to_entry(entry, metadata) + mock_bind_info.assert_called_with(entry, metadata, + infoxml=eset.infoxml, + default=eset.metadata) + + @patch("Bcfg2.Server.Plugin.%s.best_matching" % test_obj.__name__) + @patch("Bcfg2.Server.Plugin.%s.bind_info_to_entry" % test_obj.__name__) + def test_bind_entry(self, mock_bind_info, mock_best_matching): + eset = self.get_obj() + entry = Mock() + metadata = Mock() + eset.bind_entry(entry, metadata) + mock_bind_info.assert_called_with(entry, metadata) + mock_best_matching.assert_called_with(metadata) + mock_best_matching.return_value.bind_entry.assert_called_with(entry, + metadata) + -class GroupSpool(TestPlugin, TestGenerator): +class TestGroupSpool(TestPlugin, TestGenerator): test_obj = GroupSpool pass diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py index f90096210..0bcb65dc4 100644 --- a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py @@ -3,7 +3,7 @@ import sys import time import unittest import lxml.etree -from mock import Mock, patch +from mock import Mock, MagicMock, patch from ....common import * import Bcfg2.Server import Bcfg2.Server.Plugin @@ -75,18 +75,27 @@ class TestProbeData(Bcfg2TestCase): class TestProbeSet(TestEntrySet): test_obj = ProbeSet + basenames = ["test", "_test", "test-test"] + ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"] + bogus_names = ["test.py"] def get_obj(self, path=datastore, fam=None, encoding=None, - plugin_name="Probes"): + plugin_name="Probes", basename=None): + # get_obj() accepts the basename argument, accepted by the + # parent get_obj() method, and just throws it away, since + # ProbeSet uses a regex for the "basename" if fam is None: fam = Mock() - return self.test_obj(path, fam, encoding, plugin_name) + rv = self.test_obj(path, fam, encoding, plugin_name) + rv.entry_type = MagicMock() + return rv def test__init(self): fam = Mock() ps = self.get_obj(fam=fam) self.assertEqual(ps.plugin_name, "Probes") fam.AddMonitor.assert_called_with(datastore, ps) + TestEntrySet.test__init(self) def test_HandleEvent(self): ps = self.get_obj() |