summaryrefslogtreecommitdiffstats
path: root/testsuite
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite')
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py178
1 files changed, 118 insertions, 60 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
index 2163aa037..1022bdc5a 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
@@ -1,5 +1,6 @@
import os
import sys
+import copy
import time
import lxml.etree
import Bcfg2.Server
@@ -25,6 +26,47 @@ test_data = dict(a=1, b=[1, 2, 3], c="test",
d=dict(a=1, b=dict(a=1), c=(1, "2", 3)))
+class FakeElement(lxml.etree._Element):
+ getroottree = Mock()
+
+ def __init__(self, el):
+ self._element = el
+
+ def __getattribute__(self, attr):
+ el = lxml.etree._Element.__getattribute__(self,
+ '__dict__')['_element']
+ if attr == "getroottree":
+ return FakeElement.getroottree
+ elif attr == "_element":
+ return el
+ else:
+ return getattr(el, attr)
+
+
+class StoringElement(object):
+ OriginalElement = copy.copy(lxml.etree.Element)
+
+ def __init__(self):
+ self.element = None
+ self.return_value = None
+
+ def __call__(self, *args, **kwargs):
+ self.element = self.OriginalElement(*args, **kwargs)
+ self.return_value = FakeElement(self.element)
+ return self.return_value
+
+
+class StoringSubElement(object):
+ OriginalSubElement = copy.copy(lxml.etree.SubElement)
+
+ def __call__(self, parent, tag, **kwargs):
+ try:
+ return self.OriginalSubElement(parent._element, tag,
+ **kwargs)
+ except AttributeError:
+ return self.OriginalSubElement(parent, tag, **kwargs)
+
+
class FakeList(list):
pass
@@ -288,61 +330,71 @@ text
probes._write_data_db.assert_called_with("test")
self.assertFalse(probes._write_data_xml.called)
- @patch("%s.open" % builtins)
- def test__write_data_xml(self, mock_open):
+ def test__write_data_xml(self):
probes = self.get_probes_object(use_db=False)
probes.probedata = self.get_test_probedata()
probes.cgroups = self.get_test_cgroups()
- probes._write_data_xml(None)
-
- mock_open.assert_called_with(os.path.join(datastore, probes.name,
- "probed.xml"), "w")
- data = lxml.etree.XML(mock_open.return_value.write.call_args[0][0])
- self.assertEqual(len(data.xpath("//Client")), 2)
-
- foodata = data.find("Client[@name='foo.example.com']")
- self.assertIsNotNone(foodata)
- self.assertIsNotNone(foodata.get("timestamp"))
- self.assertEqual(len(foodata.findall("Probe")),
- len(probes.probedata['foo.example.com']))
- self.assertEqual(len(foodata.findall("Group")),
- len(probes.cgroups['foo.example.com']))
- xml = foodata.find("Probe[@name='xml']")
- self.assertIsNotNone(xml)
- self.assertIsNotNone(xml.get("value"))
- xdata = lxml.etree.XML(xml.get("value"))
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- text = foodata.find("Probe[@name='text']")
- self.assertIsNotNone(text)
- self.assertIsNotNone(text.get("value"))
- multiline = foodata.find("Probe[@name='multiline']")
- self.assertIsNotNone(multiline)
- self.assertIsNotNone(multiline.get("value"))
- self.assertGreater(len(multiline.get("value").splitlines()), 1)
-
- bardata = data.find("Client[@name='bar.example.com']")
- self.assertIsNotNone(bardata)
- self.assertIsNotNone(bardata.get("timestamp"))
- self.assertEqual(len(bardata.findall("Probe")),
- len(probes.probedata['bar.example.com']))
- self.assertEqual(len(bardata.findall("Group")),
- len(probes.cgroups['bar.example.com']))
- empty = bardata.find("Probe[@name='empty']")
- self.assertIsNotNone(empty)
- self.assertIsNotNone(empty.get("value"))
- self.assertEqual(empty.get("value"), "")
- if HAS_JSON:
- jdata = bardata.find("Probe[@name='json']")
- self.assertIsNotNone(jdata)
- self.assertIsNotNone(jdata.get("value"))
- self.assertItemsEqual(test_data, json.loads(jdata.get("value")))
- if HAS_YAML:
- ydata = bardata.find("Probe[@name='yaml']")
- self.assertIsNotNone(ydata)
- self.assertIsNotNone(ydata.get("value"))
- self.assertItemsEqual(test_data, yaml.load(ydata.get("value")))
+
+ @patch("lxml.etree.Element")
+ @patch("lxml.etree.SubElement", StoringSubElement())
+ def inner(mock_Element):
+ mock_Element.side_effect = StoringElement()
+ probes._write_data_xml(None)
+
+ top = mock_Element.side_effect.return_value
+ write = top.getroottree.return_value.write
+ self.assertEqual(write.call_args[0][0],
+ os.path.join(datastore, probes.name,
+ "probed.xml"))
+
+ data = top._element
+ foodata = data.find("Client[@name='foo.example.com']")
+ self.assertIsNotNone(foodata)
+ self.assertIsNotNone(foodata.get("timestamp"))
+ self.assertEqual(len(foodata.findall("Probe")),
+ len(probes.probedata['foo.example.com']))
+ self.assertEqual(len(foodata.findall("Group")),
+ len(probes.cgroups['foo.example.com']))
+ xml = foodata.find("Probe[@name='xml']")
+ self.assertIsNotNone(xml)
+ self.assertIsNotNone(xml.get("value"))
+ xdata = lxml.etree.XML(xml.get("value"))
+ self.assertIsNotNone(xdata)
+ self.assertIsNotNone(xdata.find("test"))
+ self.assertEqual(xdata.find("test").get("foo"), "foo")
+ text = foodata.find("Probe[@name='text']")
+ self.assertIsNotNone(text)
+ self.assertIsNotNone(text.get("value"))
+ multiline = foodata.find("Probe[@name='multiline']")
+ self.assertIsNotNone(multiline)
+ self.assertIsNotNone(multiline.get("value"))
+ self.assertGreater(len(multiline.get("value").splitlines()), 1)
+
+ bardata = data.find("Client[@name='bar.example.com']")
+ self.assertIsNotNone(bardata)
+ self.assertIsNotNone(bardata.get("timestamp"))
+ self.assertEqual(len(bardata.findall("Probe")),
+ len(probes.probedata['bar.example.com']))
+ self.assertEqual(len(bardata.findall("Group")),
+ len(probes.cgroups['bar.example.com']))
+ empty = bardata.find("Probe[@name='empty']")
+ self.assertIsNotNone(empty)
+ self.assertIsNotNone(empty.get("value"))
+ self.assertEqual(empty.get("value"), "")
+ if HAS_JSON:
+ jdata = bardata.find("Probe[@name='json']")
+ self.assertIsNotNone(jdata)
+ self.assertIsNotNone(jdata.get("value"))
+ self.assertItemsEqual(test_data,
+ json.loads(jdata.get("value")))
+ if HAS_YAML:
+ ydata = bardata.find("Probe[@name='yaml']")
+ self.assertIsNotNone(ydata)
+ self.assertIsNotNone(ydata.get("value"))
+ self.assertItemsEqual(test_data,
+ yaml.load(ydata.get("value")))
+
+ inner()
@skipUnless(HAS_DJANGO, "Django not found, skipping")
def test__write_data_db(self):
@@ -414,18 +466,24 @@ text
probes._load_data_db.assert_any_call()
self.assertFalse(probes._load_data_xml.called)
- @patch("%s.open" % builtins)
@patch("lxml.etree.parse")
- def test__load_data_xml(self, mock_parse, mock_open):
+ def test__load_data_xml(self, mock_parse):
probes = self.get_probes_object(use_db=False)
- # to get the value for lxml.etree.parse to parse, we call
- # _write_data_xml, mock the open() call, and grab the data
- # that gets "written" to probed.xml
probes.probedata = self.get_test_probedata()
probes.cgroups = self.get_test_cgroups()
- probes._write_data_xml(None)
- xdata = \
- lxml.etree.XML(str(mock_open.return_value.write.call_args[0][0]))
+
+ # to get the value for lxml.etree.parse to parse, we call
+ # _write_data_xml, mock the lxml.etree._ElementTree.write()
+ # call, and grab the data that gets "written" to probed.xml
+ @patch("lxml.etree.Element")
+ @patch("lxml.etree.SubElement", StoringSubElement())
+ def inner(mock_Element):
+ mock_Element.side_effect = StoringElement()
+ probes._write_data_xml(None)
+ top = mock_Element.side_effect.return_value
+ return top._element
+
+ xdata = inner()
mock_parse.return_value = xdata.getroottree()
probes.probedata = dict()
probes.cgroups = dict()