summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-08-13 14:27:27 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-08-13 14:27:27 -0400
commitffde9c4783902d6904e41c3d5aa620d3f47b1117 (patch)
treee308f25e48537400d0e7cea7ead3526647356241
parenta9a36c9c6802ce70403bfb8b20b076a17d02f99a (diff)
downloadbcfg2-ffde9c4783902d6904e41c3d5aa620d3f47b1117.tar.gz
bcfg2-ffde9c4783902d6904e41c3d5aa620d3f47b1117.tar.bz2
bcfg2-ffde9c4783902d6904e41c3d5aa620d3f47b1117.zip
added unit tests for INode, InfoNode
-rw-r--r--src/lib/Bcfg2/Server/Plugin.py30
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Deps.py21
-rw-r--r--testsuite/Testlib/TestServer/TestPlugin.py343
3 files changed, 325 insertions, 69 deletions
diff --git a/src/lib/Bcfg2/Server/Plugin.py b/src/lib/Bcfg2/Server/Plugin.py
index 8ba19632d..cfde25035 100644
--- a/src/lib/Bcfg2/Server/Plugin.py
+++ b/src/lib/Bcfg2/Server/Plugin.py
@@ -762,26 +762,28 @@ class StructFile(XMLFileBacked):
return rv
-class INode:
+class INode(object):
"""
LNodes provide lists of things available at a particular
group intersection.
"""
- raw = {'Client': "lambda m, e:'%(name)s' == m.hostname and predicate(m, e)",
- 'Group': "lambda m, e:'%(name)s' in m.groups and predicate(m, e)"}
- nraw = {'Client': "lambda m, e:'%(name)s' != m.hostname and predicate(m, e)",
- 'Group': "lambda m, e:'%(name)s' not in m.groups and predicate(m, e)"}
+ raw = dict(
+ Client="lambda m, e:'%(name)s' == m.hostname and predicate(m, e)",
+ Group="lambda m, e:'%(name)s' in m.groups and predicate(m, e)")
+ nraw = dict(
+ Client="lambda m, e:'%(name)s' != m.hostname and predicate(m, e)",
+ Group="lambda m, e:'%(name)s' not in m.groups and predicate(m, e)")
containers = ['Group', 'Client']
ignore = []
def __init__(self, data, idict, parent=None):
self.data = data
self.contents = {}
- if parent == None:
- self.predicate = lambda m, d: True
+ if parent is None:
+ self.predicate = lambda m, e: True
else:
predicate = parent.predicate
- if data.get('negate', 'false') in ['true', 'True']:
+ if data.get('negate', 'false').lower() == 'true':
psrc = self.nraw
else:
psrc = self.raw
@@ -790,20 +792,23 @@ class INode:
{'name': data.get('name')},
{'predicate': predicate})
else:
- raise Exception
- mytype = self.__class__
+ raise PluginExecutionError("Unknown tag: %s" % data.tag)
self.children = []
+ self._load_children(data, idict)
+
+ def _load_children(self, data, idict):
for item in data.getchildren():
if item.tag in self.ignore:
continue
elif item.tag in self.containers:
- self.children.append(mytype(item, idict, self))
+ self.children.append(self.__class__(item, idict, self))
else:
try:
self.contents[item.tag][item.get('name')] = \
dict(item.attrib)
except KeyError:
- self.contents[item.tag] = {item.get('name'): dict(item.attrib)}
+ self.contents[item.tag] = \
+ {item.get('name'): dict(item.attrib)}
if item.text:
self.contents[item.tag][item.get('name')]['__text__'] = \
item.text
@@ -984,7 +989,6 @@ class SpecificityError(Exception):
class Specificity:
-
def __init__(self, all=False, group=False, hostname=False, prio=0, delta=False):
self.hostname = hostname
self.all = all
diff --git a/src/lib/Bcfg2/Server/Plugins/Deps.py b/src/lib/Bcfg2/Server/Plugins/Deps.py
index 9b848baae..d3a1ee871 100644
--- a/src/lib/Bcfg2/Server/Plugins/Deps.py
+++ b/src/lib/Bcfg2/Server/Plugins/Deps.py
@@ -7,27 +7,10 @@ import Bcfg2.Server.Plugin
class DNode(Bcfg2.Server.Plugin.INode):
"""DNode provides supports for single predicate types for dependencies."""
- raw = {'Group': "lambda m, e:'%(name)s' in m.groups and predicate(m, e)"}
- containers = ['Group']
-
- def __init__(self, data, idict, parent=None):
- self.data = data
- self.contents = {}
- if parent == None:
- self.predicate = lambda x, d: True
- else:
- predicate = parent.predicate
- if data.tag in list(self.raw.keys()):
- self.predicate = eval(self.raw[data.tag] %
- {'name': data.get('name')},
- {'predicate': predicate})
- else:
- raise Exception
- mytype = self.__class__
- self.children = []
+ def _load_children(self, data, idict):
for item in data.getchildren():
if item.tag in self.containers:
- self.children.append(mytype(item, idict, self))
+ self.children.append(self.__class__(item, idict, self))
else:
data = [(child.tag, child.get('name'))
for child in item.getchildren()]
diff --git a/testsuite/Testlib/TestServer/TestPlugin.py b/testsuite/Testlib/TestServer/TestPlugin.py
index a23ef2473..0ce32eb91 100644
--- a/testsuite/Testlib/TestServer/TestPlugin.py
+++ b/testsuite/Testlib/TestServer/TestPlugin.py
@@ -15,11 +15,29 @@ def call(*args, **kwargs):
calls """
return (args, kwargs)
+class Bcfg2TestCase(unittest.TestCase):
+ def assertXMLEqual(self, el1, el2, msg=None):
+ self.assertEqual(el1.tag, el2.tag, msg=msg)
+ self.assertEqual(el1.text, el2.text, msg=msg)
+ self.assertItemsEqual(el1.attrib, el2.attrib, msg=msg)
+ self.assertEqual(len(el1.getchildren()),
+ len(el2.getchildren()))
+ for child1 in el1.getchildren():
+ cname = child1.get("name")
+ self.assertIsNotNone(cname,
+ msg="Element %s has no 'name' attribute" %
+ child1.tag)
+ children2 = el2.xpath("*[@name='%s']" % cname)
+ self.assertEqual(len(children2), 1,
+ msg="More than one element named %s" % cname)
+ self.assertXMLEqual(child1, children2[0], msg=msg)
+
+
class FakeElementTree(lxml.etree._ElementTree):
xinclude = Mock()
-class TestFunctions(unittest.TestCase):
+class TestFunctions(Bcfg2TestCase):
def test_bind_info(self):
entry = lxml.etree.Element("Path", name="/test")
metadata = Mock()
@@ -62,17 +80,17 @@ class TestFunctions(unittest.TestCase):
name="/test"))
-class TestPluginInitError(unittest.TestCase):
+class TestPluginInitError(Bcfg2TestCase):
""" placeholder for future tests """
pass
-class TestPluginExecutionError(unittest.TestCase):
+class TestPluginExecutionError(Bcfg2TestCase):
""" placeholder for future tests """
pass
-class TestDebuggable(unittest.TestCase):
+class TestDebuggable(Bcfg2TestCase):
def test__init(self):
d = Debuggable()
self.assertIsInstance(d.logger, logging.Logger)
@@ -150,26 +168,26 @@ class TestDatabaseBacked(TestPlugin):
Bcfg2.Server.Plugin.has_django = True
-class TestPluginDatabaseModel(unittest.TestCase):
+class TestPluginDatabaseModel(Bcfg2TestCase):
""" placeholder for future tests """
pass
-class TestGenerator(unittest.TestCase):
+class TestGenerator(Bcfg2TestCase):
def test_HandleEntry(self):
g = Generator()
self.assertRaises(NotImplementedError,
g.HandleEntry, None, None)
-class TestStructure(unittest.TestCase):
+class TestStructure(Bcfg2TestCase):
def test_BuildStructures(self):
s = Structure()
self.assertRaises(NotImplementedError,
s.BuildStructures, None)
-class TestMetadata(unittest.TestCase):
+class TestMetadata(Bcfg2TestCase):
def test_get_initial_metadata(self):
m = Metadata()
self.assertRaises(NotImplementedError,
@@ -186,12 +204,12 @@ class TestMetadata(unittest.TestCase):
m.merge_additional_groups, None, None)
-class TestConnector(unittest.TestCase):
+class TestConnector(Bcfg2TestCase):
""" placeholder """
pass
-class TestProbing(unittest.TestCase):
+class TestProbing(Bcfg2TestCase):
""" placeholder """
pass
@@ -368,14 +386,14 @@ class TestThreadedStatistics(TestStatistics):
ts.process_statistics(*self.data[0])
-class TestPullSource(unittest.TestCase):
+class TestPullSource(Bcfg2TestCase):
def test_GetCurrentEntry(self):
ps = PullSource()
self.assertRaises(NotImplementedError,
ps.GetCurrentEntry, None, None, None)
-class TestPullTarget(unittest.TestCase):
+class TestPullTarget(Bcfg2TestCase):
def test_AcceptChoices(self):
pt = PullTarget()
self.assertRaises(NotImplementedError,
@@ -387,41 +405,41 @@ class TestPullTarget(unittest.TestCase):
pt.AcceptPullData, None, None, None)
-class TestDecision(unittest.TestCase):
+class TestDecision(Bcfg2TestCase):
""" placeholder for future tests """
pass
-class TestValidationError(unittest.TestCase):
+class TestValidationError(Bcfg2TestCase):
""" placeholder for future tests """
pass
-class TestStructureValidator(unittest.TestCase):
+class TestStructureValidator(Bcfg2TestCase):
def test_validate_structures(self):
sv = StructureValidator()
self.assertRaises(NotImplementedError,
sv.validate_structures, None, None)
-class TestGoalValidator(unittest.TestCase):
+class TestGoalValidator(Bcfg2TestCase):
def test_validate_goals(self):
gv = GoalValidator()
self.assertRaises(NotImplementedError,
gv.validate_goals, None, None)
-class TestVersion(unittest.TestCase):
+class TestVersion(Bcfg2TestCase):
""" placeholder for future tests """
pass
-class TestClientRunHooks(unittest.TestCase):
+class TestClientRunHooks(Bcfg2TestCase):
""" placeholder for future tests """
pass
-class TestFileBacked(unittest.TestCase):
+class TestFileBacked(Bcfg2TestCase):
@patch("__builtin__.open")
@patch("Bcfg2.Server.Plugin.FileBacked.Index")
def test_HandleEvent(self, mock_Index, mock_open):
@@ -449,7 +467,7 @@ class TestFileBacked(unittest.TestCase):
self.assertFalse(mock_Index.called)
-class TestDirectoryBacked(unittest.TestCase):
+class TestDirectoryBacked(Bcfg2TestCase):
testpaths = {1: '',
2: '/foo',
3: '/foo/bar',
@@ -823,6 +841,7 @@ class TestXMLFileBacked(TestFileBacked):
fam.AddMonitor.assert_called_with("/test/test4.xml", xfb)
self.assertIn("/test/test4.xml", xfb.extras)
+
class TestStructFile(TestXMLFileBacked):
def _get_test_data(self):
""" build a very complex set of test data """
@@ -893,22 +912,6 @@ class TestStructFile(TestXMLFileBacked):
children[4] = standalone
return (xdata, groups, subgroups, children, subchildren, standalone)
- def assertXMLEqual(self, el1, el2, msg=None):
- self.assertEqual(el1.tag, el2.tag, msg=msg)
- self.assertEqual(el1.text, el2.text, msg=msg)
- self.assertItemsEqual(el1.attrib, el2.attrib, msg=msg)
- self.assertEqual(len(el1.getchildren()),
- len(el2.getchildren()))
- for child1 in el1.getchildren():
- cname = child1.get("name")
- self.assertIsNotNone(cname,
- msg="Element %s has no 'name' attribute" %
- child1.tag)
- children2 = el2.xpath("*[@name='%s']" % cname)
- self.assertEqual(len(children2), 1,
- msg="More than one element named %s" % cname)
- self.assertXMLEqual(child1, children2[0], msg=msg)
-
def test_include_element(self):
sf = StructFile("/test/test.xml")
metadata = Mock()
@@ -1045,6 +1048,272 @@ class TestStructFile(TestXMLFileBacked):
self.assertXMLEqual(xactual, xexpected)
+# INode.__init__ and INode._load_children() call each other
+# recursively, which makes this class kind of a nightmare to test. we
+# have to first patch INode._load_children so that we can create an
+# INode object with no children loaded, then we unpatch
+# INode._load_children and patch INode.__init__ so that child objects
+# aren't actually created. but in order to test things atomically, we
+# do this umpteen times in order to test with different data. we
+# write our own context manager to make this a little easier. fun fun
+# fun.
+class patch_inode(object):
+ def __init__(self, test_obj, data, idict):
+ self.test_obj = test_obj
+ self.data = data
+ self.idict = idict
+ self.patch_init = None
+ self.inode = None
+
+ def __enter__(self):
+ with patch("Bcfg2.Server.Plugin.%s._load_children" %
+ self.test_obj.__name__):
+ self.inode = self.test_obj(self.data, self.idict)
+ self.patch_init = patch("Bcfg2.Server.Plugin.%s.__init__" %
+ self.inode.__class__.__name__,
+ new=Mock(return_value=None))
+ self.patch_init.start()
+ self.inode._load_children(self.data, self.idict)
+ return (self.inode, self.patch_init.new)
+
+ def __exit__(self, type, value, traceback):
+ self.patch_init.stop()
+ del self.patch_init
+ del self.inode
+
+
+class TestINode(Bcfg2TestCase):
+ test_obj = INode
+
+ def test_raw_predicates(self):
+ metadata = Mock()
+ metadata.groups = ["group1", "group2"]
+ metadata.hostname = "foo.example.com"
+ entry = None
+
+ parent_predicate = lambda m, e: True
+ pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+ pred = eval(self.test_obj.raw['Client'] % dict(name="bar.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
+ pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+ pred = eval(self.test_obj.raw['Group'] % dict(name="group3"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
+ pred = eval(self.test_obj.nraw['Client'] % dict(name="foo.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+
+ pred = eval(self.test_obj.nraw['Group'] % dict(name="group1"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+
+ parent_predicate = lambda m, e: False
+ pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
+ self.assertItemsEqual(self.test_obj.containers,
+ self.test_obj.raw.keys())
+ self.assertItemsEqual(self.test_obj.containers,
+ self.test_obj.nraw.keys())
+
+ @patch("Bcfg2.Server.Plugin.INode._load_children")
+ def test__init(self, mock_load_children):
+ data = lxml.etree.Element("Bogus")
+ # called with no parent, should not raise an exception; it's a
+ # top-level tag in an XML file and so is not expected to be a
+ # proper predicate
+ INode(data, dict())
+ self.assertRaises(PluginExecutionError,
+ INode, data, dict(), Mock())
+
+ data = lxml.etree.Element("Client", name="foo.example.com")
+ idict = dict()
+ inode = INode(data, idict)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertTrue(inode.predicate(Mock(), Mock()))
+
+ parent = Mock()
+ parent.predicate = lambda m, e: True
+ metadata = Mock()
+ metadata.groups = ["group1", "group2"]
+ metadata.hostname = "foo.example.com"
+ entry = None
+
+ # test setting predicate with parent object
+ mock_load_children.reset_mock()
+ inode = INode(data, idict, parent=parent)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertTrue(inode.predicate(metadata, entry))
+
+ # test negation
+ data = lxml.etree.Element("Client", name="foo.example.com",
+ negate="true")
+ mock_load_children.reset_mock()
+ inode = INode(data, idict, parent=parent)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertFalse(inode.predicate(metadata, entry))
+
+ # test failure of a matching predicate (client names do not match)
+ data = lxml.etree.Element("Client", name="foo.example.com")
+ metadata.hostname = "bar.example.com"
+ mock_load_children.reset_mock()
+ inode = INode(data, idict, parent=parent)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertFalse(inode.predicate(metadata, entry))
+
+ # test that parent predicate is AND'ed in correctly
+ parent.predicate = lambda m, e: False
+ metadata.hostname = "foo.example.com"
+ mock_load_children.reset_mock()
+ inode = INode(data, idict, parent=parent)
+ mock_load_children.assert_called_with(data, idict)
+ self.assertFalse(inode.predicate(metadata, entry))
+
+ def test_load_children(self):
+ data = lxml.etree.Element("Parent")
+ child1 = lxml.etree.SubElement(data, "Client", name="foo.example.com")
+ child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
+ idict = dict()
+ with patch_inode(self.test_obj, data, idict) as (inode, mock_init):
+ self.assertItemsEqual(mock_init.call_args_list,
+ [call(child1, idict, inode),
+ call(child2, idict, inode)])
+ self.assertEqual(idict, dict())
+ self.assertItemsEqual(inode.contents, dict())
+
+ data = lxml.etree.Element("Parent")
+ child1 = lxml.etree.SubElement(data, "Data", name="child1",
+ attr="some attr")
+ child1.text = "text"
+ subchild1 = lxml.etree.SubElement(child1, "SubChild", name="subchild")
+ child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
+ idict = dict()
+ with patch_inode(self.test_obj, data, idict) as (inode, mock_init):
+ mock_init.assert_called_with(child2, idict, inode)
+ tag = child1.tag
+ name = child1.get("name")
+ self.assertEqual(idict, dict(Data=[name]))
+ self.assertIn(tag, inode.contents)
+ self.assertIn(name, inode.contents[tag])
+ self.assertItemsEqual(inode.contents[tag][name],
+ dict(name=name,
+ attr=child1.get('attr'),
+ __text__=child1.text,
+ __children__=[subchild1]))
+
+ # test ignore. no ignore is set on INode by default, so we
+ # have to set one
+ old_ignore = copy.copy(self.test_obj.ignore)
+ self.test_obj.ignore.append("Data")
+ idict = dict()
+ with patch_inode(self.test_obj, data, idict) as (inode, mock_init):
+ mock_init.assert_called_with(child2, idict, inode)
+ self.assertEqual(idict, dict())
+ self.assertItemsEqual(inode.contents, dict())
+ self.test_obj.ignore = old_ignore
+
+ def test_Match(self):
+ idata = lxml.etree.Element("Parent")
+ contents = lxml.etree.SubElement(idata, "Data", name="contents",
+ attr="some attr")
+ child = lxml.etree.SubElement(idata, "Group", name="bar", negate="true")
+
+ inode = INode(idata, dict())
+ inode.predicate = Mock()
+ inode.predicate.return_value = False
+
+ metadata = Mock()
+ metadata.groups = ['foo']
+ data = dict()
+ entry = child
+
+ inode.Match(metadata, data, entry=child)
+ self.assertEqual(data, dict())
+ inode.predicate.assert_called_with(metadata, child)
+
+ inode.predicate.reset_mock()
+ inode.Match(metadata, data)
+ self.assertEqual(data, dict())
+ # can't easily compare XML args without the original
+ # object, and we're testing that Match() works without an
+ # XML object passed in, so...
+ self.assertEqual(inode.predicate.call_args[0][0],
+ metadata)
+ self.assertXMLEqual(inode.predicate.call_args[0][1],
+ lxml.etree.Element("None"))
+
+ inode.predicate.reset_mock()
+ inode.predicate.return_value = True
+ inode.Match(metadata, data, entry=child)
+ self.assertEqual(data, inode.contents)
+ inode.predicate.assert_called_with(metadata, child)
+
+
+class TestInfoNode(TestINode):
+ __test__ = True
+ test_obj = InfoNode
+
+ def test_raw_predicates(self):
+ TestINode.test_raw_predicates(self)
+ metadata = Mock()
+ entry = lxml.etree.Element("Path", name="/tmp/foo",
+ realname="/tmp/bar")
+
+ parent_predicate = lambda m, d: True
+ pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+ pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+ pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bogus"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
+ pred = eval(self.test_obj.nraw['Path'] % dict(name="/tmp/foo"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bar"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
+ dict(predicate=parent_predicate))
+ self.assertTrue(pred(metadata, entry))
+
+ parent_predicate = lambda m, d: False
+ pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+ pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
+ dict(predicate=parent_predicate))
+ self.assertFalse(pred(metadata, entry))
+
-class TestEntrySet(unittest.TestCase):
+class TestEntrySet(Bcfg2TestCase):
pass