summaryrefslogtreecommitdiffstats
path: root/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py')
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py478
1 files changed, 434 insertions, 44 deletions
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
index 9aacbe5cc..83da19bad 100644
--- a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
+++ b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
@@ -1,10 +1,25 @@
import os
+import sys
import copy
import time
import socket
import unittest
import lxml.etree
from mock import Mock, patch
+
+try:
+ from django.core.management import setup_environ
+ has_django = True
+
+ os.environ['DJANGO_SETTINGS_MODULE'] = "Bcfg2.settings"
+
+ import Bcfg2.settings
+ Bcfg2.settings.DATABASE_NAME = \
+ os.path.join(os.path.dirname(os.path.abspath(__file__)), "test.sqlite")
+ Bcfg2.settings.DATABASES['default']['NAME'] = Bcfg2.settings.DATABASE_NAME
+except ImportError:
+ has_django = False
+
import Bcfg2.Server
import Bcfg2.Server.Plugin
from Bcfg2.Server.Plugins.Metadata import *
@@ -74,11 +89,95 @@ groups_test_tree = lxml.etree.XML('''
datastore = "/"
-def get_metadata_object(core=None, watch_clients=False):
+def test_syncdb():
+ if not has_django:
+ raise unittest.SkipTest("Django not found, skipping")
+
+ # create the test database
+ setup_environ(Bcfg2.settings)
+ from django.core.management.commands import syncdb
+ cmd = syncdb.Command()
+ cmd.handle_noargs(interactive=False)
+ assert os.path.exists(Bcfg2.settings.DATABASE_NAME)
+
+ # ensure that we a) can connect to the database; b) start with a
+ # clean database
+ MetadataClientModel.objects.all().delete()
+ assert list(MetadataClientModel.objects.all()) == []
+
+def get_metadata_object(core=None, watch_clients=False, use_db=False):
if core is None:
core = Mock()
- metadata = Metadata(core, datastore, watch_clients=watch_clients)
- return metadata
+ core.setup.cfp.getboolean = Mock()
+ core.setup.cfp.getboolean.return_value = use_db
+ return Metadata(core, datastore, watch_clients=watch_clients)
+
+
+class TestClientVersions(unittest.TestCase):
+ test_clients = dict(client1="1.2.0",
+ client2="1.2.2",
+ client3="1.3.0pre1",
+ client4="1.1.0",
+ client5=None,
+ client6=None)
+
+ def setUp(self):
+ test_syncdb()
+ for client, version in self.test_clients.items():
+ MetadataClientModel(hostname=client, version=version).save()
+
+ def test__contains(self):
+ v = ClientVersions()
+ self.assertIn("client1", v)
+ self.assertIn("client5", v)
+ self.assertNotIn("client__contains", v)
+
+ def test_keys(self):
+ v = ClientVersions()
+ self.assertItemsEqual(self.test_clients.keys(), v.keys())
+
+ def test__setitem(self):
+ v = ClientVersions()
+
+ # test setting version of existing client
+ v["client1"] = "1.2.3"
+ self.assertIn("client1", v)
+ self.assertEqual(v['client1'], "1.2.3")
+ client = MetadataClientModel.objects.get(hostname="client1")
+ self.assertEqual(client.version, "1.2.3")
+
+ # test adding new client
+ new = "client__setitem"
+ v[new] = "1.3.0"
+ self.assertIn(new, v)
+ self.assertEqual(v[new], "1.3.0")
+ client = MetadataClientModel.objects.get(hostname=new)
+ self.assertEqual(client.version, "1.3.0")
+
+ # test adding new client with no version
+ new2 = "client__setitem_2"
+ v[new2] = None
+ self.assertIn(new2, v)
+ self.assertEqual(v[new2], None)
+ client = MetadataClientModel.objects.get(hostname=new2)
+ self.assertEqual(client.version, None)
+
+ def test__getitem(self):
+ v = ClientVersions()
+
+ # test getting existing client
+ self.assertEqual(v['client2'], "1.2.2")
+ self.assertIsNone(v['client5'])
+
+ # test exception on nonexistent client. can't use assertRaises
+ # for this because assertRaises requires a callable
+ try:
+ v['clients__getitem']
+ assert False
+ except KeyError:
+ assert True
+ except:
+ assert False
class TestXMLMetadataConfig(unittest.TestCase):
@@ -268,9 +367,11 @@ class TestClientMetadata(unittest.TestCase):
class TestMetadata(unittest.TestCase):
groups_test_tree = groups_test_tree
clients_test_tree = clients_test_tree
+ use_db = False
def get_metadata_object(self, core=None, watch_clients=False):
- return get_metadata_object(core=core, watch_clients=watch_clients)
+ return get_metadata_object(core=core, watch_clients=watch_clients,
+ use_db=self.use_db)
def get_nonexistent_client(self, metadata, prefix="client"):
if metadata is None:
@@ -631,13 +732,7 @@ class TestMetadata(unittest.TestCase):
negated_groups)
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata._set_profile")
- def test_set_profile(self, mock_set_profile, inherited_set_profile=None):
- if inherited_set_profile:
- # allow a subclass of TestMetadata to patch a different
- # _set_profile object and pass it in. this probably isn't
- # the best way to accomplish that, but it seems to work.
- mock_set_profile = inherited_set_profile
+ def test_set_profile(self):
metadata = self.get_metadata_object()
if 'clients.xml' in metadata.states:
metadata.states['clients.xml'] = False
@@ -656,43 +751,48 @@ class TestMetadata(unittest.TestCase):
metadata.set_profile,
"client1", "group3", None)
- metadata.set_profile("client1", "group5", None, force=True)
- mock_set_profile.assert_called_with("client1", "group5", None)
-
- metadata.set_profile("client1", "group3", None, force=True)
- mock_set_profile.assert_called_with("client1", "group3", None)
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_set_profile_db(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ if metadata._use_db:
+ profile = "group1"
+ client_name = self.get_nonexistent_client(metadata)
+ metadata.set_profile(client_name, profile, None)
+ self.assertIn(client_name, metadata.clients)
+ self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
+ metadata.set_profile,
+ client_name, profile, None)
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client")
@patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
- def test__set_profile(self, mock_update_client, mock_add_client):
- metadata = self.get_metadata_object()
- self.load_groups_data(metadata=metadata)
- self.load_clients_data(metadata=metadata)
-
- metadata.clients_xml.write = Mock()
- metadata._set_profile("client1", "group2", None)
- mock_update_client.assert_called_with("client1", dict(profile="group2"))
- metadata.clients_xml.write.assert_any_call()
- self.assertEqual(metadata.clientgroups["client1"], ["group2"])
-
- metadata.clients_xml.write.reset_mock()
- new1 = self.get_nonexistent_client(metadata)
- metadata._set_profile(new1, "group1", None)
- mock_add_client.assert_called_with(new1, dict(profile="group1"))
- metadata.clients_xml.write.assert_any_call()
- self.assertEqual(metadata.clientgroups[new1], ["group1"])
-
- metadata.clients_xml.write.reset_mock()
- new2 = self.get_nonexistent_client(metadata)
- metadata.session_cache[('1.2.3.6', None)] = (None, new2)
- metadata._set_profile("uuid_new", "group1", ('1.2.3.6', None))
- mock_add_client.assert_called_with(new2,
- dict(uuid='uuid_new',
- profile="group1",
- address='1.2.3.6'))
- metadata.clients_xml.write.assert_any_call()
- self.assertEqual(metadata.clientgroups["uuid_new"], ["group1"])
+ def test_set_profile_xml(self, mock_update_client, mock_add_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ if not metadata._use_db:
+ metadata.clients_xml.write = Mock()
+ metadata.set_profile("client1", "group2", None)
+ mock_update_client.assert_called_with("client1",
+ dict(profile="group2"))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clientgroups["client1"], ["group2"])
+
+ metadata.clients_xml.write.reset_mock()
+ new1 = self.get_nonexistent_client(metadata)
+ metadata.set_profile(new1, "group1", None)
+ mock_add_client.assert_called_with(new1, dict(profile="group1"))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clientgroups[new1], ["group1"])
+
+ metadata.clients_xml.write.reset_mock()
+ new2 = self.get_nonexistent_client(metadata)
+ metadata.session_cache[('1.2.3.6', None)] = (None, new2)
+ metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None))
+ mock_add_client.assert_called_with(new2,
+ dict(uuid='uuid_new',
+ profile="group1",
+ address='1.2.3.6'))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clientgroups["uuid_new"], ["group1"])
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch("socket.gethostbyaddr")
@@ -1004,3 +1104,293 @@ class TestMetadata(unittest.TestCase):
def test_viz(self):
pass
+
+
+
+class TestMetadataBase(TestMetadata):
+ """ base test object for testing Metadata with database enabled """
+ __test__ = False
+ use_db = True
+
+ def __init__(self, *args, **kwargs):
+ TestMetadata.__init__(self, *args, **kwargs)
+ test_syncdb()
+
+ def setUp(self):
+ if not has_django:
+ self.skipTest("Django not found, skipping")
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = get_metadata_object()
+ for client in clients_test_tree.findall("Client"):
+ metadata.add_client(client.get("name"))
+ return metadata
+
+ def get_nonexistent_client(self, _, prefix="client"):
+ clients = [o.hostname for o in MetadataClientModel.objects.all()]
+ i = 0
+ client_name = "%s%s" % (prefix, i)
+ while client_name in clients:
+ i += 1
+ client_name = "%s%s" % (prefix, i)
+ return client_name
+
+ @patch('os.path.exists')
+ def test__init(self, mock_exists):
+ core = Mock()
+ core.fam = Mock()
+ mock_exists.return_value = False
+ metadata = self.get_metadata_object(core=core, watch_clients=True)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked)
+ core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+
+ mock_exists.return_value = True
+ core.fam.reset_mock()
+ metadata = self.get_metadata_object(core=core, watch_clients=True)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "clients.xml"),
+ metadata)
+
+ def test_add_group(self):
+ pass
+
+ def test_add_bundle(self):
+ pass
+
+ def test_add_client(self):
+ metadata = self.get_metadata_object()
+ hostname = self.get_nonexistent_client(metadata)
+ client = metadata.add_client(hostname)
+ self.assertIsInstance(client, MetadataClientModel)
+ self.assertEqual(client.hostname, hostname)
+ self.assertIn(hostname, metadata.clients)
+ self.assertIn(hostname, metadata.list_clients())
+ self.assertItemsEqual(metadata.clients,
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_update_group(self):
+ pass
+
+ def test_update_bundle(self):
+ pass
+
+ def test_update_client(self):
+ pass
+
+ def test_list_clients(self):
+ metadata = self.get_metadata_object()
+ self.assertItemsEqual(metadata.list_clients(),
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_remove_group(self):
+ pass
+
+ def test_remove_bundle(self):
+ pass
+
+ def test_remove_client(self):
+ metadata = self.get_metadata_object()
+ client_name = self.get_nonexistent_client(metadata)
+
+ self.assertRaises(MetadataConsistencyError,
+ metadata.remove_client,
+ client_name)
+
+ metadata.add_client(client_name)
+ metadata.remove_client(client_name)
+ self.assertNotIn(client_name, metadata.clients)
+ self.assertNotIn(client_name, metadata.list_clients())
+ self.assertItemsEqual(metadata.clients,
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_process_statistics(self):
+ pass
+
+
+class TestMetadata_NoClientsXML(TestMetadataBase):
+ """ test Metadata without a clients.xml. we have to disable or
+ override tests that rely on client options """
+ __test__ = True
+
+ def __init__(self, *args, **kwargs):
+ TestMetadata.__init__(self, *args, **kwargs)
+
+ for client in self.clients_test_tree.findall("Client"):
+ newclient = lxml.etree.SubElement(self.groups_test_tree.getroot(),
+ "Client", name=client.get("name"))
+ lxml.etree.SubElement(newclient, "Group",
+ name=client.get("profile"))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
+ def test_get_initial_metadata(self, mock_clientmetadata):
+ metadata = self.get_metadata_object()
+ if 'clients.xml' in metadata.states:
+ metadata.states['clients.xml'] = False
+ self.assertRaises(MetadataRuntimeError,
+ metadata.get_initial_metadata, None)
+
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
+
+ # test basic client metadata
+ metadata.get_initial_metadata("client1")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client1", "group1", set(["group1"]), set(), set(),
+ set(), dict(category1='group1'), None, None))
+
+ # test bundles, category suppression
+ metadata.get_initial_metadata("client2")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client2", "group2", set(["group2"]),
+ set(["bundle1", "bundle2"]), set(), set(),
+ dict(category1="group2"), None, None))
+
+ # test new client creation
+ new1 = self.get_nonexistent_client(metadata)
+ imd = metadata.get_initial_metadata(new1)
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ (new1, "group1", set(["group1"]), set(), set(), set(),
+ dict(category1="group1"), None, None))
+
+ # test nested groups, per-client groups
+ imd = metadata.get_initial_metadata("client8")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client8", "group1",
+ set(["group1", "group8", "group9", "group10"]), set(),
+ set(), set(), dict(category1="group1"), None, None))
+
+ # test per-client groups, group negation, nested groups
+ imd = metadata.get_initial_metadata("client9")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client9", "group2",
+ set(["group2", "group8", "group11"]),
+ set(["bundle1", "bundle2"]), set(), set(),
+ dict(category1="group2"), None, None))
+
+ # test exception on new client with no default profile
+ metadata.default = None
+ new2 = self.get_nonexistent_client(metadata)
+ self.assertRaises(MetadataConsistencyError,
+ metadata.get_initial_metadata,
+ new2)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_validate_client_address(self, mock_resolve_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ # this is upper case to ensure that case is folded properly in
+ # validate_client_address()
+ mock_resolve_client.return_value = "CLIENT4"
+ self.assertTrue(metadata.validate_client_address("client4",
+ ("1.2.3.7", None)))
+ mock_resolve_client.assert_called_with(("1.2.3.7", None))
+
+ mock_resolve_client.reset_mock()
+ self.assertFalse(metadata.validate_client_address("client5",
+ ("1.2.3.5", None)))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_AuthenticateConnection(self, mock_resolve_client,
+ mock_validate_client_address):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.password = "password1"
+
+ cert = dict(subject=[[("commonName", "client1")]])
+ mock_validate_client_address.return_value = False
+ self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.1"))
+ mock_validate_client_address.return_value = True
+ self.assertTrue(metadata.AuthenticateConnection(cert, "root",
+ metadata.password,
+ "1.2.3.1"))
+
+ cert = dict(subject=[[("commonName", "client8")]])
+
+ mock_resolve_client.return_value = "client5"
+ self.assertTrue(metadata.AuthenticateConnection(None, "root",
+ "password1", "1.2.3.8"))
+
+ mock_resolve_client.side_effect = MetadataConsistencyError
+ self.assertFalse(metadata.AuthenticateConnection(None, "root",
+ "password1",
+ "1.2.3.8"))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("socket.gethostbyaddr")
+ def test_resolve_client(self, mock_gethostbyaddr):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
+
+ metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
+ 'client3')
+ mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
+ cleanup_cache=True), 'client3')
+ self.assertEqual(metadata.session_cache, dict())
+
+ mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
+ mock_gethostbyaddr.assert_called_with('1.2.3.6')
+
+ mock_gethostbyaddr.reset_mock()
+ mock_gethostbyaddr.return_value = None
+ mock_gethostbyaddr.side_effect = socket.herror
+ self.assertRaises(MetadataConsistencyError,
+ metadata.resolve_client,
+ ('1.2.3.8', None))
+ mock_gethostbyaddr.assert_called_with('1.2.3.8')
+
+ def test_clients_xml_event(self):
+ pass
+
+
+class TestMetadata_ClientsXML(TestMetadataBase):
+ """ test Metadata with a clients.xml. """
+ __test__ = True
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = self.get_metadata_object()
+ metadata.core.fam = Mock()
+ metadata._handle_file("clients.xml")
+ metadata = TestMetadata.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+ return TestMetadataBase.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata._handle_clients_xml_event")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.list_clients")
+ def test_clients_xml_event(self, mock_list_clients, mock_handle_event,
+ mock_load_xml):
+ metadata = self.get_metadata_object()
+ metadata.profiles = ["group1", "group2"]
+ evt = Mock()
+ evt.filename = os.path.join(datastore, "Metadata", "clients.xml")
+ evt.code2str = Mock(return_value="changed")
+ metadata.HandleEvent(evt)
+ self.assertFalse(mock_handle_event.called)
+ self.assertFalse(mock_load_xml.called)
+
+ mock_load_xml.reset_mock()
+ mock_handle_event.reset_mock()
+ mock_list_clients.reset_mock()
+ metadata._handle_file("clients.xml")
+ metadata.HandleEvent(evt)
+ mock_handle_event.assert_called_with(evt)
+ mock_list_clients.assert_any_call()
+ mock_load_xml.assert_any_call()