diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2012-09-04 09:52:57 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2012-09-04 09:52:57 -0400 |
commit | d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 (patch) | |
tree | cf59b7bcef389ca76f09c7f804db9d893b918e3b /testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py | |
parent | 6697481f7bed646b4c66c54c9a46d11aa075af4a (diff) | |
download | bcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.tar.gz bcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.tar.bz2 bcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.zip |
reorganized testsuite to allow tests on stuff outside of src
Diffstat (limited to 'testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py')
-rw-r--r-- | testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py | 1470 |
1 files changed, 0 insertions, 1470 deletions
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py deleted file mode 100644 index 2ff0af78e..000000000 --- a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py +++ /dev/null @@ -1,1470 +0,0 @@ -import os -import sys -import copy -import time -import socket -import lxml.etree -import Bcfg2.Server -import Bcfg2.Server.Plugin -from Bcfg2.Server.Plugins.Metadata import * -from mock import Mock, patch - -# add all parent testsuite directories to sys.path to allow (most) -# relative imports in python 2.4 -path = os.path.dirname(__file__) -while path != "/": - if os.path.basename(path).lower().startswith("test"): - sys.path.append(path) - if os.path.basename(path) == "testsuite": - break - path = os.path.dirname(path) -from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \ - skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \ - patchIf, datastore -from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \ - TestStatistics, TestDatabaseBacked - -def get_clients_test_tree(): - return lxml.etree.XML(''' -<Clients> - <Client name="client1" address="1.2.3.1" auth="cert" - location="floating" password="password2" profile="group1"/> - <Client name="client2" address="1.2.3.2" secure="true" profile="group2"/> - <Client name="client3" address="1.2.3.3" uuid="uuid1" profile="group1" - password="password2"> - <Alias name="alias1"/> - </Client> - <Client name="client4" profile="group1"> - <Alias name="alias2" address="1.2.3.2"/> - <Alias name="alias3"/> - </Client> - <Client name="client5" profile="group1"/> - <Client name="client6" profile="group1" auth="bootstrap"/> - <Client name="client7" profile="group1" auth="cert" address="1.2.3.4"/> - <Client name="client8" profile="group1" auth="cert+password" - address="1.2.3.5"/> - <Client name="client9" profile="group2" secure="true" password="password3"/> - <Client name="client10" profile="group1" floating="true"/> -</Clients>''').getroottree() - -def get_groups_test_tree(): - return lxml.etree.XML(''' -<Groups xmlns:xi="http://www.w3.org/2001/XInclude"> - <Client name="client8"> - <Group name="group8"/> - </Client> - <Client name="client9"> - <Group name="group8"/> - </Client> - - <Group name="group1" default="true" profile="true" public="true" - category="category1"/> - <Group name="group2" profile="true" public="true" category="category1"> - <Bundle name="bundle1"/> - <Bundle name="bundle2"/> - <Group name="group1"/> - <Group name="group4"/> - </Group> - <Group name="group3" category="category2" public="false"/> - <Group name="group4" category="category1"> - <Group name="group1"/> - <Group name="group6"/> - </Group> - <Group name="group5"/> - <Group name="group7"> - <Bundle name="bundle3"/> - </Group> - <Group name="group8"> - <Group name="group9"/> - <Client name="client9"> - <Group name="group11"/> - <Group name="group9" negate="true"/> - </Client> - <Group name="group1"> - <Group name="group10"/> - </Group> - </Group> -</Groups>''').getroottree() - - -def get_metadata_object(core=None, watch_clients=False, use_db=False): - if core is None: - core = Mock() - core.setup.cfp.getboolean = Mock(return_value=use_db) - return Metadata(core, datastore, watch_clients=watch_clients) - - -class TestMetadataDB(DBModelTestCase): - if has_django: - models = [MetadataClientModel] - - -if has_django or can_skip: - class TestClientVersions(Bcfg2TestCase): - test_clients = dict(client1="1.2.0", - client2="1.2.2", - client3="1.3.0pre1", - client4="1.1.0", - client5=None, - client6=None) - - @skipUnless(has_django, "Django not found") - def setUp(self): - syncdb(TestMetadataDB) - for client, version in self.test_clients.items(): - MetadataClientModel(hostname=client, version=version).save() - - def test__contains(self): - v = ClientVersions() - self.assertIn("client1", v) - self.assertIn("client5", v) - self.assertNotIn("client__contains", v) - - def test_keys(self): - v = ClientVersions() - self.assertItemsEqual(self.test_clients.keys(), v.keys()) - - def test__setitem(self): - v = ClientVersions() - - # test setting version of existing client - v["client1"] = "1.2.3" - self.assertIn("client1", v) - self.assertEqual(v['client1'], "1.2.3") - client = MetadataClientModel.objects.get(hostname="client1") - self.assertEqual(client.version, "1.2.3") - - # test adding new client - new = "client__setitem" - v[new] = "1.3.0" - self.assertIn(new, v) - self.assertEqual(v[new], "1.3.0") - client = MetadataClientModel.objects.get(hostname=new) - self.assertEqual(client.version, "1.3.0") - - # test adding new client with no version - new2 = "client__setitem_2" - v[new2] = None - self.assertIn(new2, v) - self.assertEqual(v[new2], None) - client = MetadataClientModel.objects.get(hostname=new2) - self.assertEqual(client.version, None) - - def test__getitem(self): - v = ClientVersions() - - # test getting existing client - self.assertEqual(v['client2'], "1.2.2") - self.assertIsNone(v['client5']) - - # test exception on nonexistent client - expected = KeyError - try: - v['clients__getitem'] - except expected: - pass - except: - err = sys.exc_info()[1] - self.assertFalse(True, "%s raised instead of %s" % - (err.__class__.__name__, - expected.__class__.__name__)) - else: - self.assertFalse(True, - "%s not raised" % expected.__class__.__name__) - - def test__len(self): - v = ClientVersions() - self.assertEqual(len(v), MetadataClientModel.objects.count()) - - def test__iter(self): - v = ClientVersions() - self.assertItemsEqual([h for h in iter(v)], v.keys()) - - def test__delitem(self): - v = ClientVersions() - - # test adding new client - new = "client__delitem" - v[new] = "1.3.0" - - del v[new] - self.assertIn(new, v) - self.assertIsNone(v[new]) - - -class TestXMLMetadataConfig(TestXMLFileBacked): - test_obj = XMLMetadataConfig - - def get_obj(self, basefile="clients.xml", core=None, watch_clients=False): - self.metadata = get_metadata_object(core=core, - watch_clients=watch_clients) - return XMLMetadataConfig(self.metadata, watch_clients, basefile) - - def test__init(self): - xmc = self.get_obj() - self.assertEqual(self.metadata.core.fam, xmc.fam) - self.assertFalse(xmc.fam.AddMonitor.called) - - def test_xdata(self): - config = self.get_obj() - expected = Bcfg2.Server.Plugin.MetadataRuntimeError - try: - config.xdata - except expected: - pass - except: - err = sys.exc_info()[1] - self.assertFalse(True, "%s raised instead of %s" % - (err.__class__.__name__, - expected.__class__.__name__)) - else: - self.assertFalse(True, - "%s not raised" % expected.__class__.__name__) - pass - - config.data = "<test/>" - self.assertEqual(config.xdata, "<test/>") - - def test_base_xdata(self): - config = self.get_obj() - # we can't use assertRaises here because base_xdata is a property - expected = Bcfg2.Server.Plugin.MetadataRuntimeError - try: - config.base_xdata - except expected: - pass - except: - err = sys.exc_info()[1] - self.assertFalse(True, "%s raised instead of %s" % - (err.__class__.__name__, - expected.__class__.__name__)) - else: - self.assertFalse(True, - "%s not raised" % expected.__class__.__name__) - pass - - config.basedata = "<test/>" - self.assertEqual(config.base_xdata, "<test/>") - - def test_add_monitor(self): - core = Mock() - config = self.get_obj(core=core) - - fname = "test.xml" - fpath = os.path.join(self.metadata.data, fname) - - config.extras = [] - config.add_monitor(fpath) - self.assertFalse(core.fam.AddMonitor.called) - self.assertEqual(config.extras, [fpath]) - - config = self.get_obj(core=core, watch_clients=True) - config.add_monitor(fpath) - core.fam.AddMonitor.assert_called_with(fpath, config.metadata) - self.assertItemsEqual(config.extras, [fpath]) - - def test_Index(self): - # Index() isn't used on XMLMetadataConfig objects - pass - - @patch("lxml.etree.parse") - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig._follow_xincludes") - def test_load_xml(self, mock_follow, mock_parse): - config = self.get_obj("clients.xml") - - def reset(): - mock_parse.reset_mock() - mock_follow.reset_mock() - config.data = None - config.basedata = None - - reset() - config.load_xml() - mock_follow.assert_called_with(xdata=mock_parse.return_value) - mock_parse.assert_called_with(os.path.join(config.basedir, - "clients.xml"), - parser=Bcfg2.Server.XMLParser) - self.assertFalse(mock_parse.return_value.xinclude.called) - self.assertEqual(config.data, mock_parse.return_value) - self.assertIsNotNone(config.basedata) - - reset() - mock_parse.side_effect = lxml.etree.XMLSyntaxError(None, None, None, - None) - config.load_xml() - mock_parse.assert_called_with(os.path.join(config.basedir, - "clients.xml"), - parser=Bcfg2.Server.XMLParser) - self.assertIsNone(config.data) - self.assertIsNone(config.basedata) - - reset() - mock_parse.side_effect = None - def follow_xincludes(xdata=None): - config.extras = [Mock(), Mock()] - mock_follow.side_effect = follow_xincludes - config.load_xml() - mock_follow.assert_called_with(xdata=mock_parse.return_value) - mock_parse.assert_called_with(os.path.join(config.basedir, - "clients.xml"), - parser=Bcfg2.Server.XMLParser) - mock_parse.return_value.xinclude.assert_any_call() - self.assertEqual(config.data, mock_parse.return_value) - self.assertIsNotNone(config.basedata) - - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml") - def test_write(self, mock_write_xml): - config = self.get_obj("clients.xml") - config.basedata = "<test/>" - config.write() - mock_write_xml.assert_called_with(os.path.join(self.metadata.data, - "clients.xml"), - "<test/>") - - @patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False)) - @patch('fcntl.lockf', Mock()) - @patch('%s.open' % builtins) - @patch('os.unlink') - @patch('os.rename') - @patch('os.path.islink') - @patch('os.readlink') - def test_write_xml(self, mock_readlink, mock_islink, mock_rename, - mock_unlink, mock_open): - fname = "clients.xml" - config = self.get_obj(fname) - fpath = os.path.join(self.metadata.data, fname) - tmpfile = "%s.new" % fpath - linkdest = os.path.join(self.metadata.data, "client-link.xml") - - mock_islink.return_value = False - - config.write_xml(fpath, get_clients_test_tree()) - mock_open.assert_called_with(tmpfile, "w") - self.assertTrue(mock_open.return_value.write.called) - mock_islink.assert_called_with(fpath) - mock_rename.assert_called_with(tmpfile, fpath) - - mock_islink.return_value = True - mock_readlink.return_value = linkdest - config.write_xml(fpath, get_clients_test_tree()) - mock_rename.assert_called_with(tmpfile, linkdest) - - mock_rename.side_effect = OSError - self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, - config.write_xml, fpath, get_clients_test_tree()) - - mock_open.return_value.write.side_effect = IOError - self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, - config.write_xml, fpath, get_clients_test_tree()) - mock_unlink.assert_called_with(tmpfile) - - mock_open.side_effect = IOError - self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, - config.write_xml, fpath, get_clients_test_tree()) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch('lxml.etree.parse') - def test_find_xml_for_xpath(self, mock_parse): - config = self.get_obj("groups.xml") - config.basedata = get_groups_test_tree() - xpath = "//Group[@name='group1']" - self.assertItemsEqual(config.find_xml_for_xpath(xpath), - dict(filename=os.path.join(self.metadata.data, - "groups.xml"), - xmltree=get_groups_test_tree(), - xquery=get_groups_test_tree().xpath(xpath))) - - self.assertEqual(config.find_xml_for_xpath("//boguselement"), dict()) - - config.extras = [os.path.join(self.metadata.data, p) - for p in ["foo.xml", "bar.xml", "clients.xml"]] - - def parse_side_effect(fname, parser=Bcfg2.Server.XMLParser): - if fname == os.path.join(self.metadata.data, "clients.xml"): - return get_clients_test_tree() - else: - return lxml.etree.XML("<null/>").getroottree() - - mock_parse.side_effect = parse_side_effect - xpath = "//Client[@secure='true']" - self.assertItemsEqual(config.find_xml_for_xpath(xpath), - dict(filename=os.path.join(self.metadata.data, - "clients.xml"), - xmltree=get_clients_test_tree(), - xquery=get_clients_test_tree().xpath(xpath))) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") - def test_HandleEvent(self, mock_load_xml): - config = self.get_obj("groups.xml") - evt = Mock() - evt.filename = os.path.join(self.metadata.data, "groups.xml") - evt.code2str = Mock(return_value="changed") - self.assertTrue(config.HandleEvent(evt)) - mock_load_xml.assert_called_with() - - -class TestClientMetadata(Bcfg2TestCase): - def test_inGroup(self): - cm = ClientMetadata("client1", "group1", ["group1", "group2"], - ["bundle1"], [], [], [], None, None, None, None) - self.assertTrue(cm.inGroup("group1")) - self.assertFalse(cm.inGroup("group3")) - - -class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked): - test_obj = Metadata - use_db = False - - def get_obj(self, core=None, watch_clients=False): - return get_metadata_object(core=core, watch_clients=watch_clients, - use_db=self.use_db) - - @skipUnless(has_django, "Django not found") - def test__use_db(self): - # with the way we've set up our metadata tests, it's unweildy - # to test _use_db. however, given the way get_obj works, if - # there was a bug in _use_db it'd be almost certain to shake - # out in the rest of the testing. - pass - - def get_nonexistent_client(self, metadata, prefix="newclient"): - if metadata is None: - metadata = self.load_clients_data() - i = 0 - client_name = "%s%s" % (prefix, i) - while client_name in metadata.clients: - i += 1 - client_name = "%s%s" % (prefix, i) - return client_name - - def test__init(self): - # test with watch_clients=False - core = Mock() - metadata = self.get_obj(core=core) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics) - self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig) - self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig) - self.assertIsInstance(metadata.query, MetadataQuery) - self.assertEqual(metadata.states, dict()) - - # test with watch_clients=True - core.fam = Mock() - metadata = self.get_obj(core=core, watch_clients=True) - self.assertEqual(len(metadata.states), 2) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - core.fam.reset_mock() - core.fam.AddMonitor = Mock(side_effect=IOError) - self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, - self.get_obj, core=core, watch_clients=True) - - @patch('os.makedirs', Mock()) - @patch('%s.open' % builtins) - def test_init_repo(self, mock_open): - Metadata.init_repo(datastore, - groups_xml="groups", clients_xml="clients") - mock_open.assert_any_call(os.path.join(datastore, "Metadata", - "groups.xml"), "w") - mock_open.assert_any_call(os.path.join(datastore, "Metadata", - "clients.xml"), "w") - - def test_search_xdata(self): - # test finding a node with the proper name - metadata = self.get_obj() - tree = get_groups_test_tree() - res = metadata._search_xdata("Group", "group1", tree) - self.assertIsInstance(res, lxml.etree._Element) - self.assertEqual(res.get("name"), "group1") - - # test finding a node with the wrong name but correct alias - metadata = self.get_obj() - tree = get_clients_test_tree() - res = metadata._search_xdata("Client", "alias3", tree, alias=True) - self.assertIsInstance(res, lxml.etree._Element) - self.assertNotEqual(res.get("name"), "alias3") - - # test failure finding a node - metadata = self.get_obj() - tree = get_clients_test_tree() - res = metadata._search_xdata("Client", - self.get_nonexistent_client(metadata), - tree, alias=True) - self.assertIsNone(res) - - def search_xdata(self, tag, name, tree, alias=False): - metadata = self.get_obj() - res = metadata._search_xdata(tag, name, tree, alias=alias) - self.assertIsInstance(res, lxml.etree._Element) - if not alias: - self.assertEqual(res.get("name"), name) - - def test_search_group(self): - # test finding a group with the proper name - tree = get_groups_test_tree() - self.search_xdata("Group", "group1", tree) - - def test_search_bundle(self): - # test finding a bundle with the proper name - tree = get_groups_test_tree() - self.search_xdata("Bundle", "bundle1", tree) - - def test_search_client(self): - # test finding a client with the proper name - tree = get_clients_test_tree() - self.search_xdata("Client", "client1", tree, alias=True) - self.search_xdata("Client", "alias1", tree, alias=True) - - def test_add_group(self): - metadata = self.get_obj() - metadata.groups_xml.write = Mock() - metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - - metadata.add_group("test1", dict()) - metadata.groups_xml.write.assert_any_call() - grp = metadata.search_group("test1", metadata.groups_xml.base_xdata) - self.assertIsNotNone(grp) - self.assertEqual(grp.attrib, dict(name='test1')) - - # have to call this explicitly -- usually load_xml does this - # on FAM events - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - - metadata.add_group("test2", dict(foo='bar')) - metadata.groups_xml.write.assert_any_call() - grp = metadata.search_group("test2", metadata.groups_xml.base_xdata) - self.assertIsNotNone(grp) - self.assertEqual(grp.attrib, dict(name='test2', foo='bar')) - - # have to call this explicitly -- usually load_xml does this - # on FAM events - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - - metadata.groups_xml.write.reset_mock() - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.add_group, - "test1", dict()) - self.assertFalse(metadata.groups_xml.write.called) - - def test_update_group(self): - metadata = self.get_obj() - metadata.groups_xml.write_xml = Mock() - metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - - metadata.update_group("group1", dict(foo="bar")) - grp = metadata.search_group("group1", metadata.groups_xml.base_xdata) - self.assertIsNotNone(grp) - self.assertIn("foo", grp.attrib) - self.assertEqual(grp.get("foo"), "bar") - self.assertTrue(metadata.groups_xml.write_xml.called) - - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.update_group, - "bogus_group", dict()) - - def test_remove_group(self): - metadata = self.get_obj() - metadata.groups_xml.write_xml = Mock() - metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - - metadata.remove_group("group5") - grp = metadata.search_group("group5", metadata.groups_xml.base_xdata) - self.assertIsNone(grp) - self.assertTrue(metadata.groups_xml.write_xml.called) - - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.remove_group, - "bogus_group") - - def test_add_bundle(self): - metadata = self.get_obj() - metadata.groups_xml.write = Mock() - metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - - metadata.add_bundle("bundle1") - metadata.groups_xml.write.assert_any_call() - bundle = metadata.search_bundle("bundle1", - metadata.groups_xml.base_xdata) - self.assertIsNotNone(bundle) - self.assertEqual(bundle.attrib, dict(name='bundle1')) - - # have to call this explicitly -- usually load_xml does this - # on FAM events - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - - metadata.groups_xml.write.reset_mock() - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.add_bundle, - "bundle1") - self.assertFalse(metadata.groups_xml.write.called) - - def test_remove_bundle(self): - metadata = self.get_obj() - metadata.groups_xml.write_xml = Mock() - metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree()) - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - - metadata.remove_bundle("bundle1") - grp = metadata.search_bundle("bundle1", metadata.groups_xml.base_xdata) - self.assertIsNone(grp) - self.assertTrue(metadata.groups_xml.write_xml.called) - - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.remove_bundle, - "bogus_bundle") - - def test_add_client(self): - metadata = self.get_obj() - metadata.clients_xml.write = Mock() - metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree() - metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) - - new1 = self.get_nonexistent_client(metadata) - metadata.add_client(new1, dict()) - metadata.clients_xml.write.assert_any_call() - grp = metadata.search_client(new1, metadata.clients_xml.base_xdata) - self.assertIsNotNone(grp) - self.assertEqual(grp.attrib, dict(name=new1)) - - # have to call this explicitly -- usually load_xml does this - # on FAM events - metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) - metadata._handle_clients_xml_event(Mock()) - - new2 = self.get_nonexistent_client(metadata) - metadata.add_client(new2, dict(foo='bar')) - metadata.clients_xml.write.assert_any_call() - grp = metadata.search_client(new2, metadata.clients_xml.base_xdata) - self.assertIsNotNone(grp) - self.assertEqual(grp.attrib, dict(name=new2, foo='bar')) - - # have to call this explicitly -- usually load_xml does this - # on FAM events - metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) - - metadata.clients_xml.write.reset_mock() - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.add_client, - new1, dict()) - self.assertFalse(metadata.clients_xml.write.called) - - def test_update_client(self): - metadata = self.get_obj() - metadata.clients_xml.write_xml = Mock() - metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree()) - metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) - - metadata.update_client("client1", dict(foo="bar")) - grp = metadata.search_client("client1", metadata.clients_xml.base_xdata) - self.assertIsNotNone(grp) - self.assertIn("foo", grp.attrib) - self.assertEqual(grp.get("foo"), "bar") - self.assertTrue(metadata.clients_xml.write_xml.called) - - new = self.get_nonexistent_client(metadata) - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.update_client, - new, dict()) - - def load_clients_data(self, metadata=None, xdata=None): - if metadata is None: - metadata = self.get_obj() - metadata.clients_xml.data = \ - xdata or copy.deepcopy(get_clients_test_tree()) - metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) - evt = Mock() - evt.filename = os.path.join(datastore, "Metadata", "clients.xml") - evt.code2str = Mock(return_value="changed") - metadata.HandleEvent(evt) - return metadata - - def test_handle_clients_xml_event(self): - metadata = self.get_obj() - metadata.profiles = ["group1", "group2"] - - metadata.clients_xml = Mock() - metadata.clients_xml.xdata = copy.deepcopy(get_clients_test_tree()) - metadata._handle_clients_xml_event(Mock()) - - if not self.use_db: - self.assertItemsEqual(metadata.clients, - dict([(c.get("name"), c.get("profile")) - for c in get_clients_test_tree().findall("//Client")])) - aliases = dict([(a.get("name"), a.getparent().get("name")) - for a in get_clients_test_tree().findall("//Alias")]) - self.assertItemsEqual(metadata.aliases, aliases) - - raliases = dict([(c.get("name"), set()) - for c in get_clients_test_tree().findall("//Client")]) - for alias in get_clients_test_tree().findall("//Alias"): - raliases[alias.getparent().get("name")].add(alias.get("name")) - self.assertItemsEqual(metadata.raliases, raliases) - - self.assertEqual(metadata.secure, - [c.get("name") - for c in get_clients_test_tree().findall("//Client[@secure='true']")]) - self.assertEqual(metadata.floating, ["client1", "client10"]) - - addresses = dict([(c.get("address"), []) - for c in get_clients_test_tree().findall("//*[@address]")]) - raddresses = dict() - for client in get_clients_test_tree().findall("//Client[@address]"): - addresses[client.get("address")].append(client.get("name")) - try: - raddresses[client.get("name")].append(client.get("address")) - except KeyError: - raddresses[client.get("name")] = [client.get("address")] - for alias in get_clients_test_tree().findall("//Alias[@address]"): - addresses[alias.get("address")].append(alias.getparent().get("name")) - try: - raddresses[alias.getparent().get("name")].append(alias.get("address")) - except KeyError: - raddresses[alias.getparent().get("name")] = alias.get("address") - - self.assertItemsEqual(metadata.addresses, addresses) - self.assertItemsEqual(metadata.raddresses, raddresses) - self.assertTrue(metadata.states['clients.xml']) - - def load_groups_data(self, metadata=None, xdata=None): - if metadata is None: - metadata = self.get_obj() - metadata.groups_xml.data = \ - xdata or copy.deepcopy(get_groups_test_tree()) - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - evt = Mock() - evt.filename = os.path.join(datastore, "Metadata", "groups.xml") - evt.code2str = Mock(return_value="changed") - metadata.HandleEvent(evt) - return metadata - - def test_handle_groups_xml_event(self): - metadata = self.get_obj() - metadata.groups_xml = Mock() - metadata.groups_xml.xdata = get_groups_test_tree() - metadata._handle_groups_xml_event(Mock()) - - self.assertTrue(metadata.states['groups.xml']) - self.assertTrue(metadata.groups['group1'].is_public) - self.assertTrue(metadata.groups['group2'].is_public) - self.assertFalse(metadata.groups['group3'].is_public) - self.assertFalse(metadata.groups['group1'].is_private) - self.assertFalse(metadata.groups['group2'].is_private) - self.assertTrue(metadata.groups['group3'].is_private) - self.assertTrue(metadata.groups['group1'].is_profile) - self.assertTrue(metadata.groups['group2'].is_profile) - self.assertFalse(metadata.groups['group3'].is_profile) - self.assertItemsEqual(metadata.groups.keys(), - set(g.get("name") - for g in get_groups_test_tree().findall("//Group"))) - self.assertEqual(metadata.groups['group1'].category, 'category1') - self.assertEqual(metadata.groups['group2'].category, 'category1') - self.assertEqual(metadata.groups['group3'].category, 'category2') - self.assertEqual(metadata.groups['group4'].category, 'category1') - self.assertEqual(metadata.default, "group1") - - all_groups = [] - negated_groups = [] - for group in get_groups_test_tree().xpath("//Groups/Client//*") + \ - get_groups_test_tree().xpath("//Groups/Group//*"): - if group.tag == 'Group' and not group.getchildren(): - if group.get("negate", "false").lower() == 'true': - negated_groups.append(group.get("name")) - else: - all_groups.append(group.get("name")) - self.assertItemsEqual([g.name - for g in metadata.group_membership.values()], - all_groups) - self.assertItemsEqual([g.name - for g in metadata.negated_groups.values()], - negated_groups) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - def test_set_profile(self): - metadata = self.get_obj() - if 'clients.xml' in metadata.states: - metadata.states['clients.xml'] = False - self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, - metadata.set_profile, - None, None, None) - - self.load_groups_data(metadata=metadata) - self.load_clients_data(metadata=metadata) - - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.set_profile, - "client1", "group5", None) - - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.set_profile, - "client1", "group3", None) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - def test_set_profile_db(self): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - if metadata._use_db: - profile = "group1" - client_name = self.get_nonexistent_client(metadata) - metadata.set_profile(client_name, profile, None) - self.assertIn(client_name, metadata.clients) - self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError, - metadata.set_profile, - client_name, profile, None) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client") - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") - def test_set_profile_xml(self, mock_update_client, mock_add_client): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - if not metadata._use_db: - metadata.clients_xml.write = Mock() - metadata.set_profile("client1", "group2", None) - mock_update_client.assert_called_with("client1", - dict(profile="group2")) - metadata.clients_xml.write.assert_any_call() - self.assertEqual(metadata.clientgroups["client1"], ["group2"]) - - metadata.clients_xml.write.reset_mock() - new1 = self.get_nonexistent_client(metadata) - metadata.set_profile(new1, "group1", None) - mock_add_client.assert_called_with(new1, dict(profile="group1")) - metadata.clients_xml.write.assert_any_call() - self.assertEqual(metadata.clientgroups[new1], ["group1"]) - - metadata.clients_xml.write.reset_mock() - new2 = self.get_nonexistent_client(metadata) - metadata.session_cache[('1.2.3.6', None)] = (None, new2) - metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None)) - mock_add_client.assert_called_with(new2, - dict(uuid='uuid_new', - profile="group1", - address='1.2.3.6')) - metadata.clients_xml.write.assert_any_call() - self.assertEqual(metadata.clientgroups["uuid_new"], ["group1"]) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("socket.gethostbyaddr") - def test_resolve_client(self, mock_gethostbyaddr): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3') - self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') - - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.resolve_client, - ('1.2.3.2', None)) - self.assertEqual(metadata.resolve_client(('1.2.3.1', None)), 'client1') - - metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100, - 'client3') - self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') - self.assertEqual(metadata.resolve_client(('1.2.3.3', None), - cleanup_cache=True), 'client3') - self.assertEqual(metadata.session_cache, dict()) - - mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6']) - self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6') - mock_gethostbyaddr.assert_called_with('1.2.3.6') - - mock_gethostbyaddr.reset_mock() - mock_gethostbyaddr.return_value = ('alias3', [], ['1.2.3.7']) - self.assertEqual(metadata.resolve_client(('1.2.3.7', None)), 'client4') - mock_gethostbyaddr.assert_called_with('1.2.3.7') - - mock_gethostbyaddr.reset_mock() - mock_gethostbyaddr.return_value = None - mock_gethostbyaddr.side_effect = socket.herror - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.resolve_client, - ('1.2.3.8', None)) - mock_gethostbyaddr.assert_called_with('1.2.3.8') - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata") - def test_get_initial_metadata(self, mock_clientmetadata): - metadata = self.get_obj() - if 'clients.xml' in metadata.states: - metadata.states['clients.xml'] = False - self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, - metadata.get_initial_metadata, None) - - self.load_groups_data(metadata=metadata) - self.load_clients_data(metadata=metadata) - - # test address, password - metadata.get_initial_metadata("client1") - mock_clientmetadata.assert_called_with("client1", "group1", - set(["group1"]), set(), set(), - set(["1.2.3.1"]), - dict(category1='group1'), None, - 'password2', None, - metadata.query) - - # test address, bundles, category suppression - metadata.get_initial_metadata("client2") - mock_clientmetadata.assert_called_with("client2", "group2", - set(["group2"]), - set(["bundle1", "bundle2"]), - set(), set(["1.2.3.2"]), - dict(category1="group2"), - None, None, None, - metadata.query) - - # test aliases, address, uuid, password - imd = metadata.get_initial_metadata("alias1") - mock_clientmetadata.assert_called_with("client3", "group1", - set(["group1"]), set(), - set(['alias1']), - set(["1.2.3.3"]), - dict(category1="group1"), - 'uuid1', 'password2', None, - metadata.query) - - # test new client creation - new1 = self.get_nonexistent_client(metadata) - imd = metadata.get_initial_metadata(new1) - mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]), - set(), set(), set(), - dict(category1="group1"), None, - None, None, metadata.query) - - # test nested groups, address, per-client groups - imd = metadata.get_initial_metadata("client8") - mock_clientmetadata.assert_called_with("client8", "group1", - set(["group1", "group8", - "group9", "group10"]), - set(), - set(), set(["1.2.3.5"]), - dict(category1="group1"), - None, None, None, metadata.query) - - # test setting per-client groups, group negation, nested groups - imd = metadata.get_initial_metadata("client9") - mock_clientmetadata.assert_called_with("client9", "group2", - set(["group2", "group8", - "group11"]), - set(["bundle1", "bundle2"]), - set(), set(), - dict(category1="group2"), None, - "password3", None, - metadata.query) - - # test new client with no default profile - metadata.default = None - new2 = self.get_nonexistent_client(metadata) - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.get_initial_metadata, new2) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - def test_merge_groups(self): - metadata = self.get_obj() - self.load_groups_data(metadata=metadata) - self.load_clients_data(metadata=metadata) - - self.assertEqual(metadata._merge_groups("client1", set(["group1"]), - categories=dict(group1="category1")), - (set(["group1"]), dict(group1="category1"))) - - self.assertEqual(metadata._merge_groups("client8", - set(["group1", "group8", "group9"]), - categories=dict(group1="category1")), - (set(["group1", "group8", "group9", "group10"]), - dict(group1="category1"))) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - def test_get_all_group_names(self): - metadata = self.load_groups_data() - self.assertItemsEqual(metadata.get_all_group_names(), - set([g.get("name") - for g in get_groups_test_tree().findall("//Group")])) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - def test_get_all_groups_in_category(self): - metadata = self.load_groups_data() - self.assertItemsEqual(metadata.get_all_groups_in_category("category1"), - set([g.get("name") - for g in get_groups_test_tree().findall("//Group[@category='category1']")])) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - def test_get_client_names_by_profiles(self): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - self.assertItemsEqual(metadata.get_client_names_by_profiles(["group2"]), - [c.get("name") - for c in get_clients_test_tree().findall("//Client[@profile='group2']")]) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - def test_get_client_names_by_groups(self): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - # this is not the best test in the world, since we mock - # core.build_metadata to just build _initial_ metadata, which - # is not at all the same thing. it turns out that mocking - # this out without starting a Bcfg2 server is pretty - # non-trivial, so this works-ish - metadata.core.build_metadata = Mock() - metadata.core.build_metadata.side_effect = \ - lambda c: metadata.get_initial_metadata(c) - self.assertItemsEqual(metadata.get_client_names_by_groups(["group2"]), - [c.get("name") - for c in get_clients_test_tree().findall("//Client[@profile='group2']")]) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - def test_merge_additional_groups(self): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - imd = metadata.get_initial_metadata("client2") - - # test adding a group excluded by categories - oldgroups = imd.groups - metadata.merge_additional_groups(imd, ["group4"]) - self.assertEqual(imd.groups, oldgroups) - - # test adding a private group - oldgroups = imd.groups - metadata.merge_additional_groups(imd, ["group3"]) - self.assertEqual(imd.groups, oldgroups) - - # test adding groups with bundles - oldgroups = imd.groups - oldbundles = imd.bundles - metadata.merge_additional_groups(imd, ["group7"]) - self.assertEqual(imd.groups, oldgroups.union(["group7"])) - self.assertEqual(imd.bundles, oldbundles.union(["bundle3"])) - - # test adding multiple groups - imd = metadata.get_initial_metadata("client2") - oldgroups = imd.groups - metadata.merge_additional_groups(imd, ["group6", "group8"]) - self.assertItemsEqual(imd.groups, - oldgroups.union(["group6", "group8", "group9"])) - - # test adding a group that is not defined in groups.xml - imd = metadata.get_initial_metadata("client2") - oldgroups = imd.groups - metadata.merge_additional_groups(imd, ["group6", "newgroup"]) - self.assertItemsEqual(imd.groups, - oldgroups.union(["group6", "newgroup"])) - - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - def test_merge_additional_data(self): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - imd = metadata.get_initial_metadata("client1") - - # we need to use a unique attribute name for this test. this - # is probably overkill, but it works - pattern = "connector%d" - for i in range(0, 100): - connector = pattern % i - if not hasattr(imd, connector): - break - self.assertFalse(hasattr(imd, connector), - "Could not find unique connector name to test " - "merge_additional_data()") - - metadata.merge_additional_data(imd, connector, "test data") - self.assertEqual(getattr(imd, connector), "test data") - self.assertIn(connector, imd.connectors) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") - def test_validate_client_address(self, mock_resolve_client): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - self.assertTrue(metadata.validate_client_address("client1", - (None, None))) - self.assertTrue(metadata.validate_client_address("client2", - ("1.2.3.2", None))) - self.assertFalse(metadata.validate_client_address("client2", - ("1.2.3.8", None))) - self.assertTrue(metadata.validate_client_address("client4", - ("1.2.3.2", None))) - # this is upper case to ensure that case is folded properly in - # validate_client_address() - mock_resolve_client.return_value = "CLIENT4" - self.assertTrue(metadata.validate_client_address("client4", - ("1.2.3.7", None))) - mock_resolve_client.assert_called_with(("1.2.3.7", None)) - - mock_resolve_client.reset_mock() - self.assertFalse(metadata.validate_client_address("client5", - ("1.2.3.5", None))) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address") - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") - def test_AuthenticateConnection(self, mock_resolve_client, - mock_validate_client_address): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - metadata.password = "password1" - - cert = dict(subject=[[("commonName", "client1")]]) - mock_validate_client_address.return_value = False - self.assertFalse(metadata.AuthenticateConnection(cert, "root", None, - "1.2.3.1")) - mock_validate_client_address.return_value = True - self.assertTrue(metadata.AuthenticateConnection(cert, "root", None, - "1.2.3.1")) - # floating cert-auth clients add themselves to the cache - self.assertIn("1.2.3.1", metadata.session_cache) - self.assertEqual(metadata.session_cache["1.2.3.1"][1], "client1") - - cert = dict(subject=[[("commonName", "client7")]]) - self.assertTrue(metadata.AuthenticateConnection(cert, "root", None, - "1.2.3.4")) - # non-floating cert-auth clients do not add themselves to the cache - self.assertNotIn("1.2.3.4", metadata.session_cache) - - cert = dict(subject=[[("commonName", "client8")]]) - - mock_resolve_client.return_value = "client5" - self.assertTrue(metadata.AuthenticateConnection(None, "root", - "password1", "1.2.3.8")) - - mock_resolve_client.side_effect = \ - Bcfg2.Server.Plugin.MetadataConsistencyError - self.assertFalse(metadata.AuthenticateConnection(None, "root", - "password1", - "1.2.3.8")) - - # secure mode, no password - self.assertFalse(metadata.AuthenticateConnection(None, 'client2', None, - "1.2.3.2")) - - self.assertTrue(metadata.AuthenticateConnection(None, 'uuid1', - "password1", "1.2.3.3")) - # non-root, non-cert clients populate session cache - self.assertIn("1.2.3.3", metadata.session_cache) - self.assertEqual(metadata.session_cache["1.2.3.3"][1], "client3") - - # use alternate password - self.assertTrue(metadata.AuthenticateConnection(None, 'client3', - "password2", "1.2.3.3")) - - # test secure mode - self.assertFalse(metadata.AuthenticateConnection(None, 'client9', - "password1", - "1.2.3.9")) - self.assertTrue(metadata.AuthenticateConnection(None, 'client9', - "password3", "1.2.3.9")) - - self.assertFalse(metadata.AuthenticateConnection(None, "client5", - "password2", - "1.2.3.7")) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") - def test_process_statistics(self, mock_update_client): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - md = Mock() - md.hostname = "client6" - metadata.process_statistics(md, None) - mock_update_client.assert_called_with(md.hostname, - dict(auth='cert')) - - mock_update_client.reset_mock() - md.hostname = "client5" - metadata.process_statistics(md, None) - self.assertFalse(mock_update_client.called) - - def test_viz(self): - pass - - -class TestMetadataBase(TestMetadata): - """ base test object for testing Metadata with database enabled """ - __test__ = False - use_db = True - - @skipUnless(has_django, "Django not found") - def setUp(self): - syncdb(TestMetadataDB) - - def load_clients_data(self, metadata=None, xdata=None): - if metadata is None: - metadata = get_obj() - for client in get_clients_test_tree().findall("Client"): - metadata.add_client(client.get("name")) - return metadata - - def get_nonexistent_client(self, _, prefix="newclient"): - clients = [o.hostname for o in MetadataClientModel.objects.all()] - i = 0 - client_name = "%s%s" % (prefix, i) - while client_name in clients: - i += 1 - client_name = "%s%s" % (prefix, i) - return client_name - - @patch('os.path.exists') - def test__init(self, mock_exists): - core = Mock() - core.fam = Mock() - mock_exists.return_value = False - metadata = self.get_obj(core=core, watch_clients=True) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) - core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, - "groups.xml"), - metadata) - - mock_exists.return_value = True - core.fam.reset_mock() - metadata = self.get_obj(core=core, watch_clients=True) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - def test_add_group(self): - pass - - def test_add_bundle(self): - pass - - def test_add_client(self): - metadata = self.get_obj() - hostname = self.get_nonexistent_client(metadata) - client = metadata.add_client(hostname) - self.assertIsInstance(client, MetadataClientModel) - self.assertEqual(client.hostname, hostname) - self.assertIn(hostname, metadata.clients) - self.assertIn(hostname, metadata.list_clients()) - self.assertItemsEqual(metadata.clients, - [c.hostname - for c in MetadataClientModel.objects.all()]) - - def test_update_group(self): - pass - - def test_update_bundle(self): - pass - - def test_update_client(self): - pass - - def test_list_clients(self): - metadata = self.get_obj() - self.assertItemsEqual(metadata.list_clients(), - [c.hostname - for c in MetadataClientModel.objects.all()]) - - def test_remove_group(self): - pass - - def test_remove_bundle(self): - pass - - def test_remove_client(self): - metadata = self.get_obj() - client_name = self.get_nonexistent_client(metadata) - - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.remove_client, - client_name) - - metadata.add_client(client_name) - metadata.remove_client(client_name) - self.assertNotIn(client_name, metadata.clients) - self.assertNotIn(client_name, metadata.list_clients()) - self.assertItemsEqual(metadata.clients, - [c.hostname - for c in MetadataClientModel.objects.all()]) - - def test_process_statistics(self): - pass - - -class TestMetadata_NoClientsXML(TestMetadataBase): - """ test Metadata without a clients.xml. we have to disable or - override tests that rely on client options """ - # only run these tests if it's possible to skip tests or if we - # have django. otherwise they'll all get run because our fake - # skipping decorators for python < 2.7 won't work when they - # decorate setUp() - if can_skip or has_django: - __test__ = True - - def load_groups_data(self, metadata=None, xdata=None): - if metadata is None: - metadata = self.get_obj() - if not xdata: - xdata = copy.deepcopy(get_groups_test_tree()) - for client in get_clients_test_tree().findall("Client"): - newclient = \ - lxml.etree.SubElement(xdata.getroot(), - "Client", name=client.get("name")) - lxml.etree.SubElement(newclient, "Group", - name=client.get("profile")) - metadata.groups_xml.data = xdata - metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) - evt = Mock() - evt.filename = os.path.join(datastore, "Metadata", "groups.xml") - evt.code2str = Mock(return_value="changed") - metadata.HandleEvent(evt) - return metadata - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata") - def test_get_initial_metadata(self, mock_clientmetadata): - metadata = self.get_obj() - if 'clients.xml' in metadata.states: - metadata.states['clients.xml'] = False - self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError, - metadata.get_initial_metadata, None) - - self.load_groups_data(metadata=metadata) - self.load_clients_data(metadata=metadata) - - # test basic client metadata - metadata.get_initial_metadata("client1") - mock_clientmetadata.assert_called_with("client1", "group1", - set(["group1"]), set(), set(), - set(), dict(category1='group1'), - None, None, None, metadata.query) - - # test bundles, category suppression - metadata.get_initial_metadata("client2") - mock_clientmetadata.assert_called_with("client2", "group2", - set(["group2"]), - set(["bundle1", "bundle2"]), - set(), set(), - dict(category1="group2"), None, - None, None, metadata.query) - - # test new client creation - new1 = self.get_nonexistent_client(metadata) - imd = metadata.get_initial_metadata(new1) - mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]), - set(), set(), set(), - dict(category1="group1"), None, - None, None, metadata.query) - - # test nested groups, per-client groups - imd = metadata.get_initial_metadata("client8") - mock_clientmetadata.assert_called_with("client8", "group1", - set(["group1", "group8", - "group9", "group10"]), - set(), set(), set(), - dict(category1="group1"), None, - None, None, metadata.query) - - # test per-client groups, group negation, nested groups - imd = metadata.get_initial_metadata("client9") - mock_clientmetadata.assert_called_with("client9", "group2", - set(["group2", "group8", - "group11"]), - set(["bundle1", "bundle2"]), - set(), set(), - dict(category1="group2"), None, - None, None, metadata.query) - - # test exception on new client with no default profile - metadata.default = None - new2 = self.get_nonexistent_client(metadata) - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.get_initial_metadata, - new2) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") - def test_validate_client_address(self, mock_resolve_client): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - # this is upper case to ensure that case is folded properly in - # validate_client_address() - mock_resolve_client.return_value = "CLIENT4" - self.assertTrue(metadata.validate_client_address("client4", - ("1.2.3.7", None))) - mock_resolve_client.assert_called_with(("1.2.3.7", None)) - - mock_resolve_client.reset_mock() - self.assertFalse(metadata.validate_client_address("client5", - ("1.2.3.5", None))) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address") - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") - def test_AuthenticateConnection(self, mock_resolve_client, - mock_validate_client_address): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - metadata.password = "password1" - - cert = dict(subject=[[("commonName", "client1")]]) - mock_validate_client_address.return_value = False - self.assertFalse(metadata.AuthenticateConnection(cert, "root", None, - "1.2.3.1")) - mock_validate_client_address.return_value = True - self.assertTrue(metadata.AuthenticateConnection(cert, "root", - metadata.password, - "1.2.3.1")) - - cert = dict(subject=[[("commonName", "client8")]]) - - mock_resolve_client.return_value = "client5" - self.assertTrue(metadata.AuthenticateConnection(None, "root", - "password1", "1.2.3.8")) - - mock_resolve_client.side_effect = \ - Bcfg2.Server.Plugin.MetadataConsistencyError - self.assertFalse(metadata.AuthenticateConnection(None, "root", - "password1", - "1.2.3.8")) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("socket.gethostbyaddr") - def test_resolve_client(self, mock_gethostbyaddr): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3') - self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') - - metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100, - 'client3') - mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3']) - self.assertEqual(metadata.resolve_client(('1.2.3.3', None), - cleanup_cache=True), 'client3') - self.assertEqual(metadata.session_cache, dict()) - - mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6']) - self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6') - mock_gethostbyaddr.assert_called_with('1.2.3.6') - - mock_gethostbyaddr.reset_mock() - mock_gethostbyaddr.return_value = None - mock_gethostbyaddr.side_effect = socket.herror - self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError, - metadata.resolve_client, - ('1.2.3.8', None)) - mock_gethostbyaddr.assert_called_with('1.2.3.8') - - def test_handle_clients_xml_event(self): - pass - - -class TestMetadata_ClientsXML(TestMetadataBase): - """ test Metadata with a clients.xml. """ - # only run these tests if it's possible to skip tests or if we - # have django. otherwise they'll all get run because our fake - # skipping decorators for python < 2.7 won't work when they - # decorate setUp() - if can_skip or has_django: - __test__ = True - - def load_clients_data(self, metadata=None, xdata=None): - if metadata is None: - metadata = self.get_obj() - metadata.core.fam = Mock() - metadata._handle_file("clients.xml") - metadata = TestMetadata.load_clients_data(self, metadata=metadata, - xdata=xdata) - return TestMetadataBase.load_clients_data(self, metadata=metadata, - xdata=xdata) - |