summaryrefslogtreecommitdiffstats
path: root/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py')
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py407
1 files changed, 0 insertions, 407 deletions
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py
deleted file mode 100644
index 99cbf1962..000000000
--- a/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py
+++ /dev/null
@@ -1,407 +0,0 @@
-import os
-import sys
-import unittest
-import lxml.etree
-from mock import Mock, patch
-from django.core.management import setup_environ
-
-os.environ['DJANGO_SETTINGS_MODULE'] = "Bcfg2.settings"
-
-import Bcfg2.settings
-Bcfg2.settings.DATABASE_NAME = \
- os.path.join(os.path.dirname(os.path.abspath(__file__)), "test.sqlite")
-Bcfg2.settings.DATABASES['default']['NAME'] = Bcfg2.settings.DATABASE_NAME
-
-import Bcfg2.Server.Plugin
-from Bcfg2.Server.Plugins.DBMetadata import *
-
-from TestMetadata import datastore, groups_test_tree, clients_test_tree, \
- TestMetadata
-
-def test_syncdb():
- # create the test database
- setup_environ(Bcfg2.settings)
- from django.core.management.commands import syncdb
- cmd = syncdb.Command()
- cmd.handle_noargs(interactive=False)
- assert os.path.exists(Bcfg2.settings.DATABASE_NAME)
-
- # ensure that we a) can connect to the database; b) start with a
- # clean database
- MetadataClientModel.objects.all().delete()
- assert list(MetadataClientModel.objects.all()) == []
-
-
-class TestClientVersions(unittest.TestCase):
- test_clients = dict(client1="1.2.0",
- client2="1.2.2",
- client3="1.3.0pre1",
- client4="1.1.0",
- client5=None,
- client6=None)
-
- def setUp(self):
- test_syncdb()
- for client, version in self.test_clients.items():
- MetadataClientModel(hostname=client, version=version).save()
-
- def test__contains(self):
- v = ClientVersions()
- self.assertIn("client1", v)
- self.assertIn("client5", v)
- self.assertNotIn("client__contains", v)
-
- def test_keys(self):
- v = ClientVersions()
- self.assertItemsEqual(self.test_clients.keys(), v.keys())
-
- def test__setitem(self):
- v = ClientVersions()
-
- # test setting version of existing client
- v["client1"] = "1.2.3"
- self.assertIn("client1", v)
- self.assertEqual(v['client1'], "1.2.3")
- client = MetadataClientModel.objects.get(hostname="client1")
- self.assertEqual(client.version, "1.2.3")
-
- # test adding new client
- new = "client__setitem"
- v[new] = "1.3.0"
- self.assertIn(new, v)
- self.assertEqual(v[new], "1.3.0")
- client = MetadataClientModel.objects.get(hostname=new)
- self.assertEqual(client.version, "1.3.0")
-
- # test adding new client with no version
- new2 = "client__setitem_2"
- v[new2] = None
- self.assertIn(new2, v)
- self.assertEqual(v[new2], None)
- client = MetadataClientModel.objects.get(hostname=new2)
- self.assertEqual(client.version, None)
-
- def test__getitem(self):
- v = ClientVersions()
-
- # test getting existing client
- self.assertEqual(v['client2'], "1.2.2")
- self.assertIsNone(v['client5'])
-
- # test exception on nonexistent client. can't use assertRaises
- # for this because assertRaises requires a callable
- try:
- v['clients__getitem']
- assert False
- except KeyError:
- assert True
- except:
- assert False
-
-
-class TestDBMetadataBase(TestMetadata):
- __test__ = False
-
- def __init__(self, *args, **kwargs):
- TestMetadata.__init__(self, *args, **kwargs)
- test_syncdb()
-
- def load_clients_data(self, metadata=None, xdata=None):
- if metadata is None:
- metadata = get_metadata_object()
- for client in clients_test_tree.findall("Client"):
- metadata.add_client(client.get("name"))
- return metadata
-
- def get_metadata_object(self, core=None, watch_clients=False):
- if core is None:
- core = Mock()
- metadata = DBMetadata(core, datastore, watch_clients=watch_clients)
- return metadata
-
- def get_nonexistent_client(self, _, prefix="client"):
- clients = [o.hostname for o in MetadataClientModel.objects.all()]
- i = 0
- client_name = "%s%s" % (prefix, i)
- while client_name in clients:
- i += 1
- client_name = "%s%s" % (prefix, i)
- return client_name
-
- @patch('os.path.exists')
- def test__init(self, mock_exists):
- core = Mock()
- core.fam = Mock()
- mock_exists.return_value = False
- metadata = self.get_metadata_object(core=core, watch_clients=True)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked)
- core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
-
- mock_exists.return_value = True
- core.fam.reset_mock()
- metadata = self.get_metadata_object(core=core, watch_clients=True)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "clients.xml"),
- metadata)
-
- def test_add_group(self):
- pass
-
- def test_add_bundle(self):
- pass
-
- def test_add_client(self):
- metadata = self.get_metadata_object()
- hostname = self.get_nonexistent_client(metadata)
- client = metadata.add_client(hostname)
- self.assertIsInstance(client, MetadataClientModel)
- self.assertEqual(client.hostname, hostname)
- self.assertIn(hostname, metadata.clients)
- self.assertIn(hostname, metadata.list_clients())
- self.assertItemsEqual(metadata.clients,
- [c.hostname
- for c in MetadataClientModel.objects.all()])
-
- def test_update_group(self):
- pass
-
- def test_update_bundle(self):
- pass
-
- def test_update_client(self):
- pass
-
- def test_list_clients(self):
- metadata = self.get_metadata_object()
- self.assertItemsEqual(metadata.list_clients(),
- [c.hostname
- for c in MetadataClientModel.objects.all()])
-
- def test_remove_group(self):
- pass
-
- def test_remove_bundle(self):
- pass
-
- def test_remove_client(self):
- metadata = self.get_metadata_object()
- client_name = self.get_nonexistent_client(metadata)
-
- self.assertRaises(MetadataConsistencyError,
- metadata.remove_client,
- client_name)
-
- metadata.add_client(client_name)
- metadata.remove_client(client_name)
- self.assertNotIn(client_name, metadata.clients)
- self.assertNotIn(client_name, metadata.list_clients())
- self.assertItemsEqual(metadata.clients,
- [c.hostname
- for c in MetadataClientModel.objects.all()])
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.DBMetadata.DBMetadata._set_profile")
- def test_set_profile(self, mock_set_profile):
- TestMetadata.test_set_profile(self,
- inherited_set_profile=mock_set_profile)
-
- def test__set_profile(self):
- metadata = self.get_metadata_object()
- profile = "group1"
- client_name = self.get_nonexistent_client(metadata)
- metadata._set_profile(client_name, profile, None)
- self.assertIn(client_name, metadata.list_clients())
- self.assertIn(client_name, metadata.clientgroups)
- self.assertItemsEqual(metadata.clientgroups[client_name], [profile])
-
- self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
- metadata._set_profile,
- client_name, profile, None)
-
- def test_process_statistics(self):
- pass
-
-
-class TestDBMetadata_NoClientsXML(TestDBMetadataBase):
- """ test DBMetadata without a clients.xml. we have to disable or
- override tests that rely on client options """
- __test__ = True
-
- def __init__(self, *args, **kwargs):
- TestMetadata.__init__(self, *args, **kwargs)
-
- for client in self.clients_test_tree.findall("Client"):
- newclient = lxml.etree.SubElement(self.groups_test_tree.getroot(),
- "Client", name=client.get("name"))
- lxml.etree.SubElement(newclient, "Group",
- name=client.get("profile"))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
- def test_get_initial_metadata(self, mock_clientmetadata):
- metadata = self.get_metadata_object()
- if 'clients.xml' in metadata.states:
- metadata.states['clients.xml'] = False
- self.assertRaises(MetadataRuntimeError,
- metadata.get_initial_metadata, None)
-
- self.load_groups_data(metadata=metadata)
- self.load_clients_data(metadata=metadata)
-
- # test basic client metadata
- metadata.get_initial_metadata("client1")
- self.assertEqual(mock_clientmetadata.call_args[0][:9],
- ("client1", "group1", set(["group1"]), set(), set(),
- set(), dict(category1='group1'), None, None))
-
- # test bundles, category suppression
- metadata.get_initial_metadata("client2")
- self.assertEqual(mock_clientmetadata.call_args[0][:9],
- ("client2", "group2", set(["group2"]),
- set(["bundle1", "bundle2"]), set(), set(),
- dict(category1="group2"), None, None))
-
- # test new client creation
- new1 = self.get_nonexistent_client(metadata)
- imd = metadata.get_initial_metadata(new1)
- self.assertEqual(mock_clientmetadata.call_args[0][:9],
- (new1, "group1", set(["group1"]), set(), set(), set(),
- dict(category1="group1"), None, None))
-
- # test nested groups, per-client groups
- imd = metadata.get_initial_metadata("client8")
- self.assertEqual(mock_clientmetadata.call_args[0][:9],
- ("client8", "group1",
- set(["group1", "group8", "group9", "group10"]), set(),
- set(), set(), dict(category1="group1"), None, None))
-
- # test per-client groups, group negation, nested groups
- imd = metadata.get_initial_metadata("client9")
- self.assertEqual(mock_clientmetadata.call_args[0][:9],
- ("client9", "group2",
- set(["group2", "group8", "group11"]),
- set(["bundle1", "bundle2"]), set(), set(),
- dict(category1="group2"), None, None))
-
- # test exception on new client with no default profile
- metadata.default = None
- new2 = self.get_nonexistent_client(metadata)
- self.assertRaises(MetadataConsistencyError,
- metadata.get_initial_metadata,
- new2)
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
- def test_validate_client_address(self, mock_resolve_client):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- # this is upper case to ensure that case is folded properly in
- # validate_client_address()
- mock_resolve_client.return_value = "CLIENT4"
- self.assertTrue(metadata.validate_client_address("client4",
- ("1.2.3.7", None)))
- mock_resolve_client.assert_called_with(("1.2.3.7", None))
-
- mock_resolve_client.reset_mock()
- self.assertFalse(metadata.validate_client_address("client5",
- ("1.2.3.5", None)))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
- def test_AuthenticateConnection(self, mock_resolve_client,
- mock_validate_client_address):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- metadata.password = "password1"
-
- cert = dict(subject=[[("commonName", "client1")]])
- mock_validate_client_address.return_value = False
- self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
- "1.2.3.1"))
- mock_validate_client_address.return_value = True
- self.assertTrue(metadata.AuthenticateConnection(cert, "root",
- metadata.password,
- "1.2.3.1"))
-
- cert = dict(subject=[[("commonName", "client8")]])
-
- mock_resolve_client.return_value = "client5"
- self.assertTrue(metadata.AuthenticateConnection(None, "root",
- "password1", "1.2.3.8"))
-
- mock_resolve_client.side_effect = MetadataConsistencyError
- self.assertFalse(metadata.AuthenticateConnection(None, "root",
- "password1",
- "1.2.3.8"))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("socket.gethostbyaddr")
- def test_resolve_client(self, mock_gethostbyaddr):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
- self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
-
- metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
- 'client3')
- mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3'])
- self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
- cleanup_cache=True), 'client3')
- self.assertEqual(metadata.session_cache, dict())
-
- mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
- self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
- mock_gethostbyaddr.assert_called_with('1.2.3.6')
-
- mock_gethostbyaddr.reset_mock()
- mock_gethostbyaddr.return_value = None
- mock_gethostbyaddr.side_effect = socket.herror
- self.assertRaises(MetadataConsistencyError,
- metadata.resolve_client,
- ('1.2.3.8', None))
- mock_gethostbyaddr.assert_called_with('1.2.3.8')
-
- def test_clients_xml_event(self):
- pass
-
-
-class TestDBMetadata_ClientsXML(TestDBMetadataBase):
- """ test DBMetadata with a clients.xml. """
- __test__ = True
-
- def load_clients_data(self, metadata=None, xdata=None):
- if metadata is None:
- metadata = self.get_metadata_object()
- metadata.core.fam = Mock()
- metadata._handle_file("clients.xml")
- metadata = TestMetadata.load_clients_data(self, metadata=metadata,
- xdata=xdata)
- return TestDBMetadataBase.load_clients_data(self, metadata=metadata,
- xdata=xdata)
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata._handle_clients_xml_event")
- @patch("Bcfg2.Server.Plugins.DBMetadata.DBMetadata.list_clients")
- def test_clients_xml_event(self, mock_list_clients, mock_handle_event,
- mock_load_xml):
- metadata = self.get_metadata_object()
- metadata.profiles = ["group1", "group2"]
- evt = Mock()
- evt.filename = os.path.join(datastore, "DBMetadata", "clients.xml")
- evt.code2str = Mock(return_value="changed")
- metadata.HandleEvent(evt)
- self.assertFalse(mock_handle_event.called)
- self.assertFalse(mock_load_xml.called)
-
- mock_load_xml.reset_mock()
- mock_handle_event.reset_mock()
- mock_list_clients.reset_mock()
- metadata._handle_file("clients.xml")
- metadata.HandleEvent(evt)
- mock_handle_event.assert_called_with(metadata, evt)
- mock_list_clients.assert_any_call()
- mock_load_xml.assert_any_call()