diff options
Diffstat (limited to 'testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py')
-rw-r--r-- | testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py | 407 |
1 files changed, 407 insertions, 0 deletions
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py new file mode 100644 index 000000000..99cbf1962 --- /dev/null +++ b/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py @@ -0,0 +1,407 @@ +import os +import sys +import unittest +import lxml.etree +from mock import Mock, patch +from django.core.management import setup_environ + +os.environ['DJANGO_SETTINGS_MODULE'] = "Bcfg2.settings" + +import Bcfg2.settings +Bcfg2.settings.DATABASE_NAME = \ + os.path.join(os.path.dirname(os.path.abspath(__file__)), "test.sqlite") +Bcfg2.settings.DATABASES['default']['NAME'] = Bcfg2.settings.DATABASE_NAME + +import Bcfg2.Server.Plugin +from Bcfg2.Server.Plugins.DBMetadata import * + +from TestMetadata import datastore, groups_test_tree, clients_test_tree, \ + TestMetadata + +def test_syncdb(): + # create the test database + setup_environ(Bcfg2.settings) + from django.core.management.commands import syncdb + cmd = syncdb.Command() + cmd.handle_noargs(interactive=False) + assert os.path.exists(Bcfg2.settings.DATABASE_NAME) + + # ensure that we a) can connect to the database; b) start with a + # clean database + MetadataClientModel.objects.all().delete() + assert list(MetadataClientModel.objects.all()) == [] + + +class TestClientVersions(unittest.TestCase): + test_clients = dict(client1="1.2.0", + client2="1.2.2", + client3="1.3.0pre1", + client4="1.1.0", + client5=None, + client6=None) + + def setUp(self): + test_syncdb() + for client, version in self.test_clients.items(): + MetadataClientModel(hostname=client, version=version).save() + + def test__contains(self): + v = ClientVersions() + self.assertIn("client1", v) + self.assertIn("client5", v) + self.assertNotIn("client__contains", v) + + def test_keys(self): + v = ClientVersions() + self.assertItemsEqual(self.test_clients.keys(), v.keys()) + + def test__setitem(self): + v = ClientVersions() + + # test setting version of existing client + v["client1"] = "1.2.3" + self.assertIn("client1", v) + self.assertEqual(v['client1'], "1.2.3") + client = MetadataClientModel.objects.get(hostname="client1") + self.assertEqual(client.version, "1.2.3") + + # test adding new client + new = "client__setitem" + v[new] = "1.3.0" + self.assertIn(new, v) + self.assertEqual(v[new], "1.3.0") + client = MetadataClientModel.objects.get(hostname=new) + self.assertEqual(client.version, "1.3.0") + + # test adding new client with no version + new2 = "client__setitem_2" + v[new2] = None + self.assertIn(new2, v) + self.assertEqual(v[new2], None) + client = MetadataClientModel.objects.get(hostname=new2) + self.assertEqual(client.version, None) + + def test__getitem(self): + v = ClientVersions() + + # test getting existing client + self.assertEqual(v['client2'], "1.2.2") + self.assertIsNone(v['client5']) + + # test exception on nonexistent client. can't use assertRaises + # for this because assertRaises requires a callable + try: + v['clients__getitem'] + assert False + except KeyError: + assert True + except: + assert False + + +class TestDBMetadataBase(TestMetadata): + __test__ = False + + def __init__(self, *args, **kwargs): + TestMetadata.__init__(self, *args, **kwargs) + test_syncdb() + + def load_clients_data(self, metadata=None, xdata=None): + if metadata is None: + metadata = get_metadata_object() + for client in clients_test_tree.findall("Client"): + metadata.add_client(client.get("name")) + return metadata + + def get_metadata_object(self, core=None, watch_clients=False): + if core is None: + core = Mock() + metadata = DBMetadata(core, datastore, watch_clients=watch_clients) + return metadata + + def get_nonexistent_client(self, _, prefix="client"): + clients = [o.hostname for o in MetadataClientModel.objects.all()] + i = 0 + client_name = "%s%s" % (prefix, i) + while client_name in clients: + i += 1 + client_name = "%s%s" % (prefix, i) + return client_name + + @patch('os.path.exists') + def test__init(self, mock_exists): + core = Mock() + core.fam = Mock() + mock_exists.return_value = False + metadata = self.get_metadata_object(core=core, watch_clients=True) + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) + core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, + "groups.xml"), + metadata) + + mock_exists.return_value = True + core.fam.reset_mock() + metadata = self.get_metadata_object(core=core, watch_clients=True) + core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, + "groups.xml"), + metadata) + core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, + "clients.xml"), + metadata) + + def test_add_group(self): + pass + + def test_add_bundle(self): + pass + + def test_add_client(self): + metadata = self.get_metadata_object() + hostname = self.get_nonexistent_client(metadata) + client = metadata.add_client(hostname) + self.assertIsInstance(client, MetadataClientModel) + self.assertEqual(client.hostname, hostname) + self.assertIn(hostname, metadata.clients) + self.assertIn(hostname, metadata.list_clients()) + self.assertItemsEqual(metadata.clients, + [c.hostname + for c in MetadataClientModel.objects.all()]) + + def test_update_group(self): + pass + + def test_update_bundle(self): + pass + + def test_update_client(self): + pass + + def test_list_clients(self): + metadata = self.get_metadata_object() + self.assertItemsEqual(metadata.list_clients(), + [c.hostname + for c in MetadataClientModel.objects.all()]) + + def test_remove_group(self): + pass + + def test_remove_bundle(self): + pass + + def test_remove_client(self): + metadata = self.get_metadata_object() + client_name = self.get_nonexistent_client(metadata) + + self.assertRaises(MetadataConsistencyError, + metadata.remove_client, + client_name) + + metadata.add_client(client_name) + metadata.remove_client(client_name) + self.assertNotIn(client_name, metadata.clients) + self.assertNotIn(client_name, metadata.list_clients()) + self.assertItemsEqual(metadata.clients, + [c.hostname + for c in MetadataClientModel.objects.all()]) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.DBMetadata.DBMetadata._set_profile") + def test_set_profile(self, mock_set_profile): + TestMetadata.test_set_profile(self, + inherited_set_profile=mock_set_profile) + + def test__set_profile(self): + metadata = self.get_metadata_object() + profile = "group1" + client_name = self.get_nonexistent_client(metadata) + metadata._set_profile(client_name, profile, None) + self.assertIn(client_name, metadata.list_clients()) + self.assertIn(client_name, metadata.clientgroups) + self.assertItemsEqual(metadata.clientgroups[client_name], [profile]) + + self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError, + metadata._set_profile, + client_name, profile, None) + + def test_process_statistics(self): + pass + + +class TestDBMetadata_NoClientsXML(TestDBMetadataBase): + """ test DBMetadata without a clients.xml. we have to disable or + override tests that rely on client options """ + __test__ = True + + def __init__(self, *args, **kwargs): + TestMetadata.__init__(self, *args, **kwargs) + + for client in self.clients_test_tree.findall("Client"): + newclient = lxml.etree.SubElement(self.groups_test_tree.getroot(), + "Client", name=client.get("name")) + lxml.etree.SubElement(newclient, "Group", + name=client.get("profile")) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata") + def test_get_initial_metadata(self, mock_clientmetadata): + metadata = self.get_metadata_object() + if 'clients.xml' in metadata.states: + metadata.states['clients.xml'] = False + self.assertRaises(MetadataRuntimeError, + metadata.get_initial_metadata, None) + + self.load_groups_data(metadata=metadata) + self.load_clients_data(metadata=metadata) + + # test basic client metadata + metadata.get_initial_metadata("client1") + self.assertEqual(mock_clientmetadata.call_args[0][:9], + ("client1", "group1", set(["group1"]), set(), set(), + set(), dict(category1='group1'), None, None)) + + # test bundles, category suppression + metadata.get_initial_metadata("client2") + self.assertEqual(mock_clientmetadata.call_args[0][:9], + ("client2", "group2", set(["group2"]), + set(["bundle1", "bundle2"]), set(), set(), + dict(category1="group2"), None, None)) + + # test new client creation + new1 = self.get_nonexistent_client(metadata) + imd = metadata.get_initial_metadata(new1) + self.assertEqual(mock_clientmetadata.call_args[0][:9], + (new1, "group1", set(["group1"]), set(), set(), set(), + dict(category1="group1"), None, None)) + + # test nested groups, per-client groups + imd = metadata.get_initial_metadata("client8") + self.assertEqual(mock_clientmetadata.call_args[0][:9], + ("client8", "group1", + set(["group1", "group8", "group9", "group10"]), set(), + set(), set(), dict(category1="group1"), None, None)) + + # test per-client groups, group negation, nested groups + imd = metadata.get_initial_metadata("client9") + self.assertEqual(mock_clientmetadata.call_args[0][:9], + ("client9", "group2", + set(["group2", "group8", "group11"]), + set(["bundle1", "bundle2"]), set(), set(), + dict(category1="group2"), None, None)) + + # test exception on new client with no default profile + metadata.default = None + new2 = self.get_nonexistent_client(metadata) + self.assertRaises(MetadataConsistencyError, + metadata.get_initial_metadata, + new2) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") + def test_validate_client_address(self, mock_resolve_client): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + # this is upper case to ensure that case is folded properly in + # validate_client_address() + mock_resolve_client.return_value = "CLIENT4" + self.assertTrue(metadata.validate_client_address("client4", + ("1.2.3.7", None))) + mock_resolve_client.assert_called_with(("1.2.3.7", None)) + + mock_resolve_client.reset_mock() + self.assertFalse(metadata.validate_client_address("client5", + ("1.2.3.5", None))) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address") + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") + def test_AuthenticateConnection(self, mock_resolve_client, + mock_validate_client_address): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + metadata.password = "password1" + + cert = dict(subject=[[("commonName", "client1")]]) + mock_validate_client_address.return_value = False + self.assertFalse(metadata.AuthenticateConnection(cert, "root", None, + "1.2.3.1")) + mock_validate_client_address.return_value = True + self.assertTrue(metadata.AuthenticateConnection(cert, "root", + metadata.password, + "1.2.3.1")) + + cert = dict(subject=[[("commonName", "client8")]]) + + mock_resolve_client.return_value = "client5" + self.assertTrue(metadata.AuthenticateConnection(None, "root", + "password1", "1.2.3.8")) + + mock_resolve_client.side_effect = MetadataConsistencyError + self.assertFalse(metadata.AuthenticateConnection(None, "root", + "password1", + "1.2.3.8")) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("socket.gethostbyaddr") + def test_resolve_client(self, mock_gethostbyaddr): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3') + self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') + + metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100, + 'client3') + mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3']) + self.assertEqual(metadata.resolve_client(('1.2.3.3', None), + cleanup_cache=True), 'client3') + self.assertEqual(metadata.session_cache, dict()) + + mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6']) + self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6') + mock_gethostbyaddr.assert_called_with('1.2.3.6') + + mock_gethostbyaddr.reset_mock() + mock_gethostbyaddr.return_value = None + mock_gethostbyaddr.side_effect = socket.herror + self.assertRaises(MetadataConsistencyError, + metadata.resolve_client, + ('1.2.3.8', None)) + mock_gethostbyaddr.assert_called_with('1.2.3.8') + + def test_clients_xml_event(self): + pass + + +class TestDBMetadata_ClientsXML(TestDBMetadataBase): + """ test DBMetadata with a clients.xml. """ + __test__ = True + + def load_clients_data(self, metadata=None, xdata=None): + if metadata is None: + metadata = self.get_metadata_object() + metadata.core.fam = Mock() + metadata._handle_file("clients.xml") + metadata = TestMetadata.load_clients_data(self, metadata=metadata, + xdata=xdata) + return TestDBMetadataBase.load_clients_data(self, metadata=metadata, + xdata=xdata) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") + @patch("Bcfg2.Server.Plugins.Metadata.Metadata._handle_clients_xml_event") + @patch("Bcfg2.Server.Plugins.DBMetadata.DBMetadata.list_clients") + def test_clients_xml_event(self, mock_list_clients, mock_handle_event, + mock_load_xml): + metadata = self.get_metadata_object() + metadata.profiles = ["group1", "group2"] + evt = Mock() + evt.filename = os.path.join(datastore, "DBMetadata", "clients.xml") + evt.code2str = Mock(return_value="changed") + metadata.HandleEvent(evt) + self.assertFalse(mock_handle_event.called) + self.assertFalse(mock_load_xml.called) + + mock_load_xml.reset_mock() + mock_handle_event.reset_mock() + mock_list_clients.reset_mock() + metadata._handle_file("clients.xml") + metadata.HandleEvent(evt) + mock_handle_event.assert_called_with(metadata, evt) + mock_list_clients.assert_any_call() + mock_load_xml.assert_any_call() |