import os import sys import copy import time import socket import unittest import lxml.etree from mock import Mock, patch try: from django.core.management import setup_environ has_django = True os.environ['DJANGO_SETTINGS_MODULE'] = "Bcfg2.settings" import Bcfg2.settings Bcfg2.settings.DATABASE_NAME = \ os.path.join(os.path.dirname(os.path.abspath(__file__)), "test.sqlite") Bcfg2.settings.DATABASES['default']['NAME'] = Bcfg2.settings.DATABASE_NAME except ImportError: has_django = False import Bcfg2.Server import Bcfg2.Server.Plugin from Bcfg2.Server.Plugins.Metadata import * XI_NAMESPACE = "http://www.w3.org/2001/XInclude" XI = "{%s}" % XI_NAMESPACE clients_test_tree = lxml.etree.XML(''' ''').getroottree() groups_test_tree = lxml.etree.XML(''' ''').getroottree() datastore = "/" def test_syncdb(): if not has_django: raise unittest.SkipTest("Django not found, skipping") # create the test database setup_environ(Bcfg2.settings) from django.core.management.commands import syncdb cmd = syncdb.Command() cmd.handle_noargs(interactive=False) assert os.path.exists(Bcfg2.settings.DATABASE_NAME) # ensure that we a) can connect to the database; b) start with a # clean database MetadataClientModel.objects.all().delete() assert list(MetadataClientModel.objects.all()) == [] def get_metadata_object(core=None, watch_clients=False, use_db=False): if core is None: core = Mock() core.setup.cfp.getboolean = Mock() core.setup.cfp.getboolean.return_value = use_db return Metadata(core, datastore, watch_clients=watch_clients) class TestClientVersions(unittest.TestCase): test_clients = dict(client1="1.2.0", client2="1.2.2", client3="1.3.0pre1", client4="1.1.0", client5=None, client6=None) def setUp(self): test_syncdb() for client, version in self.test_clients.items(): MetadataClientModel(hostname=client, version=version).save() def test__contains(self): v = ClientVersions() self.assertIn("client1", v) self.assertIn("client5", v) self.assertNotIn("client__contains", v) def test_keys(self): v = ClientVersions() self.assertItemsEqual(self.test_clients.keys(), v.keys()) def test__setitem(self): v = ClientVersions() # test setting version of existing client v["client1"] = "1.2.3" self.assertIn("client1", v) self.assertEqual(v['client1'], "1.2.3") client = MetadataClientModel.objects.get(hostname="client1") self.assertEqual(client.version, "1.2.3") # test adding new client new = "client__setitem" v[new] = "1.3.0" self.assertIn(new, v) self.assertEqual(v[new], "1.3.0") client = MetadataClientModel.objects.get(hostname=new) self.assertEqual(client.version, "1.3.0") # test adding new client with no version new2 = "client__setitem_2" v[new2] = None self.assertIn(new2, v) self.assertEqual(v[new2], None) client = MetadataClientModel.objects.get(hostname=new2) self.assertEqual(client.version, None) def test__getitem(self): v = ClientVersions() # test getting existing client self.assertEqual(v['client2'], "1.2.2") self.assertIsNone(v['client5']) # test exception on nonexistent client. can't use assertRaises # for this because assertRaises requires a callable try: v['clients__getitem'] assert False except KeyError: assert True except: assert False def test__len(self): v = ClientVersions() self.assertEqual(len(v), MetadataClientModel.objects.count()) def test__iter(self): v = ClientVersions() self.assertItemsEqual([h for h in iter(v)], v.keys()) def test__delitem(self): v = ClientVersions() # test adding new client new = "client__delitem" v[new] = "1.3.0" del v[new] self.assertIn(new, v) self.assertIsNone(v[new]) class TestXMLMetadataConfig(unittest.TestCase): groups_test_tree = groups_test_tree clients_test_tree = clients_test_tree def get_config_object(self, basefile="clients.xml", core=None, watch_clients=False): self.metadata = get_metadata_object(core=core, watch_clients=watch_clients) return XMLMetadataConfig(self.metadata, watch_clients, basefile) def test_xdata(self): config = self.get_config_object() # we can't use assertRaises here because xdata is a property try: config.xdata assert False except MetadataRuntimeError: assert True except: assert False config.data = "" self.assertEqual(config.xdata, "") def test_base_xdata(self): config = self.get_config_object() # we can't use assertRaises here because base_xdata is a property try: config.base_xdata assert False except MetadataRuntimeError: assert True except: assert False config.basedata = "" self.assertEqual(config.base_xdata, "") def test_add_monitor(self): core = Mock() config = self.get_config_object(core=core) fname = "test.xml" fpath = os.path.join(self.metadata.data, fname) config.extras = [] config.add_monitor(fpath, fname) self.assertFalse(core.fam.AddMonitor.called) self.assertEqual(config.extras, [fname]) config = self.get_config_object(core=core, watch_clients=True) config.add_monitor(fpath, fname) core.fam.AddMonitor.assert_called_with(fpath, config.metadata) self.assertItemsEqual(config.extras, [fname]) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.add_monitor") @patch("lxml.etree.parse") def test_load_xml(self, mock_parse, mock_add_monitor): config = self.get_config_object("clients.xml") mock_parse.side_effect = lxml.etree.XMLSyntaxError(None, None, None, None) config.load_xml() self.assertIsNone(config.data) self.assertIsNone(config.basedata) config.data = None config.basedata = None mock_parse.side_effect = None mock_parse.return_value.findall = Mock(return_value=[]) config.load_xml() self.assertIsNotNone(config.data) self.assertIsNotNone(config.basedata) config.data = None config.basedata = None def side_effect(*args): def second_call(*args): return [] mock_parse.return_value.findall.side_effect = second_call return [lxml.etree.Element(XI + "include", href="more.xml"), lxml.etree.Element(XI + "include", href="evenmore.xml")] mock_parse.return_value.findall = Mock(side_effect=side_effect) config.load_xml() mock_add_monitor.assert_any_call("more.xml") mock_add_monitor.assert_any_call("evenmore.xml") @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml") def test_write(self, mock_write_xml): config = self.get_config_object("clients.xml") config.basedata = "" config.write() mock_write_xml.assert_called_with(os.path.join(self.metadata.data, "clients.xml"), "") @patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False)) @patch('fcntl.lockf', Mock()) @patch('__builtin__.open') @patch('os.unlink') @patch('os.rename') @patch('os.path.islink') @patch('os.readlink') def test_write_xml(self, mock_readlink, mock_islink, mock_rename, mock_unlink, mock_open): fname = "clients.xml" config = self.get_config_object(fname) fpath = os.path.join(self.metadata.data, fname) tmpfile = "%s.new" % fpath linkdest = os.path.join(self.metadata.data, "client-link.xml") mock_islink.return_value = False config.write_xml(fpath, self.clients_test_tree) mock_open.assert_called_with(tmpfile, "w") self.assertTrue(mock_open.return_value.write.called) mock_islink.assert_called_with(fpath) mock_rename.assert_called_with(tmpfile, fpath) mock_islink.return_value = True mock_readlink.return_value = linkdest config.write_xml(fpath, self.clients_test_tree) mock_rename.assert_called_with(tmpfile, linkdest) mock_rename.side_effect = OSError self.assertRaises(MetadataRuntimeError, config.write_xml, fpath, self.clients_test_tree) mock_open.return_value.write.side_effect = IOError self.assertRaises(MetadataRuntimeError, config.write_xml, fpath, self.clients_test_tree) mock_unlink.assert_called_with(tmpfile) mock_open.side_effect = IOError self.assertRaises(MetadataRuntimeError, config.write_xml, fpath, self.clients_test_tree) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch('lxml.etree.parse') def test_find_xml_for_xpath(self, mock_parse): config = self.get_config_object("groups.xml") config.basedata = self.groups_test_tree xpath = "//Group[@name='group1']" self.assertItemsEqual(config.find_xml_for_xpath(xpath), dict(filename=os.path.join(self.metadata.data, "groups.xml"), xmltree=self.groups_test_tree, xquery=self.groups_test_tree.xpath(xpath))) self.assertEqual(config.find_xml_for_xpath("//boguselement"), dict()) config.extras = ["foo.xml", "bar.xml", "clients.xml"] def parse_side_effect(fname, parser=Bcfg2.Server.XMLParser): if fname == os.path.join(self.metadata.data, "clients.xml"): return self.clients_test_tree else: return lxml.etree.XML("").getroottree() mock_parse.side_effect = parse_side_effect xpath = "//Client[@secure='true']" self.assertItemsEqual(config.find_xml_for_xpath(xpath), dict(filename=os.path.join(self.metadata.data, "clients.xml"), xmltree=self.clients_test_tree, xquery=self.clients_test_tree.xpath(xpath))) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") def test_HandleEvent(self, mock_load_xml): config = self.get_config_object("groups.xml") evt = Mock() evt.filename = os.path.join(self.metadata.data, "groups.xml") evt.code2str = Mock(return_value="changed") self.assertTrue(config.HandleEvent(evt)) mock_load_xml.assert_called_with() class TestClientMetadata(unittest.TestCase): def test_inGroup(self): cm = ClientMetadata("client1", "group1", ["group1", "group2"], ["bundle1"], [], [], [], None, None, None, None) self.assertTrue(cm.inGroup("group1")) self.assertFalse(cm.inGroup("group3")) class TestMetadata(unittest.TestCase): groups_test_tree = groups_test_tree clients_test_tree = clients_test_tree use_db = False def get_metadata_object(self, core=None, watch_clients=False): return get_metadata_object(core=core, watch_clients=watch_clients, use_db=self.use_db) def get_nonexistent_client(self, metadata, prefix="client"): if metadata is None: metadata = self.load_clients_data() i = 0 client_name = "%s%s" % (prefix, i) while client_name in metadata.clients: i += 1 client_name = "%s%s" % (prefix, i) return client_name def test__init(self): # test with watch_clients=False core = Mock() metadata = self.get_metadata_object(core=core) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics) self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig) self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig) self.assertIsInstance(metadata.query, MetadataQuery) self.assertEqual(metadata.states, dict()) # test with watch_clients=True core.fam = Mock() metadata = self.get_metadata_object(core=core, watch_clients=True) self.assertEqual(len(metadata.states), 2) core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, "groups.xml"), metadata) core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, "clients.xml"), metadata) core.fam.reset_mock() core.fam.AddMonitor = Mock(side_effect=IOError) self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, self.get_metadata_object, core=core, watch_clients=True) @patch('os.makedirs', Mock()) @patch('__builtin__.open') def test_init_repo(self, mock_open): Metadata.init_repo(datastore, groups_xml="groups", clients_xml="clients") mock_open.assert_any_call(os.path.join(datastore, "Metadata", "groups.xml"), "w") mock_open.assert_any_call(os.path.join(datastore, "Metadata", "clients.xml"), "w") def test_search_xdata(self): # test finding a node with the proper name metadata = self.get_metadata_object() tree = self.groups_test_tree res = metadata._search_xdata("Group", "group1", tree) self.assertIsInstance(res, lxml.etree._Element) self.assertEqual(res.get("name"), "group1") # test finding a node with the wrong name but correct alias metadata = self.get_metadata_object() tree = self.clients_test_tree res = metadata._search_xdata("Client", "alias3", tree, alias=True) self.assertIsInstance(res, lxml.etree._Element) self.assertNotEqual(res.get("name"), "alias3") # test failure finding a node metadata = self.get_metadata_object() tree = self.clients_test_tree res = metadata._search_xdata("Client", self.get_nonexistent_client(metadata), tree, alias=True) self.assertIsNone(res) def search_xdata(self, tag, name, tree, alias=False): metadata = self.get_metadata_object() res = metadata._search_xdata(tag, name, tree, alias=alias) self.assertIsInstance(res, lxml.etree._Element) if not alias: self.assertEqual(res.get("name"), name) def test_search_group(self): # test finding a group with the proper name tree = self.groups_test_tree self.search_xdata("Group", "group1", tree) def test_search_bundle(self): # test finding a bundle with the proper name tree = self.groups_test_tree self.search_xdata("Bundle", "bundle1", tree) def test_search_client(self): # test finding a client with the proper name tree = self.clients_test_tree self.search_xdata("Client", "client1", tree, alias=True) self.search_xdata("Client", "alias1", tree, alias=True) def test_add_group(self): metadata = self.get_metadata_object() metadata.groups_xml.write = Mock() metadata.groups_xml.data = lxml.etree.XML('').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) metadata.add_group("test1", dict()) metadata.groups_xml.write.assert_any_call() grp = metadata.search_group("test1", metadata.groups_xml.base_xdata) self.assertIsNotNone(grp) self.assertEqual(grp.attrib, dict(name='test1')) # have to call this explicitly -- usually load_xml does this # on FAM events metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) metadata.add_group("test2", dict(foo='bar')) metadata.groups_xml.write.assert_any_call() grp = metadata.search_group("test2", metadata.groups_xml.base_xdata) self.assertIsNotNone(grp) self.assertEqual(grp.attrib, dict(name='test2', foo='bar')) # have to call this explicitly -- usually load_xml does this # on FAM events metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) metadata.groups_xml.write.reset_mock() self.assertRaises(MetadataConsistencyError, metadata.add_group, "test1", dict()) self.assertFalse(metadata.groups_xml.write.called) def test_update_group(self): metadata = self.get_metadata_object() metadata.groups_xml.write_xml = Mock() metadata.groups_xml.data = copy.deepcopy(self.groups_test_tree) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) metadata.update_group("group1", dict(foo="bar")) grp = metadata.search_group("group1", metadata.groups_xml.base_xdata) self.assertIsNotNone(grp) self.assertIn("foo", grp.attrib) self.assertEqual(grp.get("foo"), "bar") self.assertTrue(metadata.groups_xml.write_xml.called) self.assertRaises(MetadataConsistencyError, metadata.update_group, "bogus_group", dict()) def test_remove_group(self): metadata = self.get_metadata_object() metadata.groups_xml.write_xml = Mock() metadata.groups_xml.data = copy.deepcopy(self.groups_test_tree) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) metadata.remove_group("group5") grp = metadata.search_group("group5", metadata.groups_xml.base_xdata) self.assertIsNone(grp) self.assertTrue(metadata.groups_xml.write_xml.called) self.assertRaises(MetadataConsistencyError, metadata.remove_group, "bogus_group") def test_add_bundle(self): metadata = self.get_metadata_object() metadata.groups_xml.write = Mock() metadata.groups_xml.data = lxml.etree.XML('').getroottree() metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) metadata.add_bundle("bundle1") metadata.groups_xml.write.assert_any_call() bundle = metadata.search_bundle("bundle1", metadata.groups_xml.base_xdata) self.assertIsNotNone(bundle) self.assertEqual(bundle.attrib, dict(name='bundle1')) # have to call this explicitly -- usually load_xml does this # on FAM events metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) metadata.groups_xml.write.reset_mock() self.assertRaises(MetadataConsistencyError, metadata.add_bundle, "bundle1") self.assertFalse(metadata.groups_xml.write.called) def test_remove_bundle(self): metadata = self.get_metadata_object() metadata.groups_xml.write_xml = Mock() metadata.groups_xml.data = copy.deepcopy(self.groups_test_tree) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) metadata.remove_bundle("bundle1") grp = metadata.search_bundle("bundle1", metadata.groups_xml.base_xdata) self.assertIsNone(grp) self.assertTrue(metadata.groups_xml.write_xml.called) self.assertRaises(MetadataConsistencyError, metadata.remove_bundle, "bogus_bundle") def test_add_client(self): metadata = self.get_metadata_object() metadata.clients_xml.write = Mock() metadata.clients_xml.data = lxml.etree.XML('').getroottree() metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) new1 = self.get_nonexistent_client(metadata) metadata.add_client(new1, dict()) metadata.clients_xml.write.assert_any_call() grp = metadata.search_client(new1, metadata.clients_xml.base_xdata) self.assertIsNotNone(grp) self.assertEqual(grp.attrib, dict(name=new1)) # have to call this explicitly -- usually load_xml does this # on FAM events metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) metadata._handle_clients_xml_event(Mock()) new2 = self.get_nonexistent_client(metadata) metadata.add_client(new2, dict(foo='bar')) metadata.clients_xml.write.assert_any_call() grp = metadata.search_client(new2, metadata.clients_xml.base_xdata) self.assertIsNotNone(grp) self.assertEqual(grp.attrib, dict(name=new2, foo='bar')) # have to call this explicitly -- usually load_xml does this # on FAM events metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) metadata.clients_xml.write.reset_mock() self.assertRaises(MetadataConsistencyError, metadata.add_client, new1, dict()) self.assertFalse(metadata.clients_xml.write.called) def test_update_client(self): metadata = self.get_metadata_object() metadata.clients_xml.write_xml = Mock() metadata.clients_xml.data = copy.deepcopy(self.clients_test_tree) metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) metadata.update_client("client1", dict(foo="bar")) grp = metadata.search_client("client1", metadata.clients_xml.base_xdata) self.assertIsNotNone(grp) self.assertIn("foo", grp.attrib) self.assertEqual(grp.get("foo"), "bar") self.assertTrue(metadata.clients_xml.write_xml.called) new = self.get_nonexistent_client(metadata) self.assertRaises(MetadataConsistencyError, metadata.update_client, new, dict()) def load_clients_data(self, metadata=None, xdata=None): if metadata is None: metadata = self.get_metadata_object() metadata.clients_xml.data = xdata or copy.deepcopy(self.clients_test_tree) metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) evt = Mock() evt.filename = os.path.join(datastore, "Metadata", "clients.xml") evt.code2str = Mock(return_value="changed") metadata.HandleEvent(evt) return metadata @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") def test_clients_xml_event(self, mock_load_xml): metadata = self.get_metadata_object() metadata.profiles = ["group1", "group2"] self.load_clients_data(metadata=metadata) mock_load_xml.assert_any_call() self.assertItemsEqual(metadata.clients, dict([(c.get("name"), c.get("profile")) for c in self.clients_test_tree.findall("//Client")])) aliases = dict([(a.get("name"), a.getparent().get("name")) for a in self.clients_test_tree.findall("//Alias")]) self.assertItemsEqual(metadata.aliases, aliases) raliases = dict([(c.get("name"), set()) for c in self.clients_test_tree.findall("//Client")]) for alias in self.clients_test_tree.findall("//Alias"): raliases[alias.getparent().get("name")].add(alias.get("name")) self.assertItemsEqual(metadata.raliases, raliases) self.assertEqual(metadata.secure, [c.get("name") for c in self.clients_test_tree.findall("//Client[@secure='true']")]) self.assertEqual(metadata.floating, ["client1", "client10"]) addresses = dict([(c.get("address"), []) for c in self.clients_test_tree.findall("//*[@address]")]) raddresses = dict() for client in self.clients_test_tree.findall("//Client[@address]"): addresses[client.get("address")].append(client.get("name")) try: raddresses[client.get("name")].append(client.get("address")) except KeyError: raddresses[client.get("name")] = [client.get("address")] for alias in self.clients_test_tree.findall("//Alias[@address]"): addresses[alias.get("address")].append(alias.getparent().get("name")) try: raddresses[alias.getparent().get("name")].append(alias.get("address")) except KeyError: raddresses[alias.getparent().get("name")] = alias.get("address") self.assertItemsEqual(metadata.addresses, addresses) self.assertItemsEqual(metadata.raddresses, raddresses) self.assertTrue(metadata.states['clients.xml']) def load_groups_data(self, metadata=None, xdata=None): if metadata is None: metadata = self.get_metadata_object() metadata.groups_xml.data = xdata or copy.deepcopy(self.groups_test_tree) metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) evt = Mock() evt.filename = os.path.join(datastore, "Metadata", "groups.xml") evt.code2str = Mock(return_value="changed") metadata.HandleEvent(evt) return metadata @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") def test_groups_xml_event(self, mock_load_xml): dup_data = copy.deepcopy(self.groups_test_tree) lxml.etree.SubElement(dup_data.getroot(), "Group", name="group1") metadata = self.load_groups_data(xdata=dup_data) mock_load_xml.assert_any_call() self.assertTrue(metadata.states['groups.xml']) self.assertTrue(metadata.groups['group1'].is_public) self.assertTrue(metadata.groups['group2'].is_public) self.assertFalse(metadata.groups['group3'].is_public) self.assertFalse(metadata.groups['group1'].is_private) self.assertFalse(metadata.groups['group2'].is_private) self.assertTrue(metadata.groups['group3'].is_private) self.assertTrue(metadata.groups['group1'].is_profile) self.assertTrue(metadata.groups['group2'].is_profile) self.assertFalse(metadata.groups['group3'].is_profile) self.assertItemsEqual(metadata.groups.keys(), set(g.get("name") for g in self.groups_test_tree.findall("//Group"))) self.assertEqual(metadata.groups['group1'].category, 'category1') self.assertEqual(metadata.groups['group2'].category, 'category1') self.assertEqual(metadata.groups['group3'].category, 'category2') self.assertEqual(metadata.groups['group4'].category, 'category1') self.assertEqual(metadata.default, "group1") all_groups = [] negated_groups = [] for group in dup_data.xpath("//Groups/Client//*") + \ dup_data.xpath("//Groups/Group//*"): if group.tag == 'Group' and not group.getchildren(): if group.get("negate", "false").lower() == 'true': negated_groups.append(group.get("name")) else: all_groups.append(group.get("name")) self.assertItemsEqual([g.name for g in metadata.group_membership.values()], all_groups) self.assertItemsEqual([g.name for g in metadata.negated_groups.values()], negated_groups) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) def test_set_profile(self): metadata = self.get_metadata_object() if 'clients.xml' in metadata.states: metadata.states['clients.xml'] = False self.assertRaises(MetadataRuntimeError, metadata.set_profile, None, None, None) self.load_groups_data(metadata=metadata) self.load_clients_data(metadata=metadata) self.assertRaises(MetadataConsistencyError, metadata.set_profile, "client1", "group5", None) self.assertRaises(MetadataConsistencyError, metadata.set_profile, "client1", "group3", None) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) def test_set_profile_db(self): metadata = self.load_clients_data(metadata=self.load_groups_data()) if metadata._use_db: profile = "group1" client_name = self.get_nonexistent_client(metadata) metadata.set_profile(client_name, profile, None) self.assertIn(client_name, metadata.clients) self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError, metadata.set_profile, client_name, profile, None) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client") @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") def test_set_profile_xml(self, mock_update_client, mock_add_client): metadata = self.load_clients_data(metadata=self.load_groups_data()) if not metadata._use_db: metadata.clients_xml.write = Mock() metadata.set_profile("client1", "group2", None) mock_update_client.assert_called_with("client1", dict(profile="group2")) metadata.clients_xml.write.assert_any_call() self.assertEqual(metadata.clientgroups["client1"], ["group2"]) metadata.clients_xml.write.reset_mock() new1 = self.get_nonexistent_client(metadata) metadata.set_profile(new1, "group1", None) mock_add_client.assert_called_with(new1, dict(profile="group1")) metadata.clients_xml.write.assert_any_call() self.assertEqual(metadata.clientgroups[new1], ["group1"]) metadata.clients_xml.write.reset_mock() new2 = self.get_nonexistent_client(metadata) metadata.session_cache[('1.2.3.6', None)] = (None, new2) metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None)) mock_add_client.assert_called_with(new2, dict(uuid='uuid_new', profile="group1", address='1.2.3.6')) metadata.clients_xml.write.assert_any_call() self.assertEqual(metadata.clientgroups["uuid_new"], ["group1"]) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("socket.gethostbyaddr") def test_resolve_client(self, mock_gethostbyaddr): metadata = self.load_clients_data(metadata=self.load_groups_data()) metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3') self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') self.assertRaises(MetadataConsistencyError, metadata.resolve_client, ('1.2.3.2', None)) self.assertEqual(metadata.resolve_client(('1.2.3.1', None)), 'client1') metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100, 'client3') self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') self.assertEqual(metadata.resolve_client(('1.2.3.3', None), cleanup_cache=True), 'client3') self.assertEqual(metadata.session_cache, dict()) mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6']) self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6') mock_gethostbyaddr.assert_called_with('1.2.3.6') mock_gethostbyaddr.reset_mock() mock_gethostbyaddr.return_value = ('alias3', [], ['1.2.3.7']) self.assertEqual(metadata.resolve_client(('1.2.3.7', None)), 'client4') mock_gethostbyaddr.assert_called_with('1.2.3.7') mock_gethostbyaddr.reset_mock() mock_gethostbyaddr.return_value = None mock_gethostbyaddr.side_effect = socket.herror self.assertRaises(MetadataConsistencyError, metadata.resolve_client, ('1.2.3.8', None)) mock_gethostbyaddr.assert_called_with('1.2.3.8') @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata") def test_get_initial_metadata(self, mock_clientmetadata): metadata = self.get_metadata_object() if 'clients.xml' in metadata.states: metadata.states['clients.xml'] = False self.assertRaises(MetadataRuntimeError, metadata.get_initial_metadata, None) self.load_groups_data(metadata=metadata) self.load_clients_data(metadata=metadata) # test address, password metadata.get_initial_metadata("client1") self.assertEqual(mock_clientmetadata.call_args[0][:9], ("client1", "group1", set(["group1"]), set(), set(), set(["1.2.3.1"]), dict(category1='group1'), None, 'password2')) # test address, bundles, category suppression metadata.get_initial_metadata("client2") self.assertEqual(mock_clientmetadata.call_args[0][:9], ("client2", "group2", set(["group2"]), set(["bundle1", "bundle2"]), set(), set(["1.2.3.2"]), dict(category1="group2"), None, None)) # test aliases, address, uuid, password imd = metadata.get_initial_metadata("alias1") self.assertEqual(mock_clientmetadata.call_args[0][:9], ("client3", "group1", set(["group1"]), set(), set(['alias1']), set(["1.2.3.3"]), dict(category1="group1"), 'uuid1', 'password2')) # test new client creation new1 = self.get_nonexistent_client(metadata) imd = metadata.get_initial_metadata(new1) self.assertEqual(mock_clientmetadata.call_args[0][:9], (new1, "group1", set(["group1"]), set(), set(), set(), dict(category1="group1"), None, None)) # test nested groups, address, per-client groups imd = metadata.get_initial_metadata("client8") self.assertEqual(mock_clientmetadata.call_args[0][:9], ("client8", "group1", set(["group1", "group8", "group9", "group10"]), set(), set(), set(["1.2.3.5"]), dict(category1="group1"), None, None)) # test setting per-client groups, group negation, nested groups imd = metadata.get_initial_metadata("client9") self.assertEqual(mock_clientmetadata.call_args[0][:9], ("client9", "group2", set(["group2", "group8", "group11"]), set(["bundle1", "bundle2"]), set(), set(), dict(category1="group2"), None, "password3")) # test new client with no default profile metadata.default = None new2 = self.get_nonexistent_client(metadata) self.assertRaises(MetadataConsistencyError, metadata.get_initial_metadata, new2) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) def test_merge_groups(self): metadata = self.get_metadata_object() self.load_groups_data(metadata=metadata) self.load_clients_data(metadata=metadata) imd = metadata.get_initial_metadata("client1") self.assertEqual(metadata._merge_groups(imd, imd.groups, categories=imd.categories), (imd.groups, imd.categories)) imd = metadata.get_initial_metadata("client8") self.assertEqual(metadata._merge_groups(imd, imd.groups, categories=imd.categories), (imd.groups.union(['group10']), imd.categories)) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) def test_get_all_group_names(self): metadata = self.load_groups_data() self.assertItemsEqual(metadata.get_all_group_names(), set([g.get("name") for g in self.groups_test_tree.findall("//Group")])) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) def test_get_all_groups_in_category(self): metadata = self.load_groups_data() self.assertItemsEqual(metadata.get_all_groups_in_category("category1"), set([g.get("name") for g in self.groups_test_tree.findall("//Group[@category='category1']")])) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) def test_get_client_names_by_profiles(self): metadata = self.load_clients_data(metadata=self.load_groups_data()) self.assertItemsEqual(metadata.get_client_names_by_profiles(["group2"]), [c.get("name") for c in self.clients_test_tree.findall("//Client[@profile='group2']")]) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) def test_get_client_names_by_groups(self): metadata = self.load_clients_data(metadata=self.load_groups_data()) # this is not the best test in the world, since we mock # core.build_metadata to just build _initial_ metadata, which # is not at all the same thing. it turns out that mocking # this out without starting a Bcfg2 server is pretty # non-trivial, so this works-ish metadata.core.build_metadata = Mock() metadata.core.build_metadata.side_effect = \ lambda c: metadata.get_initial_metadata(c) self.assertItemsEqual(metadata.get_client_names_by_groups(["group2"]), [c.get("name") for c in self.clients_test_tree.findall("//Client[@profile='group2']")]) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) def test_merge_additional_groups(self): metadata = self.load_clients_data(metadata=self.load_groups_data()) imd = metadata.get_initial_metadata("client2") # test adding a group excluded by categories oldgroups = imd.groups metadata.merge_additional_groups(imd, ["group4"]) self.assertEqual(imd.groups, oldgroups) # test adding a private group oldgroups = imd.groups metadata.merge_additional_groups(imd, ["group3"]) self.assertEqual(imd.groups, oldgroups) # test adding groups with bundles oldgroups = imd.groups oldbundles = imd.bundles metadata.merge_additional_groups(imd, ["group7"]) self.assertEqual(imd.groups, oldgroups.union(["group7"])) self.assertEqual(imd.bundles, oldbundles.union(["bundle3"])) # test adding multiple groups imd = metadata.get_initial_metadata("client2") oldgroups = imd.groups metadata.merge_additional_groups(imd, ["group6", "group8"]) self.assertItemsEqual(imd.groups, oldgroups.union(["group6", "group8", "group9"])) # test adding a group that is not defined in groups.xml imd = metadata.get_initial_metadata("client2") oldgroups = imd.groups metadata.merge_additional_groups(imd, ["group6", "newgroup"]) self.assertItemsEqual(imd.groups, oldgroups.union(["group6", "newgroup"])) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) def test_merge_additional_data(self): metadata = self.load_clients_data(metadata=self.load_groups_data()) imd = metadata.get_initial_metadata("client1") # we need to use a unique attribute name for this test. this # is probably overkill, but it works pattern = "connector%d" for i in range(0, 100): connector = pattern % i if not hasattr(imd, connector): break self.assertFalse(hasattr(imd, connector), "Could not find unique connector name to test " "merge_additional_data()") metadata.merge_additional_data(imd, connector, "test data") self.assertEqual(getattr(imd, connector), "test data") self.assertIn(connector, imd.connectors) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") def test_validate_client_address(self, mock_resolve_client): metadata = self.load_clients_data(metadata=self.load_groups_data()) self.assertTrue(metadata.validate_client_address("client1", (None, None))) self.assertTrue(metadata.validate_client_address("client2", ("1.2.3.2", None))) self.assertFalse(metadata.validate_client_address("client2", ("1.2.3.8", None))) self.assertTrue(metadata.validate_client_address("client4", ("1.2.3.2", None))) # this is upper case to ensure that case is folded properly in # validate_client_address() mock_resolve_client.return_value = "CLIENT4" self.assertTrue(metadata.validate_client_address("client4", ("1.2.3.7", None))) mock_resolve_client.assert_called_with(("1.2.3.7", None)) mock_resolve_client.reset_mock() self.assertFalse(metadata.validate_client_address("client5", ("1.2.3.5", None))) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address") @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") def test_AuthenticateConnection(self, mock_resolve_client, mock_validate_client_address): metadata = self.load_clients_data(metadata=self.load_groups_data()) metadata.password = "password1" cert = dict(subject=[[("commonName", "client1")]]) mock_validate_client_address.return_value = False self.assertFalse(metadata.AuthenticateConnection(cert, "root", None, "1.2.3.1")) mock_validate_client_address.return_value = True self.assertTrue(metadata.AuthenticateConnection(cert, "root", None, "1.2.3.1")) # floating cert-auth clients add themselves to the cache self.assertIn("1.2.3.1", metadata.session_cache) self.assertEqual(metadata.session_cache["1.2.3.1"][1], "client1") cert = dict(subject=[[("commonName", "client7")]]) self.assertTrue(metadata.AuthenticateConnection(cert, "root", None, "1.2.3.4")) # non-floating cert-auth clients do not add themselves to the cache self.assertNotIn("1.2.3.4", metadata.session_cache) cert = dict(subject=[[("commonName", "client8")]]) mock_resolve_client.return_value = "client5" self.assertTrue(metadata.AuthenticateConnection(None, "root", "password1", "1.2.3.8")) mock_resolve_client.side_effect = MetadataConsistencyError self.assertFalse(metadata.AuthenticateConnection(None, "root", "password1", "1.2.3.8")) # secure mode, no password self.assertFalse(metadata.AuthenticateConnection(None, 'client2', None, "1.2.3.2")) self.assertTrue(metadata.AuthenticateConnection(None, 'uuid1', "password1", "1.2.3.3")) # non-root, non-cert clients populate session cache self.assertIn("1.2.3.3", metadata.session_cache) self.assertEqual(metadata.session_cache["1.2.3.3"][1], "client3") # use alternate password self.assertTrue(metadata.AuthenticateConnection(None, 'client3', "password2", "1.2.3.3")) # test secure mode self.assertFalse(metadata.AuthenticateConnection(None, 'client9', "password1", "1.2.3.9")) self.assertTrue(metadata.AuthenticateConnection(None, 'client9', "password3", "1.2.3.9")) self.assertFalse(metadata.AuthenticateConnection(None, "client5", "password2", "1.2.3.7")) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") def test_process_statistics(self, mock_update_client): metadata = self.load_clients_data(metadata=self.load_groups_data()) md = Mock() md.hostname = "client6" metadata.process_statistics(md, None) mock_update_client.assert_called_with(md.hostname, dict(auth='cert')) mock_update_client.reset_mock() md.hostname = "client5" metadata.process_statistics(md, None) self.assertFalse(mock_update_client.called) def test_viz(self): pass class TestMetadataBase(TestMetadata): """ base test object for testing Metadata with database enabled """ __test__ = False use_db = True def __init__(self, *args, **kwargs): TestMetadata.__init__(self, *args, **kwargs) test_syncdb() def setUp(self): if not has_django: self.skipTest("Django not found, skipping") def load_clients_data(self, metadata=None, xdata=None): if metadata is None: metadata = get_metadata_object() for client in clients_test_tree.findall("Client"): metadata.add_client(client.get("name")) return metadata def get_nonexistent_client(self, _, prefix="client"): clients = [o.hostname for o in MetadataClientModel.objects.all()] i = 0 client_name = "%s%s" % (prefix, i) while client_name in clients: i += 1 client_name = "%s%s" % (prefix, i) return client_name @patch('os.path.exists') def test__init(self, mock_exists): core = Mock() core.fam = Mock() mock_exists.return_value = False metadata = self.get_metadata_object(core=core, watch_clients=True) self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, "groups.xml"), metadata) mock_exists.return_value = True core.fam.reset_mock() metadata = self.get_metadata_object(core=core, watch_clients=True) core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, "groups.xml"), metadata) core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, "clients.xml"), metadata) def test_add_group(self): pass def test_add_bundle(self): pass def test_add_client(self): metadata = self.get_metadata_object() hostname = self.get_nonexistent_client(metadata) client = metadata.add_client(hostname) self.assertIsInstance(client, MetadataClientModel) self.assertEqual(client.hostname, hostname) self.assertIn(hostname, metadata.clients) self.assertIn(hostname, metadata.list_clients()) self.assertItemsEqual(metadata.clients, [c.hostname for c in MetadataClientModel.objects.all()]) def test_update_group(self): pass def test_update_bundle(self): pass def test_update_client(self): pass def test_list_clients(self): metadata = self.get_metadata_object() self.assertItemsEqual(metadata.list_clients(), [c.hostname for c in MetadataClientModel.objects.all()]) def test_remove_group(self): pass def test_remove_bundle(self): pass def test_remove_client(self): metadata = self.get_metadata_object() client_name = self.get_nonexistent_client(metadata) self.assertRaises(MetadataConsistencyError, metadata.remove_client, client_name) metadata.add_client(client_name) metadata.remove_client(client_name) self.assertNotIn(client_name, metadata.clients) self.assertNotIn(client_name, metadata.list_clients()) self.assertItemsEqual(metadata.clients, [c.hostname for c in MetadataClientModel.objects.all()]) def test_process_statistics(self): pass class TestMetadata_NoClientsXML(TestMetadataBase): """ test Metadata without a clients.xml. we have to disable or override tests that rely on client options """ __test__ = True def __init__(self, *args, **kwargs): TestMetadata.__init__(self, *args, **kwargs) for client in self.clients_test_tree.findall("Client"): newclient = lxml.etree.SubElement(self.groups_test_tree.getroot(), "Client", name=client.get("name")) lxml.etree.SubElement(newclient, "Group", name=client.get("profile")) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata") def test_get_initial_metadata(self, mock_clientmetadata): metadata = self.get_metadata_object() if 'clients.xml' in metadata.states: metadata.states['clients.xml'] = False self.assertRaises(MetadataRuntimeError, metadata.get_initial_metadata, None) self.load_groups_data(metadata=metadata) self.load_clients_data(metadata=metadata) # test basic client metadata metadata.get_initial_metadata("client1") self.assertEqual(mock_clientmetadata.call_args[0][:9], ("client1", "group1", set(["group1"]), set(), set(), set(), dict(category1='group1'), None, None)) # test bundles, category suppression metadata.get_initial_metadata("client2") self.assertEqual(mock_clientmetadata.call_args[0][:9], ("client2", "group2", set(["group2"]), set(["bundle1", "bundle2"]), set(), set(), dict(category1="group2"), None, None)) # test new client creation new1 = self.get_nonexistent_client(metadata) imd = metadata.get_initial_metadata(new1) self.assertEqual(mock_clientmetadata.call_args[0][:9], (new1, "group1", set(["group1"]), set(), set(), set(), dict(category1="group1"), None, None)) # test nested groups, per-client groups imd = metadata.get_initial_metadata("client8") self.assertEqual(mock_clientmetadata.call_args[0][:9], ("client8", "group1", set(["group1", "group8", "group9", "group10"]), set(), set(), set(), dict(category1="group1"), None, None)) # test per-client groups, group negation, nested groups imd = metadata.get_initial_metadata("client9") self.assertEqual(mock_clientmetadata.call_args[0][:9], ("client9", "group2", set(["group2", "group8", "group11"]), set(["bundle1", "bundle2"]), set(), set(), dict(category1="group2"), None, None)) # test exception on new client with no default profile metadata.default = None new2 = self.get_nonexistent_client(metadata) self.assertRaises(MetadataConsistencyError, metadata.get_initial_metadata, new2) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") def test_validate_client_address(self, mock_resolve_client): metadata = self.load_clients_data(metadata=self.load_groups_data()) # this is upper case to ensure that case is folded properly in # validate_client_address() mock_resolve_client.return_value = "CLIENT4" self.assertTrue(metadata.validate_client_address("client4", ("1.2.3.7", None))) mock_resolve_client.assert_called_with(("1.2.3.7", None)) mock_resolve_client.reset_mock() self.assertFalse(metadata.validate_client_address("client5", ("1.2.3.5", None))) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address") @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") def test_AuthenticateConnection(self, mock_resolve_client, mock_validate_client_address): metadata = self.load_clients_data(metadata=self.load_groups_data()) metadata.password = "password1" cert = dict(subject=[[("commonName", "client1")]]) mock_validate_client_address.return_value = False self.assertFalse(metadata.AuthenticateConnection(cert, "root", None, "1.2.3.1")) mock_validate_client_address.return_value = True self.assertTrue(metadata.AuthenticateConnection(cert, "root", metadata.password, "1.2.3.1")) cert = dict(subject=[[("commonName", "client8")]]) mock_resolve_client.return_value = "client5" self.assertTrue(metadata.AuthenticateConnection(None, "root", "password1", "1.2.3.8")) mock_resolve_client.side_effect = MetadataConsistencyError self.assertFalse(metadata.AuthenticateConnection(None, "root", "password1", "1.2.3.8")) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) @patch("socket.gethostbyaddr") def test_resolve_client(self, mock_gethostbyaddr): metadata = self.load_clients_data(metadata=self.load_groups_data()) metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3') self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100, 'client3') mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3']) self.assertEqual(metadata.resolve_client(('1.2.3.3', None), cleanup_cache=True), 'client3') self.assertEqual(metadata.session_cache, dict()) mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6']) self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6') mock_gethostbyaddr.assert_called_with('1.2.3.6') mock_gethostbyaddr.reset_mock() mock_gethostbyaddr.return_value = None mock_gethostbyaddr.side_effect = socket.herror self.assertRaises(MetadataConsistencyError, metadata.resolve_client, ('1.2.3.8', None)) mock_gethostbyaddr.assert_called_with('1.2.3.8') def test_clients_xml_event(self): pass class TestMetadata_ClientsXML(TestMetadataBase): """ test Metadata with a clients.xml. """ __test__ = True def load_clients_data(self, metadata=None, xdata=None): if metadata is None: metadata = self.get_metadata_object() metadata.core.fam = Mock() metadata._handle_file("clients.xml") metadata = TestMetadata.load_clients_data(self, metadata=metadata, xdata=xdata) return TestMetadataBase.load_clients_data(self, metadata=metadata, xdata=xdata) @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") @patch("Bcfg2.Server.Plugins.Metadata.Metadata._handle_clients_xml_event") @patch("Bcfg2.Server.Plugins.Metadata.Metadata.list_clients") def test_clients_xml_event(self, mock_list_clients, mock_handle_event, mock_load_xml): metadata = self.get_metadata_object() metadata.profiles = ["group1", "group2"] evt = Mock() evt.filename = os.path.join(datastore, "Metadata", "clients.xml") evt.code2str = Mock(return_value="changed") metadata.HandleEvent(evt) self.assertFalse(mock_handle_event.called) self.assertFalse(mock_load_xml.called) mock_load_xml.reset_mock() mock_handle_event.reset_mock() mock_list_clients.reset_mock() metadata._handle_file("clients.xml") metadata.HandleEvent(evt) mock_handle_event.assert_called_with(evt) mock_list_clients.assert_any_call() mock_load_xml.assert_any_call()