From 05122a6f04fbbff9838816f6f713e483811c8ed5 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 18 Dec 2012 11:20:47 -0600 Subject: added unit tests for Rules and Defaults --- src/lib/Bcfg2/Server/Plugin/helpers.py | 8 +- src/lib/Bcfg2/Server/Plugins/Defaults.py | 7 +- src/lib/Bcfg2/Server/Plugins/Rules.py | 7 +- .../TestServer/TestPlugin/Testinterfaces.py | 14 ++- .../Testlib/TestServer/TestPlugins/TestDefaults.py | 67 +++++++++++ .../Testlib/TestServer/TestPlugins/TestProbes.py | 25 ++-- .../Testlib/TestServer/TestPlugins/TestRules.py | 130 +++++++++++++++++++++ 7 files changed, 229 insertions(+), 29 deletions(-) create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py create mode 100644 testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py diff --git a/src/lib/Bcfg2/Server/Plugin/helpers.py b/src/lib/Bcfg2/Server/Plugin/helpers.py index dfda30b39..c6cd006ac 100644 --- a/src/lib/Bcfg2/Server/Plugin/helpers.py +++ b/src/lib/Bcfg2/Server/Plugin/helpers.py @@ -155,13 +155,9 @@ class DatabaseBacked(Plugin): lock before writing, because it does not allow multiple threads to write.""" engine = \ - self.core.setup.cfp.get(Bcfg2.Options.DB_ENGINE.cf[0], - Bcfg2.Options.DB_ENGINE.cf[1], + self.core.setup.cfp.get(*Bcfg2.Options.DB_ENGINE.cf, default=Bcfg2.Options.DB_ENGINE.default) - if engine == 'sqlite3': - return True - else: - return False + return engine == 'sqlite3' @staticmethod def get_db_lock(func): diff --git a/src/lib/Bcfg2/Server/Plugins/Defaults.py b/src/lib/Bcfg2/Server/Plugins/Defaults.py index 53eed3798..f4d86a64f 100644 --- a/src/lib/Bcfg2/Server/Plugins/Defaults.py +++ b/src/lib/Bcfg2/Server/Plugins/Defaults.py @@ -7,7 +7,6 @@ import Bcfg2.Server.Plugins.Rules class Defaults(Bcfg2.Server.Plugins.Rules.Rules, Bcfg2.Server.Plugin.StructureValidator): """Set default attributes on bound entries""" - name = 'Defaults' __author__ = 'bcfg-dev@mcs.anl.gov' # Rules is a Generator that happens to implement all of the @@ -20,16 +19,13 @@ class Defaults(Bcfg2.Server.Plugins.Rules.Rules, def HandlesEntry(self, entry, metadata): return False - def HandleEntry(self, entry, metadata): - raise Bcfg2.Server.Plugin.PluginExecutionError - def HandleEvent(self, event): Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent(self, event) def validate_structures(self, metadata, structures): """ Apply defaults """ for struct in structures: - for entry in struct.iter(): + for entry in struct.getchildren(): if entry.tag.startswith("Bound"): is_bound = True entry.tag = entry.tag[5:] @@ -48,6 +44,7 @@ class Defaults(Bcfg2.Server.Plugins.Rules.Rules, if is_bound: entry.tag = "Bound" + entry.tag + @property def _regex_enabled(self): """ Defaults depends on regex matching, so force it enabled """ return True diff --git a/src/lib/Bcfg2/Server/Plugins/Rules.py b/src/lib/Bcfg2/Server/Plugins/Rules.py index 904876794..21862c5db 100644 --- a/src/lib/Bcfg2/Server/Plugins/Rules.py +++ b/src/lib/Bcfg2/Server/Plugins/Rules.py @@ -6,7 +6,6 @@ import Bcfg2.Server.Plugin class Rules(Bcfg2.Server.Plugin.PrioDir): """This is a generator that handles service assignments.""" - name = 'Rules' __author__ = 'bcfg-dev@mcs.anl.gov' def __init__(self, core, datastore): @@ -19,15 +18,14 @@ class Rules(Bcfg2.Server.Plugin.PrioDir): self.Entries[entry.tag].keys()) return False - def HandleEntry(self, entry, metadata): - return self.BindEntry(entry, metadata) - def BindEntry(self, entry, metadata): attrs = self.get_attrs(entry, metadata) for key, val in list(attrs.items()): if key not in entry.attrib: entry.attrib[key] = val + HandleEntry = BindEntry + def _matches(self, entry, metadata, rules): if Bcfg2.Server.Plugin.PrioDir._matches(self, entry, metadata, rules): return True @@ -48,6 +46,7 @@ class Rules(Bcfg2.Server.Plugin.PrioDir): return True return False + @property def _regex_enabled(self): """ Return True if rules regexes are enabled, False otherwise """ return self.core.setup.cfp.getboolean("rules", "regex", default=False) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py index 343f088b3..35f4e0700 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py @@ -328,15 +328,25 @@ class TestDecision(Bcfg2TestCase): class TestStructureValidator(Bcfg2TestCase): + test_obj = StructureValidator + + def get_obj(self): + return self.test_obj() + def test_validate_structures(self): - sv = StructureValidator() + sv = self.get_obj() self.assertRaises(NotImplementedError, sv.validate_structures, None, None) class TestGoalValidator(Bcfg2TestCase): + test_obj = GoalValidator + + def get_obj(self): + return self.test_obj() + def test_validate_goals(self): - gv = GoalValidator() + gv = self.get_obj() self.assertRaises(NotImplementedError, gv.validate_goals, None, None) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py new file mode 100644 index 000000000..9ed0c3803 --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py @@ -0,0 +1,67 @@ +import os +import sys +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Defaults import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestRules import TestRules +from Testinterfaces import TestStructureValidator + + +class TestDefaults(TestRules, TestStructureValidator): + test_obj = Defaults + + def get_obj(self, *args, **kwargs): + return TestRules.get_obj(self, *args, **kwargs) + + def test_HandlesEntry(self): + d = self.get_obj() + self.assertFalse(d.HandlesEntry(Mock(), Mock())) + + @patch("Bcfg2.Server.Plugin.helpers.XMLDirectoryBacked.HandleEvent") + def test_HandleEvent(self, mock_HandleEvent): + d = self.get_obj() + evt = Mock() + d.HandleEvent(evt) + mock_HandleEvent.assert_called_with(d, evt) + + def test_validate_structures(self): + d = self.get_obj() + d.BindEntry = Mock() + metadata = Mock() + + entries = [] + b1 = lxml.etree.Element("Bundle") + entries.append(lxml.etree.SubElement(b1, "Path", name="/foo")) + entries.append(lxml.etree.SubElement(b1, "Path", name="/bar")) + b2 = lxml.etree.Element("Bundle") + bound = lxml.etree.SubElement(b2, "BoundPath", name="/baz") + entries.append(bound) + entries.append(lxml.etree.SubElement(b2, "Package", name="quux")) + + d.validate_structures(metadata, [b1, b2]) + self.assertItemsEqual(d.BindEntry.call_args_list, + [call(e, metadata) for e in entries]) + # ensure that BoundEntries stay bound + self.assertTrue(bound.tag == "BoundPath") + + def test__matches_regex_disabled(self): + """ cannot disable regex in Defaults plugin """ + pass + + def set_regex_enabled(self, rules_obj, state): + pass + + def test__regex_enabled(self): + r = self.get_obj() + self.assertTrue(r._regex_enabled) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index f12adec69..899fb24a0 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -21,7 +21,8 @@ from TestPlugin import TestEntrySet, TestProbing, TestConnector, \ TestDatabaseBacked # test data for JSON and YAML tests -test_data = dict(a=1, b=[1, 2, 3], c="test") +test_data = dict(a=1, b=[1, 2, 3], c="test", + d=dict(a=1, b=dict(a=1), c=(1, "2", 3))) class FakeList(list): @@ -75,14 +76,14 @@ class TestProbeData(Bcfg2TestCase): data = ProbeData(jdata) self.assertIsNotNone(data.json) self.assertItemsEqual(test_data, data.json) - + @skipUnless(HAS_YAML, "YAML libraries not found, skipping YAML tests") def test_yaml(self): jdata = yaml.dump(test_data) data = ProbeData(jdata) self.assertIsNotNone(data.yaml) self.assertItemsEqual(test_data, data.yaml) - + class TestProbeSet(TestEntrySet): test_obj = ProbeSet @@ -123,7 +124,7 @@ class TestProbeSet(TestEntrySet): evt.filename = "probed.xml" ps.HandleEvent(evt) self.assertFalse(ps.handle_event.called) - + # test that other events are processed appropriately evt.reset_mock() evt.filename = "fooprobe" @@ -133,7 +134,7 @@ class TestProbeSet(TestEntrySet): @patch("%s.list" % builtins, FakeList) def test_get_probe_data(self): ps = self.get_obj() - + # build some fairly complex test data for this. in the end, # we want the probe data to include only the most specific # version of a given probe, and by basename only, not full @@ -390,7 +391,7 @@ text client = Mock() client.hostname = cname probes._write_data_db(client) - + pdata = ProbesDataModel.objects.filter(hostname=cname).all() self.assertEqual(len(pdata), len(probes.probedata[cname])) pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() @@ -476,13 +477,13 @@ text # we use a simple (read: bogus) datalist here to make this # easy to test datalist = ["a", "b", "c"] - + probes = self.get_probes_object() probes.core.metadata_cache_mode = 'off' client = Mock() client.hostname = "foo.example.com" probes.ReceiveData(client, datalist) - + cgroups = [] cprobedata = ClientProbeDataSet() self.assertItemsEqual(mock_ReceiveDataItem.call_args_list, @@ -496,7 +497,7 @@ text probes.cgroups[client.hostname] = datalist probes.core.metadata_cache_mode = 'aggressive' probes.ReceiveData(client, ['a', 'b', 'd']) - + mock_write_data.assert_called_with(client) probes.core.metadata_cache.expire.assert_called_with(client.hostname) @@ -520,7 +521,7 @@ text dataitem.text = str(pdata) probes.ReceiveDataItem(client, dataitem, cgroups, cprobedata) - + probes.cgroups[client.hostname] = cgroups probes.probedata[client.hostname] = cprobedata self.assertIn(client.hostname, probes.probedata) @@ -563,5 +564,5 @@ text metadata.hostname = "nonexistent" self.assertEqual(probes.get_additional_data(metadata), ClientProbeDataSet()) - - + + diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py new file mode 100644 index 000000000..f018b45dc --- /dev/null +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py @@ -0,0 +1,130 @@ +import os +import sys +import lxml.etree +import Bcfg2.Server.Plugin +from mock import Mock, MagicMock, patch +from Bcfg2.Server.Plugins.Rules import * + +# add all parent testsuite directories to sys.path to allow (most) +# relative imports in python 2.4 +path = os.path.dirname(__file__) +while path != "/": + if os.path.basename(path).lower().startswith("test"): + sys.path.append(path) + if os.path.basename(path) == "testsuite": + break + path = os.path.dirname(path) +from common import * +from TestPlugin import TestPrioDir + + +class TestRules(TestPrioDir): + test_obj = Rules + + def test_HandlesEntry(self): + r = self.get_obj() + r.Entries = dict(Path={"/etc/foo.conf": Mock(), + "/etc/bar.conf": Mock()}) + r._matches = Mock() + metadata = Mock() + + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + self.assertEqual(r.HandlesEntry(entry, metadata), + r._matches.return_value) + r._matches.assert_called_with(entry, metadata, + r.Entries['Path'].keys()) + + r._matches.reset_mock() + entry = lxml.etree.Element("Path", name="/etc/baz.conf") + self.assertEqual(r.HandlesEntry(entry, metadata), + r._matches.return_value) + r._matches.assert_called_with(entry, metadata, + r.Entries['Path'].keys()) + + r._matches.reset_mock() + entry = lxml.etree.Element("Package", name="foo") + self.assertFalse(r.HandlesEntry(entry, metadata)) + + def test_BindEntry(self, method="BindEntry"): + r = self.get_obj() + r.get_attrs = Mock() + r.get_attrs.return_value = dict(overwrite="new", add="add", + text="text") + entry = lxml.etree.Element("Test", overwrite="old", keep="keep") + metadata = Mock() + + getattr(r, method)(entry, metadata) + r.get_attrs.assert_called_with(entry, metadata) + self.assertItemsEqual(entry.attrib, + dict(overwrite="old", add="add", keep="keep", + text="text")) + + def test_HandleEntry(self): + self.test_BindEntry(method="HandleEntry") + + @patch("Bcfg2.Server.Plugin.PrioDir._matches") + def test__matches(self, mock_matches): + """ test _matches() behavior regardless of state of _regex_enabled """ + r = self.get_obj() + metadata = Mock() + + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + rules = [] + mock_matches.return_value = True + self.assertTrue(r._matches(entry, metadata, rules)) + mock_matches.assert_called_with(r, entry, metadata, rules) + + # test special Path cases -- adding and removing trailing slash + mock_matches.reset_mock() + mock_matches.return_value = False + rules = ["/etc/foo/", "/etc/bar"] + entry = lxml.etree.Element("Path", name="/etc/foo") + self.assertTrue(r._matches(entry, metadata, rules)) + mock_matches.assert_called_with(r, entry, metadata, rules) + + mock_matches.reset_mock() + entry = lxml.etree.Element("Path", name="/etc/bar/") + self.assertTrue(r._matches(entry, metadata, rules)) + mock_matches.assert_called_with(r, entry, metadata, rules) + + @patch("Bcfg2.Server.Plugin.PrioDir._matches") + def test__matches_regex_disabled(self, mock_matches): + """ test failure to match with regex disabled """ + r = self.get_obj() + self.set_regex_enabled(r, False) + metadata = Mock() + mock_matches.return_value = False + + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + rules = [] + self.assertFalse(r._matches(entry, metadata, rules)) + mock_matches.assert_called_with(r, entry, metadata, rules) + + @patch("Bcfg2.Server.Plugin.PrioDir._matches") + def test__matches_regex_enabled(self, mock_matches): + """ test match with regex enabled """ + r = self.get_obj() + self.set_regex_enabled(r, True) + metadata = Mock() + mock_matches.return_value = False + + entry = lxml.etree.Element("Path", name="/etc/foo.conf") + rules = ["/etc/.*\.conf", "/etc/bar"] + self.assertTrue(r._matches(entry, metadata, rules)) + mock_matches.assert_called_with(r, entry, metadata, rules) + self.assertIn("/etc/.*\.conf", r._regex_cache.keys()) + + def set_regex_enabled(self, rules_obj, state): + """ set the state of regex_enabled for this implementation of + Rules """ + if not isinstance(rules_obj.core.setup, MagicMock): + rules_obj.core.setup = MagicMock() + rules_obj.core.setup.cfp.getboolean.return_value = state + + def test__regex_enabled(self): + r = self.get_obj() + r.core.setup = MagicMock() + self.assertEqual(r._regex_enabled, + r.core.setup.cfp.getboolean.return_value) + r.core.setup.cfp.getboolean.assert_called_with("rules", "regex", + default=False) -- cgit v1.2.3-1-g7c22