From be4f6ad065fa17ae34c810f2c09bc9f5fa4d9c23 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 14 Aug 2012 14:44:08 -0400 Subject: added unit tests for EntrySet --- testsuite/Testlib/TestServer/TestPlugin.py | 399 ++++++++++++++++++++++++++++- 1 file changed, 385 insertions(+), 14 deletions(-) (limited to 'testsuite/Testlib/TestServer/TestPlugin.py') diff --git a/testsuite/Testlib/TestServer/TestPlugin.py b/testsuite/Testlib/TestServer/TestPlugin.py index 9ca3243a3..e5baf9a0b 100644 --- a/testsuite/Testlib/TestServer/TestPlugin.py +++ b/testsuite/Testlib/TestServer/TestPlugin.py @@ -1,4 +1,5 @@ import os +import re import copy import logging import unittest @@ -164,32 +165,39 @@ class TestPluginDatabaseModel(Bcfg2TestCase): class TestGenerator(Bcfg2TestCase): - def test_HandleEntry(self): - g = Generator() - self.assertRaises(NotImplementedError, - g.HandleEntry, None, None) + test_obj = Generator class TestStructure(Bcfg2TestCase): + test_obj = Structure + + def get_obj(self): + return self.test_obj() + def test_BuildStructures(self): - s = Structure() + s = self.get_obj() self.assertRaises(NotImplementedError, s.BuildStructures, None) class TestMetadata(Bcfg2TestCase): + test_obj = Metadata + + def get_obj(self): + return self.test_obj() + def test_get_initial_metadata(self): - m = Metadata() + m = self.get_obj() self.assertRaises(NotImplementedError, m.get_initial_metadata, None) def test_merge_additional_data(self): - m = Metadata() + m = self.get_obj() self.assertRaises(NotImplementedError, m.merge_additional_data, None, None, None) def test_merge_additional_groups(self): - m = Metadata() + m = self.get_obj() self.assertRaises(NotImplementedError, m.merge_additional_groups, None, None) @@ -1511,12 +1519,12 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked): pd.get_attrs, entry, metadata) -class SpecificityError(Bcfg2TestCase): +class TestSpecificityError(Bcfg2TestCase): """ placeholder for future tests """ pass -class Specificity(Bcfg2TestCase): +class TestSpecificity(Bcfg2TestCase): test_obj = Specificity def get_obj(self, **kwargs): @@ -1560,7 +1568,6 @@ class Specificity(Bcfg2TestCase): for i in range(len(specs)): for j in range(len(specs)): - print "i=%s, j=%s" % (i, j) if i < j: self.assertGreater(specs[i], specs[j]) self.assertLess(specs[j], specs[i]) @@ -1578,7 +1585,7 @@ class Specificity(Bcfg2TestCase): self.assertGreaterEqual(specs[j], specs[i]) -class SpecificData(Bcfg2TestCase): +class TestSpecificData(Bcfg2TestCase): test_obj = SpecificData def get_obj(self, name="/test.txt", specific=None, encoding=None): @@ -1608,12 +1615,376 @@ class SpecificData(Bcfg2TestCase): class TestEntrySet(TestDebuggable): test_obj = EntrySet + # filenames that should be matched successfully by the EntrySet + # 'specific' regex. these are filenames alone -- a specificity + # will be added to these + basenames = ["test", "test.py", "test with spaces.txt", + "test.multiple.dots.py", "test_underscores.and.dots", + "really_misleading.G10_test", + "name$with*regex(special){chars}", + "misleading.H_hostname.test.com"] + # filenames that do not match any of the basenames (or the + # basename regex, if applicable) + bogus_names = ["bogus"] + # filenames that should be ignored + ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", + "test.txt.genshi_include", "test.G_foo.genshi_include"] + - def get_obj(self, basename="test", path=datastore, entry_type=SpecificData, + def get_obj(self, basename="test", path=datastore, entry_type=MagicMock(), encoding=None): return self.test_obj(basename, path, entry_type, encoding) + def test__init(self): + for basename in self.basenames: + eset = self.get_obj(basename=basename) + self.assertIsInstance(eset.specific, re._pattern_type) + self.assertTrue(eset.specific.match(os.path.join(datastore, + basename))) + ppath = os.path.join(datastore, "Plugin", basename) + self.assertTrue(eset.specific.match(ppath)) + self.assertTrue(eset.specific.match(ppath + ".G20_foo")) + self.assertTrue(eset.specific.match(ppath + ".G1_foo")) + self.assertTrue(eset.specific.match(ppath + ".G32768_foo")) + # a group named '_' + self.assertTrue(eset.specific.match(ppath + ".G10__")) + self.assertTrue(eset.specific.match(ppath + ".H_hostname")) + self.assertTrue(eset.specific.match(ppath + ".H_fqdn.subdomain.example.com")) + self.assertTrue(eset.specific.match(ppath + ".G20_group_with_underscores")) + + self.assertFalse(eset.specific.match(ppath + ".G20_group with spaces")) + self.assertFalse(eset.specific.match(ppath + ".G_foo")) + self.assertFalse(eset.specific.match(ppath + ".G_")) + self.assertFalse(eset.specific.match(ppath + ".G20_")) + self.assertFalse(eset.specific.match(ppath + ".H_")) + + for bogus in self.bogus_names: + self.assertFalse(eset.specific.match(os.path.join(datastore, + "Plugin", + bogus))) + + for ignore in self.ignore: + self.assertTrue(eset.ignore.match(ignore)) + + self.assertFalse(eset.ignore.match(basename)) + self.assertFalse(eset.ignore.match(basename + ".G20_foo")) + self.assertFalse(eset.ignore.match(basename + ".G1_foo")) + self.assertFalse(eset.ignore.match(basename + ".G32768_foo")) + self.assertFalse(eset.ignore.match(basename + ".G10__")) + self.assertFalse(eset.ignore.match(basename + ".H_hostname")) + self.assertFalse(eset.ignore.match(basename + ".H_fqdn.subdomain.example.com")) + self.assertFalse(eset.ignore.match(basename + ".G20_group_with_underscores")) + + def test_get_matching(self): + items = {0: Mock(), 1: Mock(), 2: Mock(), 3: Mock(), 4: Mock(), + 5: Mock()} + items[0].specific.matches.return_value = False + items[1].specific.matches.return_value = True + items[2].specific.matches.return_value = False + items[3].specific.matches.return_value = False + items[4].specific.matches.return_value = True + items[5].specific.matches.return_value = True + metadata = Mock() + eset = self.get_obj() + eset.entries = items + self.assertItemsEqual(eset.get_matching(metadata), + [items[1], items[4], items[5]]) + for i in items.values(): + i.specific.matches.assert_called_with(metadata) + + @patch("Bcfg2.Server.Plugin.%s.get_matching" % test_obj.__name__) + def test_best_matching(self, mock_get_matching): + eset = self.get_obj() + metadata = Mock() + matching = [] + + def reset(): + mock_get_matching.reset_mock() + metadata.reset_mock() + for m in matching: + m.reset_mock() + + def specific(all=False, group=False, prio=None, host=False): + spec = MagicMock() + spec.all = all + spec.group = group + spec.prio = prio + spec.host = host + if prio: + spec.__cmp__ = lambda o: cmp(spec.prio, o.prio) + return spec + + self.assertRaises(PluginExecutionError, + eset.best_matching, metadata, matching=[]) + + reset() + mock_get_matching.return_value = matching + self.assertRaises(PluginExecutionError, + eset.best_matching, metadata) + mock_get_matching.assert_called_with(metadata) + + # test with a single file for all + reset() + matching.insert(0, specific(all=True)) + mock_get_matching.return_value = matching + self.assertEqual(eset.best_matching(metadata), + matching[0]) + mock_get_matching.assert_called_with(metadata) + + # test with a single group-specific file + reset() + matching.insert(0, specific(group=True, prio=10)) + mock_get_matching.return_value = matching + self.assertEqual(eset.best_matching(metadata), + matching[0]) + mock_get_matching.assert_called_with(metadata) + + # test with multiple group-specific files + reset() + matching.insert(0, specific(group=True, prio=20)) + mock_get_matching.return_value = matching + self.assertEqual(eset.best_matching(metadata), + matching[0]) + mock_get_matching.assert_called_with(metadata) + + # test with host-specific file + reset() + matching.insert(0, specific(host=True)) + mock_get_matching.return_value = matching + self.assertEqual(eset.best_matching(metadata), + matching[0]) + mock_get_matching.assert_called_with(metadata) + + @patch("Bcfg2.Server.Plugin.%s.entry_init" % test_obj.__name__) + @patch("Bcfg2.Server.Plugin.%s.reset_metadata" % test_obj.__name__) + @patch("Bcfg2.Server.Plugin.%s.update_metadata" % test_obj.__name__) + def test_handle_event(self, mock_update_md, mock_reset_md, mock_init): + def reset(): + mock_update_md.reset_mock() + mock_reset_md.reset_mock() + mock_init.reset_mock() + + eset = self.get_obj() + for fname in ["info", "info.xml", ":info"]: + for evt in ["exists", "created", "changed"]: + reset() + event = Mock() + event.code2str.return_value = evt + event.filename = fname + eset.handle_event(event) + mock_update_md.assert_called_with(event) + self.assertFalse(mock_init.called) + self.assertFalse(mock_reset_md.called) + + reset() + event = Mock() + event.code2str.return_value = "deleted" + event.filename = fname + eset.handle_event(event) + mock_reset_md.assert_called_with(event) + self.assertFalse(mock_init.called) + self.assertFalse(mock_update_md.called) + + for evt in ["exists", "created", "changed"]: + reset() + event = Mock() + event.code2str.return_value = evt + event.filename = "test.txt" + eset.handle_event(event) + mock_init.assert_called_with(event) + self.assertFalse(mock_reset_md.called) + self.assertFalse(mock_update_md.called) + + reset() + entry = Mock() + eset.entries["test.txt"] = entry + event = Mock() + event.code2str.return_value = "changed" + event.filename = "test.txt" + eset.handle_event(event) + entry.handle_event.assert_called_with(event) + self.assertFalse(mock_init.called) + self.assertFalse(mock_reset_md.called) + self.assertFalse(mock_update_md.called) + + reset() + entry = Mock() + eset.entries["test.txt"] = entry + event = Mock() + event.code2str.return_value = "deleted" + event.filename = "test.txt" + eset.handle_event(event) + self.assertNotIn("test.txt", eset.entries) + + @patch("Bcfg2.Server.Plugin.%s.specificity_from_filename" % + test_obj.__name__) + def test_entry_init(self, mock_spec): + eset = self.get_obj() + + def reset(): + eset.entry_type.reset_mock() + mock_spec.reset_mock() + + event = Mock() + event.code2str.return_value = "created" + event.filename = "test.txt" + eset.entry_init(event) + mock_spec.assert_called_with("test.txt", specific=None) + eset.entry_type.assert_called_with(os.path.join(eset.path, "test.txt"), + mock_spec.return_value, None) + eset.entry_type.return_value.handle_event.assert_called_with(event) + self.assertIn("test.txt", eset.entries) + + # test duplicate add + reset() + eset.entry_init(event) + self.assertFalse(mock_spec.called) + self.assertFalse(eset.entry_type.called) + eset.entries["test.txt"].handle_event.assert_called_with(event) + + # test keyword args + etype = Mock() + specific = Mock() + event = Mock() + event.code2str.return_value = "created" + event.filename = "test2.txt" + eset.entry_init(event, entry_type=etype, specific=specific) + mock_spec.assert_called_with("test2.txt", specific=specific) + etype.assert_called_with(os.path.join(eset.path, "test2.txt"), + mock_spec.return_value, None) + etype.return_value.handle_event.assert_called_with(event) + self.assertIn("test2.txt", eset.entries) + + # test specificity error + event = Mock() + event.code2str.return_value = "created" + event.filename = "test3.txt" + mock_spec.side_effect = SpecificityError + eset.entry_init(event) + mock_spec.assert_called_with("test3.txt", specific=None) + self.assertFalse(eset.entry_type.called) + + @patch("Bcfg2.Server.Plugin.Specificity") + def test_specificity_from_filename(self, mock_spec): + def test(eset, fname, **kwargs): + mock_spec.reset_mock() + if "specific" in kwargs: + specific = kwargs['specific'] + del kwargs['specific'] + else: + specific = None + self.assertEqual(eset.specificity_from_filename(fname, + specific=specific), + mock_spec.return_value) + mock_spec.assert_called_with(**kwargs) + + def fails(eset, fname, specific=None): + mock_spec.reset_mock() + self.assertRaises(SpecificityError, + eset.specificity_from_filename, fname, + specific=specific) + + for basename in self.basenames: + eset = self.get_obj(basename=basename) + ppath = os.path.join(datastore, "Plugin", basename) + test(eset, ppath, all=True) + test(eset, ppath + ".G20_foo", group="foo", prio=20) + test(eset, ppath + ".G1_foo", group="foo", prio=1) + test(eset, ppath + ".G32768_foo", group="foo", prio=32768) + test(eset, ppath + ".G10__", group="_", prio=10) + test(eset, ppath + ".H_hostname", hostname="hostname") + test(eset, ppath + ".H_fqdn.subdomain.example.com", + hostname="fqdn.subdomain.example.com") + test(eset, ppath + ".G20_group_with_underscores", + group="group_with_underscores", prio=20) + + for bogus in self.bogus_names: + fails(eset, bogus) + fails(eset, ppath + ".G_group with spaces") + fails(eset, ppath + ".G_foo") + fails(eset, ppath + ".G_") + fails(eset, ppath + ".G20_") + fails(eset, ppath + ".H_") + + @patch("__builtin__.open") + @patch("Bcfg2.Server.Plugin.InfoXML") + def test_update_metadata(self, mock_InfoXML, mock_open): + eset = self.get_obj() + + # add info.xml + event = Mock() + event.filename = "info.xml" + eset.update_metadata(event) + mock_InfoXML.assert_called_with(os.path.join(eset.path, "info.xml"), + True) + mock_InfoXML.return_value.HandleEvent.assert_called_with(event) + self.assertEqual(eset.infoxml, mock_InfoXML.return_value) + + # modify info.xml + mock_InfoXML.reset_mock() + eset.update_metadata(event) + self.assertFalse(mock_InfoXML.called) + eset.infoxml.HandleEvent.assert_called_with(event) + + for fname in [':info', 'info']: + event = Mock() + event.filename = fname + + idata = ["owner:owner", + "group: GROUP", + "perms: 775", + "important: true", + "bogus: line"] + mock_open.return_value.readlines.return_value = idata + eset.update_metadata(event) + expected = default_file_metadata.copy() + expected['owner'] = 'owner' + expected['group'] = 'GROUP' + expected['perms'] = '0775' + expected['important'] = 'true' + self.assertItemsEqual(eset.metadata, + expected) + + def test_reset_metadata(self): + eset = self.get_obj() + + # test info.xml + event = Mock() + event.filename = "info.xml" + eset.infoxml = Mock() + eset.reset_metadata(event) + self.assertIsNone(eset.infoxml) + + for fname in [':info', 'info']: + event = Mock() + event.filename = fname + eset.metadata = Mock() + eset.reset_metadata(event) + self.assertItemsEqual(eset.metadata, default_file_metadata) + + @patch("Bcfg2.Server.Plugin.bind_info") + def test_bind_info_to_entry(self, mock_bind_info): + eset = self.get_obj() + entry = Mock() + metadata = Mock() + eset.bind_info_to_entry(entry, metadata) + mock_bind_info.assert_called_with(entry, metadata, + infoxml=eset.infoxml, + default=eset.metadata) + + @patch("Bcfg2.Server.Plugin.%s.best_matching" % test_obj.__name__) + @patch("Bcfg2.Server.Plugin.%s.bind_info_to_entry" % test_obj.__name__) + def test_bind_entry(self, mock_bind_info, mock_best_matching): + eset = self.get_obj() + entry = Mock() + metadata = Mock() + eset.bind_entry(entry, metadata) + mock_bind_info.assert_called_with(entry, metadata) + mock_best_matching.assert_called_with(metadata) + mock_best_matching.return_value.bind_entry.assert_called_with(entry, + metadata) + -class GroupSpool(TestPlugin, TestGenerator): +class TestGroupSpool(TestPlugin, TestGenerator): test_obj = GroupSpool pass -- cgit v1.2.3-1-g7c22