From d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 4 Sep 2012 09:52:57 -0400 Subject: reorganized testsuite to allow tests on stuff outside of src --- .../Testlib/TestServer/TestPlugins/TestMetadata.py | 1470 ++++++++++++++++++++ .../Testlib/TestServer/TestPlugins/TestProbes.py | 549 ++++++++ .../TestServer/TestPlugins/TestSEModules.py | 109 ++ .../TestServer/TestPlugins/TestTemplateHelper.py | 120 ++ .../Testlib/TestServer/TestPlugins/__init__.py | 0 5 files changed, 2248 insertions(+) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/__init__.py (limited to 'testsuite/Testsrc/Testlib/TestServer/TestPlugins') diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py new file mode 100644 index 000000000..2ff0af78e --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -0,0 +1,1470 @@ +import os +import sys +import copy +import time +import socket +import lxml.etree +import Bcfg2.Server +import Bcfg2.Server.Plugin +from Bcfg2.Server.Plugins.Metadata import * +from mock import Mock, patch + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore +from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \ + TestStatistics, TestDatabaseBacked + +def get_clients_test_tree(): + return lxml.etree.XML(''' + + + + + + + + + + + + + + + + +''').getroottree() + +def get_groups_test_tree(): + return lxml.etree.XML(''' + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +''').getroottree() + + +def get_metadata_object(core=None, watch_clients=False, use_db=False): + if core is None: + core = Mock() + core.setup.cfp.getboolean = Mock(return_value=use_db) + return Metadata(core, datastore, watch_clients=watch_clients) + + +class TestMetadataDB(DBModelTestCase): + if has_django: + models = [MetadataClientModel] + + +if has_django or can_skip: + class TestClientVersions(Bcfg2TestCase): + test_clients = dict(client1="1.2.0", + client2="1.2.2", + client3="1.3.0pre1", + client4="1.1.0", + client5=None, + client6=None) + + @skipUnless(has_django, "Django not found") + def setUp(self): + syncdb(TestMetadataDB) + for client, version in self.test_clients.items(): + MetadataClientModel(hostname=client, version=version).save() + + def test__contains(self): + v = ClientVersions() + self.assertIn("client1", v) + self.assertIn("client5", v) + self.assertNotIn("client__contains", v) + + def test_keys(self): + v = ClientVersions() + self.assertItemsEqual(self.test_clients.keys(), v.keys()) + + def test__setitem(self): + v = ClientVersions() + + # test setting version of existing client + v["client1"] = "1.2.3" + self.assertIn("client1", v) + self.assertEqual(v['client1'], "1.2.3") + client = MetadataClientModel.objects.get(hostname="client1") + self.assertEqual(client.version, "1.2.3") + + # test adding new client + new = "client__setitem" + v[new] = "1.3.0" + self.assertIn(new, v) + self.assertEqual(v[new], "1.3.0") + client = MetadataClientModel.objects.get(hostname=new) + self.assertEqual(client.version, "1.3.0") + + # test adding new client with no version + new2 = "client__setitem_2" + v[new2] = None + self.assertIn(new2, v) + self.assertEqual(v[new2], None) + client = MetadataClientModel.objects.get(hostname=new2) + self.assertEqual(client.version, None) + + def test__getitem(self): + v = ClientVersions() + + # test getting existing client + self.assertEqual(v['client2'], "1.2.2") + self.assertIsNone(v['client5']) + + # test exception on nonexistent client + expected = KeyError + try: + v['clients__getitem'] + except expected: + pass + except: + err = sys.exc_info()[1] + self.assertFalse(True, "%s raised instead of %s" % + (err.__class__.__name__, + expected.__class__.__name__)) + else: + self.assertFalse(True, + "%s not raised" % expected.__class__.__name__) + + def test__len(self): + v = ClientVersions() + self.assertEqual(len(v), MetadataClientModel.objects.count()) + + def test__iter(self): + v = ClientVersions() + self.assertItemsEqual([h for h in iter(v)], v.keys()) + + def test__delitem(self): + v = ClientVersions() + + # test adding new client + new = "client__delitem" + v[new] = "1.3.0" + + del v[new] + self.assertIn(new, v) + self.assertIsNone(v[new]) + + +class TestXMLMetadataConfig(TestXMLFileBacked): + test_obj = XMLMetadataConfig + + def get_obj(self, basefile="clients.xml", core=None, watch_clients=False): + self.metadata = get_metadata_object(core=core, + watch_clients=watch_clients) + return XMLMetadataConfig(self.metadata, watch_clients, basefile) + + def test__init(self): + xmc = self.get_obj() + self.assertEqual(self.metadata.core.fam, xmc.fam) + self.assertFalse(xmc.fam.AddMonitor.called) + + def test_xdata(self): + config = self.get_obj() + expected = Bcfg2.Server.Plugin.MetadataRuntimeError + try: + config.xdata + except expected: + pass + except: + err = sys.exc_info()[1] + self.assertFalse(True, "%s raised instead of %s" % + (err.__class__.__name__, + expected.__class__.__name__)) + else: + self.assertFalse(True, + "%s not raised" % expected.__class__.__name__) + pass + + config.data = "" + self.assertEqual(config.xdata, "") + + def test_base_xdata(self): + config = self.get_obj() + # we can't use assertRaises here because base_xdata is a property + expected = Bcfg2.Server.Plugin.MetadataRuntimeError + try: + config.base_xdata + except expected: + pass + except: + err = sys.exc_info()[1] + self.assertFalse(True, "%s raised instead of %s" % + (err.__class__.__name__, + expected.__class__.__name__)) + else: + self.assertFalse(True, + "%s not raised" % expected.__class__.__name__) + pass + + config.basedata = "" + self.assertEqual(config.base_xdata, "") + + def test_add_monitor(self): + core = Mock() + config = self.get_obj(core=core) + + fname = "test.xml" + fpath = os.path.join(self.metadata.data, fname) + + config.extras = [] + config.add_monitor(fpath) + self.assertFalse(core.fam.AddMonitor.called) + self.assertEqual(config.extras, [fpath]) + + config = self.get_obj(core=core, watch_clients=True) + config.add_monitor(fpath) + core.fam.AddMonitor.assert_called_with(fpath, config.metadata) + self.assertItemsEqual(config.extras, [fpath]) + + def test_Index(self): + # Index() isn't used on XMLMetadataConfig objects + pass + + @patch("lxml.etree.parse") + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig._follow_xincludes") + def test_load_xml(self, mock_follow, mock_parse): + config = self.get_obj("clients.xml") + + def reset(): + mock_parse.reset_mock() + mock_follow.reset_mock() + config.data = None + config.basedata = None + + reset() + config.load_xml() + mock_follow.assert_called_with(xdata=mock_parse.return_value) + mock_parse.assert_called_with(os.path.join(config.basedir, + "clients.xml"), + parser=Bcfg2.Server.XMLParser) + self.assertFalse(mock_parse.return_value.xinclude.called) + self.assertEqual(config.data, mock_parse.return_value) + self.assertIsNotNone(config.basedata) + + reset() + mock_parse.side_effect = lxml.etree.XMLSyntaxError(None, None, None, + None) + config.load_xml() + mock_parse.assert_called_with(os.path.join(config.basedir, + "clients.xml"), + parser=Bcfg2.Server.XMLParser) + self.assertIsNone(config.data) + self.assertIsNone(config.basedata) + + reset() + mock_parse.side_effect = None + def follow_xincludes(xdata=None): + config.extras = [Mock(), Mock()] + mock_follow.side_effect = follow_xincludes + config.load_xml() + mock_follow.assert_called_with(xdata=mock_parse.return_value) + mock_parse.assert_called_with(os.path.join(config.basedir, + "clients.xml"), + parser=Bcfg2.Server.XMLParser) + mock_parse.return_value.xinclude.assert_any_call() + self.assertEqual(config.data, mock_parse.return_value) + self.assertIsNotNone(config.basedata) + + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml") + def test_write(self, mock_write_xml): + config = self.get_obj("clients.xml") + config.basedata = "" + config.write() + mock_write_xml.assert_called_with(os.path.join(self.metadata.data, + "clients.xml"), + "") + + @patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False)) + @patch('fcntl.lockf', Mock()) + @patch('%s.open' % builtins) + @patch('os.unlink') + @patch('os.rename') + @patch('os.path.islink') + @patch('os.readlink') + def test_write_xml(self, mock_readlink, mock_islink, mock_rename, + mock_unlink, mock_open): + fname = "clients.xml" + config = self.get_obj(fname) + fpath = os.path.join(self.metadata.data, fname) + tmpfile = "%s.new" % fpath + linkdest = os.path.join(self.metadata.data, "client-link.xml") + + mock_islink.return_value = False + + config.write_xml(fpath, get_clients_test_tree()) + mock_open.assert_called_with(tmpfile, "w") + self.assertTrue(mock_open.return_value.write.called) + mock_islink.assert_called_with(fpath) + mock_rename.assert_called_with(tmpfile, fpath) + + mock_islink.return_value = True + mock_readlink.return_value = linkdest + config.write_xml(fpath, get_clients_test_tree()) + mock_rename.assert_called_with(tmpfile, linkdest) + + mock_rename.side_effect = OSError + self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, + config.write_xml, fpath, get_clients_test_tree()) + + mock_open.return_value.write.side_effect = IOError + self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, + config.write_xml, fpath, get_clients_test_tree()) + mock_unlink.assert_called_with(tmpfile) + + mock_open.side_effect = IOError + self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, + config.write_xml, fpath, get_clients_test_tree()) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch('lxml.etree.parse') + def test_find_xml_for_xpath(self, mock_parse): + config = self.get_obj("groups.xml") + config.basedata = get_groups_test_tree() + xpath = "//Group[@name='group1']" + self.assertItemsEqual(config.find_xml_for_xpath(xpath), + dict(filename=os.path.join(self.metadata.data, + "groups.xml"), + xmltree=get_groups_test_tree(), + xquery=get_groups_test_tree().xpath(xpath))) + + self.assertEqual(config.find_xml_for_xpath("//boguselement"), dict()) + + config.extras = [os.path.join(self.metadata.data, p) + for p in ["foo.xml", "bar.xml", "clients.xml"]] + + def parse_side_effect(fname, parser=Bcfg2.Server.XMLParser): + if fname == os.path.join(self.metadata.data, "clients.xml"): + return get_clients_test_tree() + else: + return lxml.etree.XML("").getroottree() + + mock_parse.side_effect = parse_side_effect + xpath = "//Client[@secure='true']" + self.assertItemsEqual(config.find_xml_for_xpath(xpath), + dict(filename=os.path.join(self.metadata.data, + "clients.xml"), + xmltree=get_clients_test_tree(), + xquery=get_clients_test_tree().xpath(xpath))) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") + def test_HandleEvent(self, mock_load_xml): + config = self.get_obj("groups.xml") + evt = Mock() + evt.filename = os.path.join(self.metadata.data, "groups.xml") + evt.code2str = Mock(return_value="changed") + self.assertTrue(config.HandleEvent(evt)) + mock_load_xml.assert_called_with() + + +class TestClientMetadata(Bcfg2TestCase): + def test_inGroup(self): + cm = ClientMetadata("client1", "group1", ["group1", "group2"], + ["bundle1"], [], [], [], None, None, None, None) + self.assertTrue(cm.inGroup("group1")) + self.assertFalse(cm.inGroup("group3")) + + +class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): + test_obj = Metadata + use_db = False + + def get_obj(self, core=None, watch_clients=False): + return get_metadata_object(core=core, watch_clients=watch_clients, + use_db=self.use_db) + + @skipUnless(has_django, "Django not found") + def test__use_db(self): + # with the way we've set up our metadata tests, it's unweildy + # to test _use_db. however, given the way get_obj works, if + # there was a bug in _use_db it'd be almost certain to shake + # out in the rest of the testing. + pass + + def get_nonexistent_client(self, metadata, prefix="newclient"): + if metadata is None: + metadata = self.load_clients_data() + i = 0 + client_name = "%s%s" % (prefix, i) + while client_name in metadata.clients: + i += 1 + client_name = "%s%s" % (prefix, i) + return client_name + + def test__init(self): + # test with watch_clients=False + core = Mock() + metadata = self.get_obj(core=core) + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin) + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata) + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics) + self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig) + self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig) + self.assertIsInstance(metadata.query, MetadataQuery) + self.assertEqual(metadata.states, dict()) + + # test with watch_clients=True + core.fam = Mock() + metadata = self.get_obj(core=core, watch_clients=True) + self.assertEqual(len(metadata.states), 2) + core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, + "groups.xml"), + metadata) + core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, + "clients.xml"), + metadata) + + core.fam.reset_mock() + core.fam.AddMonitor = Mock(side_effect=IOError) + self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, + self.get_obj, core=core, watch_clients=True) + + @patch('os.makedirs', Mock()) + @patch('%s.open' % builtins) + def test_init_repo(self, mock_open): + Metadata.init_repo(datastore, + groups_xml="groups", clients_xml="clients") + mock_open.assert_any_call(os.path.join(datastore, "Metadata", + "groups.xml"), "w") + mock_open.assert_any_call(os.path.join(datastore, "Metadata", + "clients.xml"), "w") + + def test_search_xdata(self): + # test finding a node with the proper name + metadata = self.get_obj() + tree = get_groups_test_tree() + res = metadata._search_xdata("Group", "group1", tree) + self.assertIsInstance(res, lxml.etree._Element) + self.assertEqual(res.get("name"), "group1") + + # test finding a node with the wrong name but correct alias + metadata = self.get_obj() + tree = get_clients_test_tree() + res = metadata._search_xdata("Client", "alias3", tree, alias=True) + self.assertIsInstance(res, lxml.etree._Element) + self.assertNotEqual(res.get("name"), "alias3") + + # test failure finding a node + metadata = self.get_obj() + tree = get_clients_test_tree() + res = metadata._search_xdata("Client", + self.get_nonexistent_client(metadata), + tree, alias=True) + self.assertIsNone(res) + + def search_xdata(self, tag, name, tree, alias=False): + metadata = self.get_obj() + res = metadata._search_xdata(tag, name, tree, alias=alias) + self.assertIsInstance(res, lxml.etree._Element) + if not alias: + self.assertEqual(res.get("name"), name) + + def test_search_group(self): + # test finding a group with the proper name + tree = get_groups_test_tree() + self.search_xdata("Group", "group1", tree) + + def test_search_bundle(self): + # test finding a bundle with the proper name + tree = get_groups_test_tree() + self.search_xdata("Bundle", "bundle1", tree) + + def test_search_client(self): + # test finding a client with the proper name + tree = get_clients_test_tree() + self.search_xdata("Client", "client1", tree, alias=True) + self.search_xdata("Client", "alias1", tree, alias=True) + + def test_add_group(self): + metadata = self.get_obj() + metadata.groups_xml.write = Mock() + metadata.groups_xml.data = lxml.etree.XML('').getroottree() + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.add_group("test1", dict()) + metadata.groups_xml.write.assert_any_call() + grp = metadata.search_group("test1", metadata.groups_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertEqual(grp.attrib, dict(name='test1')) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.add_group("test2", dict(foo='bar')) + metadata.groups_xml.write.assert_any_call() + grp = metadata.search_group("test2", metadata.groups_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertEqual(grp.attrib, dict(name='test2', foo='bar')) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.groups_xml.write.reset_mock() + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.add_group, + "test1", dict()) + self.assertFalse(metadata.groups_xml.write.called) + + def test_update_group(self): + metadata = self.get_obj() + metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.update_group("group1", dict(foo="bar")) + grp = metadata.search_group("group1", metadata.groups_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertIn("foo", grp.attrib) + self.assertEqual(grp.get("foo"), "bar") + self.assertTrue(metadata.groups_xml.write_xml.called) + + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.update_group, + "bogus_group", dict()) + + def test_remove_group(self): + metadata = self.get_obj() + metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.remove_group("group5") + grp = metadata.search_group("group5", metadata.groups_xml.base_xdata) + self.assertIsNone(grp) + self.assertTrue(metadata.groups_xml.write_xml.called) + + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.remove_group, + "bogus_group") + + def test_add_bundle(self): + metadata = self.get_obj() + metadata.groups_xml.write = Mock() + metadata.groups_xml.data = lxml.etree.XML('').getroottree() + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.add_bundle("bundle1") + metadata.groups_xml.write.assert_any_call() + bundle = metadata.search_bundle("bundle1", + metadata.groups_xml.base_xdata) + self.assertIsNotNone(bundle) + self.assertEqual(bundle.attrib, dict(name='bundle1')) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.groups_xml.write.reset_mock() + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.add_bundle, + "bundle1") + self.assertFalse(metadata.groups_xml.write.called) + + def test_remove_bundle(self): + metadata = self.get_obj() + metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.remove_bundle("bundle1") + grp = metadata.search_bundle("bundle1", metadata.groups_xml.base_xdata) + self.assertIsNone(grp) + self.assertTrue(metadata.groups_xml.write_xml.called) + + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.remove_bundle, + "bogus_bundle") + + def test_add_client(self): + metadata = self.get_obj() + metadata.clients_xml.write = Mock() + metadata.clients_xml.data = lxml.etree.XML('').getroottree() + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + + new1 = self.get_nonexistent_client(metadata) + metadata.add_client(new1, dict()) + metadata.clients_xml.write.assert_any_call() + grp = metadata.search_client(new1, metadata.clients_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertEqual(grp.attrib, dict(name=new1)) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + metadata._handle_clients_xml_event(Mock()) + + new2 = self.get_nonexistent_client(metadata) + metadata.add_client(new2, dict(foo='bar')) + metadata.clients_xml.write.assert_any_call() + grp = metadata.search_client(new2, metadata.clients_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertEqual(grp.attrib, dict(name=new2, foo='bar')) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + + metadata.clients_xml.write.reset_mock() + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.add_client, + new1, dict()) + self.assertFalse(metadata.clients_xml.write.called) + + def test_update_client(self): + metadata = self.get_obj() + metadata.clients_xml.write_xml = Mock() + metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree()) + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + + metadata.update_client("client1", dict(foo="bar")) + grp = metadata.search_client("client1", metadata.clients_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertIn("foo", grp.attrib) + self.assertEqual(grp.get("foo"), "bar") + self.assertTrue(metadata.clients_xml.write_xml.called) + + new = self.get_nonexistent_client(metadata) + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.update_client, + new, dict()) + + def load_clients_data(self, metadata=None, xdata=None): + if metadata is None: + metadata = self.get_obj() + metadata.clients_xml.data = \ + xdata or copy.deepcopy(get_clients_test_tree()) + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + evt = Mock() + evt.filename = os.path.join(datastore, "Metadata", "clients.xml") + evt.code2str = Mock(return_value="changed") + metadata.HandleEvent(evt) + return metadata + + def test_handle_clients_xml_event(self): + metadata = self.get_obj() + metadata.profiles = ["group1", "group2"] + + metadata.clients_xml = Mock() + metadata.clients_xml.xdata = copy.deepcopy(get_clients_test_tree()) + metadata._handle_clients_xml_event(Mock()) + + if not self.use_db: + self.assertItemsEqual(metadata.clients, + dict([(c.get("name"), c.get("profile")) + for c in get_clients_test_tree().findall("//Client")])) + aliases = dict([(a.get("name"), a.getparent().get("name")) + for a in get_clients_test_tree().findall("//Alias")]) + self.assertItemsEqual(metadata.aliases, aliases) + + raliases = dict([(c.get("name"), set()) + for c in get_clients_test_tree().findall("//Client")]) + for alias in get_clients_test_tree().findall("//Alias"): + raliases[alias.getparent().get("name")].add(alias.get("name")) + self.assertItemsEqual(metadata.raliases, raliases) + + self.assertEqual(metadata.secure, + [c.get("name") + for c in get_clients_test_tree().findall("//Client[@secure='true']")]) + self.assertEqual(metadata.floating, ["client1", "client10"]) + + addresses = dict([(c.get("address"), []) + for c in get_clients_test_tree().findall("//*[@address]")]) + raddresses = dict() + for client in get_clients_test_tree().findall("//Client[@address]"): + addresses[client.get("address")].append(client.get("name")) + try: + raddresses[client.get("name")].append(client.get("address")) + except KeyError: + raddresses[client.get("name")] = [client.get("address")] + for alias in get_clients_test_tree().findall("//Alias[@address]"): + addresses[alias.get("address")].append(alias.getparent().get("name")) + try: + raddresses[alias.getparent().get("name")].append(alias.get("address")) + except KeyError: + raddresses[alias.getparent().get("name")] = alias.get("address") + + self.assertItemsEqual(metadata.addresses, addresses) + self.assertItemsEqual(metadata.raddresses, raddresses) + self.assertTrue(metadata.states['clients.xml']) + + def load_groups_data(self, metadata=None, xdata=None): + if metadata is None: + metadata = self.get_obj() + metadata.groups_xml.data = \ + xdata or copy.deepcopy(get_groups_test_tree()) + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + evt = Mock() + evt.filename = os.path.join(datastore, "Metadata", "groups.xml") + evt.code2str = Mock(return_value="changed") + metadata.HandleEvent(evt) + return metadata + + def test_handle_groups_xml_event(self): + metadata = self.get_obj() + metadata.groups_xml = Mock() + metadata.groups_xml.xdata = get_groups_test_tree() + metadata._handle_groups_xml_event(Mock()) + + self.assertTrue(metadata.states['groups.xml']) + self.assertTrue(metadata.groups['group1'].is_public) + self.assertTrue(metadata.groups['group2'].is_public) + self.assertFalse(metadata.groups['group3'].is_public) + self.assertFalse(metadata.groups['group1'].is_private) + self.assertFalse(metadata.groups['group2'].is_private) + self.assertTrue(metadata.groups['group3'].is_private) + self.assertTrue(metadata.groups['group1'].is_profile) + self.assertTrue(metadata.groups['group2'].is_profile) + self.assertFalse(metadata.groups['group3'].is_profile) + self.assertItemsEqual(metadata.groups.keys(), + set(g.get("name") + for g in get_groups_test_tree().findall("//Group"))) + self.assertEqual(metadata.groups['group1'].category, 'category1') + self.assertEqual(metadata.groups['group2'].category, 'category1') + self.assertEqual(metadata.groups['group3'].category, 'category2') + self.assertEqual(metadata.groups['group4'].category, 'category1') + self.assertEqual(metadata.default, "group1") + + all_groups = [] + negated_groups = [] + for group in get_groups_test_tree().xpath("//Groups/Client//*") + \ + get_groups_test_tree().xpath("//Groups/Group//*"): + if group.tag == 'Group' and not group.getchildren(): + if group.get("negate", "false").lower() == 'true': + negated_groups.append(group.get("name")) + else: + all_groups.append(group.get("name")) + self.assertItemsEqual([g.name + for g in metadata.group_membership.values()], + all_groups) + self.assertItemsEqual([g.name + for g in metadata.negated_groups.values()], + negated_groups) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_set_profile(self): + metadata = self.get_obj() + if 'clients.xml' in metadata.states: + metadata.states['clients.xml'] = False + self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, + metadata.set_profile, + None, None, None) + + self.load_groups_data(metadata=metadata) + self.load_clients_data(metadata=metadata) + + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.set_profile, + "client1", "group5", None) + + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.set_profile, + "client1", "group3", None) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_set_profile_db(self): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + if metadata._use_db: + profile = "group1" + client_name = self.get_nonexistent_client(metadata) + metadata.set_profile(client_name, profile, None) + self.assertIn(client_name, metadata.clients) + self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError, + metadata.set_profile, + client_name, profile, None) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client") + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") + def test_set_profile_xml(self, mock_update_client, mock_add_client): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + if not metadata._use_db: + metadata.clients_xml.write = Mock() + metadata.set_profile("client1", "group2", None) + mock_update_client.assert_called_with("client1", + dict(profile="group2")) + metadata.clients_xml.write.assert_any_call() + self.assertEqual(metadata.clientgroups["client1"], ["group2"]) + + metadata.clients_xml.write.reset_mock() + new1 = self.get_nonexistent_client(metadata) + metadata.set_profile(new1, "group1", None) + mock_add_client.assert_called_with(new1, dict(profile="group1")) + metadata.clients_xml.write.assert_any_call() + self.assertEqual(metadata.clientgroups[new1], ["group1"]) + + metadata.clients_xml.write.reset_mock() + new2 = self.get_nonexistent_client(metadata) + metadata.session_cache[('1.2.3.6', None)] = (None, new2) + metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None)) + mock_add_client.assert_called_with(new2, + dict(uuid='uuid_new', + profile="group1", + address='1.2.3.6')) + metadata.clients_xml.write.assert_any_call() + self.assertEqual(metadata.clientgroups["uuid_new"], ["group1"]) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("socket.gethostbyaddr") + def test_resolve_client(self, mock_gethostbyaddr): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3') + self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') + + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.resolve_client, + ('1.2.3.2', None)) + self.assertEqual(metadata.resolve_client(('1.2.3.1', None)), 'client1') + + metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100, + 'client3') + self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') + self.assertEqual(metadata.resolve_client(('1.2.3.3', None), + cleanup_cache=True), 'client3') + self.assertEqual(metadata.session_cache, dict()) + + mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6']) + self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6') + mock_gethostbyaddr.assert_called_with('1.2.3.6') + + mock_gethostbyaddr.reset_mock() + mock_gethostbyaddr.return_value = ('alias3', [], ['1.2.3.7']) + self.assertEqual(metadata.resolve_client(('1.2.3.7', None)), 'client4') + mock_gethostbyaddr.assert_called_with('1.2.3.7') + + mock_gethostbyaddr.reset_mock() + mock_gethostbyaddr.return_value = None + mock_gethostbyaddr.side_effect = socket.herror + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.resolve_client, + ('1.2.3.8', None)) + mock_gethostbyaddr.assert_called_with('1.2.3.8') + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata") + def test_get_initial_metadata(self, mock_clientmetadata): + metadata = self.get_obj() + if 'clients.xml' in metadata.states: + metadata.states['clients.xml'] = False + self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, + metadata.get_initial_metadata, None) + + self.load_groups_data(metadata=metadata) + self.load_clients_data(metadata=metadata) + + # test address, password + metadata.get_initial_metadata("client1") + mock_clientmetadata.assert_called_with("client1", "group1", + set(["group1"]), set(), set(), + set(["1.2.3.1"]), + dict(category1='group1'), None, + 'password2', None, + metadata.query) + + # test address, bundles, category suppression + metadata.get_initial_metadata("client2") + mock_clientmetadata.assert_called_with("client2", "group2", + set(["group2"]), + set(["bundle1", "bundle2"]), + set(), set(["1.2.3.2"]), + dict(category1="group2"), + None, None, None, + metadata.query) + + # test aliases, address, uuid, password + imd = metadata.get_initial_metadata("alias1") + mock_clientmetadata.assert_called_with("client3", "group1", + set(["group1"]), set(), + set(['alias1']), + set(["1.2.3.3"]), + dict(category1="group1"), + 'uuid1', 'password2', None, + metadata.query) + + # test new client creation + new1 = self.get_nonexistent_client(metadata) + imd = metadata.get_initial_metadata(new1) + mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]), + set(), set(), set(), + dict(category1="group1"), None, + None, None, metadata.query) + + # test nested groups, address, per-client groups + imd = metadata.get_initial_metadata("client8") + mock_clientmetadata.assert_called_with("client8", "group1", + set(["group1", "group8", + "group9", "group10"]), + set(), + set(), set(["1.2.3.5"]), + dict(category1="group1"), + None, None, None, metadata.query) + + # test setting per-client groups, group negation, nested groups + imd = metadata.get_initial_metadata("client9") + mock_clientmetadata.assert_called_with("client9", "group2", + set(["group2", "group8", + "group11"]), + set(["bundle1", "bundle2"]), + set(), set(), + dict(category1="group2"), None, + "password3", None, + metadata.query) + + # test new client with no default profile + metadata.default = None + new2 = self.get_nonexistent_client(metadata) + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.get_initial_metadata, new2) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_merge_groups(self): + metadata = self.get_obj() + self.load_groups_data(metadata=metadata) + self.load_clients_data(metadata=metadata) + + self.assertEqual(metadata._merge_groups("client1", set(["group1"]), + categories=dict(group1="category1")), + (set(["group1"]), dict(group1="category1"))) + + self.assertEqual(metadata._merge_groups("client8", + set(["group1", "group8", "group9"]), + categories=dict(group1="category1")), + (set(["group1", "group8", "group9", "group10"]), + dict(group1="category1"))) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_get_all_group_names(self): + metadata = self.load_groups_data() + self.assertItemsEqual(metadata.get_all_group_names(), + set([g.get("name") + for g in get_groups_test_tree().findall("//Group")])) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_get_all_groups_in_category(self): + metadata = self.load_groups_data() + self.assertItemsEqual(metadata.get_all_groups_in_category("category1"), + set([g.get("name") + for g in get_groups_test_tree().findall("//Group[@category='category1']")])) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_get_client_names_by_profiles(self): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + self.assertItemsEqual(metadata.get_client_names_by_profiles(["group2"]), + [c.get("name") + for c in get_clients_test_tree().findall("//Client[@profile='group2']")]) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_get_client_names_by_groups(self): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + # this is not the best test in the world, since we mock + # core.build_metadata to just build _initial_ metadata, which + # is not at all the same thing. it turns out that mocking + # this out without starting a Bcfg2 server is pretty + # non-trivial, so this works-ish + metadata.core.build_metadata = Mock() + metadata.core.build_metadata.side_effect = \ + lambda c: metadata.get_initial_metadata(c) + self.assertItemsEqual(metadata.get_client_names_by_groups(["group2"]), + [c.get("name") + for c in get_clients_test_tree().findall("//Client[@profile='group2']")]) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_merge_additional_groups(self): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + imd = metadata.get_initial_metadata("client2") + + # test adding a group excluded by categories + oldgroups = imd.groups + metadata.merge_additional_groups(imd, ["group4"]) + self.assertEqual(imd.groups, oldgroups) + + # test adding a private group + oldgroups = imd.groups + metadata.merge_additional_groups(imd, ["group3"]) + self.assertEqual(imd.groups, oldgroups) + + # test adding groups with bundles + oldgroups = imd.groups + oldbundles = imd.bundles + metadata.merge_additional_groups(imd, ["group7"]) + self.assertEqual(imd.groups, oldgroups.union(["group7"])) + self.assertEqual(imd.bundles, oldbundles.union(["bundle3"])) + + # test adding multiple groups + imd = metadata.get_initial_metadata("client2") + oldgroups = imd.groups + metadata.merge_additional_groups(imd, ["group6", "group8"]) + self.assertItemsEqual(imd.groups, + oldgroups.union(["group6", "group8", "group9"])) + + # test adding a group that is not defined in groups.xml + imd = metadata.get_initial_metadata("client2") + oldgroups = imd.groups + metadata.merge_additional_groups(imd, ["group6", "newgroup"]) + self.assertItemsEqual(imd.groups, + oldgroups.union(["group6", "newgroup"])) + + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_merge_additional_data(self): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + imd = metadata.get_initial_metadata("client1") + + # we need to use a unique attribute name for this test. this + # is probably overkill, but it works + pattern = "connector%d" + for i in range(0, 100): + connector = pattern % i + if not hasattr(imd, connector): + break + self.assertFalse(hasattr(imd, connector), + "Could not find unique connector name to test " + "merge_additional_data()") + + metadata.merge_additional_data(imd, connector, "test data") + self.assertEqual(getattr(imd, connector), "test data") + self.assertIn(connector, imd.connectors) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") + def test_validate_client_address(self, mock_resolve_client): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + self.assertTrue(metadata.validate_client_address("client1", + (None, None))) + self.assertTrue(metadata.validate_client_address("client2", + ("1.2.3.2", None))) + self.assertFalse(metadata.validate_client_address("client2", + ("1.2.3.8", None))) + self.assertTrue(metadata.validate_client_address("client4", + ("1.2.3.2", None))) + # this is upper case to ensure that case is folded properly in + # validate_client_address() + mock_resolve_client.return_value = "CLIENT4" + self.assertTrue(metadata.validate_client_address("client4", + ("1.2.3.7", None))) + mock_resolve_client.assert_called_with(("1.2.3.7", None)) + + mock_resolve_client.reset_mock() + self.assertFalse(metadata.validate_client_address("client5", + ("1.2.3.5", None))) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address") + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") + def test_AuthenticateConnection(self, mock_resolve_client, + mock_validate_client_address): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + metadata.password = "password1" + + cert = dict(subject=[[("commonName", "client1")]]) + mock_validate_client_address.return_value = False + self.assertFalse(metadata.AuthenticateConnection(cert, "root", None, + "1.2.3.1")) + mock_validate_client_address.return_value = True + self.assertTrue(metadata.AuthenticateConnection(cert, "root", None, + "1.2.3.1")) + # floating cert-auth clients add themselves to the cache + self.assertIn("1.2.3.1", metadata.session_cache) + self.assertEqual(metadata.session_cache["1.2.3.1"][1], "client1") + + cert = dict(subject=[[("commonName", "client7")]]) + self.assertTrue(metadata.AuthenticateConnection(cert, "root", None, + "1.2.3.4")) + # non-floating cert-auth clients do not add themselves to the cache + self.assertNotIn("1.2.3.4", metadata.session_cache) + + cert = dict(subject=[[("commonName", "client8")]]) + + mock_resolve_client.return_value = "client5" + self.assertTrue(metadata.AuthenticateConnection(None, "root", + "password1", "1.2.3.8")) + + mock_resolve_client.side_effect = \ + Bcfg2.Server.Plugin.MetadataConsistencyError + self.assertFalse(metadata.AuthenticateConnection(None, "root", + "password1", + "1.2.3.8")) + + # secure mode, no password + self.assertFalse(metadata.AuthenticateConnection(None, 'client2', None, + "1.2.3.2")) + + self.assertTrue(metadata.AuthenticateConnection(None, 'uuid1', + "password1", "1.2.3.3")) + # non-root, non-cert clients populate session cache + self.assertIn("1.2.3.3", metadata.session_cache) + self.assertEqual(metadata.session_cache["1.2.3.3"][1], "client3") + + # use alternate password + self.assertTrue(metadata.AuthenticateConnection(None, 'client3', + "password2", "1.2.3.3")) + + # test secure mode + self.assertFalse(metadata.AuthenticateConnection(None, 'client9', + "password1", + "1.2.3.9")) + self.assertTrue(metadata.AuthenticateConnection(None, 'client9', + "password3", "1.2.3.9")) + + self.assertFalse(metadata.AuthenticateConnection(None, "client5", + "password2", + "1.2.3.7")) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") + def test_process_statistics(self, mock_update_client): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + md = Mock() + md.hostname = "client6" + metadata.process_statistics(md, None) + mock_update_client.assert_called_with(md.hostname, + dict(auth='cert')) + + mock_update_client.reset_mock() + md.hostname = "client5" + metadata.process_statistics(md, None) + self.assertFalse(mock_update_client.called) + + def test_viz(self): + pass + + +class TestMetadataBase(TestMetadata): + """ base test object for testing Metadata with database enabled """ + __test__ = False + use_db = True + + @skipUnless(has_django, "Django not found") + def setUp(self): + syncdb(TestMetadataDB) + + def load_clients_data(self, metadata=None, xdata=None): + if metadata is None: + metadata = get_obj() + for client in get_clients_test_tree().findall("Client"): + metadata.add_client(client.get("name")) + return metadata + + def get_nonexistent_client(self, _, prefix="newclient"): + clients = [o.hostname for o in MetadataClientModel.objects.all()] + i = 0 + client_name = "%s%s" % (prefix, i) + while client_name in clients: + i += 1 + client_name = "%s%s" % (prefix, i) + return client_name + + @patch('os.path.exists') + def test__init(self, mock_exists): + core = Mock() + core.fam = Mock() + mock_exists.return_value = False + metadata = self.get_obj(core=core, watch_clients=True) + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) + core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, + "groups.xml"), + metadata) + + mock_exists.return_value = True + core.fam.reset_mock() + metadata = self.get_obj(core=core, watch_clients=True) + core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, + "groups.xml"), + metadata) + core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, + "clients.xml"), + metadata) + + def test_add_group(self): + pass + + def test_add_bundle(self): + pass + + def test_add_client(self): + metadata = self.get_obj() + hostname = self.get_nonexistent_client(metadata) + client = metadata.add_client(hostname) + self.assertIsInstance(client, MetadataClientModel) + self.assertEqual(client.hostname, hostname) + self.assertIn(hostname, metadata.clients) + self.assertIn(hostname, metadata.list_clients()) + self.assertItemsEqual(metadata.clients, + [c.hostname + for c in MetadataClientModel.objects.all()]) + + def test_update_group(self): + pass + + def test_update_bundle(self): + pass + + def test_update_client(self): + pass + + def test_list_clients(self): + metadata = self.get_obj() + self.assertItemsEqual(metadata.list_clients(), + [c.hostname + for c in MetadataClientModel.objects.all()]) + + def test_remove_group(self): + pass + + def test_remove_bundle(self): + pass + + def test_remove_client(self): + metadata = self.get_obj() + client_name = self.get_nonexistent_client(metadata) + + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.remove_client, + client_name) + + metadata.add_client(client_name) + metadata.remove_client(client_name) + self.assertNotIn(client_name, metadata.clients) + self.assertNotIn(client_name, metadata.list_clients()) + self.assertItemsEqual(metadata.clients, + [c.hostname + for c in MetadataClientModel.objects.all()]) + + def test_process_statistics(self): + pass + + +class TestMetadata_NoClientsXML(TestMetadataBase): + """ test Metadata without a clients.xml. we have to disable or + override tests that rely on client options """ + # only run these tests if it's possible to skip tests or if we + # have django. otherwise they'll all get run because our fake + # skipping decorators for python < 2.7 won't work when they + # decorate setUp() + if can_skip or has_django: + __test__ = True + + def load_groups_data(self, metadata=None, xdata=None): + if metadata is None: + metadata = self.get_obj() + if not xdata: + xdata = copy.deepcopy(get_groups_test_tree()) + for client in get_clients_test_tree().findall("Client"): + newclient = \ + lxml.etree.SubElement(xdata.getroot(), + "Client", name=client.get("name")) + lxml.etree.SubElement(newclient, "Group", + name=client.get("profile")) + metadata.groups_xml.data = xdata + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + evt = Mock() + evt.filename = os.path.join(datastore, "Metadata", "groups.xml") + evt.code2str = Mock(return_value="changed") + metadata.HandleEvent(evt) + return metadata + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata") + def test_get_initial_metadata(self, mock_clientmetadata): + metadata = self.get_obj() + if 'clients.xml' in metadata.states: + metadata.states['clients.xml'] = False + self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, + metadata.get_initial_metadata, None) + + self.load_groups_data(metadata=metadata) + self.load_clients_data(metadata=metadata) + + # test basic client metadata + metadata.get_initial_metadata("client1") + mock_clientmetadata.assert_called_with("client1", "group1", + set(["group1"]), set(), set(), + set(), dict(category1='group1'), + None, None, None, metadata.query) + + # test bundles, category suppression + metadata.get_initial_metadata("client2") + mock_clientmetadata.assert_called_with("client2", "group2", + set(["group2"]), + set(["bundle1", "bundle2"]), + set(), set(), + dict(category1="group2"), None, + None, None, metadata.query) + + # test new client creation + new1 = self.get_nonexistent_client(metadata) + imd = metadata.get_initial_metadata(new1) + mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]), + set(), set(), set(), + dict(category1="group1"), None, + None, None, metadata.query) + + # test nested groups, per-client groups + imd = metadata.get_initial_metadata("client8") + mock_clientmetadata.assert_called_with("client8", "group1", + set(["group1", "group8", + "group9", "group10"]), + set(), set(), set(), + dict(category1="group1"), None, + None, None, metadata.query) + + # test per-client groups, group negation, nested groups + imd = metadata.get_initial_metadata("client9") + mock_clientmetadata.assert_called_with("client9", "group2", + set(["group2", "group8", + "group11"]), + set(["bundle1", "bundle2"]), + set(), set(), + dict(category1="group2"), None, + None, None, metadata.query) + + # test exception on new client with no default profile + metadata.default = None + new2 = self.get_nonexistent_client(metadata) + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.get_initial_metadata, + new2) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") + def test_validate_client_address(self, mock_resolve_client): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + # this is upper case to ensure that case is folded properly in + # validate_client_address() + mock_resolve_client.return_value = "CLIENT4" + self.assertTrue(metadata.validate_client_address("client4", + ("1.2.3.7", None))) + mock_resolve_client.assert_called_with(("1.2.3.7", None)) + + mock_resolve_client.reset_mock() + self.assertFalse(metadata.validate_client_address("client5", + ("1.2.3.5", None))) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address") + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") + def test_AuthenticateConnection(self, mock_resolve_client, + mock_validate_client_address): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + metadata.password = "password1" + + cert = dict(subject=[[("commonName", "client1")]]) + mock_validate_client_address.return_value = False + self.assertFalse(metadata.AuthenticateConnection(cert, "root", None, + "1.2.3.1")) + mock_validate_client_address.return_value = True + self.assertTrue(metadata.AuthenticateConnection(cert, "root", + metadata.password, + "1.2.3.1")) + + cert = dict(subject=[[("commonName", "client8")]]) + + mock_resolve_client.return_value = "client5" + self.assertTrue(metadata.AuthenticateConnection(None, "root", + "password1", "1.2.3.8")) + + mock_resolve_client.side_effect = \ + Bcfg2.Server.Plugin.MetadataConsistencyError + self.assertFalse(metadata.AuthenticateConnection(None, "root", + "password1", + "1.2.3.8")) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("socket.gethostbyaddr") + def test_resolve_client(self, mock_gethostbyaddr): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3') + self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') + + metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100, + 'client3') + mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3']) + self.assertEqual(metadata.resolve_client(('1.2.3.3', None), + cleanup_cache=True), 'client3') + self.assertEqual(metadata.session_cache, dict()) + + mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6']) + self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6') + mock_gethostbyaddr.assert_called_with('1.2.3.6') + + mock_gethostbyaddr.reset_mock() + mock_gethostbyaddr.return_value = None + mock_gethostbyaddr.side_effect = socket.herror + self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, + metadata.resolve_client, + ('1.2.3.8', None)) + mock_gethostbyaddr.assert_called_with('1.2.3.8') + + def test_handle_clients_xml_event(self): + pass + + +class TestMetadata_ClientsXML(TestMetadataBase): + """ test Metadata with a clients.xml. """ + # only run these tests if it's possible to skip tests or if we + # have django. otherwise they'll all get run because our fake + # skipping decorators for python < 2.7 won't work when they + # decorate setUp() + if can_skip or has_django: + __test__ = True + + def load_clients_data(self, metadata=None, xdata=None): + if metadata is None: + metadata = self.get_obj() + metadata.core.fam = Mock() + metadata._handle_file("clients.xml") + metadata = TestMetadata.load_clients_data(self, metadata=metadata, + xdata=xdata) + return TestMetadataBase.load_clients_data(self, metadata=metadata, + xdata=xdata) + diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py new file mode 100644 index 000000000..0a971c245 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -0,0 +1,549 @@ +import os +import sys +import time +import lxml.etree +import Bcfg2.Server +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore +from Bcfg2.Server.Plugins.Probes import * +from TestPlugin import TestEntrySet, TestProbing, TestConnector, \ + TestDatabaseBacked + +# test data for JSON and YAML tests +test_data = dict(a=1, b=[1, 2, 3], c="test") + +class FakeList(list): + pass + + +class TestProbesDB(DBModelTestCase): + if has_django: + models = [ProbesGroupsModel, ProbesDataModel] + + +class TestClientProbeDataSet(Bcfg2TestCase): + def test__init(self): + ds = ClientProbeDataSet() + self.assertLessEqual(ds.timestamp, time.time()) + self.assertIsInstance(ds, dict) + self.assertNotIn("timestamp", ds) + + ds = ClientProbeDataSet(timestamp=123) + self.assertEqual(ds.timestamp, 123) + self.assertNotIn("timestamp", ds) + +class TestProbeData(Bcfg2TestCase): + def test_str(self): + # a value that is not valid XML, JSON, or YAML + val = "'test" + + # test string behavior + data = ProbeData(val) + self.assertIsInstance(data, str) + self.assertEqual(data, val) + # test 1.2.0-1.2.2 broken behavior + self.assertEqual(data.data, val) + # test that formatted data accessors return None + self.assertIsNone(data.xdata) + self.assertIsNone(data.yaml) + self.assertIsNone(data.json) + + def test_xdata(self): + xdata = lxml.etree.Element("test") + lxml.etree.SubElement(xdata, "test2") + data = ProbeData(lxml.etree.tostring(xdata, + xml_declaration=False).decode('UTF-8')) + self.assertIsNotNone(data.xdata) + self.assertIsNotNone(data.xdata.find("test2")) + + @skipUnless(has_json, "JSON libraries not found, skipping JSON tests") + def test_json(self): + jdata = json.dumps(test_data) + data = ProbeData(jdata) + self.assertIsNotNone(data.json) + self.assertItemsEqual(test_data, data.json) + + @skipUnless(has_yaml, "YAML libraries not found, skipping YAML tests") + def test_yaml(self): + jdata = yaml.dump(test_data) + data = ProbeData(jdata) + self.assertIsNotNone(data.yaml) + self.assertItemsEqual(test_data, data.yaml) + + +class TestProbeSet(TestEntrySet): + test_obj = ProbeSet + basenames = ["test", "_test", "test-test"] + ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"] + bogus_names = ["test.py"] + + def get_obj(self, path=datastore, fam=None, encoding=None, + plugin_name="Probes", basename=None): + # get_obj() accepts the basename argument, accepted by the + # parent get_obj() method, and just throws it away, since + # ProbeSet uses a regex for the "basename" + if fam is None: + fam = Mock() + rv = self.test_obj(path, fam, encoding, plugin_name) + rv.entry_type = MagicMock() + return rv + + def test__init(self): + fam = Mock() + ps = self.get_obj(fam=fam) + self.assertEqual(ps.plugin_name, "Probes") + fam.AddMonitor.assert_called_with(datastore, ps) + TestEntrySet.test__init(self) + + def test_HandleEvent(self): + ps = self.get_obj() + ps.handle_event = Mock() + + # test that events on the data store itself are skipped + evt = Mock() + evt.filename = datastore + ps.HandleEvent(evt) + self.assertFalse(ps.handle_event.called) + + # test that events on probed.xml are skipped + evt.reset_mock() + evt.filename = "probed.xml" + ps.HandleEvent(evt) + self.assertFalse(ps.handle_event.called) + + # test that other events are processed appropriately + evt.reset_mock() + evt.filename = "fooprobe" + ps.HandleEvent(evt) + ps.handle_event.assert_called_with(evt) + + @patch("%s.list" % builtins, FakeList) + def test_get_probe_data(self): + ps = self.get_obj() + + # build some fairly complex test data for this. in the end, + # we want the probe data to include only the most specific + # version of a given probe, and by basename only, not full + # (specific) name. We don't fully test the specificity stuff, + # we just check to make sure sort() is called and trust that + # sort() does the right thing on Specificity objects. (I.e., + # trust that Specificity is well-tested. Hah!) We also test + # to make sure the interpreter is determined correctly. + ps.get_matching = Mock() + matching = FakeList() + matching.sort = Mock() + + p1 = Mock() + p1.specific = Bcfg2.Server.Plugin.Specificity(group=True, prio=10) + p1.name = "fooprobe.G10_foogroup" + p1.data = """#!/bin/bash +group-specific""" + matching.append(p1) + + p2 = Mock() + p2.specific = Bcfg2.Server.Plugin.Specificity(all=True) + p2.name = "fooprobe" + p2.data = "#!/bin/bash" + matching.append(p2) + + p3 = Mock() + p3.specific = Bcfg2.Server.Plugin.Specificity(all=True) + p3.name = "barprobe" + p3.data = "#! /usr/bin/env python" + matching.append(p3) + + p4 = Mock() + p4.specific = Bcfg2.Server.Plugin.Specificity(all=True) + p4.name = "bazprobe" + p4.data = "" + matching.append(p4) + + ps.get_matching.return_value = matching + + metadata = Mock() + pdata = ps.get_probe_data(metadata) + ps.get_matching.assert_called_with(metadata) + # we can't create a matching operator.attrgetter object, and I + # don't feel the need to mock that out -- this is a good + # enough check + self.assertTrue(matching.sort.called) + + self.assertEqual(len(pdata), 3, + "Found: %s" % [p.get("name") for p in pdata]) + for probe in pdata: + if probe.get("name") == "fooprobe": + self.assertIn("group-specific", probe.text) + self.assertEqual(probe.get("interpreter"), "/bin/bash") + elif probe.get("name") == "barprobe": + self.assertEqual(probe.get("interpreter"), + "/usr/bin/env python") + elif probe.get("name") == "bazprobe": + self.assertIsNotNone(probe.get("interpreter")) + else: + assert False, "Strange probe found in get_probe_data() return" + + +class TestProbes(TestProbing, TestConnector, TestDatabaseBacked): + test_obj = Probes + + def get_test_probedata(self): + test_xdata = lxml.etree.Element("test") + lxml.etree.SubElement(test_xdata, "test", foo="foo") + rv = dict() + rv["foo.example.com"] = ClientProbeDataSet(timestamp=time.time()) + rv["foo.example.com"]["xml"] = \ + ProbeData(lxml.etree.tostring(test_xdata, + xml_declaration=False).decode('UTF-8')) + rv["foo.example.com"]["text"] = ProbeData("freeform text") + rv["foo.example.com"]["multiline"] = ProbeData("""multiple +lines +of +freeform +text +""") + rv["bar.example.com"] = ClientProbeDataSet(timestamp=time.time()) + rv["bar.example.com"]["empty"] = ProbeData("") + if has_yaml: + rv["bar.example.com"]["yaml"] = ProbeData(yaml.dump(test_data)) + if has_json: + rv["bar.example.com"]["json"] = ProbeData(json.dumps(test_data)) + return rv + + def get_test_cgroups(self): + return {"foo.example.com": ["group", "group with spaces", + "group-with-dashes"], + "bar.example.com": []} + + def get_probes_object(self, use_db=False, load_data=None): + core = Mock() + core.setup.cfp.getboolean = Mock() + core.setup.cfp.getboolean.return_value = use_db + if load_data is None: + load_data = MagicMock() + # we have to patch load_data() in a funny way because + # different versions of Mock have different scopes for + # patching. in some versions, a patch applied to + # get_probes_object() would only apply to that function, while + # in others it would also apply to the calling function (e.g., + # test__init(), which relies on being able to check the calls + # of load_data(), and thus on load_data() being consistently + # mocked) + @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", new=load_data) + def inner(): + return Probes(core, datastore) + + return inner() + + def test__init(self): + mock_load_data = Mock() + probes = self.get_probes_object(load_data=mock_load_data) + probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore, + probes.name), + probes.probes) + mock_load_data.assert_any_call() + self.assertEqual(probes.probedata, ClientProbeDataSet()) + self.assertEqual(probes.cgroups, dict()) + + @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", Mock()) + def test__use_db(self): + probes = self.get_probes_object() + self.assertFalse(probes._use_db) + probes.core.setup.cfp.getboolean.assert_called_with("probes", + "use_database", + default=False) + + @skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock()) + def test_write_data_xml(self): + probes = self.get_probes_object(use_db=False) + probes.write_data("test") + probes._write_data_xml.assert_called_with("test") + self.assertFalse(probes._write_data_db.called) + + @skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock()) + def test_write_data_db(self): + probes = self.get_probes_object(use_db=True) + probes.write_data("test") + probes._write_data_db.assert_called_with("test") + self.assertFalse(probes._write_data_xml.called) + + @patch("%s.open" % builtins) + def test__write_data_xml(self, mock_open): + probes = self.get_probes_object(use_db=False) + probes.probedata = self.get_test_probedata() + probes.cgroups = self.get_test_cgroups() + probes._write_data_xml(None) + + mock_open.assert_called_with(os.path.join(datastore, probes.name, + "probed.xml"), "w") + data = lxml.etree.XML(mock_open.return_value.write.call_args[0][0]) + self.assertEqual(len(data.xpath("//Client")), 2) + + foodata = data.find("Client[@name='foo.example.com']") + self.assertIsNotNone(foodata) + self.assertIsNotNone(foodata.get("timestamp")) + self.assertEqual(len(foodata.findall("Probe")), + len(probes.probedata['foo.example.com'])) + self.assertEqual(len(foodata.findall("Group")), + len(probes.cgroups['foo.example.com'])) + xml = foodata.find("Probe[@name='xml']") + self.assertIsNotNone(xml) + self.assertIsNotNone(xml.get("value")) + xdata = lxml.etree.XML(xml.get("value")) + self.assertIsNotNone(xdata) + self.assertIsNotNone(xdata.find("test")) + self.assertEqual(xdata.find("test").get("foo"), "foo") + text = foodata.find("Probe[@name='text']") + self.assertIsNotNone(text) + self.assertIsNotNone(text.get("value")) + multiline = foodata.find("Probe[@name='multiline']") + self.assertIsNotNone(multiline) + self.assertIsNotNone(multiline.get("value")) + self.assertGreater(len(multiline.get("value").splitlines()), 1) + + bardata = data.find("Client[@name='bar.example.com']") + self.assertIsNotNone(bardata) + self.assertIsNotNone(bardata.get("timestamp")) + self.assertEqual(len(bardata.findall("Probe")), + len(probes.probedata['bar.example.com'])) + self.assertEqual(len(bardata.findall("Group")), + len(probes.cgroups['bar.example.com'])) + empty = bardata.find("Probe[@name='empty']") + self.assertIsNotNone(empty) + self.assertIsNotNone(empty.get("value")) + self.assertEqual(empty.get("value"), "") + if has_yaml: + ydata = bardata.find("Probe[@name='yaml']") + self.assertIsNotNone(ydata) + self.assertIsNotNone(ydata.get("value")) + self.assertItemsEqual(test_data, yaml.load(ydata.get("value"))) + if has_json: + jdata = bardata.find("Probe[@name='json']") + self.assertIsNotNone(jdata) + self.assertIsNotNone(jdata.get("value")) + self.assertItemsEqual(test_data, json.loads(jdata.get("value"))) + + @skipUnless(has_django, "Django not found, skipping") + def test__write_data_db(self): + syncdb(TestProbesDB) + probes = self.get_probes_object(use_db=True) + probes.probedata = self.get_test_probedata() + probes.cgroups = self.get_test_cgroups() + + for cname in ["foo.example.com", "bar.example.com"]: + client = Mock() + client.hostname = cname + probes._write_data_db(client) + + pdata = ProbesDataModel.objects.filter(hostname=cname).all() + self.assertEqual(len(pdata), len(probes.probedata[cname])) + + for probe in pdata: + self.assertEqual(probe.hostname, client.hostname) + self.assertIsNotNone(probe.data) + if probe.probe == "xml": + xdata = lxml.etree.XML(probe.data) + self.assertIsNotNone(xdata) + self.assertIsNotNone(xdata.find("test")) + self.assertEqual(xdata.find("test").get("foo"), "foo") + elif probe.probe == "text": + pass + elif probe.probe == "multiline": + self.assertGreater(len(probe.data.splitlines()), 1) + elif probe.probe == "empty": + self.assertEqual(probe.data, "") + elif probe.probe == "yaml": + self.assertItemsEqual(test_data, yaml.load(probe.data)) + elif probe.probe == "json": + self.assertItemsEqual(test_data, json.loads(probe.data)) + else: + assert False, "Strange probe found in _write_data_db data" + + pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() + self.assertEqual(len(pgroups), len(probes.cgroups[cname])) + + # test that old probe data is removed properly + cname = 'foo.example.com' + del probes.probedata[cname]['text'] + probes.cgroups[cname].pop() + client = Mock() + client.hostname = cname + probes._write_data_db(client) + + pdata = ProbesDataModel.objects.filter(hostname=cname).all() + self.assertEqual(len(pdata), len(probes.probedata[cname])) + pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() + self.assertEqual(len(pgroups), len(probes.cgroups[cname])) + + @skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock()) + def test_load_data_xml(self): + probes = self.get_probes_object(use_db=False) + probes.load_data() + probes._load_data_xml.assert_any_call() + self.assertFalse(probes._load_data_db.called) + + @skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock()) + def test_load_data_db(self): + probes = self.get_probes_object(use_db=True) + probes.load_data() + probes._load_data_db.assert_any_call() + self.assertFalse(probes._load_data_xml.called) + + @patch("%s.open" % builtins) + @patch("lxml.etree.parse") + def test__load_data_xml(self, mock_parse, mock_open): + probes = self.get_probes_object(use_db=False) + # to get the value for lxml.etree.parse to parse, we call + # _write_data_xml, mock the open() call, and grab the data + # that gets "written" to probed.xml + probes.probedata = self.get_test_probedata() + probes.cgroups = self.get_test_cgroups() + probes._write_data_xml(None) + xdata = \ + lxml.etree.XML(str(mock_open.return_value.write.call_args[0][0])) + mock_parse.return_value = xdata.getroottree() + probes.probedata = dict() + probes.cgroups = dict() + + probes._load_data_xml() + mock_parse.assert_called_with(os.path.join(datastore, probes.name, + 'probed.xml'), + parser=Bcfg2.Server.XMLParser) + self.assertItemsEqual(probes.probedata, self.get_test_probedata()) + self.assertItemsEqual(probes.cgroups, self.get_test_cgroups()) + + @skipUnless(has_django, "Django not found, skipping") + def test__load_data_db(self): + syncdb(TestProbesDB) + probes = self.get_probes_object(use_db=True) + probes.probedata = self.get_test_probedata() + probes.cgroups = self.get_test_cgroups() + for cname in probes.probedata.keys(): + client = Mock() + client.hostname = cname + probes._write_data_db(client) + + probes.probedata = dict() + probes.cgroups = dict() + probes._load_data_db() + self.assertItemsEqual(probes.probedata, self.get_test_probedata()) + # the db backend does not store groups at all if a client has + # no groups set, so we can't just use assertItemsEqual here, + # because loading saved data may _not_ result in the original + # data if some clients had no groups set. + test_cgroups = self.get_test_cgroups() + for cname, groups in test_cgroups.items(): + if cname in probes.cgroups: + self.assertEqual(groups, probes.cgroups[cname]) + else: + self.assertEqual(groups, []) + + @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data") + def test_GetProbes(self, mock_get_probe_data): + TestProbing.test_GetProbes(self) + + probes = self.get_probes_object() + metadata = Mock() + probes.GetProbes(metadata) + mock_get_probe_data.assert_called_with(metadata) + + @patch("Bcfg2.Server.Plugins.Probes.Probes.write_data") + @patch("Bcfg2.Server.Plugins.Probes.Probes.ReceiveDataItem") + def test_ReceiveData(self, mock_ReceiveDataItem, mock_write_data): + TestProbing.test_ReceiveData(self) + + # we use a simple (read: bogus) datalist here to make this + # easy to test + datalist = ["a", "b", "c"] + + probes = self.get_probes_object() + client = Mock() + client.hostname = "foo.example.com" + probes.ReceiveData(client, datalist) + + self.assertItemsEqual(mock_ReceiveDataItem.call_args_list, + [call(client, "a"), call(client, "b"), + call(client, "c")]) + mock_write_data.assert_called_with(client) + + def test_ReceiveDataItem(self): + probes = self.get_probes_object() + for cname, cdata in self.get_test_probedata().items(): + client = Mock() + client.hostname = cname + for pname, pdata in cdata.items(): + dataitem = lxml.etree.Element("Probe", name=pname) + if pname == "text": + # add some groups to the plaintext test to test + # group parsing + data = [pdata] + for group in self.get_test_cgroups()[cname]: + data.append("group:%s" % group) + dataitem.text = "\n".join(data) + else: + dataitem.text = str(pdata) + + probes.ReceiveDataItem(client, dataitem) + + self.assertIn(client.hostname, probes.probedata) + self.assertIn(pname, probes.probedata[cname]) + self.assertEqual(pdata, probes.probedata[cname][pname]) + self.assertIn(client.hostname, probes.cgroups) + self.assertEqual(probes.cgroups[cname], + self.get_test_cgroups()[cname]) + + def test_get_additional_groups(self): + TestConnector.test_get_additional_groups(self) + + probes = self.get_probes_object() + test_cgroups = self.get_test_cgroups() + probes.cgroups = self.get_test_cgroups() + for cname in test_cgroups.keys(): + metadata = Mock() + metadata.hostname = cname + self.assertEqual(test_cgroups[cname], + probes.get_additional_groups(metadata)) + # test a non-existent client + metadata = Mock() + metadata.hostname = "nonexistent" + self.assertEqual(probes.get_additional_groups(metadata), + list()) + + def test_get_additional_data(self): + TestConnector.test_get_additional_data(self) + + probes = self.get_probes_object() + test_probedata = self.get_test_probedata() + probes.probedata = self.get_test_probedata() + for cname in test_probedata.keys(): + metadata = Mock() + metadata.hostname = cname + self.assertEqual(test_probedata[cname], + probes.get_additional_data(metadata)) + # test a non-existent client + metadata = Mock() + metadata.hostname = "nonexistent" + self.assertEqual(probes.get_additional_data(metadata), + ClientProbeDataSet()) + + diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py new file mode 100644 index 000000000..c319ed663 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py @@ -0,0 +1,109 @@ +import os +import sys +import lxml.etree +import Bcfg2.Server.Plugin +from Bcfg2.Compat import b64encode +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.SEModules import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore +from TestPlugin import TestSpecificData, TestGroupSpool + +class TestSEModuleData(TestSpecificData): + test_obj = SEModuleData + path = os.path.join(datastore, "SEModules", "test.pp", "test.pp") + + def test_bind_entry(self): + data = self.get_obj() + data.data = "test" + entry = lxml.etree.Element("test", name=self.path) + data.bind_entry(entry, Mock()) + self.assertEqual(entry.get("name"), self.path) + self.assertEqual(entry.get("encoding"), "base64") + self.assertEqual(entry.text, b64encode(data.data)) + + +class TestSEModules(TestGroupSpool): + test_obj = SEModules + + def test__get_module_name(self): + modules = self.get_obj() + for mname in ["foo", "foo.pp"]: + entry = lxml.etree.Element("SELinux", type="module", name=mname) + self.assertEqual(modules._get_module_name(entry), "/foo.pp") + + @patch("Bcfg2.Server.Plugins.SEModules.SEModules._get_module_name") + def test_HandlesEntry(self, mock_get_name): + modules = self.get_obj() + modules.Entries['SELinux']['/foo.pp'] = Mock() + modules.Entries['SELinux']['/bar.pp'] = Mock() + for el in [lxml.etree.Element("Path", name="/foo.pp"), + lxml.etree.Element("SELinux", type="fcontext", + name="/foo.pp"), + lxml.etree.Element("SELinux", type="module", + name="/baz.pp")]: + mock_get_name.return_value = el.get("name") + self.assertFalse(modules.HandlesEntry(el, Mock())) + mock_get_name.assert_called_with(el) + + for el in [lxml.etree.Element("SELinux", type="module", + name="/foo.pp"), + lxml.etree.Element("SELinux", type="module", + name="/bar.pp")]: + mock_get_name.return_value = el.get("name") + self.assertTrue(modules.HandlesEntry(el, Mock()), + msg="SEModules fails to handle %s" % el.get("name")) + mock_get_name.assert_called_with(el) + + TestGroupSpool.test_HandlesEntry(self) + + @patch("Bcfg2.Server.Plugins.SEModules.SEModules._get_module_name") + def test_HandlesEntry(self, mock_get_name): + modules = self.get_obj() + handler = Mock() + modules.Entries['SELinux']['/foo.pp'] = handler + mock_get_name.return_value = "/foo.pp" + + entry = lxml.etree.Element("SELinux", type="module", name="foo") + metadata = Mock() + self.assertEqual(modules.HandleEntry(entry, metadata), + handler.return_value) + mock_get_name.assert_called_with(entry) + self.assertEqual(entry.get("name"), mock_get_name.return_value) + handler.assert_called_with(entry, metadata) + + TestGroupSpool.test_HandlesEntry(self) + + def test_add_entry(self): + @patch("%s.%s.event_path" % + (self.test_obj.__module__, self.test_obj.__name__)) + @patch("%s.%s.add_entry" % (self.test_obj.__base__.__module__, + self.test_obj.__base__.__name__)) + def inner(mock_add_entry, mock_event_path): + modules = self.get_obj() + + evt = Mock() + evt.filename = "test.pp.G10_foo" + + mock_event_path.return_value = os.path.join(datastore, + self.test_obj.__name__, + "test.pp", + "test.pp.G10_foo") + modules.add_entry(evt) + self.assertEqual(modules.filename_pattern, "test.pp") + mock_add_entry.assert_called_with(modules, evt) + mock_event_path.assert_called_with(evt) + + inner() + TestGroupSpool.test_add_entry(self) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py new file mode 100644 index 000000000..556487288 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py @@ -0,0 +1,120 @@ +import os +import sys +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.TemplateHelper import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ + skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ + patchIf, datastore +from TestPlugin import TestDirectoryBacked, TestConnector, TestPlugin, \ + TestFileBacked + + +class TestHelperModule(TestFileBacked): + test_obj = HelperModule + path = os.path.join(datastore, "test.py") + + def test__init(self): + hm = self.get_obj() + self.assertEqual(hm._module_name, "test") + self.assertEqual(hm._attrs, []) + + @patch("imp.load_source") + def test_Index(self, mock_load_source): + hm = self.get_obj() + + mock_load_source.side_effect = ImportError + attrs = dir(hm) + hm.Index() + mock_load_source.assert_called_with(hm._module_name, hm.name) + self.assertEqual(attrs, dir(hm)) + self.assertEqual(hm._attrs, []) + + mock_load_source.reset() + mock_load_source.side_effect = None + # a regular Mock (not a MagicMock) won't automatically create + # __export__, so this triggers a failure condition in Index + mock_load_source.return_value = Mock() + attrs = dir(hm) + hm.Index() + mock_load_source.assert_called_with(hm._module_name, hm.name) + self.assertEqual(attrs, dir(hm)) + self.assertEqual(hm._attrs, []) + + # test reserved attributes + module = Mock() + module.__export__ = ["_attrs", "Index", "__init__"] + mock_load_source.reset() + mock_load_source.return_value = module + attrs = dir(hm) + hm.Index() + mock_load_source.assert_called_with(hm._module_name, hm.name) + self.assertEqual(attrs, dir(hm)) + self.assertEqual(hm._attrs, []) + + # test adding attributes + module = Mock() + module.__export__ = ["foo", "bar", "baz", "Index"] + mock_load_source.reset() + mock_load_source.return_value = module + hm.Index() + mock_load_source.assert_called_with(hm._module_name, hm.name) + self.assertTrue(hasattr(hm, "foo")) + self.assertTrue(hasattr(hm, "bar")) + self.assertTrue(hasattr(hm, "baz")) + self.assertEqual(hm._attrs, ["foo", "bar", "baz"]) + + # test removing attributes + module = Mock() + module.__export__ = ["foo", "bar", "quux", "Index"] + mock_load_source.reset() + mock_load_source.return_value = module + hm.Index() + mock_load_source.assert_called_with(hm._module_name, hm.name) + self.assertTrue(hasattr(hm, "foo")) + self.assertTrue(hasattr(hm, "bar")) + self.assertTrue(hasattr(hm, "quux")) + self.assertFalse(hasattr(hm, "baz")) + self.assertEqual(hm._attrs, ["foo", "bar", "quux"]) + + + +class TestHelperSet(TestDirectoryBacked): + test_obj = HelperSet + testfiles = ['foo.py', 'foo_bar.py', 'foo.bar.py'] + ignore = ['fooo.py~', 'fooo.pyc', 'fooo.pyo'] + badevents = ['foo'] + + +class TestTemplateHelper(TestPlugin, TestConnector): + test_obj = TemplateHelper + + def test__init(self): + TestPlugin.test__init(self) + + th = self.get_obj() + self.assertIsInstance(th.helpers, HelperSet) + + def test_get_additional_data(self): + TestConnector.test_get_additional_data(self) + + th = self.get_obj() + modules = ['foo', 'bar'] + rv = dict() + for mname in modules: + module = Mock() + module._module_name = mname + rv[mname] = module + th.helpers.entries['%s.py' % mname] = module + actual = th.get_additional_data(Mock()) + self.assertItemsEqual(actual, rv) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/__init__.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/__init__.py new file mode 100644 index 000000000..e69de29bb -- cgit v1.2.3-1-g7c22