import os import copy import logging import unittest import lxml.etree from mock import Mock, MagicMock, patch from Bcfg2.Server.Plugin import * import Bcfg2.Server import Bcfg2.Server.Plugins.Metadata datastore = '/' def call(*args, **kwargs): """ the Mock call object is a fairly recent addition, but it's very very useful, so we create our own function to create Mock calls """ return (args, kwargs) class TestFunctions(unittest.TestCase): def test_bind_info(self): entry = lxml.etree.Element("Path", name="/test") metadata = Mock() default = dict(test1="test1", test2="test2") # test without infoxml bind_info(entry, metadata, default=default) self.assertItemsEqual(entry.attrib, dict(test1="test1", test2="test2", name="/test")) # test with bogus infoxml entry = lxml.etree.Element("Path", name="/test") infoxml = Mock() self.assertRaises(PluginExecutionError, bind_info, entry, metadata, infoxml=infoxml) infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry) # test with valid infoxml entry = lxml.etree.Element("Path", name="/test") infoxml.reset_mock() infodata = {None: {"test3": "test3", "test4": "test4"}} def infoxml_rv(metadata, rv, entry=None): rv['Info'] = infodata infoxml.pnode.Match.side_effect = infoxml_rv bind_info(entry, metadata, infoxml=infoxml, default=default) # mock objects don't properly track the called-with value of # arguments whose value is changed by the function, so it # thinks Match() was called with the final value of the mdata # arg, not the initial value. makes this test a little less # worthwhile, TBH. infoxml.pnode.Match.assert_called_with(metadata, dict(Info=infodata), entry=entry) self.assertItemsEqual(entry.attrib, dict(test1="test1", test2="test2", test3="test3", test4="test4", name="/test")) class TestPluginInitError(unittest.TestCase): """ placeholder for future tests """ pass class TestPluginExecutionError(unittest.TestCase): """ placeholder for future tests """ pass class TestDebuggable(unittest.TestCase): def test__init(self): d = Debuggable() self.assertIsInstance(d.logger, logging.Logger) self.assertFalse(d.debug_flag) @patch("Bcfg2.Server.Plugin.Debuggable.debug_log") def test_toggle_debug(self, mock_debug): d = Debuggable() orig = d.debug_flag d.toggle_debug() self.assertNotEqual(orig, d.debug_flag) self.assertTrue(mock_debug.called) mock_debug.reset_mock() changed = d.debug_flag d.toggle_debug() self.assertNotEqual(changed, d.debug_flag) self.assertEqual(orig, d.debug_flag) self.assertTrue(mock_debug.called) def test_debug_log(self): d = Debuggable() d.logger = Mock() d.debug_flag = False d.debug_log("test") self.assertFalse(d.logger.error.called) d.logger.reset_mock() d.debug_log("test", flag=True) self.assertTrue(d.logger.error.called) d.logger.reset_mock() d.debug_flag = True d.debug_log("test") self.assertTrue(d.logger.error.called) class TestPlugin(unittest.TestCase): def test__init(self): core = Mock() p = Plugin(core, datastore) self.assertEqual(p.data, os.path.join(datastore, p.name)) self.assertEqual(p.core, core) self.assertIsInstance(p, Debuggable) @patch("os.makedirs") def test_init_repo(self, mock_makedirs): Plugin.init_repo(datastore) mock_makedirs.assert_called_with(os.path.join(datastore, Plugin.name)) class TestDatabaseBacked(unittest.TestCase): @unittest.skipUnless(has_django, "Django not found") def test__use_db(self): core = Mock() core.setup.cfp.getboolean.return_value = True db = DatabaseBacked(core, datastore) self.assertTrue(db._use_db) core = Mock() core.setup.cfp.getboolean.return_value = False db = DatabaseBacked(core, datastore) self.assertFalse(db._use_db) Bcfg2.Server.Plugin.has_django = False core = Mock() db = DatabaseBacked(core, datastore) self.assertFalse(db._use_db) core = Mock() core.setup.cfp.getboolean.return_value = True db = DatabaseBacked(core, datastore) self.assertFalse(db._use_db) Bcfg2.Server.Plugin.has_django = True class TestPluginDatabaseModel(unittest.TestCase): """ placeholder for future tests """ pass class TestGenerator(unittest.TestCase): def test_HandleEntry(self): g = Generator() self.assertRaises(NotImplementedError, g.HandleEntry, None, None) class TestStructure(unittest.TestCase): def test_BuildStructures(self): s = Structure() self.assertRaises(NotImplementedError, s.BuildStructures, None) class TestMetadata(unittest.TestCase): def test_get_initial_metadata(self): m = Metadata() self.assertRaises(NotImplementedError, m.get_initial_metadata, None) def test_merge_additional_data(self): m = Metadata() self.assertRaises(NotImplementedError, m.merge_additional_data, None, None, None) def test_merge_additional_groups(self): m = Metadata() self.assertRaises(NotImplementedError, m.merge_additional_groups, None, None) class TestConnector(unittest.TestCase): """ placeholder """ pass class TestProbing(unittest.TestCase): """ placeholder """ pass class TestStatistics(unittest.TestCase): """ placeholder """ pass class TestThreadedStatistics(unittest.TestCase): data = [("foo.example.com", ""), ("bar.example.com", "")] @patch("threading.Thread.start") def test__init(self, mock_start): core = Mock() ts = ThreadedStatistics(core, datastore) mock_start.assert_any_call() @patch("__builtin__.open") @patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock()) def test_save(self, mock_open): core = Mock() ts = ThreadedStatistics(core, datastore) queue = Mock() queue.empty = Mock(side_effect=Empty) ts.work_queue = queue mock_open.side_effect = OSError # test that save does _not_ raise an exception even when # everything goes pear-shaped ts.save() queue.empty.assert_any_call() mock_open.assert_called_with(ts.pending_file, 'w') queue.reset_mock() mock_open.reset_mock() queue.data = [] for hostname, xml in self.data: md = Mock() md.hostname = hostname queue.data.append((md, lxml.etree.XML(xml))) queue.empty.side_effect = lambda: len(queue.data) == 0 queue.get_nowait = Mock(side_effect=lambda: queue.data.pop()) mock_open.side_effect = None # oh, the joy of working around different package names in # py3k... with patch("%s.dump" % cPickle.__name__) as mock_dump: ts.save() queue.empty.assert_any_call() queue.get_nowait.assert_any_call() mock_open.assert_called_with(ts.pending_file, 'w') mock_open.return_value.close.assert_any_call() # the order of the queue data gets changed, so we have to # verify this call in an ugly way self.assertItemsEqual(mock_dump.call_args[0][0], self.data) self.assertEqual(mock_dump.call_args[0][1], mock_open.return_value) @patch("os.unlink") @patch("os.path.exists") @patch("__builtin__.open") @patch("lxml.etree.XML") @patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock()) def test_load(self, mock_XML, mock_open, mock_exists, mock_unlink): core = Mock() core.terminate.isSet.return_value = False ts = ThreadedStatistics(core, datastore) with patch("%s.load" % cPickle.__name__) as mock_load: ts.work_queue = Mock() ts.work_queue.data = [] def reset(): core.reset_mock() mock_open.reset_mock() mock_exists.reset_mock() mock_unlink.reset_mock() mock_load.reset_mock() mock_XML.reset_mock() ts.work_queue.reset_mock() ts.work_queue.data = [] mock_exists.return_value = False self.assertTrue(ts.load()) mock_exists.assert_called_with(ts.pending_file) reset() mock_exists.return_value = True mock_open.side_effect = OSError self.assertFalse(ts.load()) mock_exists.assert_called_with(ts.pending_file) mock_open.assert_called_with(ts.pending_file, 'r') reset() mock_open.side_effect = None mock_load.return_value = self.data ts.work_queue.put_nowait.side_effect = Full self.assertTrue(ts.load()) mock_exists.assert_called_with(ts.pending_file) mock_open.assert_called_with(ts.pending_file, 'r') mock_open.return_value.close.assert_any_call() mock_load.assert_called_with(mock_open.return_value) reset() core.build_metadata.side_effect = lambda x: x mock_XML.side_effect = lambda x, parser=None: x ts.work_queue.put_nowait.side_effect = None self.assertTrue(ts.load()) mock_exists.assert_called_with(ts.pending_file) mock_open.assert_called_with(ts.pending_file, 'r') mock_open.return_value.close.assert_any_call() mock_load.assert_called_with(mock_open.return_value) self.assertItemsEqual(mock_XML.call_args_list, [call(x, parser=Bcfg2.Server.XMLParser) for h, x in self.data]) self.assertItemsEqual(ts.work_queue.put_nowait.call_args_list, [call((h, x)) for h, x in self.data]) mock_unlink.assert_called_with(ts.pending_file) @patch("threading.Thread.start", Mock()) @patch("Bcfg2.Server.Plugin.ThreadedStatistics.load") @patch("Bcfg2.Server.Plugin.ThreadedStatistics.save") @patch("Bcfg2.Server.Plugin.ThreadedStatistics.handle_statistic") def test_run(self, mock_handle, mock_save, mock_load): core = Mock() ts = ThreadedStatistics(core, datastore) mock_load.return_value = True ts.work_queue = Mock() def reset(): mock_handle.reset_mock() mock_save.reset_mock() mock_load.reset_mock() core.reset_mock() ts.work_queue.reset_mock() ts.work_queue.data = self.data[:] ts.work_queue.get_calls = 0 reset() def get_rv(**kwargs): ts.work_queue.get_calls += 1 try: return ts.work_queue.data.pop() except: raise Empty ts.work_queue.get.side_effect = get_rv def terminate_isset(): # this lets the loop go on a few iterations with an empty # queue to test that it doesn't error out return ts.work_queue.get_calls > 3 core.terminate.isSet.side_effect = terminate_isset ts.run() mock_load.assert_any_call() self.assertGreaterEqual(ts.work_queue.get.call_count, len(self.data)) self.assertItemsEqual(mock_handle.call_args_list, [call(h, x) for h, x in self.data]) mock_save.assert_any_call() @patch("copy.copy", Mock(side_effect=lambda x: x)) @patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock()) def test_process_statistics(self): core = Mock() ts = ThreadedStatistics(core, datastore) ts.work_queue = Mock() ts.process_statistics(*self.data[0]) ts.work_queue.put_nowait.assert_called_with(self.data[0]) ts.work_queue.reset_mock() ts.work_queue.put_nowait.side_effect = Full # test that no exception is thrown ts.process_statistics(*self.data[0]) class TestPullSource(unittest.TestCase): def test_GetCurrentEntry(self): ps = PullSource() self.assertRaises(NotImplementedError, ps.GetCurrentEntry, None, None, None) class TestPullTarget(unittest.TestCase): def test_AcceptChoices(self): pt = PullTarget() self.assertRaises(NotImplementedError, pt.AcceptChoices, None, None) def test_AcceptPullData(self): pt = PullTarget() self.assertRaises(NotImplementedError, pt.AcceptPullData, None, None, None) class TestDecision(unittest.TestCase): """ placeholder for future tests """ pass class TestValidationError(unittest.TestCase): """ placeholder for future tests """ pass class TestStructureValidator(unittest.TestCase): def test_validate_structures(self): sv = StructureValidator() self.assertRaises(NotImplementedError, sv.validate_structures, None, None) class TestGoalValidator(unittest.TestCase): def test_validate_goals(self): gv = GoalValidator() self.assertRaises(NotImplementedError, gv.validate_goals, None, None) class TestVersion(unittest.TestCase): """ placeholder for future tests """ pass class TestClientRunHooks(unittest.TestCase): """ placeholder for future tests """ pass class TestFileBacked(unittest.TestCase): @patch("__builtin__.open") @patch("Bcfg2.Server.Plugin.FileBacked.Index") def test_HandleEvent(self, mock_Index, mock_open): path = "/test" fb = FileBacked(path) def reset(): mock_Index.reset_mock() mock_open.reset_mock() for evt in ["exists", "changed", "created"]: reset() event = Mock() event.code2str.return_value = evt fb.HandleEvent(event) mock_open.assert_called_with(path) mock_open.return_value.read.assert_any_call() mock_Index.assert_any_call() reset() event = Mock() event.code2str.return_value = "endExist" fb.HandleEvent(event) self.assertFalse(mock_open.called) self.assertFalse(mock_Index.called)