summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lib/Bcfg2/Server/Plugin.py56
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Decisions.py6
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Probes.py5
-rw-r--r--src/lib/Bcfg2/Server/Plugins/TemplateHelper.py7
-rw-r--r--testsuite/Testlib/TestServer/TestPlugin.py399
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestProbes.py15
6 files changed, 432 insertions, 56 deletions
diff --git a/src/lib/Bcfg2/Server/Plugin.py b/src/lib/Bcfg2/Server/Plugin.py
index b630081d4..3fcf578d1 100644
--- a/src/lib/Bcfg2/Server/Plugin.py
+++ b/src/lib/Bcfg2/Server/Plugin.py
@@ -36,16 +36,15 @@ del default_file_metadata['args']
logger = logging.getLogger('Bcfg2.Server.Plugin')
-info_regex = re.compile( \
- 'encoding:(\s)*(?P<encoding>\w+)|' +
- 'group:(\s)*(?P<group>\S+)|' +
- 'important:(\s)*(?P<important>\S+)|' +
- 'mtime:(\s)*(?P<mtime>\w+)|' +
- 'owner:(\s)*(?P<owner>\S+)|' +
- 'paranoid:(\s)*(?P<paranoid>\S+)|' +
- 'perms:(\s)*(?P<perms>\w+)|' +
- 'secontext:(\s)*(?P<secontext>\S+)|' +
- 'sensitive:(\s)*(?P<sensitive>\S+)|')
+info_regex = re.compile('owner:(\s)*(?P<owner>\S+)|' +
+ 'group:(\s)*(?P<group>\S+)|' +
+ 'perms:(\s)*(?P<perms>\w+)|' +
+ 'secontext:(\s)*(?P<secontext>\S+)|' +
+ 'paranoid:(\s)*(?P<paranoid>\S+)|' +
+ 'sensitive:(\s)*(?P<sensitive>\S+)|' +
+ 'encoding:(\s)*(?P<encoding>\w+)|' +
+ 'important:(\s)*(?P<important>\S+)|' +
+ 'mtime:(\s)*(?P<mtime>\w+)|')
def bind_info(entry, metadata, infoxml=None, default=default_file_metadata):
for attr, val in list(default.items()):
@@ -654,7 +653,6 @@ class XMLFileBacked(FileBacked):
if fpath not in self.extras:
if os.path.exists(fpath):
self._follow_xincludes(fname=fpath)
- print "adding monitor to %s" % fpath
self.add_monitor(fpath)
else:
msg = "%s: %s does not exist, skipping" % (self.name, name)
@@ -1068,7 +1066,7 @@ class EntrySet(Debuggable):
"""Entry sets deal with the host- and group-specific entries."""
ignore = re.compile("^(\.#.*|.*~|\\..*\\.(sw[px])|.*\\.genshi_include)$")
- def __init__(self, basename, path, entry_type, encoding):
+ def __init__(self, basename, path, entry_type, encoding, is_regex=False):
Debuggable.__init__(self, name=basename)
self.path = path
self.entry_type = entry_type
@@ -1076,7 +1074,12 @@ class EntrySet(Debuggable):
self.metadata = default_file_metadata.copy()
self.infoxml = None
self.encoding = encoding
- pattern = '(.*/)?%s(\.((H_(?P<hostname>\S+))|' % basename
+
+ if is_regex:
+ base_pat = basename
+ else:
+ base_pat = re.escape(basename)
+ pattern = '(.*/)?%s(\.((H_(?P<hostname>\S+))|' % base_pat
pattern += '(G(?P<prio>\d+)_(?P<group>\S+))))?$'
self.specific = re.compile(pattern)
@@ -1093,20 +1096,13 @@ class EntrySet(Debuggable):
if matching is None:
matching = self.get_matching(metadata)
- hspec = [ent for ent in matching if ent.specific.hostname]
- if hspec:
- return hspec[0]
-
- gspec = [ent for ent in matching if ent.specific.group]
- if gspec:
- gspec.sort()
- return gspec[-1]
-
- aspec = [ent for ent in matching if ent.specific.all]
- if aspec:
- return aspec[0]
-
- raise PluginExecutionError
+ if matching:
+ matching.sort()
+ return matching[0]
+ else:
+ raise PluginExecutionError("No matching entries available for %s "
+ "for %s" % (self.path,
+ metadata.hostname))
def handle_event(self, event):
"""Handle FAM events for the TemplateSet."""
@@ -1195,8 +1191,7 @@ class EntrySet(Debuggable):
if value:
self.metadata[key] = value
if len(self.metadata['perms']) == 3:
- self.metadata['perms'] = "0%s" % \
- (self.metadata['perms'])
+ self.metadata['perms'] = "0%s" % self.metadata['perms']
def reset_metadata(self, event):
"""Reset metadata to defaults if info or info.xml removed."""
@@ -1209,7 +1204,8 @@ class EntrySet(Debuggable):
bind_info(entry, metadata, infoxml=self.infoxml, default=self.metadata)
def bind_entry(self, entry, metadata):
- """Return the appropriate interpreted template from the set of available templates."""
+ """Return the appropriate interpreted template from the set of
+ available templates."""
self.bind_info_to_entry(entry, metadata)
return self.best_matching(metadata).bind_entry(entry, metadata)
diff --git a/src/lib/Bcfg2/Server/Plugins/Decisions.py b/src/lib/Bcfg2/Server/Plugins/Decisions.py
index b432474f2..78b549c2c 100644
--- a/src/lib/Bcfg2/Server/Plugins/Decisions.py
+++ b/src/lib/Bcfg2/Server/Plugins/Decisions.py
@@ -23,9 +23,9 @@ class DecisionSet(Bcfg2.Server.Plugin.EntrySet):
- `encoding`: XML character encoding
"""
- pattern = '(white|black)list'
- Bcfg2.Server.Plugin.EntrySet.__init__(self, pattern, path, \
- DecisionFile, encoding)
+ Bcfg2.Server.Plugin.EntrySet.__init__(self, '(white|black)list', path,
+ DecisionFile, encoding,
+ is_regex=True)
try:
fam.AddMonitor(path, self)
except OSError:
diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py
index 53deefebd..e08f52a28 100644
--- a/src/lib/Bcfg2/Server/Plugins/Probes.py
+++ b/src/lib/Bcfg2/Server/Plugins/Probes.py
@@ -115,11 +115,10 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet):
bangline = re.compile('^#!\s*(?P<interpreter>.*)$')
def __init__(self, path, fam, encoding, plugin_name):
- fpattern = '[0-9A-Za-z_\-]+'
self.plugin_name = plugin_name
- Bcfg2.Server.Plugin.EntrySet.__init__(self, fpattern, path,
+ Bcfg2.Server.Plugin.EntrySet.__init__(self, '[0-9A-Za-z_\-]+', path,
Bcfg2.Server.Plugin.SpecificData,
- encoding)
+ encoding, is_regex=True)
fam.AddMonitor(path, self)
def HandleEvent(self, event):
diff --git a/src/lib/Bcfg2/Server/Plugins/TemplateHelper.py b/src/lib/Bcfg2/Server/Plugins/TemplateHelper.py
index 3712506d6..2b3aa6bc5 100644
--- a/src/lib/Bcfg2/Server/Plugins/TemplateHelper.py
+++ b/src/lib/Bcfg2/Server/Plugins/TemplateHelper.py
@@ -50,12 +50,13 @@ class HelperModule(Bcfg2.Server.Plugin.SpecificData):
class HelperSet(Bcfg2.Server.Plugin.EntrySet):
ignore = re.compile("^(\.#.*|.*~|\\..*\\.(sw[px])|.*\.py[co])$")
+ fpattern = '[0-9A-Za-z_\-]+\.py'
def __init__(self, path, fam, encoding, plugin_name):
- fpattern = '[0-9A-Za-z_\-]+\.py'
self.plugin_name = plugin_name
- Bcfg2.Server.Plugin.EntrySet.__init__(self, fpattern, path,
- HelperModule, encoding)
+ Bcfg2.Server.Plugin.EntrySet.__init__(self, self.fpattern, path,
+ HelperModule, encoding,
+ is_regex=True)
fam.AddMonitor(path, self)
def HandleEvent(self, event):
diff --git a/testsuite/Testlib/TestServer/TestPlugin.py b/testsuite/Testlib/TestServer/TestPlugin.py
index 9ca3243a3..e5baf9a0b 100644
--- a/testsuite/Testlib/TestServer/TestPlugin.py
+++ b/testsuite/Testlib/TestServer/TestPlugin.py
@@ -1,4 +1,5 @@
import os
+import re
import copy
import logging
import unittest
@@ -164,32 +165,39 @@ class TestPluginDatabaseModel(Bcfg2TestCase):
class TestGenerator(Bcfg2TestCase):
- def test_HandleEntry(self):
- g = Generator()
- self.assertRaises(NotImplementedError,
- g.HandleEntry, None, None)
+ test_obj = Generator
class TestStructure(Bcfg2TestCase):
+ test_obj = Structure
+
+ def get_obj(self):
+ return self.test_obj()
+
def test_BuildStructures(self):
- s = Structure()
+ s = self.get_obj()
self.assertRaises(NotImplementedError,
s.BuildStructures, None)
class TestMetadata(Bcfg2TestCase):
+ test_obj = Metadata
+
+ def get_obj(self):
+ return self.test_obj()
+
def test_get_initial_metadata(self):
- m = Metadata()
+ m = self.get_obj()
self.assertRaises(NotImplementedError,
m.get_initial_metadata, None)
def test_merge_additional_data(self):
- m = Metadata()
+ m = self.get_obj()
self.assertRaises(NotImplementedError,
m.merge_additional_data, None, None, None)
def test_merge_additional_groups(self):
- m = Metadata()
+ m = self.get_obj()
self.assertRaises(NotImplementedError,
m.merge_additional_groups, None, None)
@@ -1511,12 +1519,12 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
pd.get_attrs, entry, metadata)
-class SpecificityError(Bcfg2TestCase):
+class TestSpecificityError(Bcfg2TestCase):
""" placeholder for future tests """
pass
-class Specificity(Bcfg2TestCase):
+class TestSpecificity(Bcfg2TestCase):
test_obj = Specificity
def get_obj(self, **kwargs):
@@ -1560,7 +1568,6 @@ class Specificity(Bcfg2TestCase):
for i in range(len(specs)):
for j in range(len(specs)):
- print "i=%s, j=%s" % (i, j)
if i < j:
self.assertGreater(specs[i], specs[j])
self.assertLess(specs[j], specs[i])
@@ -1578,7 +1585,7 @@ class Specificity(Bcfg2TestCase):
self.assertGreaterEqual(specs[j], specs[i])
-class SpecificData(Bcfg2TestCase):
+class TestSpecificData(Bcfg2TestCase):
test_obj = SpecificData
def get_obj(self, name="/test.txt", specific=None, encoding=None):
@@ -1608,12 +1615,376 @@ class SpecificData(Bcfg2TestCase):
class TestEntrySet(TestDebuggable):
test_obj = EntrySet
+ # filenames that should be matched successfully by the EntrySet
+ # 'specific' regex. these are filenames alone -- a specificity
+ # will be added to these
+ basenames = ["test", "test.py", "test with spaces.txt",
+ "test.multiple.dots.py", "test_underscores.and.dots",
+ "really_misleading.G10_test",
+ "name$with*regex(special){chars}",
+ "misleading.H_hostname.test.com"]
+ # filenames that do not match any of the basenames (or the
+ # basename regex, if applicable)
+ bogus_names = ["bogus"]
+ # filenames that should be ignored
+ ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx",
+ "test.txt.genshi_include", "test.G_foo.genshi_include"]
+
- def get_obj(self, basename="test", path=datastore, entry_type=SpecificData,
+ def get_obj(self, basename="test", path=datastore, entry_type=MagicMock(),
encoding=None):
return self.test_obj(basename, path, entry_type, encoding)
+ def test__init(self):
+ for basename in self.basenames:
+ eset = self.get_obj(basename=basename)
+ self.assertIsInstance(eset.specific, re._pattern_type)
+ self.assertTrue(eset.specific.match(os.path.join(datastore,
+ basename)))
+ ppath = os.path.join(datastore, "Plugin", basename)
+ self.assertTrue(eset.specific.match(ppath))
+ self.assertTrue(eset.specific.match(ppath + ".G20_foo"))
+ self.assertTrue(eset.specific.match(ppath + ".G1_foo"))
+ self.assertTrue(eset.specific.match(ppath + ".G32768_foo"))
+ # a group named '_'
+ self.assertTrue(eset.specific.match(ppath + ".G10__"))
+ self.assertTrue(eset.specific.match(ppath + ".H_hostname"))
+ self.assertTrue(eset.specific.match(ppath + ".H_fqdn.subdomain.example.com"))
+ self.assertTrue(eset.specific.match(ppath + ".G20_group_with_underscores"))
+
+ self.assertFalse(eset.specific.match(ppath + ".G20_group with spaces"))
+ self.assertFalse(eset.specific.match(ppath + ".G_foo"))
+ self.assertFalse(eset.specific.match(ppath + ".G_"))
+ self.assertFalse(eset.specific.match(ppath + ".G20_"))
+ self.assertFalse(eset.specific.match(ppath + ".H_"))
+
+ for bogus in self.bogus_names:
+ self.assertFalse(eset.specific.match(os.path.join(datastore,
+ "Plugin",
+ bogus)))
+
+ for ignore in self.ignore:
+ self.assertTrue(eset.ignore.match(ignore))
+
+ self.assertFalse(eset.ignore.match(basename))
+ self.assertFalse(eset.ignore.match(basename + ".G20_foo"))
+ self.assertFalse(eset.ignore.match(basename + ".G1_foo"))
+ self.assertFalse(eset.ignore.match(basename + ".G32768_foo"))
+ self.assertFalse(eset.ignore.match(basename + ".G10__"))
+ self.assertFalse(eset.ignore.match(basename + ".H_hostname"))
+ self.assertFalse(eset.ignore.match(basename + ".H_fqdn.subdomain.example.com"))
+ self.assertFalse(eset.ignore.match(basename + ".G20_group_with_underscores"))
+
+ def test_get_matching(self):
+ items = {0: Mock(), 1: Mock(), 2: Mock(), 3: Mock(), 4: Mock(),
+ 5: Mock()}
+ items[0].specific.matches.return_value = False
+ items[1].specific.matches.return_value = True
+ items[2].specific.matches.return_value = False
+ items[3].specific.matches.return_value = False
+ items[4].specific.matches.return_value = True
+ items[5].specific.matches.return_value = True
+ metadata = Mock()
+ eset = self.get_obj()
+ eset.entries = items
+ self.assertItemsEqual(eset.get_matching(metadata),
+ [items[1], items[4], items[5]])
+ for i in items.values():
+ i.specific.matches.assert_called_with(metadata)
+
+ @patch("Bcfg2.Server.Plugin.%s.get_matching" % test_obj.__name__)
+ def test_best_matching(self, mock_get_matching):
+ eset = self.get_obj()
+ metadata = Mock()
+ matching = []
+
+ def reset():
+ mock_get_matching.reset_mock()
+ metadata.reset_mock()
+ for m in matching:
+ m.reset_mock()
+
+ def specific(all=False, group=False, prio=None, host=False):
+ spec = MagicMock()
+ spec.all = all
+ spec.group = group
+ spec.prio = prio
+ spec.host = host
+ if prio:
+ spec.__cmp__ = lambda o: cmp(spec.prio, o.prio)
+ return spec
+
+ self.assertRaises(PluginExecutionError,
+ eset.best_matching, metadata, matching=[])
+
+ reset()
+ mock_get_matching.return_value = matching
+ self.assertRaises(PluginExecutionError,
+ eset.best_matching, metadata)
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with a single file for all
+ reset()
+ matching.insert(0, specific(all=True))
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata),
+ matching[0])
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with a single group-specific file
+ reset()
+ matching.insert(0, specific(group=True, prio=10))
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata),
+ matching[0])
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with multiple group-specific files
+ reset()
+ matching.insert(0, specific(group=True, prio=20))
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata),
+ matching[0])
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with host-specific file
+ reset()
+ matching.insert(0, specific(host=True))
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata),
+ matching[0])
+ mock_get_matching.assert_called_with(metadata)
+
+ @patch("Bcfg2.Server.Plugin.%s.entry_init" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.reset_metadata" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.update_metadata" % test_obj.__name__)
+ def test_handle_event(self, mock_update_md, mock_reset_md, mock_init):
+ def reset():
+ mock_update_md.reset_mock()
+ mock_reset_md.reset_mock()
+ mock_init.reset_mock()
+
+ eset = self.get_obj()
+ for fname in ["info", "info.xml", ":info"]:
+ for evt in ["exists", "created", "changed"]:
+ reset()
+ event = Mock()
+ event.code2str.return_value = evt
+ event.filename = fname
+ eset.handle_event(event)
+ mock_update_md.assert_called_with(event)
+ self.assertFalse(mock_init.called)
+ self.assertFalse(mock_reset_md.called)
+
+ reset()
+ event = Mock()
+ event.code2str.return_value = "deleted"
+ event.filename = fname
+ eset.handle_event(event)
+ mock_reset_md.assert_called_with(event)
+ self.assertFalse(mock_init.called)
+ self.assertFalse(mock_update_md.called)
+
+ for evt in ["exists", "created", "changed"]:
+ reset()
+ event = Mock()
+ event.code2str.return_value = evt
+ event.filename = "test.txt"
+ eset.handle_event(event)
+ mock_init.assert_called_with(event)
+ self.assertFalse(mock_reset_md.called)
+ self.assertFalse(mock_update_md.called)
+
+ reset()
+ entry = Mock()
+ eset.entries["test.txt"] = entry
+ event = Mock()
+ event.code2str.return_value = "changed"
+ event.filename = "test.txt"
+ eset.handle_event(event)
+ entry.handle_event.assert_called_with(event)
+ self.assertFalse(mock_init.called)
+ self.assertFalse(mock_reset_md.called)
+ self.assertFalse(mock_update_md.called)
+
+ reset()
+ entry = Mock()
+ eset.entries["test.txt"] = entry
+ event = Mock()
+ event.code2str.return_value = "deleted"
+ event.filename = "test.txt"
+ eset.handle_event(event)
+ self.assertNotIn("test.txt", eset.entries)
+
+ @patch("Bcfg2.Server.Plugin.%s.specificity_from_filename" %
+ test_obj.__name__)
+ def test_entry_init(self, mock_spec):
+ eset = self.get_obj()
+
+ def reset():
+ eset.entry_type.reset_mock()
+ mock_spec.reset_mock()
+
+ event = Mock()
+ event.code2str.return_value = "created"
+ event.filename = "test.txt"
+ eset.entry_init(event)
+ mock_spec.assert_called_with("test.txt", specific=None)
+ eset.entry_type.assert_called_with(os.path.join(eset.path, "test.txt"),
+ mock_spec.return_value, None)
+ eset.entry_type.return_value.handle_event.assert_called_with(event)
+ self.assertIn("test.txt", eset.entries)
+
+ # test duplicate add
+ reset()
+ eset.entry_init(event)
+ self.assertFalse(mock_spec.called)
+ self.assertFalse(eset.entry_type.called)
+ eset.entries["test.txt"].handle_event.assert_called_with(event)
+
+ # test keyword args
+ etype = Mock()
+ specific = Mock()
+ event = Mock()
+ event.code2str.return_value = "created"
+ event.filename = "test2.txt"
+ eset.entry_init(event, entry_type=etype, specific=specific)
+ mock_spec.assert_called_with("test2.txt", specific=specific)
+ etype.assert_called_with(os.path.join(eset.path, "test2.txt"),
+ mock_spec.return_value, None)
+ etype.return_value.handle_event.assert_called_with(event)
+ self.assertIn("test2.txt", eset.entries)
+
+ # test specificity error
+ event = Mock()
+ event.code2str.return_value = "created"
+ event.filename = "test3.txt"
+ mock_spec.side_effect = SpecificityError
+ eset.entry_init(event)
+ mock_spec.assert_called_with("test3.txt", specific=None)
+ self.assertFalse(eset.entry_type.called)
+
+ @patch("Bcfg2.Server.Plugin.Specificity")
+ def test_specificity_from_filename(self, mock_spec):
+ def test(eset, fname, **kwargs):
+ mock_spec.reset_mock()
+ if "specific" in kwargs:
+ specific = kwargs['specific']
+ del kwargs['specific']
+ else:
+ specific = None
+ self.assertEqual(eset.specificity_from_filename(fname,
+ specific=specific),
+ mock_spec.return_value)
+ mock_spec.assert_called_with(**kwargs)
+
+ def fails(eset, fname, specific=None):
+ mock_spec.reset_mock()
+ self.assertRaises(SpecificityError,
+ eset.specificity_from_filename, fname,
+ specific=specific)
+
+ for basename in self.basenames:
+ eset = self.get_obj(basename=basename)
+ ppath = os.path.join(datastore, "Plugin", basename)
+ test(eset, ppath, all=True)
+ test(eset, ppath + ".G20_foo", group="foo", prio=20)
+ test(eset, ppath + ".G1_foo", group="foo", prio=1)
+ test(eset, ppath + ".G32768_foo", group="foo", prio=32768)
+ test(eset, ppath + ".G10__", group="_", prio=10)
+ test(eset, ppath + ".H_hostname", hostname="hostname")
+ test(eset, ppath + ".H_fqdn.subdomain.example.com",
+ hostname="fqdn.subdomain.example.com")
+ test(eset, ppath + ".G20_group_with_underscores",
+ group="group_with_underscores", prio=20)
+
+ for bogus in self.bogus_names:
+ fails(eset, bogus)
+ fails(eset, ppath + ".G_group with spaces")
+ fails(eset, ppath + ".G_foo")
+ fails(eset, ppath + ".G_")
+ fails(eset, ppath + ".G20_")
+ fails(eset, ppath + ".H_")
+
+ @patch("__builtin__.open")
+ @patch("Bcfg2.Server.Plugin.InfoXML")
+ def test_update_metadata(self, mock_InfoXML, mock_open):
+ eset = self.get_obj()
+
+ # add info.xml
+ event = Mock()
+ event.filename = "info.xml"
+ eset.update_metadata(event)
+ mock_InfoXML.assert_called_with(os.path.join(eset.path, "info.xml"),
+ True)
+ mock_InfoXML.return_value.HandleEvent.assert_called_with(event)
+ self.assertEqual(eset.infoxml, mock_InfoXML.return_value)
+
+ # modify info.xml
+ mock_InfoXML.reset_mock()
+ eset.update_metadata(event)
+ self.assertFalse(mock_InfoXML.called)
+ eset.infoxml.HandleEvent.assert_called_with(event)
+
+ for fname in [':info', 'info']:
+ event = Mock()
+ event.filename = fname
+
+ idata = ["owner:owner",
+ "group: GROUP",
+ "perms: 775",
+ "important: true",
+ "bogus: line"]
+ mock_open.return_value.readlines.return_value = idata
+ eset.update_metadata(event)
+ expected = default_file_metadata.copy()
+ expected['owner'] = 'owner'
+ expected['group'] = 'GROUP'
+ expected['perms'] = '0775'
+ expected['important'] = 'true'
+ self.assertItemsEqual(eset.metadata,
+ expected)
+
+ def test_reset_metadata(self):
+ eset = self.get_obj()
+
+ # test info.xml
+ event = Mock()
+ event.filename = "info.xml"
+ eset.infoxml = Mock()
+ eset.reset_metadata(event)
+ self.assertIsNone(eset.infoxml)
+
+ for fname in [':info', 'info']:
+ event = Mock()
+ event.filename = fname
+ eset.metadata = Mock()
+ eset.reset_metadata(event)
+ self.assertItemsEqual(eset.metadata, default_file_metadata)
+
+ @patch("Bcfg2.Server.Plugin.bind_info")
+ def test_bind_info_to_entry(self, mock_bind_info):
+ eset = self.get_obj()
+ entry = Mock()
+ metadata = Mock()
+ eset.bind_info_to_entry(entry, metadata)
+ mock_bind_info.assert_called_with(entry, metadata,
+ infoxml=eset.infoxml,
+ default=eset.metadata)
+
+ @patch("Bcfg2.Server.Plugin.%s.best_matching" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.bind_info_to_entry" % test_obj.__name__)
+ def test_bind_entry(self, mock_bind_info, mock_best_matching):
+ eset = self.get_obj()
+ entry = Mock()
+ metadata = Mock()
+ eset.bind_entry(entry, metadata)
+ mock_bind_info.assert_called_with(entry, metadata)
+ mock_best_matching.assert_called_with(metadata)
+ mock_best_matching.return_value.bind_entry.assert_called_with(entry,
+ metadata)
+
-class GroupSpool(TestPlugin, TestGenerator):
+class TestGroupSpool(TestPlugin, TestGenerator):
test_obj = GroupSpool
pass
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py
index f90096210..0bcb65dc4 100644
--- a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py
+++ b/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py
@@ -3,7 +3,7 @@ import sys
import time
import unittest
import lxml.etree
-from mock import Mock, patch
+from mock import Mock, MagicMock, patch
from ....common import *
import Bcfg2.Server
import Bcfg2.Server.Plugin
@@ -75,18 +75,27 @@ class TestProbeData(Bcfg2TestCase):
class TestProbeSet(TestEntrySet):
test_obj = ProbeSet
+ basenames = ["test", "_test", "test-test"]
+ ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"]
+ bogus_names = ["test.py"]
def get_obj(self, path=datastore, fam=None, encoding=None,
- plugin_name="Probes"):
+ plugin_name="Probes", basename=None):
+ # get_obj() accepts the basename argument, accepted by the
+ # parent get_obj() method, and just throws it away, since
+ # ProbeSet uses a regex for the "basename"
if fam is None:
fam = Mock()
- return self.test_obj(path, fam, encoding, plugin_name)
+ rv = self.test_obj(path, fam, encoding, plugin_name)
+ rv.entry_type = MagicMock()
+ return rv
def test__init(self):
fam = Mock()
ps = self.get_obj(fam=fam)
self.assertEqual(ps.plugin_name, "Probes")
fam.AddMonitor.assert_called_with(datastore, ps)
+ TestEntrySet.test__init(self)
def test_HandleEvent(self):
ps = self.get_obj()