From d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 4 Sep 2012 09:52:57 -0400 Subject: reorganized testsuite to allow tests on stuff outside of src --- .../Testlib/TestServer/TestPlugins/TestProbes.py | 549 +++++++++++++++++++++ 1 file changed, 549 insertions(+) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py new file mode 100644 index 000000000..0a971c245 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -0,0 +1,549 @@ +import os +import sys +import time +import lxml.etree +import Bcfg2.Server +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore +from Bcfg2.Server.Plugins.Probes import * +from TestPlugin import TestEntrySet, TestProbing, TestConnector, \ + TestDatabaseBacked + +# test data for JSON and YAML tests +test_data = dict(a=1, b=[1, 2, 3], c="test") + +class FakeList(list): + pass + + +class TestProbesDB(DBModelTestCase): + if has_django: + models = [ProbesGroupsModel, ProbesDataModel] + + +class TestClientProbeDataSet(Bcfg2TestCase): + def test__init(self): + ds = ClientProbeDataSet() + self.assertLessEqual(ds.timestamp, time.time()) + self.assertIsInstance(ds, dict) + self.assertNotIn("timestamp", ds) + + ds = ClientProbeDataSet(timestamp=123) + self.assertEqual(ds.timestamp, 123) + self.assertNotIn("timestamp", ds) + +class TestProbeData(Bcfg2TestCase): + def test_str(self): + # a value that is not valid XML, JSON, or YAML + val = "'test" + + # test string behavior + data = ProbeData(val) + self.assertIsInstance(data, str) + self.assertEqual(data, val) + # test 1.2.0-1.2.2 broken behavior + self.assertEqual(data.data, val) + # test that formatted data accessors return None + self.assertIsNone(data.xdata) + self.assertIsNone(data.yaml) + self.assertIsNone(data.json) + + def test_xdata(self): + xdata = lxml.etree.Element("test") + lxml.etree.SubElement(xdata, "test2") + data = ProbeData(lxml.etree.tostring(xdata, + xml_declaration=False).decode('UTF-8')) + self.assertIsNotNone(data.xdata) + self.assertIsNotNone(data.xdata.find("test2")) + + @skipUnless(has_json, "JSON libraries not found, skipping JSON tests") + def test_json(self): + jdata = json.dumps(test_data) + data = ProbeData(jdata) + self.assertIsNotNone(data.json) + self.assertItemsEqual(test_data, data.json) + + @skipUnless(has_yaml, "YAML libraries not found, skipping YAML tests") + def test_yaml(self): + jdata = yaml.dump(test_data) + data = ProbeData(jdata) + self.assertIsNotNone(data.yaml) + self.assertItemsEqual(test_data, data.yaml) + + +class TestProbeSet(TestEntrySet): + test_obj = ProbeSet + basenames = ["test", "_test", "test-test"] + ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"] + bogus_names = ["test.py"] + + def get_obj(self, path=datastore, fam=None, encoding=None, + plugin_name="Probes", basename=None): + # get_obj() accepts the basename argument, accepted by the + # parent get_obj() method, and just throws it away, since + # ProbeSet uses a regex for the "basename" + if fam is None: + fam = Mock() + rv = self.test_obj(path, fam, encoding, plugin_name) + rv.entry_type = MagicMock() + return rv + + def test__init(self): + fam = Mock() + ps = self.get_obj(fam=fam) + self.assertEqual(ps.plugin_name, "Probes") + fam.AddMonitor.assert_called_with(datastore, ps) + TestEntrySet.test__init(self) + + def test_HandleEvent(self): + ps = self.get_obj() + ps.handle_event = Mock() + + # test that events on the data store itself are skipped + evt = Mock() + evt.filename = datastore + ps.HandleEvent(evt) + self.assertFalse(ps.handle_event.called) + + # test that events on probed.xml are skipped + evt.reset_mock() + evt.filename = "probed.xml" + ps.HandleEvent(evt) + self.assertFalse(ps.handle_event.called) + + # test that other events are processed appropriately + evt.reset_mock() + evt.filename = "fooprobe" + ps.HandleEvent(evt) + ps.handle_event.assert_called_with(evt) + + @patch("%s.list" % builtins, FakeList) + def test_get_probe_data(self): + ps = self.get_obj() + + # build some fairly complex test data for this. in the end, + # we want the probe data to include only the most specific + # version of a given probe, and by basename only, not full + # (specific) name. We don't fully test the specificity stuff, + # we just check to make sure sort() is called and trust that + # sort() does the right thing on Specificity objects. (I.e., + # trust that Specificity is well-tested. Hah!) We also test + # to make sure the interpreter is determined correctly. + ps.get_matching = Mock() + matching = FakeList() + matching.sort = Mock() + + p1 = Mock() + p1.specific = Bcfg2.Server.Plugin.Specificity(group=True, prio=10) + p1.name = "fooprobe.G10_foogroup" + p1.data = """#!/bin/bash +group-specific""" + matching.append(p1) + + p2 = Mock() + p2.specific = Bcfg2.Server.Plugin.Specificity(all=True) + p2.name = "fooprobe" + p2.data = "#!/bin/bash" + matching.append(p2) + + p3 = Mock() + p3.specific = Bcfg2.Server.Plugin.Specificity(all=True) + p3.name = "barprobe" + p3.data = "#! /usr/bin/env python" + matching.append(p3) + + p4 = Mock() + p4.specific = Bcfg2.Server.Plugin.Specificity(all=True) + p4.name = "bazprobe" + p4.data = "" + matching.append(p4) + + ps.get_matching.return_value = matching + + metadata = Mock() + pdata = ps.get_probe_data(metadata) + ps.get_matching.assert_called_with(metadata) + # we can't create a matching operator.attrgetter object, and I + # don't feel the need to mock that out -- this is a good + # enough check + self.assertTrue(matching.sort.called) + + self.assertEqual(len(pdata), 3, + "Found: %s" % [p.get("name") for p in pdata]) + for probe in pdata: + if probe.get("name") == "fooprobe": + self.assertIn("group-specific", probe.text) + self.assertEqual(probe.get("interpreter"), "/bin/bash") + elif probe.get("name") == "barprobe": + self.assertEqual(probe.get("interpreter"), + "/usr/bin/env python") + elif probe.get("name") == "bazprobe": + self.assertIsNotNone(probe.get("interpreter")) + else: + assert False, "Strange probe found in get_probe_data() return" + + +class TestProbes(TestProbing, TestConnector, TestDatabaseBacked): + test_obj = Probes + + def get_test_probedata(self): + test_xdata = lxml.etree.Element("test") + lxml.etree.SubElement(test_xdata, "test", foo="foo") + rv = dict() + rv["foo.example.com"] = ClientProbeDataSet(timestamp=time.time()) + rv["foo.example.com"]["xml"] = \ + ProbeData(lxml.etree.tostring(test_xdata, + xml_declaration=False).decode('UTF-8')) + rv["foo.example.com"]["text"] = ProbeData("freeform text") + rv["foo.example.com"]["multiline"] = ProbeData("""multiple +lines +of +freeform +text +""") + rv["bar.example.com"] = ClientProbeDataSet(timestamp=time.time()) + rv["bar.example.com"]["empty"] = ProbeData("") + if has_yaml: + rv["bar.example.com"]["yaml"] = ProbeData(yaml.dump(test_data)) + if has_json: + rv["bar.example.com"]["json"] = ProbeData(json.dumps(test_data)) + return rv + + def get_test_cgroups(self): + return {"foo.example.com": ["group", "group with spaces", + "group-with-dashes"], + "bar.example.com": []} + + def get_probes_object(self, use_db=False, load_data=None): + core = Mock() + core.setup.cfp.getboolean = Mock() + core.setup.cfp.getboolean.return_value = use_db + if load_data is None: + load_data = MagicMock() + # we have to patch load_data() in a funny way because + # different versions of Mock have different scopes for + # patching. in some versions, a patch applied to + # get_probes_object() would only apply to that function, while + # in others it would also apply to the calling function (e.g., + # test__init(), which relies on being able to check the calls + # of load_data(), and thus on load_data() being consistently + # mocked) + @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", new=load_data) + def inner(): + return Probes(core, datastore) + + return inner() + + def test__init(self): + mock_load_data = Mock() + probes = self.get_probes_object(load_data=mock_load_data) + probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore, + probes.name), + probes.probes) + mock_load_data.assert_any_call() + self.assertEqual(probes.probedata, ClientProbeDataSet()) + self.assertEqual(probes.cgroups, dict()) + + @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", Mock()) + def test__use_db(self): + probes = self.get_probes_object() + self.assertFalse(probes._use_db) + probes.core.setup.cfp.getboolean.assert_called_with("probes", + "use_database", + default=False) + + @skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock()) + def test_write_data_xml(self): + probes = self.get_probes_object(use_db=False) + probes.write_data("test") + probes._write_data_xml.assert_called_with("test") + self.assertFalse(probes._write_data_db.called) + + @skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock()) + def test_write_data_db(self): + probes = self.get_probes_object(use_db=True) + probes.write_data("test") + probes._write_data_db.assert_called_with("test") + self.assertFalse(probes._write_data_xml.called) + + @patch("%s.open" % builtins) + def test__write_data_xml(self, mock_open): + probes = self.get_probes_object(use_db=False) + probes.probedata = self.get_test_probedata() + probes.cgroups = self.get_test_cgroups() + probes._write_data_xml(None) + + mock_open.assert_called_with(os.path.join(datastore, probes.name, + "probed.xml"), "w") + data = lxml.etree.XML(mock_open.return_value.write.call_args[0][0]) + self.assertEqual(len(data.xpath("//Client")), 2) + + foodata = data.find("Client[@name='foo.example.com']") + self.assertIsNotNone(foodata) + self.assertIsNotNone(foodata.get("timestamp")) + self.assertEqual(len(foodata.findall("Probe")), + len(probes.probedata['foo.example.com'])) + self.assertEqual(len(foodata.findall("Group")), + len(probes.cgroups['foo.example.com'])) + xml = foodata.find("Probe[@name='xml']") + self.assertIsNotNone(xml) + self.assertIsNotNone(xml.get("value")) + xdata = lxml.etree.XML(xml.get("value")) + self.assertIsNotNone(xdata) + self.assertIsNotNone(xdata.find("test")) + self.assertEqual(xdata.find("test").get("foo"), "foo") + text = foodata.find("Probe[@name='text']") + self.assertIsNotNone(text) + self.assertIsNotNone(text.get("value")) + multiline = foodata.find("Probe[@name='multiline']") + self.assertIsNotNone(multiline) + self.assertIsNotNone(multiline.get("value")) + self.assertGreater(len(multiline.get("value").splitlines()), 1) + + bardata = data.find("Client[@name='bar.example.com']") + self.assertIsNotNone(bardata) + self.assertIsNotNone(bardata.get("timestamp")) + self.assertEqual(len(bardata.findall("Probe")), + len(probes.probedata['bar.example.com'])) + self.assertEqual(len(bardata.findall("Group")), + len(probes.cgroups['bar.example.com'])) + empty = bardata.find("Probe[@name='empty']") + self.assertIsNotNone(empty) + self.assertIsNotNone(empty.get("value")) + self.assertEqual(empty.get("value"), "") + if has_yaml: + ydata = bardata.find("Probe[@name='yaml']") + self.assertIsNotNone(ydata) + self.assertIsNotNone(ydata.get("value")) + self.assertItemsEqual(test_data, yaml.load(ydata.get("value"))) + if has_json: + jdata = bardata.find("Probe[@name='json']") + self.assertIsNotNone(jdata) + self.assertIsNotNone(jdata.get("value")) + self.assertItemsEqual(test_data, json.loads(jdata.get("value"))) + + @skipUnless(has_django, "Django not found, skipping") + def test__write_data_db(self): + syncdb(TestProbesDB) + probes = self.get_probes_object(use_db=True) + probes.probedata = self.get_test_probedata() + probes.cgroups = self.get_test_cgroups() + + for cname in ["foo.example.com", "bar.example.com"]: + client = Mock() + client.hostname = cname + probes._write_data_db(client) + + pdata = ProbesDataModel.objects.filter(hostname=cname).all() + self.assertEqual(len(pdata), len(probes.probedata[cname])) + + for probe in pdata: + self.assertEqual(probe.hostname, client.hostname) + self.assertIsNotNone(probe.data) + if probe.probe == "xml": + xdata = lxml.etree.XML(probe.data) + self.assertIsNotNone(xdata) + self.assertIsNotNone(xdata.find("test")) + self.assertEqual(xdata.find("test").get("foo"), "foo") + elif probe.probe == "text": + pass + elif probe.probe == "multiline": + self.assertGreater(len(probe.data.splitlines()), 1) + elif probe.probe == "empty": + self.assertEqual(probe.data, "") + elif probe.probe == "yaml": + self.assertItemsEqual(test_data, yaml.load(probe.data)) + elif probe.probe == "json": + self.assertItemsEqual(test_data, json.loads(probe.data)) + else: + assert False, "Strange probe found in _write_data_db data" + + pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() + self.assertEqual(len(pgroups), len(probes.cgroups[cname])) + + # test that old probe data is removed properly + cname = 'foo.example.com' + del probes.probedata[cname]['text'] + probes.cgroups[cname].pop() + client = Mock() + client.hostname = cname + probes._write_data_db(client) + + pdata = ProbesDataModel.objects.filter(hostname=cname).all() + self.assertEqual(len(pdata), len(probes.probedata[cname])) + pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() + self.assertEqual(len(pgroups), len(probes.cgroups[cname])) + + @skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock()) + def test_load_data_xml(self): + probes = self.get_probes_object(use_db=False) + probes.load_data() + probes._load_data_xml.assert_any_call() + self.assertFalse(probes._load_data_db.called) + + @skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock()) + def test_load_data_db(self): + probes = self.get_probes_object(use_db=True) + probes.load_data() + probes._load_data_db.assert_any_call() + self.assertFalse(probes._load_data_xml.called) + + @patch("%s.open" % builtins) + @patch("lxml.etree.parse") + def test__load_data_xml(self, mock_parse, mock_open): + probes = self.get_probes_object(use_db=False) + # to get the value for lxml.etree.parse to parse, we call + # _write_data_xml, mock the open() call, and grab the data + # that gets "written" to probed.xml + probes.probedata = self.get_test_probedata() + probes.cgroups = self.get_test_cgroups() + probes._write_data_xml(None) + xdata = \ + lxml.etree.XML(str(mock_open.return_value.write.call_args[0][0])) + mock_parse.return_value = xdata.getroottree() + probes.probedata = dict() + probes.cgroups = dict() + + probes._load_data_xml() + mock_parse.assert_called_with(os.path.join(datastore, probes.name, + 'probed.xml'), + parser=Bcfg2.Server.XMLParser) + self.assertItemsEqual(probes.probedata, self.get_test_probedata()) + self.assertItemsEqual(probes.cgroups, self.get_test_cgroups()) + + @skipUnless(has_django, "Django not found, skipping") + def test__load_data_db(self): + syncdb(TestProbesDB) + probes = self.get_probes_object(use_db=True) + probes.probedata = self.get_test_probedata() + probes.cgroups = self.get_test_cgroups() + for cname in probes.probedata.keys(): + client = Mock() + client.hostname = cname + probes._write_data_db(client) + + probes.probedata = dict() + probes.cgroups = dict() + probes._load_data_db() + self.assertItemsEqual(probes.probedata, self.get_test_probedata()) + # the db backend does not store groups at all if a client has + # no groups set, so we can't just use assertItemsEqual here, + # because loading saved data may _not_ result in the original + # data if some clients had no groups set. + test_cgroups = self.get_test_cgroups() + for cname, groups in test_cgroups.items(): + if cname in probes.cgroups: + self.assertEqual(groups, probes.cgroups[cname]) + else: + self.assertEqual(groups, []) + + @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data") + def test_GetProbes(self, mock_get_probe_data): + TestProbing.test_GetProbes(self) + + probes = self.get_probes_object() + metadata = Mock() + probes.GetProbes(metadata) + mock_get_probe_data.assert_called_with(metadata) + + @patch("Bcfg2.Server.Plugins.Probes.Probes.write_data") + @patch("Bcfg2.Server.Plugins.Probes.Probes.ReceiveDataItem") + def test_ReceiveData(self, mock_ReceiveDataItem, mock_write_data): + TestProbing.test_ReceiveData(self) + + # we use a simple (read: bogus) datalist here to make this + # easy to test + datalist = ["a", "b", "c"] + + probes = self.get_probes_object() + client = Mock() + client.hostname = "foo.example.com" + probes.ReceiveData(client, datalist) + + self.assertItemsEqual(mock_ReceiveDataItem.call_args_list, + [call(client, "a"), call(client, "b"), + call(client, "c")]) + mock_write_data.assert_called_with(client) + + def test_ReceiveDataItem(self): + probes = self.get_probes_object() + for cname, cdata in self.get_test_probedata().items(): + client = Mock() + client.hostname = cname + for pname, pdata in cdata.items(): + dataitem = lxml.etree.Element("Probe", name=pname) + if pname == "text": + # add some groups to the plaintext test to test + # group parsing + data = [pdata] + for group in self.get_test_cgroups()[cname]: + data.append("group:%s" % group) + dataitem.text = "\n".join(data) + else: + dataitem.text = str(pdata) + + probes.ReceiveDataItem(client, dataitem) + + self.assertIn(client.hostname, probes.probedata) + self.assertIn(pname, probes.probedata[cname]) + self.assertEqual(pdata, probes.probedata[cname][pname]) + self.assertIn(client.hostname, probes.cgroups) + self.assertEqual(probes.cgroups[cname], + self.get_test_cgroups()[cname]) + + def test_get_additional_groups(self): + TestConnector.test_get_additional_groups(self) + + probes = self.get_probes_object() + test_cgroups = self.get_test_cgroups() + probes.cgroups = self.get_test_cgroups() + for cname in test_cgroups.keys(): + metadata = Mock() + metadata.hostname = cname + self.assertEqual(test_cgroups[cname], + probes.get_additional_groups(metadata)) + # test a non-existent client + metadata = Mock() + metadata.hostname = "nonexistent" + self.assertEqual(probes.get_additional_groups(metadata), + list()) + + def test_get_additional_data(self): + TestConnector.test_get_additional_data(self) + + probes = self.get_probes_object() + test_probedata = self.get_test_probedata() + probes.probedata = self.get_test_probedata() + for cname in test_probedata.keys(): + metadata = Mock() + metadata.hostname = cname + self.assertEqual(test_probedata[cname], + probes.get_additional_data(metadata)) + # test a non-existent client + metadata = Mock() + metadata.hostname = "nonexistent" + self.assertEqual(probes.get_additional_data(metadata), + ClientProbeDataSet()) + + -- cgit v1.2.3-1-g7c22