summaryrefslogtreecommitdiffstats
path: root/testsuite/Testlib/TestServer
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-09-04 09:52:57 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-09-04 09:52:57 -0400
commitd9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6 (patch)
treecf59b7bcef389ca76f09c7f804db9d893b918e3b /testsuite/Testlib/TestServer
parent6697481f7bed646b4c66c54c9a46d11aa075af4a (diff)
downloadbcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.tar.gz
bcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.tar.bz2
bcfg2-d9d4391b211c0a13cbfeadc9fa63e5bdeba9d2f6.zip
reorganized testsuite to allow tests on stuff outside of src
Diffstat (limited to 'testsuite/Testlib/TestServer')
-rw-r--r--testsuite/Testlib/TestServer/TestPlugin.py2296
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py1470
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestProbes.py549
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestSEModules.py109
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestTemplateHelper.py120
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/__init__.py0
-rw-r--r--testsuite/Testlib/TestServer/__init__.py0
7 files changed, 0 insertions, 4544 deletions
diff --git a/testsuite/Testlib/TestServer/TestPlugin.py b/testsuite/Testlib/TestServer/TestPlugin.py
deleted file mode 100644
index 7ea5b9b42..000000000
--- a/testsuite/Testlib/TestServer/TestPlugin.py
+++ /dev/null
@@ -1,2296 +0,0 @@
-import os
-import re
-import sys
-import copy
-import logging
-import lxml.etree
-import Bcfg2.Server
-from Bcfg2.Compat import reduce
-from mock import Mock, MagicMock, patch
-from Bcfg2.Server.Plugin import *
-
-# add all parent testsuite directories to sys.path to allow (most)
-# relative imports in python 2.4
-path = os.path.dirname(__file__)
-while path != '/':
- if os.path.basename(path).lower().startswith("test"):
- sys.path.append(path)
- if os.path.basename(path) == "testsuite":
- break
- path = os.path.dirname(path)
-from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
- skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
- patchIf, datastore
-
-
-try:
- re_type = re._pattern_type
-except AttributeError:
- re_type = type(re.compile(""))
-
-def tostring(el):
- return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8')
-
-
-class FakeElementTree(lxml.etree._ElementTree):
- xinclude = Mock()
-
-
-class TestFunctions(Bcfg2TestCase):
- def test_bind_info(self):
- entry = lxml.etree.Element("Path", name="/test")
- metadata = Mock()
- default = dict(test1="test1", test2="test2")
- # test without infoxml
- bind_info(entry, metadata, default=default)
- self.assertItemsEqual(entry.attrib,
- dict(test1="test1",
- test2="test2",
- name="/test"))
-
- # test with bogus infoxml
- entry = lxml.etree.Element("Path", name="/test")
- infoxml = Mock()
- self.assertRaises(PluginExecutionError,
- bind_info,
- entry, metadata, infoxml=infoxml)
- infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry)
-
- # test with valid infoxml
- entry = lxml.etree.Element("Path", name="/test")
- infoxml.reset_mock()
- infodata = {None: {"test3": "test3", "test4": "test4"}}
- def infoxml_rv(metadata, rv, entry=None):
- rv['Info'] = infodata
- infoxml.pnode.Match.side_effect = infoxml_rv
- bind_info(entry, metadata, infoxml=infoxml, default=default)
- # mock objects don't properly track the called-with value of
- # arguments whose value is changed by the function, so it
- # thinks Match() was called with the final value of the mdata
- # arg, not the initial value. makes this test a little less
- # worthwhile, TBH.
- infoxml.pnode.Match.assert_called_with(metadata, dict(Info=infodata),
- entry=entry)
- self.assertItemsEqual(entry.attrib,
- dict(test1="test1",
- test2="test2",
- test3="test3",
- test4="test4",
- name="/test"))
-
-
-class TestPluginInitError(Bcfg2TestCase):
- """ placeholder for future tests """
- pass
-
-
-class TestPluginExecutionError(Bcfg2TestCase):
- """ placeholder for future tests """
- pass
-
-
-class TestDebuggable(Bcfg2TestCase):
- test_obj = Debuggable
-
- def get_obj(self):
- return self.test_obj()
-
- def test__init(self):
- d = self.get_obj()
- self.assertIsInstance(d.logger, logging.Logger)
- self.assertFalse(d.debug_flag)
-
- @patch("Bcfg2.Server.Plugin.%s.debug_log" % test_obj.__name__)
- def test_toggle_debug(self, mock_debug):
- d = self.get_obj()
- orig = d.debug_flag
- d.toggle_debug()
- self.assertNotEqual(orig, d.debug_flag)
- self.assertTrue(mock_debug.called)
-
- mock_debug.reset_mock()
-
- changed = d.debug_flag
- d.toggle_debug()
- self.assertNotEqual(changed, d.debug_flag)
- self.assertEqual(orig, d.debug_flag)
- self.assertTrue(mock_debug.called)
-
- def test_debug_log(self):
- d = self.get_obj()
- d.logger = Mock()
- d.debug_flag = False
- d.debug_log("test")
- self.assertFalse(d.logger.error.called)
-
- d.logger.reset_mock()
- d.debug_log("test", flag=True)
- self.assertTrue(d.logger.error.called)
-
- d.logger.reset_mock()
- d.debug_flag = True
- d.debug_log("test")
- self.assertTrue(d.logger.error.called)
-
-
-class TestPlugin(TestDebuggable):
- test_obj = Plugin
-
- def get_obj(self, core=None):
- if core is None:
- core = Mock()
- return self.test_obj(core, datastore)
-
- def test__init(self):
- core = Mock()
- p = self.get_obj(core=core)
- self.assertEqual(p.data, os.path.join(datastore, p.name))
- self.assertEqual(p.core, core)
- self.assertIsInstance(p, Debuggable)
-
- @patch("os.makedirs")
- def test_init_repo(self, mock_makedirs):
- self.test_obj.init_repo(datastore)
- mock_makedirs.assert_called_with(os.path.join(datastore,
- self.test_obj.name))
-
-
-class TestDatabaseBacked(TestPlugin):
- test_obj = DatabaseBacked
-
- @skipUnless(has_django, "Django not found")
- def test__use_db(self):
- core = Mock()
- core.setup.cfp.getboolean.return_value = True
- db = self.get_obj(core)
- self.assertTrue(db._use_db)
-
- core = Mock()
- core.setup.cfp.getboolean.return_value = False
- db = self.get_obj(core)
- self.assertFalse(db._use_db)
-
- Bcfg2.Server.Plugin.has_django = False
- core = Mock()
- db = self.get_obj(core)
- self.assertFalse(db._use_db)
-
- core = Mock()
- core.setup.cfp.getboolean.return_value = True
- db = self.get_obj(core)
- self.assertFalse(db._use_db)
- Bcfg2.Server.Plugin.has_django = True
-
-
-class TestPluginDatabaseModel(Bcfg2TestCase):
- """ placeholder for future tests """
- pass
-
-
-class TestGenerator(Bcfg2TestCase):
- test_obj = Generator
-
- def test_HandlesEntry(self):
- pass
-
- def test_HandleEntry(self):
- pass
-
-
-class TestStructure(Bcfg2TestCase):
- test_obj = Structure
-
- def get_obj(self):
- return self.test_obj()
-
- def test_BuildStructures(self):
- s = self.get_obj()
- self.assertRaises(NotImplementedError,
- s.BuildStructures, None)
-
-
-class TestMetadata(Bcfg2TestCase):
- test_obj = Metadata
-
- def get_obj(self):
- return self.test_obj()
-
- def test_get_initial_metadata(self):
- m = self.get_obj()
- self.assertRaises(NotImplementedError,
- m.get_initial_metadata, None)
-
- def test_merge_additional_data(self):
- m = self.get_obj()
- self.assertRaises(NotImplementedError,
- m.merge_additional_data, None, None, None)
-
- def test_merge_additional_groups(self):
- m = self.get_obj()
- self.assertRaises(NotImplementedError,
- m.merge_additional_groups, None, None)
-
-
-class TestConnector(Bcfg2TestCase):
- """ placeholder """
- def test_get_additional_groups(self):
- pass
-
- def test_get_additional_data(self):
- pass
-
-
-class TestProbing(Bcfg2TestCase):
- """ placeholder """
- def test_GetProbes(self):
- pass
-
- def test_ReceiveData(self):
- pass
-
-
-class TestStatistics(TestPlugin):
- test_obj = Statistics
-
- def test_process_statistics(self):
- pass
-
-
-class TestThreadedStatistics(TestStatistics):
- test_obj = ThreadedStatistics
- data = [("foo.example.com", "<foo/>"),
- ("bar.example.com", "<bar/>")]
-
- @patch("threading.Thread.start")
- def test__init(self, mock_start):
- core = Mock()
- ts = self.get_obj(core)
- mock_start.assert_any_call()
-
- @patch("%s.open" % builtins)
- @patch("%s.dump" % cPickle.__name__)
- @patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock())
- def test_save(self, mock_dump, mock_open):
- core = Mock()
- ts = self.get_obj(core)
- queue = Mock()
- queue.empty = Mock(side_effect=Empty)
- ts.work_queue = queue
-
- mock_open.side_effect = OSError
- # test that save does _not_ raise an exception even when
- # everything goes pear-shaped
- ts.save()
- queue.empty.assert_any_call()
- mock_open.assert_called_with(ts.pending_file, 'w')
-
- queue.reset_mock()
- mock_open.reset_mock()
-
- queue.data = []
- for hostname, xml in self.data:
- md = Mock()
- md.hostname = hostname
- queue.data.append((md, lxml.etree.XML(xml)))
- queue.empty.side_effect = lambda: len(queue.data) == 0
- queue.get_nowait = Mock(side_effect=lambda: queue.data.pop())
- mock_open.side_effect = None
-
- ts.save()
- queue.empty.assert_any_call()
- queue.get_nowait.assert_any_call()
- mock_open.assert_called_with(ts.pending_file, 'w')
- mock_open.return_value.close.assert_any_call()
- # the order of the queue data gets changed, so we have to
- # verify this call in an ugly way
- self.assertItemsEqual(mock_dump.call_args[0][0], self.data)
- self.assertEqual(mock_dump.call_args[0][1], mock_open.return_value)
-
- @patch("os.unlink")
- @patch("os.path.exists")
- @patch("%s.open" % builtins)
- @patch("lxml.etree.XML")
- @patch("%s.load" % cPickle.__name__)
- @patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock())
- def test_load(self, mock_load, mock_XML, mock_open, mock_exists,
- mock_unlink):
- core = Mock()
- core.terminate.isSet.return_value = False
- ts = self.get_obj(core)
-
- ts.work_queue = Mock()
- ts.work_queue.data = []
- def reset():
- core.reset_mock()
- mock_open.reset_mock()
- mock_exists.reset_mock()
- mock_unlink.reset_mock()
- mock_load.reset_mock()
- mock_XML.reset_mock()
- ts.work_queue.reset_mock()
- ts.work_queue.data = []
-
- mock_exists.return_value = False
- self.assertTrue(ts.load())
- mock_exists.assert_called_with(ts.pending_file)
-
- reset()
- mock_exists.return_value = True
- mock_open.side_effect = OSError
- self.assertFalse(ts.load())
- mock_exists.assert_called_with(ts.pending_file)
- mock_open.assert_called_with(ts.pending_file, 'r')
-
- reset()
- mock_open.side_effect = None
- mock_load.return_value = self.data
- ts.work_queue.put_nowait.side_effect = Full
- self.assertTrue(ts.load())
- mock_exists.assert_called_with(ts.pending_file)
- mock_open.assert_called_with(ts.pending_file, 'r')
- mock_open.return_value.close.assert_any_call()
- mock_load.assert_called_with(mock_open.return_value)
-
- reset()
- core.build_metadata.side_effect = lambda x: x
- mock_XML.side_effect = lambda x, parser=None: x
- ts.work_queue.put_nowait.side_effect = None
- self.assertTrue(ts.load())
- mock_exists.assert_called_with(ts.pending_file)
- mock_open.assert_called_with(ts.pending_file, 'r')
- mock_open.return_value.close.assert_any_call()
- mock_load.assert_called_with(mock_open.return_value)
- self.assertItemsEqual(mock_XML.call_args_list,
- [call(x, parser=Bcfg2.Server.XMLParser)
- for h, x in self.data])
- self.assertItemsEqual(ts.work_queue.put_nowait.call_args_list,
- [call((h, x)) for h, x in self.data])
- mock_unlink.assert_called_with(ts.pending_file)
-
- @patch("threading.Thread.start", Mock())
- @patch("Bcfg2.Server.Plugin.ThreadedStatistics.load")
- @patch("Bcfg2.Server.Plugin.ThreadedStatistics.save")
- @patch("Bcfg2.Server.Plugin.ThreadedStatistics.handle_statistic")
- def test_run(self, mock_handle, mock_save, mock_load):
- core = Mock()
- ts = self.get_obj(core)
- mock_load.return_value = True
- ts.work_queue = Mock()
-
- def reset():
- mock_handle.reset_mock()
- mock_save.reset_mock()
- mock_load.reset_mock()
- core.reset_mock()
- ts.work_queue.reset_mock()
- ts.work_queue.data = self.data[:]
- ts.work_queue.get_calls = 0
-
- reset()
-
- def get_rv(**kwargs):
- ts.work_queue.get_calls += 1
- try:
- return ts.work_queue.data.pop()
- except:
- raise Empty
- ts.work_queue.get.side_effect = get_rv
- def terminate_isset():
- # this lets the loop go on a few iterations with an empty
- # queue to test that it doesn't error out
- return ts.work_queue.get_calls > 3
- core.terminate.isSet.side_effect = terminate_isset
-
- ts.work_queue.empty.return_value = False
- ts.run()
- mock_load.assert_any_call()
- self.assertGreaterEqual(ts.work_queue.get.call_count, len(self.data))
- self.assertItemsEqual(mock_handle.call_args_list,
- [call(h, x) for h, x in self.data])
- mock_save.assert_any_call()
-
- @patch("copy.copy", Mock(side_effect=lambda x: x))
- @patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock())
- def test_process_statistics(self):
- TestStatistics.test_process_statistics(self)
-
- core = Mock()
- ts = self.get_obj(core)
- ts.work_queue = Mock()
- ts.process_statistics(*self.data[0])
- ts.work_queue.put_nowait.assert_called_with(self.data[0])
-
- ts.work_queue.reset_mock()
- ts.work_queue.put_nowait.side_effect = Full
- # test that no exception is thrown
- ts.process_statistics(*self.data[0])
-
-
-class TestPullSource(Bcfg2TestCase):
- def test_GetCurrentEntry(self):
- ps = PullSource()
- self.assertRaises(NotImplementedError,
- ps.GetCurrentEntry, None, None, None)
-
-
-class TestPullTarget(Bcfg2TestCase):
- def test_AcceptChoices(self):
- pt = PullTarget()
- self.assertRaises(NotImplementedError,
- pt.AcceptChoices, None, None)
-
- def test_AcceptPullData(self):
- pt = PullTarget()
- self.assertRaises(NotImplementedError,
- pt.AcceptPullData, None, None, None)
-
-
-class TestDecision(Bcfg2TestCase):
- """ placeholder for future tests """
- pass
-
-
-class TestValidationError(Bcfg2TestCase):
- """ placeholder for future tests """
- pass
-
-
-class TestStructureValidator(Bcfg2TestCase):
- def test_validate_structures(self):
- sv = StructureValidator()
- self.assertRaises(NotImplementedError,
- sv.validate_structures, None, None)
-
-
-class TestGoalValidator(Bcfg2TestCase):
- def test_validate_goals(self):
- gv = GoalValidator()
- self.assertRaises(NotImplementedError,
- gv.validate_goals, None, None)
-
-
-class TestVersion(Bcfg2TestCase):
- """ placeholder for future tests """
- pass
-
-
-class TestClientRunHooks(Bcfg2TestCase):
- """ placeholder for future tests """
- pass
-
-
-class TestFileBacked(Bcfg2TestCase):
- test_obj = FileBacked
- path = os.path.join(datastore, "test")
-
- def get_obj(self, path=None, fam=None):
- if path is None:
- path = self.path
- return self.test_obj(path, fam=fam)
-
- @patch("%s.open" % builtins)
- def test_HandleEvent(self, mock_open):
- fb = self.get_obj()
- fb.Index = Mock()
-
- def reset():
- fb.Index.reset_mock()
- mock_open.reset_mock()
-
- for evt in ["exists", "changed", "created"]:
- reset()
- event = Mock()
- event.code2str.return_value = evt
- fb.HandleEvent(event)
- mock_open.assert_called_with(self.path)
- mock_open.return_value.read.assert_any_call()
- fb.Index.assert_any_call()
-
- reset()
- event = Mock()
- event.code2str.return_value = "endExist"
- fb.HandleEvent(event)
- self.assertFalse(mock_open.called)
- self.assertFalse(fb.Index.called)
-
-
-class TestDirectoryBacked(Bcfg2TestCase):
- test_obj = DirectoryBacked
- testpaths = {1: '',
- 2: '/foo',
- 3: '/foo/bar',
- 4: '/foo/bar/baz',
- 5: 'quux',
- 6: 'xyzzy/',
- 7: 'xyzzy/plugh/'}
- testfiles = ['foo', 'bar/baz.txt', 'plugh.py']
- ignore = [] # ignore no events
- badevents = [] # DirectoryBacked handles all files, so there's no
- # such thing as a bad event
-
- def test_child_interface(self):
- # ensure that the child object has the correct interface
- self.assertTrue(hasattr(self.test_obj.__child__, "HandleEvent"))
-
- @patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__,
- Mock())
- def get_obj(self, fam=None):
- if fam is None:
- fam = Mock()
- return self.test_obj(os.path.join(datastore, self.test_obj.__name__),
- fam)
-
- @patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__)
- def test__init(self, mock_add_monitor):
- db = self.test_obj(datastore, Mock())
- mock_add_monitor.assert_called_with('')
-
- def test__getitem(self):
- db = self.get_obj()
- db.entries.update(dict(a=1, b=2, c=3))
- self.assertEqual(db['a'], 1)
- self.assertEqual(db['b'], 2)
- expected = KeyError
- try:
- db['d']
- except expected:
- pass
- except:
- err = sys.exc_info()[1]
- self.assertFalse(True, "%s raised instead of %s" %
- (err.__class__.__name__,
- expected.__class__.__name__))
- else:
- self.assertFalse(True,
- "%s not raised" % expected.__class__.__name__)
-
- def test__iter(self):
- db = self.get_obj()
- db.entries.update(dict(a=1, b=2, c=3))
- self.assertEqual([i for i in db],
- [i for i in db.entries.items()])
-
- @patch("os.path.isdir")
- def test_add_directory_monitor(self, mock_isdir):
- db = self.get_obj()
- db.fam = Mock()
- db.fam.rv = 0
-
- def reset():
- db.fam.rv += 1
- db.fam.AddMonitor.return_value = db.fam.rv
- db.fam.reset_mock()
- mock_isdir.reset_mock()
-
- mock_isdir.return_value = True
- for path in self.testpaths.values():
- reset()
- db.add_directory_monitor(path)
- db.fam.AddMonitor.assert_called_with(os.path.join(db.data, path),
- db)
- self.assertIn(db.fam.rv, db.handles)
- self.assertEqual(db.handles[db.fam.rv], path)
-
- reset()
- # test duplicate adds
- for path in self.testpaths.values():
- reset()
- db.add_directory_monitor(path)
- self.assertFalse(db.fam.AddMonitor.called)
-
- reset()
- mock_isdir.return_value = False
- db.add_directory_monitor('bogus')
- self.assertFalse(db.fam.AddMonitor.called)
- self.assertNotIn(db.fam.rv, db.handles)
-
- def test_add_entry(self):
- db = self.get_obj()
- db.fam = Mock()
- class MockChild(Mock):
- def __init__(self, path, fam, **kwargs):
- Mock.__init__(self, **kwargs)
- self.path = path
- self.fam = fam
- self.HandleEvent = Mock()
- db.__child__ = MockChild
-
- for path in self.testpaths.values():
- event = Mock()
- db.add_entry(path, event)
- self.assertIn(path, db.entries)
- self.assertEqual(db.entries[path].path,
- os.path.join(db.data, path))
- self.assertEqual(db.entries[path].fam, db.fam)
- db.entries[path].HandleEvent.assert_called_with(event)
-
- @patch("os.path.isdir")
- @patch("Bcfg2.Server.Plugin.%s.add_entry" % test_obj.__name__)
- @patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__)
- def test_HandleEvent(self, mock_add_monitor, mock_add_entry, mock_isdir):
- db = self.get_obj()
- # a path with a leading / should never get into
- # DirectoryBacked.handles, so strip that test case
- for rid, path in self.testpaths.items():
- path = path.lstrip('/')
- db.handles[rid] = path
-
- def reset():
- mock_isdir.reset_mock()
- mock_add_entry.reset_mock()
- mock_add_monitor.reset_mock()
-
- def get_event(filename, action, requestID):
- event = Mock()
- event.code2str.return_value = action
- event.filename = filename
- event.requestID = requestID
- return event
-
- # test events on the data directory itself
- reset()
- mock_isdir.return_value = True
- event = get_event(db.data, "exists", 1)
- db.HandleEvent(event)
- mock_add_monitor.assert_called_with("")
-
- # test events on paths that aren't handled
- reset()
- mock_isdir.return_value = False
- event = get_event('/' + self.testfiles[0], 'created',
- max(self.testpaths.keys()) + 1)
- db.HandleEvent(event)
- self.assertFalse(mock_add_monitor.called)
- self.assertFalse(mock_add_entry.called)
-
- for req_id, path in self.testpaths.items():
- # a path with a leading / should never get into
- # DirectoryBacked.handles, so strip that test case
- path = path.lstrip('/')
- basepath = os.path.join(datastore, path)
- for fname in self.testfiles:
- relpath = os.path.join(path, fname)
- abspath = os.path.join(basepath, fname)
-
- # test endExist does nothing
- reset()
- event = get_event(fname, 'endExist', req_id)
- db.HandleEvent(event)
- self.assertFalse(mock_add_monitor.called)
- self.assertFalse(mock_add_entry.called)
-
- mock_isdir.return_value = True
- for evt in ["created", "exists", "changed"]:
- # test that creating or changing a directory works
- reset()
- event = get_event(fname, evt, req_id)
- db.HandleEvent(event)
- mock_add_monitor.assert_called_with(relpath)
- self.assertFalse(mock_add_entry.called)
-
- mock_isdir.return_value = False
- for evt in ["created", "exists"]:
- # test that creating a file works
- reset()
- event = get_event(fname, evt, req_id)
- db.HandleEvent(event)
- mock_add_entry.assert_called_with(relpath, event)
- self.assertFalse(mock_add_monitor.called)
- db.entries[relpath] = MagicMock()
-
- # test that changing a file that already exists works
- reset()
- event = get_event(fname, "changed", req_id)
- db.HandleEvent(event)
- db.entries[relpath].HandleEvent.assert_called_with(event)
- self.assertFalse(mock_add_monitor.called)
- self.assertFalse(mock_add_entry.called)
-
- # test that deleting an entry works
- reset()
- event = get_event(fname, "deleted", req_id)
- db.HandleEvent(event)
- self.assertNotIn(relpath, db.entries)
-
- # test that changing a file that doesn't exist works
- reset()
- event = get_event(fname, "changed", req_id)
- db.HandleEvent(event)
- mock_add_entry.assert_called_with(relpath, event)
- self.assertFalse(mock_add_monitor.called)
- db.entries[relpath] = MagicMock()
-
- # test that deleting a directory works. this is a little
- # strange because the _parent_ directory has to handle the
- # deletion
- reset()
- event = get_event('quux', "deleted", 1)
- db.HandleEvent(event)
- for key in db.entries.keys():
- self.assertFalse(key.startswith('quux'))
-
- # test bad events
- for fname in self.badevents:
- reset()
- event = get_event(fname, "created", 1)
- db.HandleEvent(event)
- self.assertFalse(mock_add_entry.called)
- self.assertFalse(mock_add_monitor.called)
-
- # test ignored events
- for fname in self.ignore:
- reset()
- event = get_event(fname, "created", 1)
- db.HandleEvent(event)
- self.assertFalse(mock_isdir.called,
- msg="Failed to ignore %s" % fname)
- self.assertFalse(mock_add_entry.called,
- msg="Failed to ignore %s" % fname)
- self.assertFalse(mock_add_monitor.called,
- msg="Failed to ignore %s" % fname)
-
-
-class TestXMLFileBacked(TestFileBacked):
- test_obj = XMLFileBacked
- path = os.path.join(datastore, "test", "test1.xml")
-
- def get_obj(self, path=None, fam=None, should_monitor=False):
- if path is None:
- path = self.path
- return self.test_obj(path, fam=fam, should_monitor=should_monitor)
-
- def test__init(self):
- fam = Mock()
- xfb = self.get_obj()
- self.assertIsNone(xfb.fam)
-
- xfb = self.get_obj(fam=fam)
- self.assertFalse(fam.AddMonitor.called)
-
- fam.reset_mock()
- xfb = self.get_obj(fam=fam, should_monitor=True)
- fam.AddMonitor.assert_called_with(self.path, xfb)
-
- @patch("os.path.exists")
- @patch("lxml.etree.parse")
- def test_follow_xincludes(self, mock_parse, mock_exists):
- xfb = self.get_obj()
- xfb.add_monitor = Mock()
-
- def reset():
- xfb.add_monitor.reset_mock()
- mock_parse.reset_mock()
- mock_exists.reset_mock()
- xfb.extras = []
-
- mock_exists.return_value = True
- xdata = dict()
- mock_parse.side_effect = lambda p: xdata[p]
-
- # basic functionality
- xdata['/test/test2.xml'] = lxml.etree.Element("Test").getroottree()
- xfb._follow_xincludes(xdata=xdata['/test/test2.xml'])
- self.assertFalse(xfb.add_monitor.called)
-
- if (not hasattr(self.test_obj, "xdata") or
- not isinstance(self.test_obj.xdata, property)):
- # if xdata is settable, test that method of getting data
- # to _follow_xincludes
- reset()
- xfb.xdata = xdata['/test/test2.xml'].getroot()
- xfb._follow_xincludes()
- self.assertFalse(xfb.add_monitor.called)
- xfb.xdata = None
-
- reset()
- xfb._follow_xincludes(fname="/test/test2.xml")
- self.assertFalse(xfb.add_monitor.called)
-
- # test one level of xinclude
- xdata[self.path] = lxml.etree.Element("Test").getroottree()
- lxml.etree.SubElement(xdata[self.path].getroot(),
- Bcfg2.Server.XI_NAMESPACE + "include",
- href="/test/test2.xml")
- reset()
- xfb._follow_xincludes(fname=self.path)
- xfb.add_monitor.assert_called_with("/test/test2.xml")
- self.assertItemsEqual(mock_parse.call_args_list,
- [call(f) for f in xdata.keys()])
- mock_exists.assert_called_with("/test/test2.xml")
-
- reset()
- xfb._follow_xincludes(fname=self.path, xdata=xdata[self.path])
- xfb.add_monitor.assert_called_with("/test/test2.xml")
- self.assertItemsEqual(mock_parse.call_args_list,
- [call(f) for f in xdata.keys()
- if f != self.path])
- mock_exists.assert_called_with("/test/test2.xml")
-
- # test two-deep level of xinclude, with some files in another
- # directory
- xdata["/test/test3.xml"] = \
- lxml.etree.Element("Test").getroottree()
- lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(),
- Bcfg2.Server.XI_NAMESPACE + "include",
- href="/test/test_dir/test4.xml")
- xdata["/test/test_dir/test4.xml"] = \
- lxml.etree.Element("Test").getroottree()
- lxml.etree.SubElement(xdata["/test/test_dir/test4.xml"].getroot(),
- Bcfg2.Server.XI_NAMESPACE + "include",
- href="/test/test_dir/test5.xml")
- xdata['/test/test_dir/test5.xml'] = \
- lxml.etree.Element("Test").getroottree()
- xdata['/test/test_dir/test6.xml'] = \
- lxml.etree.Element("Test").getroottree()
- # relative includes
- lxml.etree.SubElement(xdata[self.path].getroot(),
- Bcfg2.Server.XI_NAMESPACE + "include",
- href="test3.xml")
- lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(),
- Bcfg2.Server.XI_NAMESPACE + "include",
- href="test_dir/test6.xml")
-
- reset()
- xfb._follow_xincludes(fname=self.path)
- self.assertItemsEqual(xfb.add_monitor.call_args_list,
- [call(f) for f in xdata.keys() if f != self.path])
- self.assertItemsEqual(mock_parse.call_args_list,
- [call(f) for f in xdata.keys()])
- self.assertItemsEqual(mock_exists.call_args_list,
- [call(f) for f in xdata.keys() if f != self.path])
-
- reset()
- xfb._follow_xincludes(fname=self.path, xdata=xdata[self.path])
- self.assertItemsEqual(xfb.add_monitor.call_args_list,
- [call(f) for f in xdata.keys() if f != self.path])
- self.assertItemsEqual(mock_parse.call_args_list,
- [call(f) for f in xdata.keys() if f != self.path])
- self.assertItemsEqual(mock_exists.call_args_list,
- [call(f) for f in xdata.keys() if f != self.path])
-
- @patch("lxml.etree._ElementTree", FakeElementTree)
- @patch("Bcfg2.Server.Plugin.%s._follow_xincludes" % test_obj.__name__)
- def test_Index(self, mock_follow):
- xfb = self.get_obj()
-
- def reset():
- mock_follow.reset_mock()
- FakeElementTree.xinclude.reset_mock()
- xfb.extras = []
- xfb.xdata = None
-
- # syntax error
- xfb.data = "<"
- self.assertRaises(PluginInitError, xfb.Index)
-
- # no xinclude
- reset()
- xdata = lxml.etree.Element("Test", name="test")
- children = [lxml.etree.SubElement(xdata, "Foo"),
- lxml.etree.SubElement(xdata, "Bar", name="bar")]
- xfb.data = tostring(xdata)
- xfb.Index()
- mock_follow.assert_any_call()
- try:
- self.assertEqual(xfb.xdata.base, self.path)
- except AttributeError:
- # python 2.4 and/or lxml 2.0 don't store the base_url in
- # .base -- no idea where it's stored.
- pass
- self.assertItemsEqual([tostring(e) for e in xfb.entries],
- [tostring(e) for e in children])
-
- # with xincludes
- reset()
- mock_follow.side_effect = \
- lambda: xfb.extras.extend(["/test/test2.xml",
- "/test/test_dir/test3.xml"])
- children.extend([
- lxml.etree.SubElement(xdata,
- Bcfg2.Server.XI_NAMESPACE + "include",
- href="/test/test2.xml"),
- lxml.etree.SubElement(xdata,
- Bcfg2.Server.XI_NAMESPACE + "include",
- href="/test/test_dir/test3.xml")])
- test2 = lxml.etree.Element("Test", name="test2")
- lxml.etree.SubElement(test2, "Baz")
- test3 = lxml.etree.Element("Test", name="test3")
- replacements = {"/test/test2.xml": test2,
- "/test/test_dir/test3.xml": test3}
- def xinclude():
- for el in xfb.xdata.findall('//%sinclude' %
- Bcfg2.Server.XI_NAMESPACE):
- xfb.xdata.replace(el, replacements[el.get("href")])
- FakeElementTree.xinclude.side_effect = xinclude
-
- xfb.data = tostring(xdata)
- xfb.Index()
- mock_follow.assert_any_call()
- FakeElementTree.xinclude.assert_any_call
- try:
- self.assertEqual(xfb.xdata.base, self.path)
- except AttributeError:
- pass
- self.assertItemsEqual([tostring(e) for e in xfb.entries],
- [tostring(e) for e in children])
-
- def test_add_monitor(self):
- xfb = self.get_obj()
- xfb.add_monitor("/test/test2.xml")
- self.assertIn("/test/test2.xml", xfb.extras)
-
- fam = Mock()
- xfb = self.get_obj(fam=fam)
- fam.reset_mock()
- xfb.add_monitor("/test/test3.xml")
- self.assertFalse(fam.AddMonitor.called)
- self.assertIn("/test/test3.xml", xfb.extras)
-
- fam.reset_mock()
- xfb = self.get_obj(fam=fam, should_monitor=True)
- xfb.add_monitor("/test/test4.xml")
- fam.AddMonitor.assert_called_with("/test/test4.xml", xfb)
- self.assertIn("/test/test4.xml", xfb.extras)
-
-
-class TestStructFile(TestXMLFileBacked):
- test_obj = StructFile
-
- def _get_test_data(self):
- """ build a very complex set of test data """
- # top-level group and client elements
- groups = dict()
- # group and client elements that are descendents of other group or
- # client elements
- subgroups = dict()
- # children of elements in `groups' that should be included in
- # match results
- children = dict()
- # children of elements in `subgroups' that should be included in
- # match results
- subchildren = dict()
- # top-level tags that are not group elements
- standalone = []
- xdata = lxml.etree.Element("Test", name="test")
- groups[0] = lxml.etree.SubElement(xdata, "Group", name="group1",
- include="true")
- children[0] = [lxml.etree.SubElement(groups[0], "Child", name="c1"),
- lxml.etree.SubElement(groups[0], "Child", name="c2")]
- subgroups[0] = [lxml.etree.SubElement(groups[0], "Group",
- name="subgroup1", include="true"),
- lxml.etree.SubElement(groups[0],
- "Client", name="client1",
- include="false")]
- subchildren[0] = \
- [lxml.etree.SubElement(subgroups[0][0], "Child", name="sc1"),
- lxml.etree.SubElement(subgroups[0][0], "Child", name="sc2",
- attr="some attr"),
- lxml.etree.SubElement(subgroups[0][0], "Child", name="sc3")]
- lxml.etree.SubElement(subchildren[0][-1], "SubChild", name="subchild")
- lxml.etree.SubElement(subgroups[0][1], "Child", name="sc4")
-
- groups[1] = lxml.etree.SubElement(xdata, "Group", name="group2",
- include="false")
- children[1] = []
- subgroups[1] = []
- subchildren[1] = []
- lxml.etree.SubElement(groups[1], "Child", name="c3")
- lxml.etree.SubElement(groups[1], "Child", name="c4")
-
- standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s1"))
-
- groups[2] = lxml.etree.SubElement(xdata, "Client", name="client2",
- include="false")
- children[2] = []
- subgroups[2] = []
- subchildren[2] = []
- lxml.etree.SubElement(groups[2], "Child", name="c5")
- lxml.etree.SubElement(groups[2], "Child", name="c6")
-
- standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s2",
- attr="some attr"))
-
- groups[3] = lxml.etree.SubElement(xdata, "Client", name="client3",
- include="true")
- children[3] = [lxml.etree.SubElement(groups[3], "Child", name="c7",
- attr="some_attr"),
- lxml.etree.SubElement(groups[3], "Child", name="c8")]
- subgroups[3] = []
- subchildren[3] = []
- lxml.etree.SubElement(children[3][-1], "SubChild", name="subchild")
-
- standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s3"))
- lxml.etree.SubElement(standalone[-1], "SubStandalone", name="sub1")
-
- children[4] = standalone
- return (xdata, groups, subgroups, children, subchildren, standalone)
-
- def test_include_element(self):
- sf = self.get_obj()
- metadata = Mock()
- metadata.groups = ["group1", "group2"]
- metadata.hostname = "foo.example.com"
-
- inc = lambda tag, **attrs: \
- sf._include_element(lxml.etree.Element(tag, **attrs), metadata)
-
- self.assertFalse(sf._include_element(lxml.etree.Comment("test"),
- metadata))
-
- self.assertFalse(inc("Group", name="group3"))
- self.assertFalse(inc("Group", name="group2", negate="true"))
- self.assertFalse(inc("Group", name="group2", negate="tRuE"))
- self.assertTrue(inc("Group", name="group2"))
- self.assertTrue(inc("Group", name="group2", negate="false"))
- self.assertTrue(inc("Group", name="group2", negate="faLSe"))
- self.assertTrue(inc("Group", name="group3", negate="true"))
- self.assertTrue(inc("Group", name="group3", negate="tRUe"))
-
- self.assertFalse(inc("Client", name="bogus.example.com"))
- self.assertFalse(inc("Client", name="foo.example.com", negate="true"))
- self.assertFalse(inc("Client", name="foo.example.com", negate="tRuE"))
- self.assertTrue(inc("Client", name="foo.example.com"))
- self.assertTrue(inc("Client", name="foo.example.com", negate="false"))
- self.assertTrue(inc("Client", name="foo.example.com", negate="faLSe"))
- self.assertTrue(inc("Client", name="bogus.example.com", negate="true"))
- self.assertTrue(inc("Client", name="bogus.example.com", negate="tRUe"))
-
- self.assertTrue(inc("Other"))
-
- @patch("Bcfg2.Server.Plugin.%s._include_element" % test_obj.__name__)
- def test__match(self, mock_include):
- sf = self.get_obj()
- metadata = Mock()
-
- (xdata, groups, subgroups, children, subchildren, standalone) = \
- self._get_test_data()
-
- mock_include.side_effect = \
- lambda x, _: (x.tag not in ['Client', 'Group'] or
- x.get("include") == "true")
-
- for i, group in groups.items():
- actual = sf._match(group, metadata)
- expected = children[i] + subchildren[i]
- self.assertEqual(len(actual), len(expected))
- # easiest way to compare the values is actually to make
- # them into an XML document and let assertXMLEqual compare
- # them
- xactual = lxml.etree.Element("Container")
- xactual.extend(actual)
- xexpected = lxml.etree.Element("Container")
- xexpected.extend(expected)
- self.assertXMLEqual(xactual, xexpected)
-
- for el in standalone:
- self.assertXMLEqual(el, sf._match(el, metadata)[0])
-
- @patch("Bcfg2.Server.Plugin.%s._match" % test_obj.__name__)
- def test_Match(self, mock_match):
- sf = self.get_obj()
- metadata = Mock()
-
- (xdata, groups, subgroups, children, subchildren, standalone) = \
- self._get_test_data()
- sf.entries.extend(copy.deepcopy(xdata).getchildren())
-
- def match_rv(el, _):
- if el.tag not in ['Client', 'Group']:
- return [el]
- elif x.get("include") == "true":
- return el.getchildren()
- else:
- return []
- mock_match.side_effect = match_rv
- actual = sf.Match(metadata)
- expected = reduce(lambda x, y: x + y,
- list(children.values()) + list(subgroups.values()))
- self.assertEqual(len(actual), len(expected))
- # easiest way to compare the values is actually to make
- # them into an XML document and let assertXMLEqual compare
- # them
- xactual = lxml.etree.Element("Container")
- xactual.extend(actual)
- xexpected = lxml.etree.Element("Container")
- xexpected.extend(expected)
- self.assertXMLEqual(xactual, xexpected)
-
- @patch("Bcfg2.Server.Plugin.%s._include_element" % test_obj.__name__)
- def test__xml_match(self, mock_include):
- sf = self.get_obj()
- metadata = Mock()
-
- (xdata, groups, subgroups, children, subchildren, standalone) = \
- self._get_test_data()
-
- mock_include.side_effect = \
- lambda x, _: (x.tag not in ['Client', 'Group'] or
- x.get("include") == "true")
-
- actual = copy.deepcopy(xdata)
- for el in actual.getchildren():
- sf._xml_match(el, metadata)
- expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib))
- expected.text = xdata.text
- expected.extend(reduce(lambda x, y: x + y,
- list(children.values()) + list(subchildren.values())))
- expected.extend(standalone)
- self.assertXMLEqual(actual, expected)
-
- @patch("Bcfg2.Server.Plugin.%s._xml_match" % test_obj.__name__)
- def test_Match(self, mock_xml_match):
- sf = self.get_obj()
- metadata = Mock()
-
- (sf.xdata, groups, subgroups, children, subchildren, standalone) = \
- self._get_test_data()
-
- sf.XMLMatch(metadata)
- actual = []
- for call in mock_xml_match.call_args_list:
- actual.append(call[0][0])
- self.assertEqual(call[0][1], metadata)
- expected = list(groups.values()) + standalone
- # easiest way to compare the values is actually to make
- # them into an XML document and let assertXMLEqual compare
- # them
- xactual = lxml.etree.Element("Container")
- xactual.extend(actual)
- xexpected = lxml.etree.Element("Container")
- xexpected.extend(expected)
- self.assertXMLEqual(xactual, xexpected)
-
-
-class TestINode(Bcfg2TestCase):
- test_obj = INode
-
- # INode.__init__ and INode._load_children() call each other
- # recursively, which makes this class kind of a nightmare to test.
- # we have to first patch INode._load_children so that we can
- # create an INode object with no children loaded, then we unpatch
- # INode._load_children and patch INode.__init__ so that child
- # objects aren't actually created. but in order to test things
- # atomically, we do this umpteen times in order to test with
- # different data. this convenience method makes this a little
- # easier. fun fun fun.
- @patch("Bcfg2.Server.Plugin.%s._load_children" % test_obj.__name__, Mock())
- def _get_inode(self, data, idict):
- return self.test_obj(data, idict)
-
- def test_raw_predicates(self):
- metadata = Mock()
- metadata.groups = ["group1", "group2"]
- metadata.hostname = "foo.example.com"
- entry = None
-
- parent_predicate = lambda m, e: True
- pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
- pred = eval(self.test_obj.raw['Client'] % dict(name="bar.example.com"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
- pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
- pred = eval(self.test_obj.raw['Group'] % dict(name="group3"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
- pred = eval(self.test_obj.nraw['Client'] % dict(name="foo.example.com"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
-
- pred = eval(self.test_obj.nraw['Group'] % dict(name="group1"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
-
- parent_predicate = lambda m, e: False
- pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
- self.assertItemsEqual(self.test_obj.containers,
- self.test_obj.raw.keys())
- self.assertItemsEqual(self.test_obj.containers,
- self.test_obj.nraw.keys())
-
- @patch("Bcfg2.Server.Plugin.INode._load_children")
- def test__init(self, mock_load_children):
- data = lxml.etree.Element("Bogus")
- # called with no parent, should not raise an exception; it's a
- # top-level tag in an XML file and so is not expected to be a
- # proper predicate
- INode(data, dict())
- self.assertRaises(PluginExecutionError,
- INode, data, dict(), Mock())
-
- data = lxml.etree.Element("Client", name="foo.example.com")
- idict = dict()
- inode = INode(data, idict)
- mock_load_children.assert_called_with(data, idict)
- self.assertTrue(inode.predicate(Mock(), Mock()))
-
- parent = Mock()
- parent.predicate = lambda m, e: True
- metadata = Mock()
- metadata.groups = ["group1", "group2"]
- metadata.hostname = "foo.example.com"
- entry = None
-
- # test setting predicate with parent object
- mock_load_children.reset_mock()
- inode = INode(data, idict, parent=parent)
- mock_load_children.assert_called_with(data, idict)
- self.assertTrue(inode.predicate(metadata, entry))
-
- # test negation
- data = lxml.etree.Element("Client", name="foo.example.com",
- negate="true")
- mock_load_children.reset_mock()
- inode = INode(data, idict, parent=parent)
- mock_load_children.assert_called_with(data, idict)
- self.assertFalse(inode.predicate(metadata, entry))
-
- # test failure of a matching predicate (client names do not match)
- data = lxml.etree.Element("Client", name="foo.example.com")
- metadata.hostname = "bar.example.com"
- mock_load_children.reset_mock()
- inode = INode(data, idict, parent=parent)
- mock_load_children.assert_called_with(data, idict)
- self.assertFalse(inode.predicate(metadata, entry))
-
- # test that parent predicate is AND'ed in correctly
- parent.predicate = lambda m, e: False
- metadata.hostname = "foo.example.com"
- mock_load_children.reset_mock()
- inode = INode(data, idict, parent=parent)
- mock_load_children.assert_called_with(data, idict)
- self.assertFalse(inode.predicate(metadata, entry))
-
- def test_load_children(self):
- data = lxml.etree.Element("Parent")
- child1 = lxml.etree.SubElement(data, "Client", name="foo.example.com")
- child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
- idict = dict()
-
- inode = self._get_inode(data, idict)
-
- @patch("Bcfg2.Server.Plugin.%s.__init__" % inode.__class__.__name__)
- def inner(mock_init):
- mock_init.return_value = None
- inode._load_children(data, idict)
- self.assertItemsEqual(mock_init.call_args_list,
- [call(child1, idict, inode),
- call(child2, idict, inode)])
- self.assertEqual(idict, dict())
- self.assertItemsEqual(inode.contents, dict())
-
- inner()
-
- data = lxml.etree.Element("Parent")
- child1 = lxml.etree.SubElement(data, "Data", name="child1",
- attr="some attr")
- child1.text = "text"
- subchild1 = lxml.etree.SubElement(child1, "SubChild", name="subchild")
- child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
- idict = dict()
-
- inode = self._get_inode(data, idict)
-
- @patch("Bcfg2.Server.Plugin.%s.__init__" % inode.__class__.__name__)
- def inner2(mock_init):
- mock_init.return_value = None
- inode._load_children(data, idict)
- mock_init.assert_called_with(child2, idict, inode)
- tag = child1.tag
- name = child1.get("name")
- self.assertEqual(idict, dict(Data=[name]))
- self.assertIn(tag, inode.contents)
- self.assertIn(name, inode.contents[tag])
- self.assertItemsEqual(inode.contents[tag][name],
- dict(name=name,
- attr=child1.get('attr'),
- __text__=child1.text,
- __children__=[subchild1]))
-
- inner2()
-
- # test ignore. no ignore is set on INode by default, so we
- # have to set one
- old_ignore = copy.copy(self.test_obj.ignore)
- self.test_obj.ignore.append("Data")
- idict = dict()
-
- inode = self._get_inode(data, idict)
-
- @patch("Bcfg2.Server.Plugin.%s.__init__" % inode.__class__.__name__)
- def inner3(mock_init):
- mock_init.return_value = None
- inode._load_children(data, idict)
- mock_init.assert_called_with(child2, idict, inode)
- self.assertEqual(idict, dict())
- self.assertItemsEqual(inode.contents, dict())
-
- inner3()
- self.test_obj.ignore = old_ignore
-
- def test_Match(self):
- idata = lxml.etree.Element("Parent")
- contents = lxml.etree.SubElement(idata, "Data", name="contents",
- attr="some attr")
- child = lxml.etree.SubElement(idata, "Group", name="bar", negate="true")
-
- inode = INode(idata, dict())
- inode.predicate = Mock()
- inode.predicate.return_value = False
-
- metadata = Mock()
- metadata.groups = ['foo']
- data = dict()
- entry = child
-
- inode.Match(metadata, data, entry=child)
- self.assertEqual(data, dict())
- inode.predicate.assert_called_with(metadata, child)
-
- inode.predicate.reset_mock()
- inode.Match(metadata, data)
- self.assertEqual(data, dict())
- # can't easily compare XML args without the original
- # object, and we're testing that Match() works without an
- # XML object passed in, so...
- self.assertEqual(inode.predicate.call_args[0][0],
- metadata)
- self.assertXMLEqual(inode.predicate.call_args[0][1],
- lxml.etree.Element("None"))
-
- inode.predicate.reset_mock()
- inode.predicate.return_value = True
- inode.Match(metadata, data, entry=child)
- self.assertEqual(data, inode.contents)
- inode.predicate.assert_called_with(metadata, child)
-
-
-class TestInfoNode(TestINode):
- __test__ = True
- test_obj = InfoNode
-
- def test_raw_predicates(self):
- TestINode.test_raw_predicates(self)
- metadata = Mock()
- entry = lxml.etree.Element("Path", name="/tmp/foo",
- realname="/tmp/bar")
-
- parent_predicate = lambda m, d: True
- pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
- pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
- pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bogus"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
- pred = eval(self.test_obj.nraw['Path'] % dict(name="/tmp/foo"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bar"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
-
- parent_predicate = lambda m, d: False
- pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
-
-class TestXMLSrc(TestXMLFileBacked):
- test_obj = XMLSrc
-
- def test_node_interface(self):
- # ensure that the node object has the necessary interface
- self.assertTrue(hasattr(self.test_obj.__node__, "Match"))
-
- @patch("%s.open" % builtins)
- def test_HandleEvent(self, mock_open):
- xdata = lxml.etree.Element("Test")
- lxml.etree.SubElement(xdata, "Path", name="path", attr="whatever")
-
- xsrc = self.get_obj("/test/foo.xml")
- xsrc.__node__ = Mock()
- mock_open.return_value.read.return_value = tostring(xdata)
-
- if xsrc.__priority_required__:
- # test with no priority at all
- self.assertRaises(PluginExecutionError,
- xsrc.HandleEvent, Mock())
-
- # test with bogus priority
- xdata.set("priority", "cow")
- mock_open.return_value.read.return_value = tostring(xdata)
- self.assertRaises(PluginExecutionError,
- xsrc.HandleEvent, Mock())
-
- # assign a priority to use in future tests
- xdata.set("priority", "10")
- mock_open.return_value.read.return_value = tostring(xdata)
-
- mock_open.reset_mock()
- xsrc = self.get_obj("/test/foo.xml")
- xsrc.__node__ = Mock()
- xsrc.HandleEvent(Mock())
- mock_open.assert_called_with("/test/foo.xml")
- mock_open.return_value.read.assert_any_call()
- self.assertXMLEqual(xsrc.__node__.call_args[0][0], xdata)
- self.assertEqual(xsrc.__node__.call_args[0][1], dict())
- self.assertEqual(xsrc.pnode, xsrc.__node__.return_value)
- self.assertEqual(xsrc.cache, None)
-
- @patch("Bcfg2.Server.Plugin.XMLSrc.HandleEvent")
- def test_Cache(self, mock_HandleEvent):
- xsrc = self.get_obj("/test/foo.xml")
- metadata = Mock()
- xsrc.Cache(metadata)
- mock_HandleEvent.assert_any_call()
-
- xsrc.pnode = Mock()
- xsrc.Cache(metadata)
- xsrc.pnode.Match.assert_called_with(metadata, xsrc.__cacheobj__())
- self.assertEqual(xsrc.cache[0], metadata)
-
- xsrc.pnode.reset_mock()
- xsrc.Cache(metadata)
- self.assertFalse(xsrc.pnode.Mock.called)
- self.assertEqual(xsrc.cache[0], metadata)
-
- xsrc.cache = ("bogus")
- xsrc.Cache(metadata)
- xsrc.pnode.Match.assert_called_with(metadata, xsrc.__cacheobj__())
- self.assertEqual(xsrc.cache[0], metadata)
-
-
-class TestInfoXML(TestXMLSrc):
- test_obj = InfoXML
-
-
-class TestXMLDirectoryBacked(TestDirectoryBacked):
- test_obj = XMLDirectoryBacked
- testfiles = ['foo.xml', 'bar/baz.xml', 'plugh.plugh.xml']
- badpaths = ["foo", "foo.txt", "foo.xsd", "xml"]
-
-
-class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
- test_obj = PrioDir
-
- @patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__,
- Mock())
- def get_obj(self, core=None):
- if core is None:
- core = Mock()
- return self.test_obj(core, datastore)
-
- def test_HandleEvent(self):
- TestXMLDirectoryBacked.test_HandleEvent(self)
-
- @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent", Mock())
- def inner():
- pd = self.get_obj()
- test1 = Mock()
- test1.items = dict(Path=["/etc/foo.conf", "/etc/bar.conf"])
- test2 = Mock()
- test2.items = dict(Path=["/etc/baz.conf"],
- Package=["quux", "xyzzy"])
- pd.entries = {"/test1.xml": test1,
- "/test2.xml": test2}
- pd.HandleEvent(Mock())
- self.assertItemsEqual(pd.Entries,
- dict(Path={"/etc/foo.conf": pd.BindEntry,
- "/etc/bar.conf": pd.BindEntry,
- "/etc/baz.conf": pd.BindEntry},
- Package={"quux": pd.BindEntry,
- "xyzzy": pd.BindEntry}))
-
- inner()
-
- def test__matches(self):
- pd = self.get_obj()
- self.assertTrue(pd._matches(lxml.etree.Element("Test",
- name="/etc/foo.conf"),
- Mock(),
- {"/etc/foo.conf": pd.BindEntry,
- "/etc/bar.conf": pd.BindEntry}))
- self.assertFalse(pd._matches(lxml.etree.Element("Test",
- name="/etc/baz.conf"),
- Mock(),
- {"/etc/foo.conf": pd.BindEntry,
- "/etc/bar.conf": pd.BindEntry}))
-
- def test_BindEntry(self):
- pd = self.get_obj()
- pd.get_attrs = Mock(return_value=dict(test1="test1", test2="test2"))
- entry = lxml.etree.Element("Path", name="/etc/foo.conf", test1="bogus")
- metadata = Mock()
- pd.BindEntry(entry, metadata)
- pd.get_attrs.assert_called_with(entry, metadata)
- self.assertItemsEqual(entry.attrib,
- dict(name="/etc/foo.conf",
- test1="test1", test2="test2"))
-
- def test_get_attrs(self):
- pd = self.get_obj()
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- children = [lxml.etree.Element("Child")]
- metadata = Mock()
- pd.entries = dict()
-
- def reset():
- metadata.reset_mock()
- for src in pd.entries.values():
- src.reset_mock()
- src.cache = None
-
- # test with no matches
- self.assertRaises(PluginExecutionError,
- pd.get_attrs, entry, metadata)
-
- def add_entry(name, data, prio=10):
- path = os.path.join(pd.data, name)
- pd.entries[path] = Mock()
- pd.entries[path].priority = prio
- def do_Cache(metadata):
- pd.entries[path].cache = (metadata, data)
- pd.entries[path].Cache.side_effect = do_Cache
-
- add_entry('test1.xml',
- dict(Path={'/etc/foo.conf': dict(attr="attr1",
- __children__=children),
- '/etc/bar.conf': dict()}))
- add_entry('test2.xml',
- dict(Path={'/etc/bar.conf': dict(__text__="text",
- attr="attr1")},
- Package={'quux': dict(),
- 'xyzzy': dict()}),
- prio=20)
- add_entry('test3.xml',
- dict(Path={'/etc/baz.conf': dict()},
- Package={'xyzzy': dict()}),
- prio=20)
-
- # test with exactly one match, __children__
- reset()
- self.assertItemsEqual(pd.get_attrs(entry, metadata),
- dict(attr="attr1"))
- for src in pd.entries.values():
- src.Cache.assert_called_with(metadata)
- self.assertEqual(len(entry.getchildren()), 1)
- self.assertXMLEqual(entry.getchildren()[0], children[0])
-
- # test with multiple matches with different priorities, __text__
- reset()
- entry = lxml.etree.Element("Path", name="/etc/bar.conf")
- self.assertItemsEqual(pd.get_attrs(entry, metadata),
- dict(attr="attr1"))
- for src in pd.entries.values():
- src.Cache.assert_called_with(metadata)
- self.assertEqual(entry.text, "text")
-
- # test with multiple matches with identical priorities
- reset()
- entry = lxml.etree.Element("Package", name="xyzzy")
- self.assertRaises(PluginExecutionError,
- pd.get_attrs, entry, metadata)
-
-
-class TestSpecificityError(Bcfg2TestCase):
- """ placeholder for future tests """
- pass
-
-
-class TestSpecificity(Bcfg2TestCase):
- test_obj = Specificity
-
- def get_obj(self, **kwargs):
- return self.test_obj(**kwargs)
-
- def test_matches(self):
- metadata = Mock()
- metadata.hostname = "foo.example.com"
- metadata.groups = ["group1", "group2"]
- self.assertTrue(self.get_obj(all=True).matches(metadata))
- self.assertTrue(self.get_obj(group="group1").matches(metadata))
- self.assertTrue(self.get_obj(hostname="foo.example.com").matches(metadata))
- self.assertFalse(self.get_obj().matches(metadata))
- self.assertFalse(self.get_obj(group="group3").matches(metadata))
- self.assertFalse(self.get_obj(hostname="bar.example.com").matches(metadata))
-
- def test__cmp(self):
- specs = [self.get_obj(all=True),
- self.get_obj(group="group1", prio=10),
- self.get_obj(group="group1", prio=20),
- self.get_obj(hostname="foo.example.com")]
-
- for i in range(len(specs)):
- for j in range(len(specs)):
- if i == j:
- self.assertEqual(0, specs[i].__cmp__(specs[j]))
- self.assertEqual(0, specs[j].__cmp__(specs[i]))
- elif i > j:
- self.assertEqual(-1, specs[i].__cmp__(specs[j]))
- self.assertEqual(1, specs[j].__cmp__(specs[i]))
- elif i < j:
- self.assertEqual(1, specs[i].__cmp__(specs[j]))
- self.assertEqual(-1, specs[j].__cmp__(specs[i]))
-
- def test_cmp(self):
- """ test __lt__/__gt__/__eq__ """
- specs = [self.get_obj(all=True),
- self.get_obj(group="group1", prio=10),
- self.get_obj(group="group1", prio=20),
- self.get_obj(hostname="foo.example.com")]
-
- for i in range(len(specs)):
- for j in range(len(specs)):
- if i < j:
- self.assertGreater(specs[i], specs[j])
- self.assertLess(specs[j], specs[i])
- self.assertGreaterEqual(specs[i], specs[j])
- self.assertLessEqual(specs[j], specs[i])
- elif i == j:
- self.assertEqual(specs[i], specs[j])
- self.assertEqual(specs[j], specs[i])
- self.assertLessEqual(specs[i], specs[j])
- self.assertGreaterEqual(specs[j], specs[i])
- elif i > j:
- self.assertLess(specs[i], specs[j])
- self.assertGreater(specs[j], specs[i])
- self.assertLessEqual(specs[i], specs[j])
- self.assertGreaterEqual(specs[j], specs[i])
-
-
-class TestSpecificData(Bcfg2TestCase):
- test_obj = SpecificData
- path = os.path.join(datastore, "test.txt")
-
- def get_obj(self, name=None, specific=None, encoding=None):
- if name is None:
- name = self.path
- if specific is None:
- specific = Mock()
- return self.test_obj(name, specific, encoding)
-
- @patch("%s.open" % builtins)
- def test_handle_event(self, mock_open):
- event = Mock()
- event.code2str.return_value = 'deleted'
- sd = self.get_obj()
- sd.handle_event(event)
- self.assertFalse(mock_open.called)
- if hasattr(sd, 'data'):
- self.assertIsNone(sd.data)
- else:
- self.assertFalse(hasattr(sd, 'data'))
-
- event = Mock()
- mock_open.return_value.read.return_value = "test"
- sd.handle_event(event)
- mock_open.assert_called_with(self.path)
- mock_open.return_value.read.assert_any_call()
- self.assertEqual(sd.data, "test")
-
-
-class TestEntrySet(TestDebuggable):
- test_obj = EntrySet
- # filenames that should be matched successfully by the EntrySet
- # 'specific' regex. these are filenames alone -- a specificity
- # will be added to these
- basenames = ["test", "test.py", "test with spaces.txt",
- "test.multiple.dots.py", "test_underscores.and.dots",
- "really_misleading.G10_test",
- "name$with*regex(special){chars}",
- "misleading.H_hostname.test.com"]
- # filenames that do not match any of the basenames (or the
- # basename regex, if applicable)
- bogus_names = ["bogus"]
- # filenames that should be ignored
- ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx",
- "test.txt.genshi_include", "test.G_foo.genshi_include"]
-
-
- def get_obj(self, basename="test", path=datastore, entry_type=MagicMock(),
- encoding=None):
- return self.test_obj(basename, path, entry_type, encoding)
-
- def test__init(self):
- for basename in self.basenames:
- eset = self.get_obj(basename=basename)
- self.assertIsInstance(eset.specific, re_type)
- self.assertTrue(eset.specific.match(os.path.join(datastore,
- basename)))
- ppath = os.path.join(datastore, "Plugin", basename)
- self.assertTrue(eset.specific.match(ppath))
- self.assertTrue(eset.specific.match(ppath + ".G20_foo"))
- self.assertTrue(eset.specific.match(ppath + ".G1_foo"))
- self.assertTrue(eset.specific.match(ppath + ".G32768_foo"))
- # a group named '_'
- self.assertTrue(eset.specific.match(ppath + ".G10__"))
- self.assertTrue(eset.specific.match(ppath + ".H_hostname"))
- self.assertTrue(eset.specific.match(ppath + ".H_fqdn.subdomain.example.com"))
- self.assertTrue(eset.specific.match(ppath + ".G20_group_with_underscores"))
-
- self.assertFalse(eset.specific.match(ppath + ".G20_group with spaces"))
- self.assertFalse(eset.specific.match(ppath + ".G_foo"))
- self.assertFalse(eset.specific.match(ppath + ".G_"))
- self.assertFalse(eset.specific.match(ppath + ".G20_"))
- self.assertFalse(eset.specific.match(ppath + ".H_"))
-
- for bogus in self.bogus_names:
- self.assertFalse(eset.specific.match(os.path.join(datastore,
- "Plugin",
- bogus)))
-
- for ignore in self.ignore:
- self.assertTrue(eset.ignore.match(ignore))
-
- self.assertFalse(eset.ignore.match(basename))
- self.assertFalse(eset.ignore.match(basename + ".G20_foo"))
- self.assertFalse(eset.ignore.match(basename + ".G1_foo"))
- self.assertFalse(eset.ignore.match(basename + ".G32768_foo"))
- self.assertFalse(eset.ignore.match(basename + ".G10__"))
- self.assertFalse(eset.ignore.match(basename + ".H_hostname"))
- self.assertFalse(eset.ignore.match(basename + ".H_fqdn.subdomain.example.com"))
- self.assertFalse(eset.ignore.match(basename + ".G20_group_with_underscores"))
-
- def test_get_matching(self):
- items = {0: Mock(), 1: Mock(), 2: Mock(), 3: Mock(), 4: Mock(),
- 5: Mock()}
- items[0].specific.matches.return_value = False
- items[1].specific.matches.return_value = True
- items[2].specific.matches.return_value = False
- items[3].specific.matches.return_value = False
- items[4].specific.matches.return_value = True
- items[5].specific.matches.return_value = True
- metadata = Mock()
- eset = self.get_obj()
- eset.entries = items
- self.assertItemsEqual(eset.get_matching(metadata),
- [items[1], items[4], items[5]])
- for i in items.values():
- i.specific.matches.assert_called_with(metadata)
-
- @patch("Bcfg2.Server.Plugin.%s.get_matching" % test_obj.__name__)
- def test_best_matching(self, mock_get_matching):
- eset = self.get_obj()
- metadata = Mock()
- matching = []
-
- def reset():
- mock_get_matching.reset_mock()
- metadata.reset_mock()
- for m in matching:
- m.reset_mock()
-
- def specific(all=False, group=False, prio=None, hostname=False):
- spec = Mock()
- spec.specific = Specificity(all=all, group=group, prio=prio,
- hostname=hostname)
- return spec
-
- self.assertRaises(PluginExecutionError,
- eset.best_matching, metadata, matching=[])
-
- reset()
- mock_get_matching.return_value = matching
- self.assertRaises(PluginExecutionError,
- eset.best_matching, metadata)
- mock_get_matching.assert_called_with(metadata)
-
- # test with a single file for all
- reset()
- expected = specific(all=True)
- matching.append(expected)
- mock_get_matching.return_value = matching
- self.assertEqual(eset.best_matching(metadata), expected)
- mock_get_matching.assert_called_with(metadata)
-
- # test with a single group-specific file
- reset()
- expected = specific(group=True, prio=10)
- matching.append(expected)
- mock_get_matching.return_value = matching
- self.assertEqual(eset.best_matching(metadata), expected)
- mock_get_matching.assert_called_with(metadata)
-
- # test with multiple group-specific files
- reset()
- expected = specific(group=True, prio=20)
- matching.append(expected)
- mock_get_matching.return_value = matching
- self.assertEqual(eset.best_matching(metadata), expected)
- mock_get_matching.assert_called_with(metadata)
-
- # test with host-specific file
- reset()
- expected = specific(hostname=True)
- matching.append(expected)
- mock_get_matching.return_value = matching
- self.assertEqual(eset.best_matching(metadata), expected)
- mock_get_matching.assert_called_with(metadata)
-
- @patch("Bcfg2.Server.Plugin.%s.entry_init" % test_obj.__name__)
- @patch("Bcfg2.Server.Plugin.%s.reset_metadata" % test_obj.__name__)
- @patch("Bcfg2.Server.Plugin.%s.update_metadata" % test_obj.__name__)
- def test_handle_event(self, mock_update_md, mock_reset_md, mock_init):
- def reset():
- mock_update_md.reset_mock()
- mock_reset_md.reset_mock()
- mock_init.reset_mock()
-
- eset = self.get_obj()
- for fname in ["info", "info.xml", ":info"]:
- for evt in ["exists", "created", "changed"]:
- reset()
- event = Mock()
- event.code2str.return_value = evt
- event.filename = fname
- eset.handle_event(event)
- mock_update_md.assert_called_with(event)
- self.assertFalse(mock_init.called)
- self.assertFalse(mock_reset_md.called)
-
- reset()
- event = Mock()
- event.code2str.return_value = "deleted"
- event.filename = fname
- eset.handle_event(event)
- mock_reset_md.assert_called_with(event)
- self.assertFalse(mock_init.called)
- self.assertFalse(mock_update_md.called)
-
- for evt in ["exists", "created", "changed"]:
- reset()
- event = Mock()
- event.code2str.return_value = evt
- event.filename = "test.txt"
- eset.handle_event(event)
- mock_init.assert_called_with(event)
- self.assertFalse(mock_reset_md.called)
- self.assertFalse(mock_update_md.called)
-
- reset()
- entry = Mock()
- eset.entries["test.txt"] = entry
- event = Mock()
- event.code2str.return_value = "changed"
- event.filename = "test.txt"
- eset.handle_event(event)
- entry.handle_event.assert_called_with(event)
- self.assertFalse(mock_init.called)
- self.assertFalse(mock_reset_md.called)
- self.assertFalse(mock_update_md.called)
-
- reset()
- entry = Mock()
- eset.entries["test.txt"] = entry
- event = Mock()
- event.code2str.return_value = "deleted"
- event.filename = "test.txt"
- eset.handle_event(event)
- self.assertNotIn("test.txt", eset.entries)
-
- @patch("Bcfg2.Server.Plugin.%s.specificity_from_filename" %
- test_obj.__name__)
- def test_entry_init(self, mock_spec):
- eset = self.get_obj()
-
- def reset():
- eset.entry_type.reset_mock()
- mock_spec.reset_mock()
-
- event = Mock()
- event.code2str.return_value = "created"
- event.filename = "test.txt"
- eset.entry_init(event)
- mock_spec.assert_called_with("test.txt", specific=None)
- eset.entry_type.assert_called_with(os.path.join(eset.path, "test.txt"),
- mock_spec.return_value, None)
- eset.entry_type.return_value.handle_event.assert_called_with(event)
- self.assertIn("test.txt", eset.entries)
-
- # test duplicate add
- reset()
- eset.entry_init(event)
- self.assertFalse(mock_spec.called)
- self.assertFalse(eset.entry_type.called)
- eset.entries["test.txt"].handle_event.assert_called_with(event)
-
- # test keyword args
- etype = Mock()
- specific = Mock()
- event = Mock()
- event.code2str.return_value = "created"
- event.filename = "test2.txt"
- eset.entry_init(event, entry_type=etype, specific=specific)
- mock_spec.assert_called_with("test2.txt", specific=specific)
- etype.assert_called_with(os.path.join(eset.path, "test2.txt"),
- mock_spec.return_value, None)
- etype.return_value.handle_event.assert_called_with(event)
- self.assertIn("test2.txt", eset.entries)
-
- # test specificity error
- event = Mock()
- event.code2str.return_value = "created"
- event.filename = "test3.txt"
- mock_spec.side_effect = SpecificityError
- eset.entry_init(event)
- mock_spec.assert_called_with("test3.txt", specific=None)
- self.assertFalse(eset.entry_type.called)
-
- @patch("Bcfg2.Server.Plugin.Specificity")
- def test_specificity_from_filename(self, mock_spec):
- def test(eset, fname, **kwargs):
- mock_spec.reset_mock()
- if "specific" in kwargs:
- specific = kwargs['specific']
- del kwargs['specific']
- else:
- specific = None
- self.assertEqual(eset.specificity_from_filename(fname,
- specific=specific),
- mock_spec.return_value)
- mock_spec.assert_called_with(**kwargs)
-
- def fails(eset, fname, specific=None):
- mock_spec.reset_mock()
- self.assertRaises(SpecificityError,
- eset.specificity_from_filename, fname,
- specific=specific)
-
- for basename in self.basenames:
- eset = self.get_obj(basename=basename)
- ppath = os.path.join(datastore, "Plugin", basename)
- test(eset, ppath, all=True)
- test(eset, ppath + ".G20_foo", group="foo", prio=20)
- test(eset, ppath + ".G1_foo", group="foo", prio=1)
- test(eset, ppath + ".G32768_foo", group="foo", prio=32768)
- test(eset, ppath + ".G10__", group="_", prio=10)
- test(eset, ppath + ".H_hostname", hostname="hostname")
- test(eset, ppath + ".H_fqdn.subdomain.example.com",
- hostname="fqdn.subdomain.example.com")
- test(eset, ppath + ".G20_group_with_underscores",
- group="group_with_underscores", prio=20)
-
- for bogus in self.bogus_names:
- fails(eset, bogus)
- fails(eset, ppath + ".G_group with spaces")
- fails(eset, ppath + ".G_foo")
- fails(eset, ppath + ".G_")
- fails(eset, ppath + ".G20_")
- fails(eset, ppath + ".H_")
-
- @patch("%s.open" % builtins)
- @patch("Bcfg2.Server.Plugin.InfoXML")
- def test_update_metadata(self, mock_InfoXML, mock_open):
- eset = self.get_obj()
-
- # add info.xml
- event = Mock()
- event.filename = "info.xml"
- eset.update_metadata(event)
- mock_InfoXML.assert_called_with(os.path.join(eset.path, "info.xml"))
- mock_InfoXML.return_value.HandleEvent.assert_called_with(event)
- self.assertEqual(eset.infoxml, mock_InfoXML.return_value)
-
- # modify info.xml
- mock_InfoXML.reset_mock()
- eset.update_metadata(event)
- self.assertFalse(mock_InfoXML.called)
- eset.infoxml.HandleEvent.assert_called_with(event)
-
- for fname in [':info', 'info']:
- event = Mock()
- event.filename = fname
-
- idata = ["owner:owner",
- "group: GROUP",
- "perms: 775",
- "important: true",
- "bogus: line"]
- mock_open.return_value.readlines.return_value = idata
- eset.update_metadata(event)
- expected = default_file_metadata.copy()
- expected['owner'] = 'owner'
- expected['group'] = 'GROUP'
- expected['perms'] = '0775'
- expected['important'] = 'true'
- self.assertItemsEqual(eset.metadata,
- expected)
-
- def test_reset_metadata(self):
- eset = self.get_obj()
-
- # test info.xml
- event = Mock()
- event.filename = "info.xml"
- eset.infoxml = Mock()
- eset.reset_metadata(event)
- self.assertIsNone(eset.infoxml)
-
- for fname in [':info', 'info']:
- event = Mock()
- event.filename = fname
- eset.metadata = Mock()
- eset.reset_metadata(event)
- self.assertItemsEqual(eset.metadata, default_file_metadata)
-
- @patch("Bcfg2.Server.Plugin.bind_info")
- def test_bind_info_to_entry(self, mock_bind_info):
- eset = self.get_obj()
- entry = Mock()
- metadata = Mock()
- eset.bind_info_to_entry(entry, metadata)
- mock_bind_info.assert_called_with(entry, metadata,
- infoxml=eset.infoxml,
- default=eset.metadata)
-
- @patch("Bcfg2.Server.Plugin.%s.best_matching" % test_obj.__name__)
- @patch("Bcfg2.Server.Plugin.%s.bind_info_to_entry" % test_obj.__name__)
- def test_bind_entry(self, mock_bind_info, mock_best_matching):
- eset = self.get_obj()
- entry = Mock()
- metadata = Mock()
- eset.bind_entry(entry, metadata)
- mock_bind_info.assert_called_with(entry, metadata)
- mock_best_matching.assert_called_with(metadata)
- mock_best_matching.return_value.bind_entry.assert_called_with(entry,
- metadata)
-
-
-class TestGroupSpool(TestPlugin, TestGenerator):
- test_obj = GroupSpool
-
- @patch("Bcfg2.Server.Plugin.%s.AddDirectoryMonitor" % test_obj.__name__)
- def get_obj(self, core=None):
- return TestPlugin.get_obj(self, core=core)
-
- @patch("Bcfg2.Server.Plugin.%s.AddDirectoryMonitor" % test_obj.__name__)
- def test__init(self, mock_Add):
- core = Mock()
- gs = self.test_obj(core, datastore)
- mock_Add.assert_called_with('')
- self.assertItemsEqual(gs.Entries, {gs.entry_type: {}})
-
- @patch("os.path.isdir")
- @patch("os.path.isfile")
- @patch("Bcfg2.Server.Plugin.%s.event_id" % test_obj.__name__)
- @patch("Bcfg2.Server.Plugin.%s.event_path" % test_obj.__name__)
- @patch("Bcfg2.Server.Plugin.%s.AddDirectoryMonitor" % test_obj.__name__)
- def test_add_entry(self, mock_Add, mock_event_path, mock_event_id,
- mock_isfile, mock_isdir):
- gs = self.get_obj()
- gs.es_cls = Mock()
- gs.es_child_cls = Mock()
-
- def reset():
- gs.es_cls.reset_mock()
- gs.es_child_cls.reset_mock()
- mock_Add.reset_mock()
- mock_event_path.reset_mock()
- mock_event_id.reset_mock()
- mock_isfile.reset_mock()
- mock_isdir.reset_mock()
-
- # directory
- event = Mock()
- event.filename = "foo"
- basedir = "test"
- epath = os.path.join(gs.data, basedir, event.filename)
- ident = os.path.join(basedir, event.filename)
- mock_event_path.return_value = epath
- mock_event_id.return_value = ident
- mock_isdir.return_value = True
- mock_isfile.return_value = False
- gs.add_entry(event)
- mock_Add.assert_called_with(os.path.join("/" + basedir, event.filename))
- self.assertNotIn(ident, gs.entries)
- mock_isdir.assert_called_with(epath)
-
- # file that is not in self.entries
- reset()
- event = Mock()
- event.filename = "foo"
- basedir = "test/foo/"
- epath = os.path.join(gs.data, basedir, event.filename)
- ident = basedir[:-1]
- mock_event_path.return_value = epath
- mock_event_id.return_value = ident
- mock_isdir.return_value = False
- mock_isfile.return_value = True
- gs.add_entry(event)
- self.assertFalse(mock_Add.called)
- gs.es_cls.assert_called_with(gs.filename_pattern,
- gs.data + ident,
- gs.es_child_cls,
- gs.encoding)
- self.assertIn(ident, gs.entries)
- self.assertEqual(gs.entries[ident], gs.es_cls.return_value)
- self.assertIn(ident, gs.Entries[gs.entry_type])
- self.assertEqual(gs.Entries[gs.entry_type][ident],
- gs.es_cls.return_value.bind_entry)
- gs.entries[ident].handle_event.assert_called_with(event)
- mock_isfile.assert_called_with(epath)
-
- # file that is in self.entries
- reset()
- gs.add_entry(event)
- self.assertFalse(mock_Add.called)
- self.assertFalse(gs.es_cls.called)
- gs.entries[ident].handle_event.assert_called_with(event)
-
- def test_event_path(self):
- gs = self.get_obj()
- gs.handles[1] = "/var/lib/foo/"
- gs.handles[2] = "/etc/foo/"
- gs.handles[3] = "/usr/share/foo/"
- event = Mock()
- event.filename = "foo"
- for i in range(1, 4):
- event.requestID = i
- self.assertEqual(gs.event_path(event),
- os.path.join(datastore, gs.name,
- gs.handles[event.requestID].lstrip('/'),
- event.filename))
-
- @patch("os.path.isdir")
- @patch("Bcfg2.Server.Plugin.%s.event_path" % test_obj.__name__)
- def test_event_id(self, mock_event_path, mock_isdir):
- gs = self.get_obj()
-
- def reset():
- mock_event_path.reset_mock()
- mock_isdir.reset_mock()
-
- gs.handles[1] = "/var/lib/foo/"
- gs.handles[2] = "/etc/foo/"
- gs.handles[3] = "/usr/share/foo/"
- event = Mock()
- event.filename = "foo"
- for i in range(1, 4):
- event.requestID = i
- reset()
- mock_isdir.return_value = True
- self.assertEqual(gs.event_id(event),
- os.path.join(gs.handles[event.requestID].lstrip('/'),
- event.filename))
- mock_isdir.assert_called_with(mock_event_path.return_value)
-
- reset()
- mock_isdir.return_value = False
- self.assertEqual(gs.event_id(event),
- gs.handles[event.requestID].rstrip('/'))
- mock_isdir.assert_called_with(mock_event_path.return_value)
-
- def test_toggle_debug(self):
- gs = self.get_obj()
- gs.entries = {"/foo": Mock(),
- "/bar": Mock(),
- "/baz/quux": Mock()}
-
- @patch("Bcfg2.Server.Plugin.Plugin.toggle_debug")
- def inner(mock_debug):
- gs.toggle_debug()
- mock_debug.assert_called_with(gs)
- for entry in gs.entries.values():
- entry.toggle_debug.assert_any_call()
-
- inner()
-
- TestPlugin.test_toggle_debug(self)
-
- def test_HandleEvent(self):
- gs = self.get_obj()
- gs.entries = {"/foo": Mock(),
- "/bar": Mock(),
- "/baz": Mock(),
- "/baz/quux": Mock()}
- for path in gs.entries.keys():
- gs.Entries[gs.entry_type] = {path: Mock()}
- gs.handles = {1: "/foo/",
- 2: "/bar/",
- 3: "/baz/",
- 4: "/baz/quux"}
-
- gs.add_entry = Mock()
- gs.event_id = Mock()
-
- def reset():
- gs.add_entry.reset_mock()
- gs.event_id.reset_mock()
- for entry in gs.entries.values():
- entry.reset_mock()
-
- # test event creation, changing entry that doesn't exist
- for evt in ["exists", "created", "changed"]:
- reset()
- event = Mock()
- event.filename = "foo"
- event.requestID = 1
- event.code2str.return_value = evt
- gs.HandleEvent(event)
- gs.event_id.assert_called_with(event)
- gs.add_entry.assert_called_with(event)
-
- # test deleting entry, changing entry that does exist
- for evt in ["changed", "deleted"]:
- reset()
- event = Mock()
- event.filename = "quux"
- event.requestID = 4
- event.code2str.return_value = evt
- gs.event_id.return_value = "/baz/quux"
- gs.HandleEvent(event)
- gs.event_id.assert_called_with(event)
- self.assertIn(gs.event_id.return_value, gs.entries)
- gs.entries[gs.event_id.return_value].handle_event.assert_called_with(event)
- self.assertFalse(gs.add_entry.called)
-
- # test deleting directory
- reset()
- event = Mock()
- event.filename = "quux"
- event.requestID = 3
- event.code2str.return_value = "deleted"
- gs.event_id.return_value = "/baz/quux"
- gs.HandleEvent(event)
- gs.event_id.assert_called_with(event)
- self.assertNotIn("/baz/quux", gs.entries)
- self.assertNotIn("/baz/quux", gs.Entries[gs.entry_type])
-
-
-
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
deleted file mode 100644
index 2ff0af78e..000000000
--- a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
+++ /dev/null
@@ -1,1470 +0,0 @@
-import os
-import sys
-import copy
-import time
-import socket
-import lxml.etree
-import Bcfg2.Server
-import Bcfg2.Server.Plugin
-from Bcfg2.Server.Plugins.Metadata import *
-from mock import Mock, patch
-
-# add all parent testsuite directories to sys.path to allow (most)
-# relative imports in python 2.4
-path = os.path.dirname(__file__)
-while path != "/":
- if os.path.basename(path).lower().startswith("test"):
- sys.path.append(path)
- if os.path.basename(path) == "testsuite":
- break
- path = os.path.dirname(path)
-from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
- skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
- patchIf, datastore
-from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \
- TestStatistics, TestDatabaseBacked
-
-def get_clients_test_tree():
- return lxml.etree.XML('''
-<Clients>
- <Client name="client1" address="1.2.3.1" auth="cert"
- location="floating" password="password2" profile="group1"/>
- <Client name="client2" address="1.2.3.2" secure="true" profile="group2"/>
- <Client name="client3" address="1.2.3.3" uuid="uuid1" profile="group1"
- password="password2">
- <Alias name="alias1"/>
- </Client>
- <Client name="client4" profile="group1">
- <Alias name="alias2" address="1.2.3.2"/>
- <Alias name="alias3"/>
- </Client>
- <Client name="client5" profile="group1"/>
- <Client name="client6" profile="group1" auth="bootstrap"/>
- <Client name="client7" profile="group1" auth="cert" address="1.2.3.4"/>
- <Client name="client8" profile="group1" auth="cert+password"
- address="1.2.3.5"/>
- <Client name="client9" profile="group2" secure="true" password="password3"/>
- <Client name="client10" profile="group1" floating="true"/>
-</Clients>''').getroottree()
-
-def get_groups_test_tree():
- return lxml.etree.XML('''
-<Groups xmlns:xi="http://www.w3.org/2001/XInclude">
- <Client name="client8">
- <Group name="group8"/>
- </Client>
- <Client name="client9">
- <Group name="group8"/>
- </Client>
-
- <Group name="group1" default="true" profile="true" public="true"
- category="category1"/>
- <Group name="group2" profile="true" public="true" category="category1">
- <Bundle name="bundle1"/>
- <Bundle name="bundle2"/>
- <Group name="group1"/>
- <Group name="group4"/>
- </Group>
- <Group name="group3" category="category2" public="false"/>
- <Group name="group4" category="category1">
- <Group name="group1"/>
- <Group name="group6"/>
- </Group>
- <Group name="group5"/>
- <Group name="group7">
- <Bundle name="bundle3"/>
- </Group>
- <Group name="group8">
- <Group name="group9"/>
- <Client name="client9">
- <Group name="group11"/>
- <Group name="group9" negate="true"/>
- </Client>
- <Group name="group1">
- <Group name="group10"/>
- </Group>
- </Group>
-</Groups>''').getroottree()
-
-
-def get_metadata_object(core=None, watch_clients=False, use_db=False):
- if core is None:
- core = Mock()
- core.setup.cfp.getboolean = Mock(return_value=use_db)
- return Metadata(core, datastore, watch_clients=watch_clients)
-
-
-class TestMetadataDB(DBModelTestCase):
- if has_django:
- models = [MetadataClientModel]
-
-
-if has_django or can_skip:
- class TestClientVersions(Bcfg2TestCase):
- test_clients = dict(client1="1.2.0",
- client2="1.2.2",
- client3="1.3.0pre1",
- client4="1.1.0",
- client5=None,
- client6=None)
-
- @skipUnless(has_django, "Django not found")
- def setUp(self):
- syncdb(TestMetadataDB)
- for client, version in self.test_clients.items():
- MetadataClientModel(hostname=client, version=version).save()
-
- def test__contains(self):
- v = ClientVersions()
- self.assertIn("client1", v)
- self.assertIn("client5", v)
- self.assertNotIn("client__contains", v)
-
- def test_keys(self):
- v = ClientVersions()
- self.assertItemsEqual(self.test_clients.keys(), v.keys())
-
- def test__setitem(self):
- v = ClientVersions()
-
- # test setting version of existing client
- v["client1"] = "1.2.3"
- self.assertIn("client1", v)
- self.assertEqual(v['client1'], "1.2.3")
- client = MetadataClientModel.objects.get(hostname="client1")
- self.assertEqual(client.version, "1.2.3")
-
- # test adding new client
- new = "client__setitem"
- v[new] = "1.3.0"
- self.assertIn(new, v)
- self.assertEqual(v[new], "1.3.0")
- client = MetadataClientModel.objects.get(hostname=new)
- self.assertEqual(client.version, "1.3.0")
-
- # test adding new client with no version
- new2 = "client__setitem_2"
- v[new2] = None
- self.assertIn(new2, v)
- self.assertEqual(v[new2], None)
- client = MetadataClientModel.objects.get(hostname=new2)
- self.assertEqual(client.version, None)
-
- def test__getitem(self):
- v = ClientVersions()
-
- # test getting existing client
- self.assertEqual(v['client2'], "1.2.2")
- self.assertIsNone(v['client5'])
-
- # test exception on nonexistent client
- expected = KeyError
- try:
- v['clients__getitem']
- except expected:
- pass
- except:
- err = sys.exc_info()[1]
- self.assertFalse(True, "%s raised instead of %s" %
- (err.__class__.__name__,
- expected.__class__.__name__))
- else:
- self.assertFalse(True,
- "%s not raised" % expected.__class__.__name__)
-
- def test__len(self):
- v = ClientVersions()
- self.assertEqual(len(v), MetadataClientModel.objects.count())
-
- def test__iter(self):
- v = ClientVersions()
- self.assertItemsEqual([h for h in iter(v)], v.keys())
-
- def test__delitem(self):
- v = ClientVersions()
-
- # test adding new client
- new = "client__delitem"
- v[new] = "1.3.0"
-
- del v[new]
- self.assertIn(new, v)
- self.assertIsNone(v[new])
-
-
-class TestXMLMetadataConfig(TestXMLFileBacked):
- test_obj = XMLMetadataConfig
-
- def get_obj(self, basefile="clients.xml", core=None, watch_clients=False):
- self.metadata = get_metadata_object(core=core,
- watch_clients=watch_clients)
- return XMLMetadataConfig(self.metadata, watch_clients, basefile)
-
- def test__init(self):
- xmc = self.get_obj()
- self.assertEqual(self.metadata.core.fam, xmc.fam)
- self.assertFalse(xmc.fam.AddMonitor.called)
-
- def test_xdata(self):
- config = self.get_obj()
- expected = Bcfg2.Server.Plugin.MetadataRuntimeError
- try:
- config.xdata
- except expected:
- pass
- except:
- err = sys.exc_info()[1]
- self.assertFalse(True, "%s raised instead of %s" %
- (err.__class__.__name__,
- expected.__class__.__name__))
- else:
- self.assertFalse(True,
- "%s not raised" % expected.__class__.__name__)
- pass
-
- config.data = "<test/>"
- self.assertEqual(config.xdata, "<test/>")
-
- def test_base_xdata(self):
- config = self.get_obj()
- # we can't use assertRaises here because base_xdata is a property
- expected = Bcfg2.Server.Plugin.MetadataRuntimeError
- try:
- config.base_xdata
- except expected:
- pass
- except:
- err = sys.exc_info()[1]
- self.assertFalse(True, "%s raised instead of %s" %
- (err.__class__.__name__,
- expected.__class__.__name__))
- else:
- self.assertFalse(True,
- "%s not raised" % expected.__class__.__name__)
- pass
-
- config.basedata = "<test/>"
- self.assertEqual(config.base_xdata, "<test/>")
-
- def test_add_monitor(self):
- core = Mock()
- config = self.get_obj(core=core)
-
- fname = "test.xml"
- fpath = os.path.join(self.metadata.data, fname)
-
- config.extras = []
- config.add_monitor(fpath)
- self.assertFalse(core.fam.AddMonitor.called)
- self.assertEqual(config.extras, [fpath])
-
- config = self.get_obj(core=core, watch_clients=True)
- config.add_monitor(fpath)
- core.fam.AddMonitor.assert_called_with(fpath, config.metadata)
- self.assertItemsEqual(config.extras, [fpath])
-
- def test_Index(self):
- # Index() isn't used on XMLMetadataConfig objects
- pass
-
- @patch("lxml.etree.parse")
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig._follow_xincludes")
- def test_load_xml(self, mock_follow, mock_parse):
- config = self.get_obj("clients.xml")
-
- def reset():
- mock_parse.reset_mock()
- mock_follow.reset_mock()
- config.data = None
- config.basedata = None
-
- reset()
- config.load_xml()
- mock_follow.assert_called_with(xdata=mock_parse.return_value)
- mock_parse.assert_called_with(os.path.join(config.basedir,
- "clients.xml"),
- parser=Bcfg2.Server.XMLParser)
- self.assertFalse(mock_parse.return_value.xinclude.called)
- self.assertEqual(config.data, mock_parse.return_value)
- self.assertIsNotNone(config.basedata)
-
- reset()
- mock_parse.side_effect = lxml.etree.XMLSyntaxError(None, None, None,
- None)
- config.load_xml()
- mock_parse.assert_called_with(os.path.join(config.basedir,
- "clients.xml"),
- parser=Bcfg2.Server.XMLParser)
- self.assertIsNone(config.data)
- self.assertIsNone(config.basedata)
-
- reset()
- mock_parse.side_effect = None
- def follow_xincludes(xdata=None):
- config.extras = [Mock(), Mock()]
- mock_follow.side_effect = follow_xincludes
- config.load_xml()
- mock_follow.assert_called_with(xdata=mock_parse.return_value)
- mock_parse.assert_called_with(os.path.join(config.basedir,
- "clients.xml"),
- parser=Bcfg2.Server.XMLParser)
- mock_parse.return_value.xinclude.assert_any_call()
- self.assertEqual(config.data, mock_parse.return_value)
- self.assertIsNotNone(config.basedata)
-
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml")
- def test_write(self, mock_write_xml):
- config = self.get_obj("clients.xml")
- config.basedata = "<test/>"
- config.write()
- mock_write_xml.assert_called_with(os.path.join(self.metadata.data,
- "clients.xml"),
- "<test/>")
-
- @patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False))
- @patch('fcntl.lockf', Mock())
- @patch('%s.open' % builtins)
- @patch('os.unlink')
- @patch('os.rename')
- @patch('os.path.islink')
- @patch('os.readlink')
- def test_write_xml(self, mock_readlink, mock_islink, mock_rename,
- mock_unlink, mock_open):
- fname = "clients.xml"
- config = self.get_obj(fname)
- fpath = os.path.join(self.metadata.data, fname)
- tmpfile = "%s.new" % fpath
- linkdest = os.path.join(self.metadata.data, "client-link.xml")
-
- mock_islink.return_value = False
-
- config.write_xml(fpath, get_clients_test_tree())
- mock_open.assert_called_with(tmpfile, "w")
- self.assertTrue(mock_open.return_value.write.called)
- mock_islink.assert_called_with(fpath)
- mock_rename.assert_called_with(tmpfile, fpath)
-
- mock_islink.return_value = True
- mock_readlink.return_value = linkdest
- config.write_xml(fpath, get_clients_test_tree())
- mock_rename.assert_called_with(tmpfile, linkdest)
-
- mock_rename.side_effect = OSError
- self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
- config.write_xml, fpath, get_clients_test_tree())
-
- mock_open.return_value.write.side_effect = IOError
- self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
- config.write_xml, fpath, get_clients_test_tree())
- mock_unlink.assert_called_with(tmpfile)
-
- mock_open.side_effect = IOError
- self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
- config.write_xml, fpath, get_clients_test_tree())
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch('lxml.etree.parse')
- def test_find_xml_for_xpath(self, mock_parse):
- config = self.get_obj("groups.xml")
- config.basedata = get_groups_test_tree()
- xpath = "//Group[@name='group1']"
- self.assertItemsEqual(config.find_xml_for_xpath(xpath),
- dict(filename=os.path.join(self.metadata.data,
- "groups.xml"),
- xmltree=get_groups_test_tree(),
- xquery=get_groups_test_tree().xpath(xpath)))
-
- self.assertEqual(config.find_xml_for_xpath("//boguselement"), dict())
-
- config.extras = [os.path.join(self.metadata.data, p)
- for p in ["foo.xml", "bar.xml", "clients.xml"]]
-
- def parse_side_effect(fname, parser=Bcfg2.Server.XMLParser):
- if fname == os.path.join(self.metadata.data, "clients.xml"):
- return get_clients_test_tree()
- else:
- return lxml.etree.XML("<null/>").getroottree()
-
- mock_parse.side_effect = parse_side_effect
- xpath = "//Client[@secure='true']"
- self.assertItemsEqual(config.find_xml_for_xpath(xpath),
- dict(filename=os.path.join(self.metadata.data,
- "clients.xml"),
- xmltree=get_clients_test_tree(),
- xquery=get_clients_test_tree().xpath(xpath)))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
- def test_HandleEvent(self, mock_load_xml):
- config = self.get_obj("groups.xml")
- evt = Mock()
- evt.filename = os.path.join(self.metadata.data, "groups.xml")
- evt.code2str = Mock(return_value="changed")
- self.assertTrue(config.HandleEvent(evt))
- mock_load_xml.assert_called_with()
-
-
-class TestClientMetadata(Bcfg2TestCase):
- def test_inGroup(self):
- cm = ClientMetadata("client1", "group1", ["group1", "group2"],
- ["bundle1"], [], [], [], None, None, None, None)
- self.assertTrue(cm.inGroup("group1"))
- self.assertFalse(cm.inGroup("group3"))
-
-
-class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked):
- test_obj = Metadata
- use_db = False
-
- def get_obj(self, core=None, watch_clients=False):
- return get_metadata_object(core=core, watch_clients=watch_clients,
- use_db=self.use_db)
-
- @skipUnless(has_django, "Django not found")
- def test__use_db(self):
- # with the way we've set up our metadata tests, it's unweildy
- # to test _use_db. however, given the way get_obj works, if
- # there was a bug in _use_db it'd be almost certain to shake
- # out in the rest of the testing.
- pass
-
- def get_nonexistent_client(self, metadata, prefix="newclient"):
- if metadata is None:
- metadata = self.load_clients_data()
- i = 0
- client_name = "%s%s" % (prefix, i)
- while client_name in metadata.clients:
- i += 1
- client_name = "%s%s" % (prefix, i)
- return client_name
-
- def test__init(self):
- # test with watch_clients=False
- core = Mock()
- metadata = self.get_obj(core=core)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics)
- self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig)
- self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig)
- self.assertIsInstance(metadata.query, MetadataQuery)
- self.assertEqual(metadata.states, dict())
-
- # test with watch_clients=True
- core.fam = Mock()
- metadata = self.get_obj(core=core, watch_clients=True)
- self.assertEqual(len(metadata.states), 2)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "clients.xml"),
- metadata)
-
- core.fam.reset_mock()
- core.fam.AddMonitor = Mock(side_effect=IOError)
- self.assertRaises(Bcfg2.Server.Plugin.PluginInitError,
- self.get_obj, core=core, watch_clients=True)
-
- @patch('os.makedirs', Mock())
- @patch('%s.open' % builtins)
- def test_init_repo(self, mock_open):
- Metadata.init_repo(datastore,
- groups_xml="groups", clients_xml="clients")
- mock_open.assert_any_call(os.path.join(datastore, "Metadata",
- "groups.xml"), "w")
- mock_open.assert_any_call(os.path.join(datastore, "Metadata",
- "clients.xml"), "w")
-
- def test_search_xdata(self):
- # test finding a node with the proper name
- metadata = self.get_obj()
- tree = get_groups_test_tree()
- res = metadata._search_xdata("Group", "group1", tree)
- self.assertIsInstance(res, lxml.etree._Element)
- self.assertEqual(res.get("name"), "group1")
-
- # test finding a node with the wrong name but correct alias
- metadata = self.get_obj()
- tree = get_clients_test_tree()
- res = metadata._search_xdata("Client", "alias3", tree, alias=True)
- self.assertIsInstance(res, lxml.etree._Element)
- self.assertNotEqual(res.get("name"), "alias3")
-
- # test failure finding a node
- metadata = self.get_obj()
- tree = get_clients_test_tree()
- res = metadata._search_xdata("Client",
- self.get_nonexistent_client(metadata),
- tree, alias=True)
- self.assertIsNone(res)
-
- def search_xdata(self, tag, name, tree, alias=False):
- metadata = self.get_obj()
- res = metadata._search_xdata(tag, name, tree, alias=alias)
- self.assertIsInstance(res, lxml.etree._Element)
- if not alias:
- self.assertEqual(res.get("name"), name)
-
- def test_search_group(self):
- # test finding a group with the proper name
- tree = get_groups_test_tree()
- self.search_xdata("Group", "group1", tree)
-
- def test_search_bundle(self):
- # test finding a bundle with the proper name
- tree = get_groups_test_tree()
- self.search_xdata("Bundle", "bundle1", tree)
-
- def test_search_client(self):
- # test finding a client with the proper name
- tree = get_clients_test_tree()
- self.search_xdata("Client", "client1", tree, alias=True)
- self.search_xdata("Client", "alias1", tree, alias=True)
-
- def test_add_group(self):
- metadata = self.get_obj()
- metadata.groups_xml.write = Mock()
- metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
-
- metadata.add_group("test1", dict())
- metadata.groups_xml.write.assert_any_call()
- grp = metadata.search_group("test1", metadata.groups_xml.base_xdata)
- self.assertIsNotNone(grp)
- self.assertEqual(grp.attrib, dict(name='test1'))
-
- # have to call this explicitly -- usually load_xml does this
- # on FAM events
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
-
- metadata.add_group("test2", dict(foo='bar'))
- metadata.groups_xml.write.assert_any_call()
- grp = metadata.search_group("test2", metadata.groups_xml.base_xdata)
- self.assertIsNotNone(grp)
- self.assertEqual(grp.attrib, dict(name='test2', foo='bar'))
-
- # have to call this explicitly -- usually load_xml does this
- # on FAM events
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
-
- metadata.groups_xml.write.reset_mock()
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.add_group,
- "test1", dict())
- self.assertFalse(metadata.groups_xml.write.called)
-
- def test_update_group(self):
- metadata = self.get_obj()
- metadata.groups_xml.write_xml = Mock()
- metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
-
- metadata.update_group("group1", dict(foo="bar"))
- grp = metadata.search_group("group1", metadata.groups_xml.base_xdata)
- self.assertIsNotNone(grp)
- self.assertIn("foo", grp.attrib)
- self.assertEqual(grp.get("foo"), "bar")
- self.assertTrue(metadata.groups_xml.write_xml.called)
-
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.update_group,
- "bogus_group", dict())
-
- def test_remove_group(self):
- metadata = self.get_obj()
- metadata.groups_xml.write_xml = Mock()
- metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
-
- metadata.remove_group("group5")
- grp = metadata.search_group("group5", metadata.groups_xml.base_xdata)
- self.assertIsNone(grp)
- self.assertTrue(metadata.groups_xml.write_xml.called)
-
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.remove_group,
- "bogus_group")
-
- def test_add_bundle(self):
- metadata = self.get_obj()
- metadata.groups_xml.write = Mock()
- metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
-
- metadata.add_bundle("bundle1")
- metadata.groups_xml.write.assert_any_call()
- bundle = metadata.search_bundle("bundle1",
- metadata.groups_xml.base_xdata)
- self.assertIsNotNone(bundle)
- self.assertEqual(bundle.attrib, dict(name='bundle1'))
-
- # have to call this explicitly -- usually load_xml does this
- # on FAM events
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
-
- metadata.groups_xml.write.reset_mock()
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.add_bundle,
- "bundle1")
- self.assertFalse(metadata.groups_xml.write.called)
-
- def test_remove_bundle(self):
- metadata = self.get_obj()
- metadata.groups_xml.write_xml = Mock()
- metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
-
- metadata.remove_bundle("bundle1")
- grp = metadata.search_bundle("bundle1", metadata.groups_xml.base_xdata)
- self.assertIsNone(grp)
- self.assertTrue(metadata.groups_xml.write_xml.called)
-
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.remove_bundle,
- "bogus_bundle")
-
- def test_add_client(self):
- metadata = self.get_obj()
- metadata.clients_xml.write = Mock()
- metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree()
- metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
-
- new1 = self.get_nonexistent_client(metadata)
- metadata.add_client(new1, dict())
- metadata.clients_xml.write.assert_any_call()
- grp = metadata.search_client(new1, metadata.clients_xml.base_xdata)
- self.assertIsNotNone(grp)
- self.assertEqual(grp.attrib, dict(name=new1))
-
- # have to call this explicitly -- usually load_xml does this
- # on FAM events
- metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
- metadata._handle_clients_xml_event(Mock())
-
- new2 = self.get_nonexistent_client(metadata)
- metadata.add_client(new2, dict(foo='bar'))
- metadata.clients_xml.write.assert_any_call()
- grp = metadata.search_client(new2, metadata.clients_xml.base_xdata)
- self.assertIsNotNone(grp)
- self.assertEqual(grp.attrib, dict(name=new2, foo='bar'))
-
- # have to call this explicitly -- usually load_xml does this
- # on FAM events
- metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
-
- metadata.clients_xml.write.reset_mock()
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.add_client,
- new1, dict())
- self.assertFalse(metadata.clients_xml.write.called)
-
- def test_update_client(self):
- metadata = self.get_obj()
- metadata.clients_xml.write_xml = Mock()
- metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree())
- metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
-
- metadata.update_client("client1", dict(foo="bar"))
- grp = metadata.search_client("client1", metadata.clients_xml.base_xdata)
- self.assertIsNotNone(grp)
- self.assertIn("foo", grp.attrib)
- self.assertEqual(grp.get("foo"), "bar")
- self.assertTrue(metadata.clients_xml.write_xml.called)
-
- new = self.get_nonexistent_client(metadata)
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.update_client,
- new, dict())
-
- def load_clients_data(self, metadata=None, xdata=None):
- if metadata is None:
- metadata = self.get_obj()
- metadata.clients_xml.data = \
- xdata or copy.deepcopy(get_clients_test_tree())
- metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
- evt = Mock()
- evt.filename = os.path.join(datastore, "Metadata", "clients.xml")
- evt.code2str = Mock(return_value="changed")
- metadata.HandleEvent(evt)
- return metadata
-
- def test_handle_clients_xml_event(self):
- metadata = self.get_obj()
- metadata.profiles = ["group1", "group2"]
-
- metadata.clients_xml = Mock()
- metadata.clients_xml.xdata = copy.deepcopy(get_clients_test_tree())
- metadata._handle_clients_xml_event(Mock())
-
- if not self.use_db:
- self.assertItemsEqual(metadata.clients,
- dict([(c.get("name"), c.get("profile"))
- for c in get_clients_test_tree().findall("//Client")]))
- aliases = dict([(a.get("name"), a.getparent().get("name"))
- for a in get_clients_test_tree().findall("//Alias")])
- self.assertItemsEqual(metadata.aliases, aliases)
-
- raliases = dict([(c.get("name"), set())
- for c in get_clients_test_tree().findall("//Client")])
- for alias in get_clients_test_tree().findall("//Alias"):
- raliases[alias.getparent().get("name")].add(alias.get("name"))
- self.assertItemsEqual(metadata.raliases, raliases)
-
- self.assertEqual(metadata.secure,
- [c.get("name")
- for c in get_clients_test_tree().findall("//Client[@secure='true']")])
- self.assertEqual(metadata.floating, ["client1", "client10"])
-
- addresses = dict([(c.get("address"), [])
- for c in get_clients_test_tree().findall("//*[@address]")])
- raddresses = dict()
- for client in get_clients_test_tree().findall("//Client[@address]"):
- addresses[client.get("address")].append(client.get("name"))
- try:
- raddresses[client.get("name")].append(client.get("address"))
- except KeyError:
- raddresses[client.get("name")] = [client.get("address")]
- for alias in get_clients_test_tree().findall("//Alias[@address]"):
- addresses[alias.get("address")].append(alias.getparent().get("name"))
- try:
- raddresses[alias.getparent().get("name")].append(alias.get("address"))
- except KeyError:
- raddresses[alias.getparent().get("name")] = alias.get("address")
-
- self.assertItemsEqual(metadata.addresses, addresses)
- self.assertItemsEqual(metadata.raddresses, raddresses)
- self.assertTrue(metadata.states['clients.xml'])
-
- def load_groups_data(self, metadata=None, xdata=None):
- if metadata is None:
- metadata = self.get_obj()
- metadata.groups_xml.data = \
- xdata or copy.deepcopy(get_groups_test_tree())
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
- evt = Mock()
- evt.filename = os.path.join(datastore, "Metadata", "groups.xml")
- evt.code2str = Mock(return_value="changed")
- metadata.HandleEvent(evt)
- return metadata
-
- def test_handle_groups_xml_event(self):
- metadata = self.get_obj()
- metadata.groups_xml = Mock()
- metadata.groups_xml.xdata = get_groups_test_tree()
- metadata._handle_groups_xml_event(Mock())
-
- self.assertTrue(metadata.states['groups.xml'])
- self.assertTrue(metadata.groups['group1'].is_public)
- self.assertTrue(metadata.groups['group2'].is_public)
- self.assertFalse(metadata.groups['group3'].is_public)
- self.assertFalse(metadata.groups['group1'].is_private)
- self.assertFalse(metadata.groups['group2'].is_private)
- self.assertTrue(metadata.groups['group3'].is_private)
- self.assertTrue(metadata.groups['group1'].is_profile)
- self.assertTrue(metadata.groups['group2'].is_profile)
- self.assertFalse(metadata.groups['group3'].is_profile)
- self.assertItemsEqual(metadata.groups.keys(),
- set(g.get("name")
- for g in get_groups_test_tree().findall("//Group")))
- self.assertEqual(metadata.groups['group1'].category, 'category1')
- self.assertEqual(metadata.groups['group2'].category, 'category1')
- self.assertEqual(metadata.groups['group3'].category, 'category2')
- self.assertEqual(metadata.groups['group4'].category, 'category1')
- self.assertEqual(metadata.default, "group1")
-
- all_groups = []
- negated_groups = []
- for group in get_groups_test_tree().xpath("//Groups/Client//*") + \
- get_groups_test_tree().xpath("//Groups/Group//*"):
- if group.tag == 'Group' and not group.getchildren():
- if group.get("negate", "false").lower() == 'true':
- negated_groups.append(group.get("name"))
- else:
- all_groups.append(group.get("name"))
- self.assertItemsEqual([g.name
- for g in metadata.group_membership.values()],
- all_groups)
- self.assertItemsEqual([g.name
- for g in metadata.negated_groups.values()],
- negated_groups)
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_set_profile(self):
- metadata = self.get_obj()
- if 'clients.xml' in metadata.states:
- metadata.states['clients.xml'] = False
- self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
- metadata.set_profile,
- None, None, None)
-
- self.load_groups_data(metadata=metadata)
- self.load_clients_data(metadata=metadata)
-
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.set_profile,
- "client1", "group5", None)
-
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.set_profile,
- "client1", "group3", None)
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_set_profile_db(self):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- if metadata._use_db:
- profile = "group1"
- client_name = self.get_nonexistent_client(metadata)
- metadata.set_profile(client_name, profile, None)
- self.assertIn(client_name, metadata.clients)
- self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
- metadata.set_profile,
- client_name, profile, None)
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client")
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
- def test_set_profile_xml(self, mock_update_client, mock_add_client):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- if not metadata._use_db:
- metadata.clients_xml.write = Mock()
- metadata.set_profile("client1", "group2", None)
- mock_update_client.assert_called_with("client1",
- dict(profile="group2"))
- metadata.clients_xml.write.assert_any_call()
- self.assertEqual(metadata.clientgroups["client1"], ["group2"])
-
- metadata.clients_xml.write.reset_mock()
- new1 = self.get_nonexistent_client(metadata)
- metadata.set_profile(new1, "group1", None)
- mock_add_client.assert_called_with(new1, dict(profile="group1"))
- metadata.clients_xml.write.assert_any_call()
- self.assertEqual(metadata.clientgroups[new1], ["group1"])
-
- metadata.clients_xml.write.reset_mock()
- new2 = self.get_nonexistent_client(metadata)
- metadata.session_cache[('1.2.3.6', None)] = (None, new2)
- metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None))
- mock_add_client.assert_called_with(new2,
- dict(uuid='uuid_new',
- profile="group1",
- address='1.2.3.6'))
- metadata.clients_xml.write.assert_any_call()
- self.assertEqual(metadata.clientgroups["uuid_new"], ["group1"])
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("socket.gethostbyaddr")
- def test_resolve_client(self, mock_gethostbyaddr):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
- self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
-
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.resolve_client,
- ('1.2.3.2', None))
- self.assertEqual(metadata.resolve_client(('1.2.3.1', None)), 'client1')
-
- metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
- 'client3')
- self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
- self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
- cleanup_cache=True), 'client3')
- self.assertEqual(metadata.session_cache, dict())
-
- mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
- self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
- mock_gethostbyaddr.assert_called_with('1.2.3.6')
-
- mock_gethostbyaddr.reset_mock()
- mock_gethostbyaddr.return_value = ('alias3', [], ['1.2.3.7'])
- self.assertEqual(metadata.resolve_client(('1.2.3.7', None)), 'client4')
- mock_gethostbyaddr.assert_called_with('1.2.3.7')
-
- mock_gethostbyaddr.reset_mock()
- mock_gethostbyaddr.return_value = None
- mock_gethostbyaddr.side_effect = socket.herror
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.resolve_client,
- ('1.2.3.8', None))
- mock_gethostbyaddr.assert_called_with('1.2.3.8')
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
- def test_get_initial_metadata(self, mock_clientmetadata):
- metadata = self.get_obj()
- if 'clients.xml' in metadata.states:
- metadata.states['clients.xml'] = False
- self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
- metadata.get_initial_metadata, None)
-
- self.load_groups_data(metadata=metadata)
- self.load_clients_data(metadata=metadata)
-
- # test address, password
- metadata.get_initial_metadata("client1")
- mock_clientmetadata.assert_called_with("client1", "group1",
- set(["group1"]), set(), set(),
- set(["1.2.3.1"]),
- dict(category1='group1'), None,
- 'password2', None,
- metadata.query)
-
- # test address, bundles, category suppression
- metadata.get_initial_metadata("client2")
- mock_clientmetadata.assert_called_with("client2", "group2",
- set(["group2"]),
- set(["bundle1", "bundle2"]),
- set(), set(["1.2.3.2"]),
- dict(category1="group2"),
- None, None, None,
- metadata.query)
-
- # test aliases, address, uuid, password
- imd = metadata.get_initial_metadata("alias1")
- mock_clientmetadata.assert_called_with("client3", "group1",
- set(["group1"]), set(),
- set(['alias1']),
- set(["1.2.3.3"]),
- dict(category1="group1"),
- 'uuid1', 'password2', None,
- metadata.query)
-
- # test new client creation
- new1 = self.get_nonexistent_client(metadata)
- imd = metadata.get_initial_metadata(new1)
- mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]),
- set(), set(), set(),
- dict(category1="group1"), None,
- None, None, metadata.query)
-
- # test nested groups, address, per-client groups
- imd = metadata.get_initial_metadata("client8")
- mock_clientmetadata.assert_called_with("client8", "group1",
- set(["group1", "group8",
- "group9", "group10"]),
- set(),
- set(), set(["1.2.3.5"]),
- dict(category1="group1"),
- None, None, None, metadata.query)
-
- # test setting per-client groups, group negation, nested groups
- imd = metadata.get_initial_metadata("client9")
- mock_clientmetadata.assert_called_with("client9", "group2",
- set(["group2", "group8",
- "group11"]),
- set(["bundle1", "bundle2"]),
- set(), set(),
- dict(category1="group2"), None,
- "password3", None,
- metadata.query)
-
- # test new client with no default profile
- metadata.default = None
- new2 = self.get_nonexistent_client(metadata)
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.get_initial_metadata, new2)
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_merge_groups(self):
- metadata = self.get_obj()
- self.load_groups_data(metadata=metadata)
- self.load_clients_data(metadata=metadata)
-
- self.assertEqual(metadata._merge_groups("client1", set(["group1"]),
- categories=dict(group1="category1")),
- (set(["group1"]), dict(group1="category1")))
-
- self.assertEqual(metadata._merge_groups("client8",
- set(["group1", "group8", "group9"]),
- categories=dict(group1="category1")),
- (set(["group1", "group8", "group9", "group10"]),
- dict(group1="category1")))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_get_all_group_names(self):
- metadata = self.load_groups_data()
- self.assertItemsEqual(metadata.get_all_group_names(),
- set([g.get("name")
- for g in get_groups_test_tree().findall("//Group")]))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_get_all_groups_in_category(self):
- metadata = self.load_groups_data()
- self.assertItemsEqual(metadata.get_all_groups_in_category("category1"),
- set([g.get("name")
- for g in get_groups_test_tree().findall("//Group[@category='category1']")]))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_get_client_names_by_profiles(self):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- self.assertItemsEqual(metadata.get_client_names_by_profiles(["group2"]),
- [c.get("name")
- for c in get_clients_test_tree().findall("//Client[@profile='group2']")])
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_get_client_names_by_groups(self):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- # this is not the best test in the world, since we mock
- # core.build_metadata to just build _initial_ metadata, which
- # is not at all the same thing. it turns out that mocking
- # this out without starting a Bcfg2 server is pretty
- # non-trivial, so this works-ish
- metadata.core.build_metadata = Mock()
- metadata.core.build_metadata.side_effect = \
- lambda c: metadata.get_initial_metadata(c)
- self.assertItemsEqual(metadata.get_client_names_by_groups(["group2"]),
- [c.get("name")
- for c in get_clients_test_tree().findall("//Client[@profile='group2']")])
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_merge_additional_groups(self):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- imd = metadata.get_initial_metadata("client2")
-
- # test adding a group excluded by categories
- oldgroups = imd.groups
- metadata.merge_additional_groups(imd, ["group4"])
- self.assertEqual(imd.groups, oldgroups)
-
- # test adding a private group
- oldgroups = imd.groups
- metadata.merge_additional_groups(imd, ["group3"])
- self.assertEqual(imd.groups, oldgroups)
-
- # test adding groups with bundles
- oldgroups = imd.groups
- oldbundles = imd.bundles
- metadata.merge_additional_groups(imd, ["group7"])
- self.assertEqual(imd.groups, oldgroups.union(["group7"]))
- self.assertEqual(imd.bundles, oldbundles.union(["bundle3"]))
-
- # test adding multiple groups
- imd = metadata.get_initial_metadata("client2")
- oldgroups = imd.groups
- metadata.merge_additional_groups(imd, ["group6", "group8"])
- self.assertItemsEqual(imd.groups,
- oldgroups.union(["group6", "group8", "group9"]))
-
- # test adding a group that is not defined in groups.xml
- imd = metadata.get_initial_metadata("client2")
- oldgroups = imd.groups
- metadata.merge_additional_groups(imd, ["group6", "newgroup"])
- self.assertItemsEqual(imd.groups,
- oldgroups.union(["group6", "newgroup"]))
-
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_merge_additional_data(self):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- imd = metadata.get_initial_metadata("client1")
-
- # we need to use a unique attribute name for this test. this
- # is probably overkill, but it works
- pattern = "connector%d"
- for i in range(0, 100):
- connector = pattern % i
- if not hasattr(imd, connector):
- break
- self.assertFalse(hasattr(imd, connector),
- "Could not find unique connector name to test "
- "merge_additional_data()")
-
- metadata.merge_additional_data(imd, connector, "test data")
- self.assertEqual(getattr(imd, connector), "test data")
- self.assertIn(connector, imd.connectors)
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
- def test_validate_client_address(self, mock_resolve_client):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- self.assertTrue(metadata.validate_client_address("client1",
- (None, None)))
- self.assertTrue(metadata.validate_client_address("client2",
- ("1.2.3.2", None)))
- self.assertFalse(metadata.validate_client_address("client2",
- ("1.2.3.8", None)))
- self.assertTrue(metadata.validate_client_address("client4",
- ("1.2.3.2", None)))
- # this is upper case to ensure that case is folded properly in
- # validate_client_address()
- mock_resolve_client.return_value = "CLIENT4"
- self.assertTrue(metadata.validate_client_address("client4",
- ("1.2.3.7", None)))
- mock_resolve_client.assert_called_with(("1.2.3.7", None))
-
- mock_resolve_client.reset_mock()
- self.assertFalse(metadata.validate_client_address("client5",
- ("1.2.3.5", None)))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
- def test_AuthenticateConnection(self, mock_resolve_client,
- mock_validate_client_address):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- metadata.password = "password1"
-
- cert = dict(subject=[[("commonName", "client1")]])
- mock_validate_client_address.return_value = False
- self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
- "1.2.3.1"))
- mock_validate_client_address.return_value = True
- self.assertTrue(metadata.AuthenticateConnection(cert, "root", None,
- "1.2.3.1"))
- # floating cert-auth clients add themselves to the cache
- self.assertIn("1.2.3.1", metadata.session_cache)
- self.assertEqual(metadata.session_cache["1.2.3.1"][1], "client1")
-
- cert = dict(subject=[[("commonName", "client7")]])
- self.assertTrue(metadata.AuthenticateConnection(cert, "root", None,
- "1.2.3.4"))
- # non-floating cert-auth clients do not add themselves to the cache
- self.assertNotIn("1.2.3.4", metadata.session_cache)
-
- cert = dict(subject=[[("commonName", "client8")]])
-
- mock_resolve_client.return_value = "client5"
- self.assertTrue(metadata.AuthenticateConnection(None, "root",
- "password1", "1.2.3.8"))
-
- mock_resolve_client.side_effect = \
- Bcfg2.Server.Plugin.MetadataConsistencyError
- self.assertFalse(metadata.AuthenticateConnection(None, "root",
- "password1",
- "1.2.3.8"))
-
- # secure mode, no password
- self.assertFalse(metadata.AuthenticateConnection(None, 'client2', None,
- "1.2.3.2"))
-
- self.assertTrue(metadata.AuthenticateConnection(None, 'uuid1',
- "password1", "1.2.3.3"))
- # non-root, non-cert clients populate session cache
- self.assertIn("1.2.3.3", metadata.session_cache)
- self.assertEqual(metadata.session_cache["1.2.3.3"][1], "client3")
-
- # use alternate password
- self.assertTrue(metadata.AuthenticateConnection(None, 'client3',
- "password2", "1.2.3.3"))
-
- # test secure mode
- self.assertFalse(metadata.AuthenticateConnection(None, 'client9',
- "password1",
- "1.2.3.9"))
- self.assertTrue(metadata.AuthenticateConnection(None, 'client9',
- "password3", "1.2.3.9"))
-
- self.assertFalse(metadata.AuthenticateConnection(None, "client5",
- "password2",
- "1.2.3.7"))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
- def test_process_statistics(self, mock_update_client):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- md = Mock()
- md.hostname = "client6"
- metadata.process_statistics(md, None)
- mock_update_client.assert_called_with(md.hostname,
- dict(auth='cert'))
-
- mock_update_client.reset_mock()
- md.hostname = "client5"
- metadata.process_statistics(md, None)
- self.assertFalse(mock_update_client.called)
-
- def test_viz(self):
- pass
-
-
-class TestMetadataBase(TestMetadata):
- """ base test object for testing Metadata with database enabled """
- __test__ = False
- use_db = True
-
- @skipUnless(has_django, "Django not found")
- def setUp(self):
- syncdb(TestMetadataDB)
-
- def load_clients_data(self, metadata=None, xdata=None):
- if metadata is None:
- metadata = get_obj()
- for client in get_clients_test_tree().findall("Client"):
- metadata.add_client(client.get("name"))
- return metadata
-
- def get_nonexistent_client(self, _, prefix="newclient"):
- clients = [o.hostname for o in MetadataClientModel.objects.all()]
- i = 0
- client_name = "%s%s" % (prefix, i)
- while client_name in clients:
- i += 1
- client_name = "%s%s" % (prefix, i)
- return client_name
-
- @patch('os.path.exists')
- def test__init(self, mock_exists):
- core = Mock()
- core.fam = Mock()
- mock_exists.return_value = False
- metadata = self.get_obj(core=core, watch_clients=True)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked)
- core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
-
- mock_exists.return_value = True
- core.fam.reset_mock()
- metadata = self.get_obj(core=core, watch_clients=True)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "clients.xml"),
- metadata)
-
- def test_add_group(self):
- pass
-
- def test_add_bundle(self):
- pass
-
- def test_add_client(self):
- metadata = self.get_obj()
- hostname = self.get_nonexistent_client(metadata)
- client = metadata.add_client(hostname)
- self.assertIsInstance(client, MetadataClientModel)
- self.assertEqual(client.hostname, hostname)
- self.assertIn(hostname, metadata.clients)
- self.assertIn(hostname, metadata.list_clients())
- self.assertItemsEqual(metadata.clients,
- [c.hostname
- for c in MetadataClientModel.objects.all()])
-
- def test_update_group(self):
- pass
-
- def test_update_bundle(self):
- pass
-
- def test_update_client(self):
- pass
-
- def test_list_clients(self):
- metadata = self.get_obj()
- self.assertItemsEqual(metadata.list_clients(),
- [c.hostname
- for c in MetadataClientModel.objects.all()])
-
- def test_remove_group(self):
- pass
-
- def test_remove_bundle(self):
- pass
-
- def test_remove_client(self):
- metadata = self.get_obj()
- client_name = self.get_nonexistent_client(metadata)
-
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.remove_client,
- client_name)
-
- metadata.add_client(client_name)
- metadata.remove_client(client_name)
- self.assertNotIn(client_name, metadata.clients)
- self.assertNotIn(client_name, metadata.list_clients())
- self.assertItemsEqual(metadata.clients,
- [c.hostname
- for c in MetadataClientModel.objects.all()])
-
- def test_process_statistics(self):
- pass
-
-
-class TestMetadata_NoClientsXML(TestMetadataBase):
- """ test Metadata without a clients.xml. we have to disable or
- override tests that rely on client options """
- # only run these tests if it's possible to skip tests or if we
- # have django. otherwise they'll all get run because our fake
- # skipping decorators for python < 2.7 won't work when they
- # decorate setUp()
- if can_skip or has_django:
- __test__ = True
-
- def load_groups_data(self, metadata=None, xdata=None):
- if metadata is None:
- metadata = self.get_obj()
- if not xdata:
- xdata = copy.deepcopy(get_groups_test_tree())
- for client in get_clients_test_tree().findall("Client"):
- newclient = \
- lxml.etree.SubElement(xdata.getroot(),
- "Client", name=client.get("name"))
- lxml.etree.SubElement(newclient, "Group",
- name=client.get("profile"))
- metadata.groups_xml.data = xdata
- metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
- evt = Mock()
- evt.filename = os.path.join(datastore, "Metadata", "groups.xml")
- evt.code2str = Mock(return_value="changed")
- metadata.HandleEvent(evt)
- return metadata
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
- def test_get_initial_metadata(self, mock_clientmetadata):
- metadata = self.get_obj()
- if 'clients.xml' in metadata.states:
- metadata.states['clients.xml'] = False
- self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
- metadata.get_initial_metadata, None)
-
- self.load_groups_data(metadata=metadata)
- self.load_clients_data(metadata=metadata)
-
- # test basic client metadata
- metadata.get_initial_metadata("client1")
- mock_clientmetadata.assert_called_with("client1", "group1",
- set(["group1"]), set(), set(),
- set(), dict(category1='group1'),
- None, None, None, metadata.query)
-
- # test bundles, category suppression
- metadata.get_initial_metadata("client2")
- mock_clientmetadata.assert_called_with("client2", "group2",
- set(["group2"]),
- set(["bundle1", "bundle2"]),
- set(), set(),
- dict(category1="group2"), None,
- None, None, metadata.query)
-
- # test new client creation
- new1 = self.get_nonexistent_client(metadata)
- imd = metadata.get_initial_metadata(new1)
- mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]),
- set(), set(), set(),
- dict(category1="group1"), None,
- None, None, metadata.query)
-
- # test nested groups, per-client groups
- imd = metadata.get_initial_metadata("client8")
- mock_clientmetadata.assert_called_with("client8", "group1",
- set(["group1", "group8",
- "group9", "group10"]),
- set(), set(), set(),
- dict(category1="group1"), None,
- None, None, metadata.query)
-
- # test per-client groups, group negation, nested groups
- imd = metadata.get_initial_metadata("client9")
- mock_clientmetadata.assert_called_with("client9", "group2",
- set(["group2", "group8",
- "group11"]),
- set(["bundle1", "bundle2"]),
- set(), set(),
- dict(category1="group2"), None,
- None, None, metadata.query)
-
- # test exception on new client with no default profile
- metadata.default = None
- new2 = self.get_nonexistent_client(metadata)
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.get_initial_metadata,
- new2)
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
- def test_validate_client_address(self, mock_resolve_client):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- # this is upper case to ensure that case is folded properly in
- # validate_client_address()
- mock_resolve_client.return_value = "CLIENT4"
- self.assertTrue(metadata.validate_client_address("client4",
- ("1.2.3.7", None)))
- mock_resolve_client.assert_called_with(("1.2.3.7", None))
-
- mock_resolve_client.reset_mock()
- self.assertFalse(metadata.validate_client_address("client5",
- ("1.2.3.5", None)))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
- def test_AuthenticateConnection(self, mock_resolve_client,
- mock_validate_client_address):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- metadata.password = "password1"
-
- cert = dict(subject=[[("commonName", "client1")]])
- mock_validate_client_address.return_value = False
- self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
- "1.2.3.1"))
- mock_validate_client_address.return_value = True
- self.assertTrue(metadata.AuthenticateConnection(cert, "root",
- metadata.password,
- "1.2.3.1"))
-
- cert = dict(subject=[[("commonName", "client8")]])
-
- mock_resolve_client.return_value = "client5"
- self.assertTrue(metadata.AuthenticateConnection(None, "root",
- "password1", "1.2.3.8"))
-
- mock_resolve_client.side_effect = \
- Bcfg2.Server.Plugin.MetadataConsistencyError
- self.assertFalse(metadata.AuthenticateConnection(None, "root",
- "password1",
- "1.2.3.8"))
-
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("socket.gethostbyaddr")
- def test_resolve_client(self, mock_gethostbyaddr):
- metadata = self.load_clients_data(metadata=self.load_groups_data())
- metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
- self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
-
- metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
- 'client3')
- mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3'])
- self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
- cleanup_cache=True), 'client3')
- self.assertEqual(metadata.session_cache, dict())
-
- mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
- self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
- mock_gethostbyaddr.assert_called_with('1.2.3.6')
-
- mock_gethostbyaddr.reset_mock()
- mock_gethostbyaddr.return_value = None
- mock_gethostbyaddr.side_effect = socket.herror
- self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
- metadata.resolve_client,
- ('1.2.3.8', None))
- mock_gethostbyaddr.assert_called_with('1.2.3.8')
-
- def test_handle_clients_xml_event(self):
- pass
-
-
-class TestMetadata_ClientsXML(TestMetadataBase):
- """ test Metadata with a clients.xml. """
- # only run these tests if it's possible to skip tests or if we
- # have django. otherwise they'll all get run because our fake
- # skipping decorators for python < 2.7 won't work when they
- # decorate setUp()
- if can_skip or has_django:
- __test__ = True
-
- def load_clients_data(self, metadata=None, xdata=None):
- if metadata is None:
- metadata = self.get_obj()
- metadata.core.fam = Mock()
- metadata._handle_file("clients.xml")
- metadata = TestMetadata.load_clients_data(self, metadata=metadata,
- xdata=xdata)
- return TestMetadataBase.load_clients_data(self, metadata=metadata,
- xdata=xdata)
-
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py
deleted file mode 100644
index 0a971c245..000000000
--- a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py
+++ /dev/null
@@ -1,549 +0,0 @@
-import os
-import sys
-import time
-import lxml.etree
-import Bcfg2.Server
-import Bcfg2.Server.Plugin
-from mock import Mock, MagicMock, patch
-
-# add all parent testsuite directories to sys.path to allow (most)
-# relative imports in python 2.4
-path = os.path.dirname(__file__)
-while path != "/":
- if os.path.basename(path).lower().startswith("test"):
- sys.path.append(path)
- if os.path.basename(path) == "testsuite":
- break
- path = os.path.dirname(path)
-from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
- skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
- patchIf, datastore
-from Bcfg2.Server.Plugins.Probes import *
-from TestPlugin import TestEntrySet, TestProbing, TestConnector, \
- TestDatabaseBacked
-
-# test data for JSON and YAML tests
-test_data = dict(a=1, b=[1, 2, 3], c="test")
-
-class FakeList(list):
- pass
-
-
-class TestProbesDB(DBModelTestCase):
- if has_django:
- models = [ProbesGroupsModel, ProbesDataModel]
-
-
-class TestClientProbeDataSet(Bcfg2TestCase):
- def test__init(self):
- ds = ClientProbeDataSet()
- self.assertLessEqual(ds.timestamp, time.time())
- self.assertIsInstance(ds, dict)
- self.assertNotIn("timestamp", ds)
-
- ds = ClientProbeDataSet(timestamp=123)
- self.assertEqual(ds.timestamp, 123)
- self.assertNotIn("timestamp", ds)
-
-class TestProbeData(Bcfg2TestCase):
- def test_str(self):
- # a value that is not valid XML, JSON, or YAML
- val = "'test"
-
- # test string behavior
- data = ProbeData(val)
- self.assertIsInstance(data, str)
- self.assertEqual(data, val)
- # test 1.2.0-1.2.2 broken behavior
- self.assertEqual(data.data, val)
- # test that formatted data accessors return None
- self.assertIsNone(data.xdata)
- self.assertIsNone(data.yaml)
- self.assertIsNone(data.json)
-
- def test_xdata(self):
- xdata = lxml.etree.Element("test")
- lxml.etree.SubElement(xdata, "test2")
- data = ProbeData(lxml.etree.tostring(xdata,
- xml_declaration=False).decode('UTF-8'))
- self.assertIsNotNone(data.xdata)
- self.assertIsNotNone(data.xdata.find("test2"))
-
- @skipUnless(has_json, "JSON libraries not found, skipping JSON tests")
- def test_json(self):
- jdata = json.dumps(test_data)
- data = ProbeData(jdata)
- self.assertIsNotNone(data.json)
- self.assertItemsEqual(test_data, data.json)
-
- @skipUnless(has_yaml, "YAML libraries not found, skipping YAML tests")
- def test_yaml(self):
- jdata = yaml.dump(test_data)
- data = ProbeData(jdata)
- self.assertIsNotNone(data.yaml)
- self.assertItemsEqual(test_data, data.yaml)
-
-
-class TestProbeSet(TestEntrySet):
- test_obj = ProbeSet
- basenames = ["test", "_test", "test-test"]
- ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"]
- bogus_names = ["test.py"]
-
- def get_obj(self, path=datastore, fam=None, encoding=None,
- plugin_name="Probes", basename=None):
- # get_obj() accepts the basename argument, accepted by the
- # parent get_obj() method, and just throws it away, since
- # ProbeSet uses a regex for the "basename"
- if fam is None:
- fam = Mock()
- rv = self.test_obj(path, fam, encoding, plugin_name)
- rv.entry_type = MagicMock()
- return rv
-
- def test__init(self):
- fam = Mock()
- ps = self.get_obj(fam=fam)
- self.assertEqual(ps.plugin_name, "Probes")
- fam.AddMonitor.assert_called_with(datastore, ps)
- TestEntrySet.test__init(self)
-
- def test_HandleEvent(self):
- ps = self.get_obj()
- ps.handle_event = Mock()
-
- # test that events on the data store itself are skipped
- evt = Mock()
- evt.filename = datastore
- ps.HandleEvent(evt)
- self.assertFalse(ps.handle_event.called)
-
- # test that events on probed.xml are skipped
- evt.reset_mock()
- evt.filename = "probed.xml"
- ps.HandleEvent(evt)
- self.assertFalse(ps.handle_event.called)
-
- # test that other events are processed appropriately
- evt.reset_mock()
- evt.filename = "fooprobe"
- ps.HandleEvent(evt)
- ps.handle_event.assert_called_with(evt)
-
- @patch("%s.list" % builtins, FakeList)
- def test_get_probe_data(self):
- ps = self.get_obj()
-
- # build some fairly complex test data for this. in the end,
- # we want the probe data to include only the most specific
- # version of a given probe, and by basename only, not full
- # (specific) name. We don't fully test the specificity stuff,
- # we just check to make sure sort() is called and trust that
- # sort() does the right thing on Specificity objects. (I.e.,
- # trust that Specificity is well-tested. Hah!) We also test
- # to make sure the interpreter is determined correctly.
- ps.get_matching = Mock()
- matching = FakeList()
- matching.sort = Mock()
-
- p1 = Mock()
- p1.specific = Bcfg2.Server.Plugin.Specificity(group=True, prio=10)
- p1.name = "fooprobe.G10_foogroup"
- p1.data = """#!/bin/bash
-group-specific"""
- matching.append(p1)
-
- p2 = Mock()
- p2.specific = Bcfg2.Server.Plugin.Specificity(all=True)
- p2.name = "fooprobe"
- p2.data = "#!/bin/bash"
- matching.append(p2)
-
- p3 = Mock()
- p3.specific = Bcfg2.Server.Plugin.Specificity(all=True)
- p3.name = "barprobe"
- p3.data = "#! /usr/bin/env python"
- matching.append(p3)
-
- p4 = Mock()
- p4.specific = Bcfg2.Server.Plugin.Specificity(all=True)
- p4.name = "bazprobe"
- p4.data = ""
- matching.append(p4)
-
- ps.get_matching.return_value = matching
-
- metadata = Mock()
- pdata = ps.get_probe_data(metadata)
- ps.get_matching.assert_called_with(metadata)
- # we can't create a matching operator.attrgetter object, and I
- # don't feel the need to mock that out -- this is a good
- # enough check
- self.assertTrue(matching.sort.called)
-
- self.assertEqual(len(pdata), 3,
- "Found: %s" % [p.get("name") for p in pdata])
- for probe in pdata:
- if probe.get("name") == "fooprobe":
- self.assertIn("group-specific", probe.text)
- self.assertEqual(probe.get("interpreter"), "/bin/bash")
- elif probe.get("name") == "barprobe":
- self.assertEqual(probe.get("interpreter"),
- "/usr/bin/env python")
- elif probe.get("name") == "bazprobe":
- self.assertIsNotNone(probe.get("interpreter"))
- else:
- assert False, "Strange probe found in get_probe_data() return"
-
-
-class TestProbes(TestProbing, TestConnector, TestDatabaseBacked):
- test_obj = Probes
-
- def get_test_probedata(self):
- test_xdata = lxml.etree.Element("test")
- lxml.etree.SubElement(test_xdata, "test", foo="foo")
- rv = dict()
- rv["foo.example.com"] = ClientProbeDataSet(timestamp=time.time())
- rv["foo.example.com"]["xml"] = \
- ProbeData(lxml.etree.tostring(test_xdata,
- xml_declaration=False).decode('UTF-8'))
- rv["foo.example.com"]["text"] = ProbeData("freeform text")
- rv["foo.example.com"]["multiline"] = ProbeData("""multiple
-lines
-of
-freeform
-text
-""")
- rv["bar.example.com"] = ClientProbeDataSet(timestamp=time.time())
- rv["bar.example.com"]["empty"] = ProbeData("")
- if has_yaml:
- rv["bar.example.com"]["yaml"] = ProbeData(yaml.dump(test_data))
- if has_json:
- rv["bar.example.com"]["json"] = ProbeData(json.dumps(test_data))
- return rv
-
- def get_test_cgroups(self):
- return {"foo.example.com": ["group", "group with spaces",
- "group-with-dashes"],
- "bar.example.com": []}
-
- def get_probes_object(self, use_db=False, load_data=None):
- core = Mock()
- core.setup.cfp.getboolean = Mock()
- core.setup.cfp.getboolean.return_value = use_db
- if load_data is None:
- load_data = MagicMock()
- # we have to patch load_data() in a funny way because
- # different versions of Mock have different scopes for
- # patching. in some versions, a patch applied to
- # get_probes_object() would only apply to that function, while
- # in others it would also apply to the calling function (e.g.,
- # test__init(), which relies on being able to check the calls
- # of load_data(), and thus on load_data() being consistently
- # mocked)
- @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", new=load_data)
- def inner():
- return Probes(core, datastore)
-
- return inner()
-
- def test__init(self):
- mock_load_data = Mock()
- probes = self.get_probes_object(load_data=mock_load_data)
- probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore,
- probes.name),
- probes.probes)
- mock_load_data.assert_any_call()
- self.assertEqual(probes.probedata, ClientProbeDataSet())
- self.assertEqual(probes.cgroups, dict())
-
- @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", Mock())
- def test__use_db(self):
- probes = self.get_probes_object()
- self.assertFalse(probes._use_db)
- probes.core.setup.cfp.getboolean.assert_called_with("probes",
- "use_database",
- default=False)
-
- @skipUnless(has_django, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock())
- def test_write_data_xml(self):
- probes = self.get_probes_object(use_db=False)
- probes.write_data("test")
- probes._write_data_xml.assert_called_with("test")
- self.assertFalse(probes._write_data_db.called)
-
- @skipUnless(has_django, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock())
- def test_write_data_db(self):
- probes = self.get_probes_object(use_db=True)
- probes.write_data("test")
- probes._write_data_db.assert_called_with("test")
- self.assertFalse(probes._write_data_xml.called)
-
- @patch("%s.open" % builtins)
- def test__write_data_xml(self, mock_open):
- probes = self.get_probes_object(use_db=False)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
- probes._write_data_xml(None)
-
- mock_open.assert_called_with(os.path.join(datastore, probes.name,
- "probed.xml"), "w")
- data = lxml.etree.XML(mock_open.return_value.write.call_args[0][0])
- self.assertEqual(len(data.xpath("//Client")), 2)
-
- foodata = data.find("Client[@name='foo.example.com']")
- self.assertIsNotNone(foodata)
- self.assertIsNotNone(foodata.get("timestamp"))
- self.assertEqual(len(foodata.findall("Probe")),
- len(probes.probedata['foo.example.com']))
- self.assertEqual(len(foodata.findall("Group")),
- len(probes.cgroups['foo.example.com']))
- xml = foodata.find("Probe[@name='xml']")
- self.assertIsNotNone(xml)
- self.assertIsNotNone(xml.get("value"))
- xdata = lxml.etree.XML(xml.get("value"))
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- text = foodata.find("Probe[@name='text']")
- self.assertIsNotNone(text)
- self.assertIsNotNone(text.get("value"))
- multiline = foodata.find("Probe[@name='multiline']")
- self.assertIsNotNone(multiline)
- self.assertIsNotNone(multiline.get("value"))
- self.assertGreater(len(multiline.get("value").splitlines()), 1)
-
- bardata = data.find("Client[@name='bar.example.com']")
- self.assertIsNotNone(bardata)
- self.assertIsNotNone(bardata.get("timestamp"))
- self.assertEqual(len(bardata.findall("Probe")),
- len(probes.probedata['bar.example.com']))
- self.assertEqual(len(bardata.findall("Group")),
- len(probes.cgroups['bar.example.com']))
- empty = bardata.find("Probe[@name='empty']")
- self.assertIsNotNone(empty)
- self.assertIsNotNone(empty.get("value"))
- self.assertEqual(empty.get("value"), "")
- if has_yaml:
- ydata = bardata.find("Probe[@name='yaml']")
- self.assertIsNotNone(ydata)
- self.assertIsNotNone(ydata.get("value"))
- self.assertItemsEqual(test_data, yaml.load(ydata.get("value")))
- if has_json:
- jdata = bardata.find("Probe[@name='json']")
- self.assertIsNotNone(jdata)
- self.assertIsNotNone(jdata.get("value"))
- self.assertItemsEqual(test_data, json.loads(jdata.get("value")))
-
- @skipUnless(has_django, "Django not found, skipping")
- def test__write_data_db(self):
- syncdb(TestProbesDB)
- probes = self.get_probes_object(use_db=True)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- for cname in ["foo.example.com", "bar.example.com"]:
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
-
- for probe in pdata:
- self.assertEqual(probe.hostname, client.hostname)
- self.assertIsNotNone(probe.data)
- if probe.probe == "xml":
- xdata = lxml.etree.XML(probe.data)
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- elif probe.probe == "text":
- pass
- elif probe.probe == "multiline":
- self.assertGreater(len(probe.data.splitlines()), 1)
- elif probe.probe == "empty":
- self.assertEqual(probe.data, "")
- elif probe.probe == "yaml":
- self.assertItemsEqual(test_data, yaml.load(probe.data))
- elif probe.probe == "json":
- self.assertItemsEqual(test_data, json.loads(probe.data))
- else:
- assert False, "Strange probe found in _write_data_db data"
-
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- # test that old probe data is removed properly
- cname = 'foo.example.com'
- del probes.probedata[cname]['text']
- probes.cgroups[cname].pop()
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- @skipUnless(has_django, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock())
- def test_load_data_xml(self):
- probes = self.get_probes_object(use_db=False)
- probes.load_data()
- probes._load_data_xml.assert_any_call()
- self.assertFalse(probes._load_data_db.called)
-
- @skipUnless(has_django, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock())
- def test_load_data_db(self):
- probes = self.get_probes_object(use_db=True)
- probes.load_data()
- probes._load_data_db.assert_any_call()
- self.assertFalse(probes._load_data_xml.called)
-
- @patch("%s.open" % builtins)
- @patch("lxml.etree.parse")
- def test__load_data_xml(self, mock_parse, mock_open):
- probes = self.get_probes_object(use_db=False)
- # to get the value for lxml.etree.parse to parse, we call
- # _write_data_xml, mock the open() call, and grab the data
- # that gets "written" to probed.xml
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
- probes._write_data_xml(None)
- xdata = \
- lxml.etree.XML(str(mock_open.return_value.write.call_args[0][0]))
- mock_parse.return_value = xdata.getroottree()
- probes.probedata = dict()
- probes.cgroups = dict()
-
- probes._load_data_xml()
- mock_parse.assert_called_with(os.path.join(datastore, probes.name,
- 'probed.xml'),
- parser=Bcfg2.Server.XMLParser)
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- self.assertItemsEqual(probes.cgroups, self.get_test_cgroups())
-
- @skipUnless(has_django, "Django not found, skipping")
- def test__load_data_db(self):
- syncdb(TestProbesDB)
- probes = self.get_probes_object(use_db=True)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
- for cname in probes.probedata.keys():
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- probes.probedata = dict()
- probes.cgroups = dict()
- probes._load_data_db()
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- # the db backend does not store groups at all if a client has
- # no groups set, so we can't just use assertItemsEqual here,
- # because loading saved data may _not_ result in the original
- # data if some clients had no groups set.
- test_cgroups = self.get_test_cgroups()
- for cname, groups in test_cgroups.items():
- if cname in probes.cgroups:
- self.assertEqual(groups, probes.cgroups[cname])
- else:
- self.assertEqual(groups, [])
-
- @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data")
- def test_GetProbes(self, mock_get_probe_data):
- TestProbing.test_GetProbes(self)
-
- probes = self.get_probes_object()
- metadata = Mock()
- probes.GetProbes(metadata)
- mock_get_probe_data.assert_called_with(metadata)
-
- @patch("Bcfg2.Server.Plugins.Probes.Probes.write_data")
- @patch("Bcfg2.Server.Plugins.Probes.Probes.ReceiveDataItem")
- def test_ReceiveData(self, mock_ReceiveDataItem, mock_write_data):
- TestProbing.test_ReceiveData(self)
-
- # we use a simple (read: bogus) datalist here to make this
- # easy to test
- datalist = ["a", "b", "c"]
-
- probes = self.get_probes_object()
- client = Mock()
- client.hostname = "foo.example.com"
- probes.ReceiveData(client, datalist)
-
- self.assertItemsEqual(mock_ReceiveDataItem.call_args_list,
- [call(client, "a"), call(client, "b"),
- call(client, "c")])
- mock_write_data.assert_called_with(client)
-
- def test_ReceiveDataItem(self):
- probes = self.get_probes_object()
- for cname, cdata in self.get_test_probedata().items():
- client = Mock()
- client.hostname = cname
- for pname, pdata in cdata.items():
- dataitem = lxml.etree.Element("Probe", name=pname)
- if pname == "text":
- # add some groups to the plaintext test to test
- # group parsing
- data = [pdata]
- for group in self.get_test_cgroups()[cname]:
- data.append("group:%s" % group)
- dataitem.text = "\n".join(data)
- else:
- dataitem.text = str(pdata)
-
- probes.ReceiveDataItem(client, dataitem)
-
- self.assertIn(client.hostname, probes.probedata)
- self.assertIn(pname, probes.probedata[cname])
- self.assertEqual(pdata, probes.probedata[cname][pname])
- self.assertIn(client.hostname, probes.cgroups)
- self.assertEqual(probes.cgroups[cname],
- self.get_test_cgroups()[cname])
-
- def test_get_additional_groups(self):
- TestConnector.test_get_additional_groups(self)
-
- probes = self.get_probes_object()
- test_cgroups = self.get_test_cgroups()
- probes.cgroups = self.get_test_cgroups()
- for cname in test_cgroups.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_cgroups[cname],
- probes.get_additional_groups(metadata))
- # test a non-existent client
- metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_groups(metadata),
- list())
-
- def test_get_additional_data(self):
- TestConnector.test_get_additional_data(self)
-
- probes = self.get_probes_object()
- test_probedata = self.get_test_probedata()
- probes.probedata = self.get_test_probedata()
- for cname in test_probedata.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_probedata[cname],
- probes.get_additional_data(metadata))
- # test a non-existent client
- metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_data(metadata),
- ClientProbeDataSet())
-
-
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestSEModules.py b/testsuite/Testlib/TestServer/TestPlugins/TestSEModules.py
deleted file mode 100644
index c319ed663..000000000
--- a/testsuite/Testlib/TestServer/TestPlugins/TestSEModules.py
+++ /dev/null
@@ -1,109 +0,0 @@
-import os
-import sys
-import lxml.etree
-import Bcfg2.Server.Plugin
-from Bcfg2.Compat import b64encode
-from mock import Mock, MagicMock, patch
-from Bcfg2.Server.Plugins.SEModules import *
-
-# add all parent testsuite directories to sys.path to allow (most)
-# relative imports in python 2.4
-path = os.path.dirname(__file__)
-while path != "/":
- if os.path.basename(path).lower().startswith("test"):
- sys.path.append(path)
- if os.path.basename(path) == "testsuite":
- break
- path = os.path.dirname(path)
-from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
- skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
- patchIf, datastore
-from TestPlugin import TestSpecificData, TestGroupSpool
-
-class TestSEModuleData(TestSpecificData):
- test_obj = SEModuleData
- path = os.path.join(datastore, "SEModules", "test.pp", "test.pp")
-
- def test_bind_entry(self):
- data = self.get_obj()
- data.data = "test"
- entry = lxml.etree.Element("test", name=self.path)
- data.bind_entry(entry, Mock())
- self.assertEqual(entry.get("name"), self.path)
- self.assertEqual(entry.get("encoding"), "base64")
- self.assertEqual(entry.text, b64encode(data.data))
-
-
-class TestSEModules(TestGroupSpool):
- test_obj = SEModules
-
- def test__get_module_name(self):
- modules = self.get_obj()
- for mname in ["foo", "foo.pp"]:
- entry = lxml.etree.Element("SELinux", type="module", name=mname)
- self.assertEqual(modules._get_module_name(entry), "/foo.pp")
-
- @patch("Bcfg2.Server.Plugins.SEModules.SEModules._get_module_name")
- def test_HandlesEntry(self, mock_get_name):
- modules = self.get_obj()
- modules.Entries['SELinux']['/foo.pp'] = Mock()
- modules.Entries['SELinux']['/bar.pp'] = Mock()
- for el in [lxml.etree.Element("Path", name="/foo.pp"),
- lxml.etree.Element("SELinux", type="fcontext",
- name="/foo.pp"),
- lxml.etree.Element("SELinux", type="module",
- name="/baz.pp")]:
- mock_get_name.return_value = el.get("name")
- self.assertFalse(modules.HandlesEntry(el, Mock()))
- mock_get_name.assert_called_with(el)
-
- for el in [lxml.etree.Element("SELinux", type="module",
- name="/foo.pp"),
- lxml.etree.Element("SELinux", type="module",
- name="/bar.pp")]:
- mock_get_name.return_value = el.get("name")
- self.assertTrue(modules.HandlesEntry(el, Mock()),
- msg="SEModules fails to handle %s" % el.get("name"))
- mock_get_name.assert_called_with(el)
-
- TestGroupSpool.test_HandlesEntry(self)
-
- @patch("Bcfg2.Server.Plugins.SEModules.SEModules._get_module_name")
- def test_HandlesEntry(self, mock_get_name):
- modules = self.get_obj()
- handler = Mock()
- modules.Entries['SELinux']['/foo.pp'] = handler
- mock_get_name.return_value = "/foo.pp"
-
- entry = lxml.etree.Element("SELinux", type="module", name="foo")
- metadata = Mock()
- self.assertEqual(modules.HandleEntry(entry, metadata),
- handler.return_value)
- mock_get_name.assert_called_with(entry)
- self.assertEqual(entry.get("name"), mock_get_name.return_value)
- handler.assert_called_with(entry, metadata)
-
- TestGroupSpool.test_HandlesEntry(self)
-
- def test_add_entry(self):
- @patch("%s.%s.event_path" %
- (self.test_obj.__module__, self.test_obj.__name__))
- @patch("%s.%s.add_entry" % (self.test_obj.__base__.__module__,
- self.test_obj.__base__.__name__))
- def inner(mock_add_entry, mock_event_path):
- modules = self.get_obj()
-
- evt = Mock()
- evt.filename = "test.pp.G10_foo"
-
- mock_event_path.return_value = os.path.join(datastore,
- self.test_obj.__name__,
- "test.pp",
- "test.pp.G10_foo")
- modules.add_entry(evt)
- self.assertEqual(modules.filename_pattern, "test.pp")
- mock_add_entry.assert_called_with(modules, evt)
- mock_event_path.assert_called_with(evt)
-
- inner()
- TestGroupSpool.test_add_entry(self)
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
deleted file mode 100644
index 556487288..000000000
--- a/testsuite/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
+++ /dev/null
@@ -1,120 +0,0 @@
-import os
-import sys
-import Bcfg2.Server.Plugin
-from mock import Mock, MagicMock, patch
-from Bcfg2.Server.Plugins.TemplateHelper import *
-
-# add all parent testsuite directories to sys.path to allow (most)
-# relative imports in python 2.4
-path = os.path.dirname(__file__)
-while path != "/":
- if os.path.basename(path).lower().startswith("test"):
- sys.path.append(path)
- if os.path.basename(path) == "testsuite":
- break
- path = os.path.dirname(path)
-from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
- skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
- patchIf, datastore
-from TestPlugin import TestDirectoryBacked, TestConnector, TestPlugin, \
- TestFileBacked
-
-
-class TestHelperModule(TestFileBacked):
- test_obj = HelperModule
- path = os.path.join(datastore, "test.py")
-
- def test__init(self):
- hm = self.get_obj()
- self.assertEqual(hm._module_name, "test")
- self.assertEqual(hm._attrs, [])
-
- @patch("imp.load_source")
- def test_Index(self, mock_load_source):
- hm = self.get_obj()
-
- mock_load_source.side_effect = ImportError
- attrs = dir(hm)
- hm.Index()
- mock_load_source.assert_called_with(hm._module_name, hm.name)
- self.assertEqual(attrs, dir(hm))
- self.assertEqual(hm._attrs, [])
-
- mock_load_source.reset()
- mock_load_source.side_effect = None
- # a regular Mock (not a MagicMock) won't automatically create
- # __export__, so this triggers a failure condition in Index
- mock_load_source.return_value = Mock()
- attrs = dir(hm)
- hm.Index()
- mock_load_source.assert_called_with(hm._module_name, hm.name)
- self.assertEqual(attrs, dir(hm))
- self.assertEqual(hm._attrs, [])
-
- # test reserved attributes
- module = Mock()
- module.__export__ = ["_attrs", "Index", "__init__"]
- mock_load_source.reset()
- mock_load_source.return_value = module
- attrs = dir(hm)
- hm.Index()
- mock_load_source.assert_called_with(hm._module_name, hm.name)
- self.assertEqual(attrs, dir(hm))
- self.assertEqual(hm._attrs, [])
-
- # test adding attributes
- module = Mock()
- module.__export__ = ["foo", "bar", "baz", "Index"]
- mock_load_source.reset()
- mock_load_source.return_value = module
- hm.Index()
- mock_load_source.assert_called_with(hm._module_name, hm.name)
- self.assertTrue(hasattr(hm, "foo"))
- self.assertTrue(hasattr(hm, "bar"))
- self.assertTrue(hasattr(hm, "baz"))
- self.assertEqual(hm._attrs, ["foo", "bar", "baz"])
-
- # test removing attributes
- module = Mock()
- module.__export__ = ["foo", "bar", "quux", "Index"]
- mock_load_source.reset()
- mock_load_source.return_value = module
- hm.Index()
- mock_load_source.assert_called_with(hm._module_name, hm.name)
- self.assertTrue(hasattr(hm, "foo"))
- self.assertTrue(hasattr(hm, "bar"))
- self.assertTrue(hasattr(hm, "quux"))
- self.assertFalse(hasattr(hm, "baz"))
- self.assertEqual(hm._attrs, ["foo", "bar", "quux"])
-
-
-
-class TestHelperSet(TestDirectoryBacked):
- test_obj = HelperSet
- testfiles = ['foo.py', 'foo_bar.py', 'foo.bar.py']
- ignore = ['fooo.py~', 'fooo.pyc', 'fooo.pyo']
- badevents = ['foo']
-
-
-class TestTemplateHelper(TestPlugin, TestConnector):
- test_obj = TemplateHelper
-
- def test__init(self):
- TestPlugin.test__init(self)
-
- th = self.get_obj()
- self.assertIsInstance(th.helpers, HelperSet)
-
- def test_get_additional_data(self):
- TestConnector.test_get_additional_data(self)
-
- th = self.get_obj()
- modules = ['foo', 'bar']
- rv = dict()
- for mname in modules:
- module = Mock()
- module._module_name = mname
- rv[mname] = module
- th.helpers.entries['%s.py' % mname] = module
- actual = th.get_additional_data(Mock())
- self.assertItemsEqual(actual, rv)
diff --git a/testsuite/Testlib/TestServer/TestPlugins/__init__.py b/testsuite/Testlib/TestServer/TestPlugins/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/testsuite/Testlib/TestServer/TestPlugins/__init__.py
+++ /dev/null
diff --git a/testsuite/Testlib/TestServer/__init__.py b/testsuite/Testlib/TestServer/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/testsuite/Testlib/TestServer/__init__.py
+++ /dev/null