summaryrefslogtreecommitdiffstats
path: root/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py')
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py407
1 files changed, 407 insertions, 0 deletions
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py
new file mode 100644
index 000000000..99cbf1962
--- /dev/null
+++ b/testsuite/Testlib/TestServer/TestPlugins/TestDBMetadata.py
@@ -0,0 +1,407 @@
+import os
+import sys
+import unittest
+import lxml.etree
+from mock import Mock, patch
+from django.core.management import setup_environ
+
+os.environ['DJANGO_SETTINGS_MODULE'] = "Bcfg2.settings"
+
+import Bcfg2.settings
+Bcfg2.settings.DATABASE_NAME = \
+ os.path.join(os.path.dirname(os.path.abspath(__file__)), "test.sqlite")
+Bcfg2.settings.DATABASES['default']['NAME'] = Bcfg2.settings.DATABASE_NAME
+
+import Bcfg2.Server.Plugin
+from Bcfg2.Server.Plugins.DBMetadata import *
+
+from TestMetadata import datastore, groups_test_tree, clients_test_tree, \
+ TestMetadata
+
+def test_syncdb():
+ # create the test database
+ setup_environ(Bcfg2.settings)
+ from django.core.management.commands import syncdb
+ cmd = syncdb.Command()
+ cmd.handle_noargs(interactive=False)
+ assert os.path.exists(Bcfg2.settings.DATABASE_NAME)
+
+ # ensure that we a) can connect to the database; b) start with a
+ # clean database
+ MetadataClientModel.objects.all().delete()
+ assert list(MetadataClientModel.objects.all()) == []
+
+
+class TestClientVersions(unittest.TestCase):
+ test_clients = dict(client1="1.2.0",
+ client2="1.2.2",
+ client3="1.3.0pre1",
+ client4="1.1.0",
+ client5=None,
+ client6=None)
+
+ def setUp(self):
+ test_syncdb()
+ for client, version in self.test_clients.items():
+ MetadataClientModel(hostname=client, version=version).save()
+
+ def test__contains(self):
+ v = ClientVersions()
+ self.assertIn("client1", v)
+ self.assertIn("client5", v)
+ self.assertNotIn("client__contains", v)
+
+ def test_keys(self):
+ v = ClientVersions()
+ self.assertItemsEqual(self.test_clients.keys(), v.keys())
+
+ def test__setitem(self):
+ v = ClientVersions()
+
+ # test setting version of existing client
+ v["client1"] = "1.2.3"
+ self.assertIn("client1", v)
+ self.assertEqual(v['client1'], "1.2.3")
+ client = MetadataClientModel.objects.get(hostname="client1")
+ self.assertEqual(client.version, "1.2.3")
+
+ # test adding new client
+ new = "client__setitem"
+ v[new] = "1.3.0"
+ self.assertIn(new, v)
+ self.assertEqual(v[new], "1.3.0")
+ client = MetadataClientModel.objects.get(hostname=new)
+ self.assertEqual(client.version, "1.3.0")
+
+ # test adding new client with no version
+ new2 = "client__setitem_2"
+ v[new2] = None
+ self.assertIn(new2, v)
+ self.assertEqual(v[new2], None)
+ client = MetadataClientModel.objects.get(hostname=new2)
+ self.assertEqual(client.version, None)
+
+ def test__getitem(self):
+ v = ClientVersions()
+
+ # test getting existing client
+ self.assertEqual(v['client2'], "1.2.2")
+ self.assertIsNone(v['client5'])
+
+ # test exception on nonexistent client. can't use assertRaises
+ # for this because assertRaises requires a callable
+ try:
+ v['clients__getitem']
+ assert False
+ except KeyError:
+ assert True
+ except:
+ assert False
+
+
+class TestDBMetadataBase(TestMetadata):
+ __test__ = False
+
+ def __init__(self, *args, **kwargs):
+ TestMetadata.__init__(self, *args, **kwargs)
+ test_syncdb()
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = get_metadata_object()
+ for client in clients_test_tree.findall("Client"):
+ metadata.add_client(client.get("name"))
+ return metadata
+
+ def get_metadata_object(self, core=None, watch_clients=False):
+ if core is None:
+ core = Mock()
+ metadata = DBMetadata(core, datastore, watch_clients=watch_clients)
+ return metadata
+
+ def get_nonexistent_client(self, _, prefix="client"):
+ clients = [o.hostname for o in MetadataClientModel.objects.all()]
+ i = 0
+ client_name = "%s%s" % (prefix, i)
+ while client_name in clients:
+ i += 1
+ client_name = "%s%s" % (prefix, i)
+ return client_name
+
+ @patch('os.path.exists')
+ def test__init(self, mock_exists):
+ core = Mock()
+ core.fam = Mock()
+ mock_exists.return_value = False
+ metadata = self.get_metadata_object(core=core, watch_clients=True)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked)
+ core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+
+ mock_exists.return_value = True
+ core.fam.reset_mock()
+ metadata = self.get_metadata_object(core=core, watch_clients=True)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "clients.xml"),
+ metadata)
+
+ def test_add_group(self):
+ pass
+
+ def test_add_bundle(self):
+ pass
+
+ def test_add_client(self):
+ metadata = self.get_metadata_object()
+ hostname = self.get_nonexistent_client(metadata)
+ client = metadata.add_client(hostname)
+ self.assertIsInstance(client, MetadataClientModel)
+ self.assertEqual(client.hostname, hostname)
+ self.assertIn(hostname, metadata.clients)
+ self.assertIn(hostname, metadata.list_clients())
+ self.assertItemsEqual(metadata.clients,
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_update_group(self):
+ pass
+
+ def test_update_bundle(self):
+ pass
+
+ def test_update_client(self):
+ pass
+
+ def test_list_clients(self):
+ metadata = self.get_metadata_object()
+ self.assertItemsEqual(metadata.list_clients(),
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_remove_group(self):
+ pass
+
+ def test_remove_bundle(self):
+ pass
+
+ def test_remove_client(self):
+ metadata = self.get_metadata_object()
+ client_name = self.get_nonexistent_client(metadata)
+
+ self.assertRaises(MetadataConsistencyError,
+ metadata.remove_client,
+ client_name)
+
+ metadata.add_client(client_name)
+ metadata.remove_client(client_name)
+ self.assertNotIn(client_name, metadata.clients)
+ self.assertNotIn(client_name, metadata.list_clients())
+ self.assertItemsEqual(metadata.clients,
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.DBMetadata.DBMetadata._set_profile")
+ def test_set_profile(self, mock_set_profile):
+ TestMetadata.test_set_profile(self,
+ inherited_set_profile=mock_set_profile)
+
+ def test__set_profile(self):
+ metadata = self.get_metadata_object()
+ profile = "group1"
+ client_name = self.get_nonexistent_client(metadata)
+ metadata._set_profile(client_name, profile, None)
+ self.assertIn(client_name, metadata.list_clients())
+ self.assertIn(client_name, metadata.clientgroups)
+ self.assertItemsEqual(metadata.clientgroups[client_name], [profile])
+
+ self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
+ metadata._set_profile,
+ client_name, profile, None)
+
+ def test_process_statistics(self):
+ pass
+
+
+class TestDBMetadata_NoClientsXML(TestDBMetadataBase):
+ """ test DBMetadata without a clients.xml. we have to disable or
+ override tests that rely on client options """
+ __test__ = True
+
+ def __init__(self, *args, **kwargs):
+ TestMetadata.__init__(self, *args, **kwargs)
+
+ for client in self.clients_test_tree.findall("Client"):
+ newclient = lxml.etree.SubElement(self.groups_test_tree.getroot(),
+ "Client", name=client.get("name"))
+ lxml.etree.SubElement(newclient, "Group",
+ name=client.get("profile"))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
+ def test_get_initial_metadata(self, mock_clientmetadata):
+ metadata = self.get_metadata_object()
+ if 'clients.xml' in metadata.states:
+ metadata.states['clients.xml'] = False
+ self.assertRaises(MetadataRuntimeError,
+ metadata.get_initial_metadata, None)
+
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
+
+ # test basic client metadata
+ metadata.get_initial_metadata("client1")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client1", "group1", set(["group1"]), set(), set(),
+ set(), dict(category1='group1'), None, None))
+
+ # test bundles, category suppression
+ metadata.get_initial_metadata("client2")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client2", "group2", set(["group2"]),
+ set(["bundle1", "bundle2"]), set(), set(),
+ dict(category1="group2"), None, None))
+
+ # test new client creation
+ new1 = self.get_nonexistent_client(metadata)
+ imd = metadata.get_initial_metadata(new1)
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ (new1, "group1", set(["group1"]), set(), set(), set(),
+ dict(category1="group1"), None, None))
+
+ # test nested groups, per-client groups
+ imd = metadata.get_initial_metadata("client8")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client8", "group1",
+ set(["group1", "group8", "group9", "group10"]), set(),
+ set(), set(), dict(category1="group1"), None, None))
+
+ # test per-client groups, group negation, nested groups
+ imd = metadata.get_initial_metadata("client9")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client9", "group2",
+ set(["group2", "group8", "group11"]),
+ set(["bundle1", "bundle2"]), set(), set(),
+ dict(category1="group2"), None, None))
+
+ # test exception on new client with no default profile
+ metadata.default = None
+ new2 = self.get_nonexistent_client(metadata)
+ self.assertRaises(MetadataConsistencyError,
+ metadata.get_initial_metadata,
+ new2)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_validate_client_address(self, mock_resolve_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ # this is upper case to ensure that case is folded properly in
+ # validate_client_address()
+ mock_resolve_client.return_value = "CLIENT4"
+ self.assertTrue(metadata.validate_client_address("client4",
+ ("1.2.3.7", None)))
+ mock_resolve_client.assert_called_with(("1.2.3.7", None))
+
+ mock_resolve_client.reset_mock()
+ self.assertFalse(metadata.validate_client_address("client5",
+ ("1.2.3.5", None)))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_AuthenticateConnection(self, mock_resolve_client,
+ mock_validate_client_address):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.password = "password1"
+
+ cert = dict(subject=[[("commonName", "client1")]])
+ mock_validate_client_address.return_value = False
+ self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.1"))
+ mock_validate_client_address.return_value = True
+ self.assertTrue(metadata.AuthenticateConnection(cert, "root",
+ metadata.password,
+ "1.2.3.1"))
+
+ cert = dict(subject=[[("commonName", "client8")]])
+
+ mock_resolve_client.return_value = "client5"
+ self.assertTrue(metadata.AuthenticateConnection(None, "root",
+ "password1", "1.2.3.8"))
+
+ mock_resolve_client.side_effect = MetadataConsistencyError
+ self.assertFalse(metadata.AuthenticateConnection(None, "root",
+ "password1",
+ "1.2.3.8"))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("socket.gethostbyaddr")
+ def test_resolve_client(self, mock_gethostbyaddr):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
+
+ metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
+ 'client3')
+ mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
+ cleanup_cache=True), 'client3')
+ self.assertEqual(metadata.session_cache, dict())
+
+ mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
+ mock_gethostbyaddr.assert_called_with('1.2.3.6')
+
+ mock_gethostbyaddr.reset_mock()
+ mock_gethostbyaddr.return_value = None
+ mock_gethostbyaddr.side_effect = socket.herror
+ self.assertRaises(MetadataConsistencyError,
+ metadata.resolve_client,
+ ('1.2.3.8', None))
+ mock_gethostbyaddr.assert_called_with('1.2.3.8')
+
+ def test_clients_xml_event(self):
+ pass
+
+
+class TestDBMetadata_ClientsXML(TestDBMetadataBase):
+ """ test DBMetadata with a clients.xml. """
+ __test__ = True
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = self.get_metadata_object()
+ metadata.core.fam = Mock()
+ metadata._handle_file("clients.xml")
+ metadata = TestMetadata.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+ return TestDBMetadataBase.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata._handle_clients_xml_event")
+ @patch("Bcfg2.Server.Plugins.DBMetadata.DBMetadata.list_clients")
+ def test_clients_xml_event(self, mock_list_clients, mock_handle_event,
+ mock_load_xml):
+ metadata = self.get_metadata_object()
+ metadata.profiles = ["group1", "group2"]
+ evt = Mock()
+ evt.filename = os.path.join(datastore, "DBMetadata", "clients.xml")
+ evt.code2str = Mock(return_value="changed")
+ metadata.HandleEvent(evt)
+ self.assertFalse(mock_handle_event.called)
+ self.assertFalse(mock_load_xml.called)
+
+ mock_load_xml.reset_mock()
+ mock_handle_event.reset_mock()
+ mock_list_clients.reset_mock()
+ metadata._handle_file("clients.xml")
+ metadata.HandleEvent(evt)
+ mock_handle_event.assert_called_with(metadata, evt)
+ mock_list_clients.assert_any_call()
+ mock_load_xml.assert_any_call()