diff options
Diffstat (limited to 'testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py')
-rw-r--r-- | testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py | 407 |
1 files changed, 0 insertions, 407 deletions
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py deleted file mode 100644 index 99cbf1962..000000000 --- a/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py +++ /dev/null @@ -1,407 +0,0 @@ -import os -import sys -import unittest -import lxml.etree -from mock import Mock, patch -from django.core.management import setup_environ - -os.environ['DJANGO_SETTINGS_MODULE'] = "Bcfg2.settings" - -import Bcfg2.settings -Bcfg2.settings.DATABASE_NAME = \ - os.path.join(os.path.dirname(os.path.abspath(__file__)), "test.sqlite") -Bcfg2.settings.DATABASES['default']['NAME'] = Bcfg2.settings.DATABASE_NAME - -import Bcfg2.Server.Plugin -from Bcfg2.Server.Plugins.DBMetadata import * - -from TestMetadata import datastore, groups_test_tree, clients_test_tree, \ - TestMetadata - -def test_syncdb(): - # create the test database - setup_environ(Bcfg2.settings) - from django.core.management.commands import syncdb - cmd = syncdb.Command() - cmd.handle_noargs(interactive=False) - assert os.path.exists(Bcfg2.settings.DATABASE_NAME) - - # ensure that we a) can connect to the database; b) start with a - # clean database - MetadataClientModel.objects.all().delete() - assert list(MetadataClientModel.objects.all()) == [] - - -class TestClientVersions(unittest.TestCase): - test_clients = dict(client1="1.2.0", - client2="1.2.2", - client3="1.3.0pre1", - client4="1.1.0", - client5=None, - client6=None) - - def setUp(self): - test_syncdb() - for client, version in self.test_clients.items(): - MetadataClientModel(hostname=client, version=version).save() - - def test__contains(self): - v = ClientVersions() - self.assertIn("client1", v) - self.assertIn("client5", v) - self.assertNotIn("client__contains", v) - - def test_keys(self): - v = ClientVersions() - self.assertItemsEqual(self.test_clients.keys(), v.keys()) - - def test__setitem(self): - v = ClientVersions() - - # test setting version of existing client - v["client1"] = "1.2.3" - self.assertIn("client1", v) - self.assertEqual(v['client1'], "1.2.3") - client = MetadataClientModel.objects.get(hostname="client1") - self.assertEqual(client.version, "1.2.3") - - # test adding new client - new = "client__setitem" - v[new] = "1.3.0" - self.assertIn(new, v) - self.assertEqual(v[new], "1.3.0") - client = MetadataClientModel.objects.get(hostname=new) - self.assertEqual(client.version, "1.3.0") - - # test adding new client with no version - new2 = "client__setitem_2" - v[new2] = None - self.assertIn(new2, v) - self.assertEqual(v[new2], None) - client = MetadataClientModel.objects.get(hostname=new2) - self.assertEqual(client.version, None) - - def test__getitem(self): - v = ClientVersions() - - # test getting existing client - self.assertEqual(v['client2'], "1.2.2") - self.assertIsNone(v['client5']) - - # test exception on nonexistent client. can't use assertRaises - # for this because assertRaises requires a callable - try: - v['clients__getitem'] - assert False - except KeyError: - assert True - except: - assert False - - -class TestDBMetadataBase(TestMetadata): - __test__ = False - - def __init__(self, *args, **kwargs): - TestMetadata.__init__(self, *args, **kwargs) - test_syncdb() - - def load_clients_data(self, metadata=None, xdata=None): - if metadata is None: - metadata = get_metadata_object() - for client in clients_test_tree.findall("Client"): - metadata.add_client(client.get("name")) - return metadata - - def get_metadata_object(self, core=None, watch_clients=False): - if core is None: - core = Mock() - metadata = DBMetadata(core, datastore, watch_clients=watch_clients) - return metadata - - def get_nonexistent_client(self, _, prefix="client"): - clients = [o.hostname for o in MetadataClientModel.objects.all()] - i = 0 - client_name = "%s%s" % (prefix, i) - while client_name in clients: - i += 1 - client_name = "%s%s" % (prefix, i) - return client_name - - @patch('os.path.exists') - def test__init(self, mock_exists): - core = Mock() - core.fam = Mock() - mock_exists.return_value = False - metadata = self.get_metadata_object(core=core, watch_clients=True) - self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked) - core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data, - "groups.xml"), - metadata) - - mock_exists.return_value = True - core.fam.reset_mock() - metadata = self.get_metadata_object(core=core, watch_clients=True) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "groups.xml"), - metadata) - core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, - "clients.xml"), - metadata) - - def test_add_group(self): - pass - - def test_add_bundle(self): - pass - - def test_add_client(self): - metadata = self.get_metadata_object() - hostname = self.get_nonexistent_client(metadata) - client = metadata.add_client(hostname) - self.assertIsInstance(client, MetadataClientModel) - self.assertEqual(client.hostname, hostname) - self.assertIn(hostname, metadata.clients) - self.assertIn(hostname, metadata.list_clients()) - self.assertItemsEqual(metadata.clients, - [c.hostname - for c in MetadataClientModel.objects.all()]) - - def test_update_group(self): - pass - - def test_update_bundle(self): - pass - - def test_update_client(self): - pass - - def test_list_clients(self): - metadata = self.get_metadata_object() - self.assertItemsEqual(metadata.list_clients(), - [c.hostname - for c in MetadataClientModel.objects.all()]) - - def test_remove_group(self): - pass - - def test_remove_bundle(self): - pass - - def test_remove_client(self): - metadata = self.get_metadata_object() - client_name = self.get_nonexistent_client(metadata) - - self.assertRaises(MetadataConsistencyError, - metadata.remove_client, - client_name) - - metadata.add_client(client_name) - metadata.remove_client(client_name) - self.assertNotIn(client_name, metadata.clients) - self.assertNotIn(client_name, metadata.list_clients()) - self.assertItemsEqual(metadata.clients, - [c.hostname - for c in MetadataClientModel.objects.all()]) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.DBMetadata.DBMetadata._set_profile") - def test_set_profile(self, mock_set_profile): - TestMetadata.test_set_profile(self, - inherited_set_profile=mock_set_profile) - - def test__set_profile(self): - metadata = self.get_metadata_object() - profile = "group1" - client_name = self.get_nonexistent_client(metadata) - metadata._set_profile(client_name, profile, None) - self.assertIn(client_name, metadata.list_clients()) - self.assertIn(client_name, metadata.clientgroups) - self.assertItemsEqual(metadata.clientgroups[client_name], [profile]) - - self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError, - metadata._set_profile, - client_name, profile, None) - - def test_process_statistics(self): - pass - - -class TestDBMetadata_NoClientsXML(TestDBMetadataBase): - """ test DBMetadata without a clients.xml. we have to disable or - override tests that rely on client options """ - __test__ = True - - def __init__(self, *args, **kwargs): - TestMetadata.__init__(self, *args, **kwargs) - - for client in self.clients_test_tree.findall("Client"): - newclient = lxml.etree.SubElement(self.groups_test_tree.getroot(), - "Client", name=client.get("name")) - lxml.etree.SubElement(newclient, "Group", - name=client.get("profile")) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata") - def test_get_initial_metadata(self, mock_clientmetadata): - metadata = self.get_metadata_object() - if 'clients.xml' in metadata.states: - metadata.states['clients.xml'] = False - self.assertRaises(MetadataRuntimeError, - metadata.get_initial_metadata, None) - - self.load_groups_data(metadata=metadata) - self.load_clients_data(metadata=metadata) - - # test basic client metadata - metadata.get_initial_metadata("client1") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client1", "group1", set(["group1"]), set(), set(), - set(), dict(category1='group1'), None, None)) - - # test bundles, category suppression - metadata.get_initial_metadata("client2") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client2", "group2", set(["group2"]), - set(["bundle1", "bundle2"]), set(), set(), - dict(category1="group2"), None, None)) - - # test new client creation - new1 = self.get_nonexistent_client(metadata) - imd = metadata.get_initial_metadata(new1) - self.assertEqual(mock_clientmetadata.call_args[0][:9], - (new1, "group1", set(["group1"]), set(), set(), set(), - dict(category1="group1"), None, None)) - - # test nested groups, per-client groups - imd = metadata.get_initial_metadata("client8") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client8", "group1", - set(["group1", "group8", "group9", "group10"]), set(), - set(), set(), dict(category1="group1"), None, None)) - - # test per-client groups, group negation, nested groups - imd = metadata.get_initial_metadata("client9") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client9", "group2", - set(["group2", "group8", "group11"]), - set(["bundle1", "bundle2"]), set(), set(), - dict(category1="group2"), None, None)) - - # test exception on new client with no default profile - metadata.default = None - new2 = self.get_nonexistent_client(metadata) - self.assertRaises(MetadataConsistencyError, - metadata.get_initial_metadata, - new2) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") - def test_validate_client_address(self, mock_resolve_client): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - # this is upper case to ensure that case is folded properly in - # validate_client_address() - mock_resolve_client.return_value = "CLIENT4" - self.assertTrue(metadata.validate_client_address("client4", - ("1.2.3.7", None))) - mock_resolve_client.assert_called_with(("1.2.3.7", None)) - - mock_resolve_client.reset_mock() - self.assertFalse(metadata.validate_client_address("client5", - ("1.2.3.5", None))) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address") - @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") - def test_AuthenticateConnection(self, mock_resolve_client, - mock_validate_client_address): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - metadata.password = "password1" - - cert = dict(subject=[[("commonName", "client1")]]) - mock_validate_client_address.return_value = False - self.assertFalse(metadata.AuthenticateConnection(cert, "root", None, - "1.2.3.1")) - mock_validate_client_address.return_value = True - self.assertTrue(metadata.AuthenticateConnection(cert, "root", - metadata.password, - "1.2.3.1")) - - cert = dict(subject=[[("commonName", "client8")]]) - - mock_resolve_client.return_value = "client5" - self.assertTrue(metadata.AuthenticateConnection(None, "root", - "password1", "1.2.3.8")) - - mock_resolve_client.side_effect = MetadataConsistencyError - self.assertFalse(metadata.AuthenticateConnection(None, "root", - "password1", - "1.2.3.8")) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) - @patch("socket.gethostbyaddr") - def test_resolve_client(self, mock_gethostbyaddr): - metadata = self.load_clients_data(metadata=self.load_groups_data()) - metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3') - self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') - - metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100, - 'client3') - mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3']) - self.assertEqual(metadata.resolve_client(('1.2.3.3', None), - cleanup_cache=True), 'client3') - self.assertEqual(metadata.session_cache, dict()) - - mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6']) - self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6') - mock_gethostbyaddr.assert_called_with('1.2.3.6') - - mock_gethostbyaddr.reset_mock() - mock_gethostbyaddr.return_value = None - mock_gethostbyaddr.side_effect = socket.herror - self.assertRaises(MetadataConsistencyError, - metadata.resolve_client, - ('1.2.3.8', None)) - mock_gethostbyaddr.assert_called_with('1.2.3.8') - - def test_clients_xml_event(self): - pass - - -class TestDBMetadata_ClientsXML(TestDBMetadataBase): - """ test DBMetadata with a clients.xml. """ - __test__ = True - - def load_clients_data(self, metadata=None, xdata=None): - if metadata is None: - metadata = self.get_metadata_object() - metadata.core.fam = Mock() - metadata._handle_file("clients.xml") - metadata = TestMetadata.load_clients_data(self, metadata=metadata, - xdata=xdata) - return TestDBMetadataBase.load_clients_data(self, metadata=metadata, - xdata=xdata) - - @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") - @patch("Bcfg2.Server.Plugins.Metadata.Metadata._handle_clients_xml_event") - @patch("Bcfg2.Server.Plugins.DBMetadata.DBMetadata.list_clients") - def test_clients_xml_event(self, mock_list_clients, mock_handle_event, - mock_load_xml): - metadata = self.get_metadata_object() - metadata.profiles = ["group1", "group2"] - evt = Mock() - evt.filename = os.path.join(datastore, "DBMetadata", "clients.xml") - evt.code2str = Mock(return_value="changed") - metadata.HandleEvent(evt) - self.assertFalse(mock_handle_event.called) - self.assertFalse(mock_load_xml.called) - - mock_load_xml.reset_mock() - mock_handle_event.reset_mock() - mock_list_clients.reset_mock() - metadata._handle_file("clients.xml") - metadata.HandleEvent(evt) - mock_handle_event.assert_called_with(metadata, evt) - mock_list_clients.assert_any_call() - mock_load_xml.assert_any_call() |