From 836e995a7ab50c838ebff190f371b493d905a31c Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 13 Aug 2012 15:02:46 -0400 Subject: abstracted out some common test stuff into common.py --- testsuite/Testlib/TestOptions.py | 8 +- testsuite/Testlib/TestServer/TestPlugin.py | 27 +--- .../Testlib/TestServer/TestPlugins/TestMetadata.py | 161 ++++++++++----------- .../Testlib/TestServer/TestPlugins/TestProbes.py | 83 ++++------- testsuite/__init__.py | 0 testsuite/common.py | 77 ++++++++++ 6 files changed, 184 insertions(+), 172 deletions(-) create mode 100644 testsuite/__init__.py create mode 100644 testsuite/common.py diff --git a/testsuite/Testlib/TestOptions.py b/testsuite/Testlib/TestOptions.py index 2129b9287..4c87b2f42 100644 --- a/testsuite/Testlib/TestOptions.py +++ b/testsuite/Testlib/TestOptions.py @@ -3,9 +3,9 @@ import sys import unittest from mock import Mock, patch import Bcfg2.Options +from ..common import * - -class TestOption(unittest.TestCase): +class TestOption(Bcfg2TestCase): def test__init(self): self.assertRaises(Bcfg2.Options.OptionFailure, Bcfg2.Options.Option, @@ -68,7 +68,7 @@ class TestOption(unittest.TestCase): assert o2.value == True -class TestOptionSet(unittest.TestCase): +class TestOptionSet(Bcfg2TestCase): def test_buildGetopt(self): opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')), ('bar', Bcfg2.Options.Option('foo', 'test2')), @@ -108,7 +108,7 @@ class TestOptionSet(unittest.TestCase): self.assertTrue(oset3['foo']) -class TestOptionParser(unittest.TestCase): +class TestOptionParser(Bcfg2TestCase): def test__init(self): opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-h')), ('bar', Bcfg2.Options.Option('foo', 'test2')), diff --git a/testsuite/Testlib/TestServer/TestPlugin.py b/testsuite/Testlib/TestServer/TestPlugin.py index 0ce32eb91..973994b2e 100644 --- a/testsuite/Testlib/TestServer/TestPlugin.py +++ b/testsuite/Testlib/TestServer/TestPlugin.py @@ -6,32 +6,7 @@ import lxml.etree from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugin import * import Bcfg2.Server - -datastore = '/' - -def call(*args, **kwargs): - """ the Mock call object is a fairly recent addition, but it's - very very useful, so we create our own function to create Mock - calls """ - return (args, kwargs) - -class Bcfg2TestCase(unittest.TestCase): - def assertXMLEqual(self, el1, el2, msg=None): - self.assertEqual(el1.tag, el2.tag, msg=msg) - self.assertEqual(el1.text, el2.text, msg=msg) - self.assertItemsEqual(el1.attrib, el2.attrib, msg=msg) - self.assertEqual(len(el1.getchildren()), - len(el2.getchildren())) - for child1 in el1.getchildren(): - cname = child1.get("name") - self.assertIsNotNone(cname, - msg="Element %s has no 'name' attribute" % - child1.tag) - children2 = el2.xpath("*[@name='%s']" % cname) - self.assertEqual(len(children2), 1, - msg="More than one element named %s" % cname) - self.assertXMLEqual(child1, children2[0], msg=msg) - +from ...common import * class FakeElementTree(lxml.etree._ElementTree): xinclude = Mock() diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py index 6926d6eef..3eb242d89 100644 --- a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -6,29 +6,13 @@ import socket import unittest import lxml.etree from mock import Mock, patch - -try: - from django.core.management import setup_environ - has_django = True - - os.environ['DJANGO_SETTINGS_MODULE'] = "Bcfg2.settings" - - import Bcfg2.settings - Bcfg2.settings.DATABASE_NAME = \ - os.path.join(os.path.dirname(os.path.abspath(__file__)), "test.sqlite") - Bcfg2.settings.DATABASES['default']['NAME'] = Bcfg2.settings.DATABASE_NAME -except ImportError: - has_django = False - +from ....common import * import Bcfg2.Server import Bcfg2.Server.Plugin from Bcfg2.Server.Plugins.Metadata import * from ..TestPlugin import TestXMLFileBacked, TestMetadata, TestStatistics, \ TestDatabaseBacked -XI_NAMESPACE = "http://www.w3.org/2001/XInclude" -XI = "{%s}" % XI_NAMESPACE - clients_test_tree = lxml.etree.XML(''' ''').getroottree() -datastore = "/" - -def test_syncdb(): - if not has_django: - raise unittest.SkipTest("Django not found, skipping") - - # create the test database - setup_environ(Bcfg2.settings) - from django.core.management.commands import syncdb - cmd = syncdb.Command() - cmd.handle_noargs(interactive=False) - assert os.path.exists(Bcfg2.settings.DATABASE_NAME) - - # ensure that we a) can connect to the database; b) start with a - # clean database - MetadataClientModel.objects.all().delete() - assert list(MetadataClientModel.objects.all()) == [] def get_metadata_object(core=None, watch_clients=False, use_db=False): if core is None: @@ -115,7 +82,11 @@ def get_metadata_object(core=None, watch_clients=False, use_db=False): return Metadata(core, datastore, watch_clients=watch_clients) -class TestClientVersions(unittest.TestCase): +class TestMetadataDB(DBModelTestCase): + models = [MetadataClientModel] + + +class TestClientVersions(Bcfg2TestCase): test_clients = dict(client1="1.2.0", client2="1.2.2", client3="1.3.0pre1", @@ -124,7 +95,7 @@ class TestClientVersions(unittest.TestCase): client6=None) def setUp(self): - test_syncdb() + syncdb(TestMetadataDB) for client, version in self.test_clients.items(): MetadataClientModel(hostname=client, version=version).save() @@ -361,7 +332,7 @@ class TestXMLMetadataConfig(TestXMLFileBacked): mock_load_xml.assert_called_with() -class TestClientMetadata(unittest.TestCase): +class TestClientMetadata(Bcfg2TestCase): def test_inGroup(self): cm = ClientMetadata("client1", "group1", ["group1", "group2"], ["bundle1"], [], [], [], None, None, None, None) @@ -631,7 +602,8 @@ class TestMetadata(TestMetadata, TestStatistics, TestDatabaseBacked): def load_clients_data(self, metadata=None, xdata=None): if metadata is None: metadata = self.get_metadata_object() - metadata.clients_xml.data = xdata or copy.deepcopy(self.clients_test_tree) + metadata.clients_xml.data = \ + xdata or copy.deepcopy(self.clients_test_tree) metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) evt = Mock() evt.filename = os.path.join(datastore, "Metadata", "clients.xml") @@ -850,48 +822,61 @@ class TestMetadata(TestMetadata, TestStatistics, TestDatabaseBacked): # test address, password metadata.get_initial_metadata("client1") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client1", "group1", set(["group1"]), set(), set(), - set(["1.2.3.1"]), dict(category1='group1'), None, - 'password2')) + mock_clientmetadata.assert_called_with("client1", "group1", + set(["group1"]), set(), set(), + set(["1.2.3.1"]), + dict(category1='group1'), None, + 'password2', None, + metadata.query) # test address, bundles, category suppression metadata.get_initial_metadata("client2") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client2", "group2", set(["group2"]), - set(["bundle1", "bundle2"]), set(), - set(["1.2.3.2"]), dict(category1="group2"), - None, None)) + mock_clientmetadata.assert_called_with("client2", "group2", + set(["group2"]), + set(["bundle1", "bundle2"]), + set(), set(["1.2.3.2"]), + dict(category1="group2"), + None, None, None, + metadata.query) # test aliases, address, uuid, password imd = metadata.get_initial_metadata("alias1") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client3", "group1", set(["group1"]), set(), - set(['alias1']), set(["1.2.3.3"]), - dict(category1="group1"), 'uuid1', 'password2')) + mock_clientmetadata.assert_called_with("client3", "group1", + set(["group1"]), set(), + set(['alias1']), + set(["1.2.3.3"]), + dict(category1="group1"), + 'uuid1', 'password2', None, + metadata.query) # test new client creation new1 = self.get_nonexistent_client(metadata) imd = metadata.get_initial_metadata(new1) - self.assertEqual(mock_clientmetadata.call_args[0][:9], - (new1, "group1", set(["group1"]), set(), - set(), set(), dict(category1="group1"), None, None)) + mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]), + set(), set(), set(), + dict(category1="group1"), None, + None, None, metadata.query) # test nested groups, address, per-client groups imd = metadata.get_initial_metadata("client8") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client8", "group1", - set(["group1", "group8", "group9", "group10"]), set(), - set(), set(["1.2.3.5"]), dict(category1="group1"), - None, None)) + mock_clientmetadata.assert_called_with("client8", "group1", + set(["group1", "group8", + "group9", "group10"]), + set(), + set(), set(["1.2.3.5"]), + dict(category1="group1"), + None, None, None, metadata.query) # test setting per-client groups, group negation, nested groups imd = metadata.get_initial_metadata("client9") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client9", "group2", - set(["group2", "group8", "group11"]), - set(["bundle1", "bundle2"]), set(), set(), - dict(category1="group2"), None, "password3")) + mock_clientmetadata.assert_called_with("client9", "group2", + set(["group2", "group8", + "group11"]), + set(["bundle1", "bundle2"]), + set(), set(), + dict(category1="group2"), None, + "password3", None, + metadata.query) # test new client with no default profile metadata.default = None @@ -1120,7 +1105,7 @@ class TestMetadataBase(TestMetadata): def __init__(self, *args, **kwargs): TestMetadata.__init__(self, *args, **kwargs) - test_syncdb() + syncdb(TestMetadataDB) def setUp(self): if not has_django: @@ -1251,38 +1236,46 @@ class TestMetadata_NoClientsXML(TestMetadataBase): # test basic client metadata metadata.get_initial_metadata("client1") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client1", "group1", set(["group1"]), set(), set(), - set(), dict(category1='group1'), None, None)) + mock_clientmetadata.assert_called_with("client1", "group1", + set(["group1"]), set(), set(), + set(), dict(category1='group1'), + None, None, None, metadata.query) # test bundles, category suppression metadata.get_initial_metadata("client2") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client2", "group2", set(["group2"]), - set(["bundle1", "bundle2"]), set(), set(), - dict(category1="group2"), None, None)) + mock_clientmetadata.assert_called_with("client2", "group2", + set(["group2"]), + set(["bundle1", "bundle2"]), + set(), set(), + dict(category1="group2"), None, + None, None, metadata.query) # test new client creation new1 = self.get_nonexistent_client(metadata) imd = metadata.get_initial_metadata(new1) - self.assertEqual(mock_clientmetadata.call_args[0][:9], - (new1, "group1", set(["group1"]), set(), set(), set(), - dict(category1="group1"), None, None)) + mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]), + set(), set(), set(), + dict(category1="group1"), None, + None, None, metadata.query) # test nested groups, per-client groups imd = metadata.get_initial_metadata("client8") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client8", "group1", - set(["group1", "group8", "group9", "group10"]), set(), - set(), set(), dict(category1="group1"), None, None)) + mock_clientmetadata.assert_called_with("client8", "group1", + set(["group1", "group8", + "group9", "group10"]), + set(), set(), set(), + dict(category1="group1"), None, + None, None, metadata.query) # test per-client groups, group negation, nested groups imd = metadata.get_initial_metadata("client9") - self.assertEqual(mock_clientmetadata.call_args[0][:9], - ("client9", "group2", - set(["group2", "group8", "group11"]), - set(["bundle1", "bundle2"]), set(), set(), - dict(category1="group2"), None, None)) + mock_clientmetadata.assert_called_with("client9", "group2", + set(["group2", "group8", + "group11"]), + set(["bundle1", "bundle2"]), + set(), set(), + dict(category1="group2"), None, + None, None, metadata.query) # test exception on new client with no default profile metadata.default = None diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py index fc901af68..816f9c370 100644 --- a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py @@ -4,54 +4,25 @@ import time import unittest import lxml.etree from mock import Mock, patch - -try: - from django.core.management import setup_environ - has_django = True - - os.environ['DJANGO_SETTINGS_MODULE'] = "Bcfg2.settings" - - import Bcfg2.settings - Bcfg2.settings.DATABASE_NAME = \ - os.path.join(os.path.dirname(os.path.abspath(__file__)), "test.sqlite") - Bcfg2.settings.DATABASES['default']['NAME'] = Bcfg2.settings.DATABASE_NAME -except ImportError: - has_django = False - +from ....common import * import Bcfg2.Server import Bcfg2.Server.Plugin from Bcfg2.Server.Plugins.Probes import * from ..TestPlugin import TestEntrySet, TestProbing, TestConnector, \ TestDatabaseBacked -datastore = "/" - # test data for JSON and YAML tests test_data = dict(a=1, b=[1, 2, 3], c="test") -def test_syncdb(): - if not has_django: - raise unittest.SkipTest("Django not found, skipping") - - # create the test database - setup_environ(Bcfg2.settings) - from django.core.management.commands import syncdb - cmd = syncdb.Command() - cmd.handle_noargs(interactive=False) - assert os.path.exists(Bcfg2.settings.DATABASE_NAME) - - # ensure that we a) can connect to the database; b) start with a - # clean database - ProbesDataModel.objects.all().delete() - ProbesGroupsModel.objects.all().delete() - assert list(ProbesDataModel.objects.all()) == [] - - class FakeList(list): sort = Mock() -class TestClientProbeDataSet(unittest.TestCase): +class TestProbesDB(DBModelTestCase): + models = [ProbesGroupsModel, ProbesDataModel] + + +class TestClientProbeDataSet(Bcfg2TestCase): def test__init(self): ds = ClientProbeDataSet() self.assertLessEqual(ds.timestamp, time.time()) @@ -62,7 +33,7 @@ class TestClientProbeDataSet(unittest.TestCase): self.assertEqual(ds.timestamp, 123) self.assertNotIn("timestamp", ds) -class TestProbeData(unittest.TestCase): +class TestProbeData(Bcfg2TestCase): def test_str(self): # a value that is not valid XML, JSON, or YAML val = "'test" @@ -251,19 +222,20 @@ text "use_database", default=False) + @unittest.skipUnless(has_django, "Django not found, skipping") @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock()) @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock()) - def test_write_data(self): + def test_write_data_xml(self): probes = self.get_probes_object(use_db=False) probes.write_data("test") probes._write_data_xml.assert_called_with("test") self.assertFalse(probes._write_data_db.called) - if not has_django: - self.skipTest("Django not found, skipping") + @unittest.skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock()) + def test_write_data_db(self): probes = self.get_probes_object(use_db=True) - probes._write_data_xml.reset_mock() - probes._write_data_db.reset_mock() probes.write_data("test") probes._write_data_db.assert_called_with("test") self.assertFalse(probes._write_data_xml.called) @@ -324,10 +296,9 @@ text self.assertIsNotNone(jdata.get("value")) self.assertItemsEqual(test_data, json.loads(jdata.get("value"))) + @unittest.skipUnless(has_django, "Django not found, skipping") def test__write_data_db(self): - if not has_django: - self.skipTest("Django not found, skipping") - test_syncdb() + syncdb(TestProbesDB) probes = self.get_probes_object(use_db=True) probes.probedata = self.get_test_probedata() probes.cgroups = self.get_test_cgroups() @@ -377,23 +348,20 @@ text pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() self.assertEqual(len(pgroups), len(probes.cgroups[cname])) + @unittest.skipUnless(has_django, "Django not found, skipping") @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock()) @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock()) - def test_load_data(self): + def test_load_data_xml(self): probes = self.get_probes_object(use_db=False) - probes._load_data_xml.reset_mock() - probes._load_data_db.reset_mock() - probes.load_data() probes._load_data_xml.assert_any_call() self.assertFalse(probes._load_data_db.called) - if not has_django: - self.skipTest("Django not found, skipping") - + @unittest.skipUnless(has_django, "Django not found, skipping") + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock()) + @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock()) + def test_load_data_db(self): probes = self.get_probes_object(use_db=True) - probes._load_data_xml.reset_mock() - probes._load_data_db.reset_mock() probes.load_data() probes._load_data_db.assert_any_call() self.assertFalse(probes._load_data_xml.called) @@ -421,10 +389,9 @@ text self.assertItemsEqual(probes.probedata, self.get_test_probedata()) self.assertItemsEqual(probes.cgroups, self.get_test_cgroups()) + @unittest.skipUnless(has_django, "Django not found, skipping") def test__load_data_db(self): - if not has_django: - self.skipTest("Django not found, skipping") - test_syncdb() + syncdb(TestProbesDB) probes = self.get_probes_object(use_db=True) probes.probedata = self.get_test_probedata() probes.cgroups = self.get_test_cgroups() @@ -468,8 +435,8 @@ text probes.ReceiveData(client, datalist) self.assertItemsEqual(mock_ReceiveDataItem.call_args_list, - [((client, "a"), {}), ((client, "b"), {}), - ((client, "c"), {})]) + [call(client, "a"), call(client, "b"), + call(client, "c")]) mock_write_data.assert_called_with(client) def test_ReceiveDataItem(self): diff --git a/testsuite/__init__.py b/testsuite/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/testsuite/common.py b/testsuite/common.py new file mode 100644 index 000000000..006b45970 --- /dev/null +++ b/testsuite/common.py @@ -0,0 +1,77 @@ +import os +import unittest + +__all__ = ['call', 'datastore', 'Bcfg2TestCase', 'DBModelTestCase', 'syncdb', + 'XI', 'XI_NAMESPACE'] + +datastore = "/" + +XI_NAMESPACE = "http://www.w3.org/2001/XInclude" +XI = "{%s}" % XI_NAMESPACE + +try: + from django.core.management import setup_environ + has_django = True + + os.environ['DJANGO_SETTINGS_MODULE'] = "Bcfg2.settings" + + import Bcfg2.settings + Bcfg2.settings.DATABASE_NAME = \ + os.path.join(os.path.dirname(os.path.abspath(__file__)), "test.sqlite") + Bcfg2.settings.DATABASES['default']['NAME'] = Bcfg2.settings.DATABASE_NAME +except ImportError: + has_django = False + +try: + from mock import call +except ImportError: + def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + + +class Bcfg2TestCase(unittest.TestCase): + def assertXMLEqual(self, el1, el2, msg=None): + self.assertEqual(el1.tag, el2.tag, msg=msg) + self.assertEqual(el1.text, el2.text, msg=msg) + self.assertItemsEqual(el1.attrib, el2.attrib, msg=msg) + self.assertEqual(len(el1.getchildren()), + len(el2.getchildren())) + for child1 in el1.getchildren(): + cname = child1.get("name") + self.assertIsNotNone(cname, + msg="Element %s has no 'name' attribute" % + child1.tag) + children2 = el2.xpath("*[@name='%s']" % cname) + self.assertEqual(len(children2), 1, + msg="More than one element named %s" % cname) + self.assertXMLEqual(child1, children2[0], msg=msg) + + +class DBModelTestCase(Bcfg2TestCase): + models = [] + + @unittest.skipUnless(has_django, "Django not found, skipping") + def test_syncdb(self): + # create the test database + setup_environ(Bcfg2.settings) + from django.core.management.commands import syncdb + cmd = syncdb.Command() + cmd.handle_noargs(interactive=False) + self.assertTrue(os.path.exists(Bcfg2.settings.DATABASE_NAME)) + + @unittest.skipUnless(has_django, "Django not found, skipping") + def test_cleandb(self): + """ ensure that we a) can connect to the database; b) start with a + clean database """ + for model in self.models: + model.objects.all().delete() + self.assertItemsEqual(list(model.objects.all()), []) + + +def syncdb(modeltest): + inst = modeltest(methodName='test_syncdb') + inst.test_syncdb() + inst.test_cleandb() -- cgit v1.2.3-1-g7c22