import os
import re
import copy
import logging
import lxml.etree
import Bcfg2.Server
from Bcfg2.Bcfg2Py3k import reduce
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugin import *
from ...common import *
class FakeElementTree(lxml.etree._ElementTree):
xinclude = Mock()
class TestFunctions(Bcfg2TestCase):
def test_bind_info(self):
entry = lxml.etree.Element("Path", name="/test")
metadata = Mock()
default = dict(test1="test1", test2="test2")
# test without infoxml
bind_info(entry, metadata, default=default)
self.assertItemsEqual(entry.attrib,
dict(test1="test1",
test2="test2",
name="/test"))
# test with bogus infoxml
entry = lxml.etree.Element("Path", name="/test")
infoxml = Mock()
self.assertRaises(PluginExecutionError,
bind_info,
entry, metadata, infoxml=infoxml)
infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry)
# test with valid infoxml
entry = lxml.etree.Element("Path", name="/test")
infoxml.reset_mock()
infodata = {None: {"test3": "test3", "test4": "test4"}}
def infoxml_rv(metadata, rv, entry=None):
rv['Info'] = infodata
infoxml.pnode.Match.side_effect = infoxml_rv
bind_info(entry, metadata, infoxml=infoxml, default=default)
# mock objects don't properly track the called-with value of
# arguments whose value is changed by the function, so it
# thinks Match() was called with the final value of the mdata
# arg, not the initial value. makes this test a little less
# worthwhile, TBH.
infoxml.pnode.Match.assert_called_with(metadata, dict(Info=infodata),
entry=entry)
self.assertItemsEqual(entry.attrib,
dict(test1="test1",
test2="test2",
test3="test3",
test4="test4",
name="/test"))
class TestPluginInitError(Bcfg2TestCase):
""" placeholder for future tests """
pass
class TestPluginExecutionError(Bcfg2TestCase):
""" placeholder for future tests """
pass
class TestDebuggable(Bcfg2TestCase):
test_obj = Debuggable
def get_obj(self):
return self.test_obj()
def test__init(self):
d = self.get_obj()
self.assertIsInstance(d.logger, logging.Logger)
self.assertFalse(d.debug_flag)
@patch("Bcfg2.Server.Plugin.%s.debug_log" % test_obj.__name__)
def test_toggle_debug(self, mock_debug):
d = self.get_obj()
orig = d.debug_flag
d.toggle_debug()
self.assertNotEqual(orig, d.debug_flag)
self.assertTrue(mock_debug.called)
mock_debug.reset_mock()
changed = d.debug_flag
d.toggle_debug()
self.assertNotEqual(changed, d.debug_flag)
self.assertEqual(orig, d.debug_flag)
self.assertTrue(mock_debug.called)
def test_debug_log(self):
d = self.get_obj()
d.logger = Mock()
d.debug_flag = False
d.debug_log("test")
self.assertFalse(d.logger.error.called)
d.logger.reset_mock()
d.debug_log("test", flag=True)
self.assertTrue(d.logger.error.called)
d.logger.reset_mock()
d.debug_flag = True
d.debug_log("test")
self.assertTrue(d.logger.error.called)
class TestPlugin(TestDebuggable):
test_obj = Plugin
def get_obj(self, core=None):
if core is None:
core = Mock()
return self.test_obj(core, datastore)
def test__init(self):
core = Mock()
p = self.get_obj(core=core)
self.assertEqual(p.data, os.path.join(datastore, p.name))
self.assertEqual(p.core, core)
self.assertIsInstance(p, Debuggable)
@patch("os.makedirs")
def test_init_repo(self, mock_makedirs):
self.test_obj.init_repo(datastore)
mock_makedirs.assert_called_with(os.path.join(datastore,
self.test_obj.name))
class TestDatabaseBacked(TestPlugin):
test_obj = DatabaseBacked
@skipUnless(has_django, "Django not found")
def test__use_db(self):
core = Mock()
core.setup.cfp.getboolean.return_value = True
db = self.get_obj(core)
self.assertTrue(db._use_db)
core = Mock()
core.setup.cfp.getboolean.return_value = False
db = self.get_obj(core)
self.assertFalse(db._use_db)
Bcfg2.Server.Plugin.has_django = False
core = Mock()
db = self.get_obj(core)
self.assertFalse(db._use_db)
core = Mock()
core.setup.cfp.getboolean.return_value = True
db = self.get_obj(core)
self.assertFalse(db._use_db)
Bcfg2.Server.Plugin.has_django = True
class TestPluginDatabaseModel(Bcfg2TestCase):
""" placeholder for future tests """
pass
class TestGenerator(Bcfg2TestCase):
test_obj = Generator
class TestStructure(Bcfg2TestCase):
test_obj = Structure
def get_obj(self):
return self.test_obj()
def test_BuildStructures(self):
s = self.get_obj()
self.assertRaises(NotImplementedError,
s.BuildStructures, None)
class TestMetadata(Bcfg2TestCase):
test_obj = Metadata
def get_obj(self):
return self.test_obj()
def test_get_initial_metadata(self):
m = self.get_obj()
self.assertRaises(NotImplementedError,
m.get_initial_metadata, None)
def test_merge_additional_data(self):
m = self.get_obj()
self.assertRaises(NotImplementedError,
m.merge_additional_data, None, None, None)
def test_merge_additional_groups(self):
m = self.get_obj()
self.assertRaises(NotImplementedError,
m.merge_additional_groups, None, None)
class TestConnector(Bcfg2TestCase):
""" placeholder """
pass
class TestProbing(Bcfg2TestCase):
""" placeholder """
pass
class TestStatistics(TestPlugin):
test_obj = Statistics
class TestThreadedStatistics(TestStatistics):
test_obj = ThreadedStatistics
data = [("foo.example.com", ""),
("bar.example.com", "")]
@patch("threading.Thread.start")
def test__init(self, mock_start):
core = Mock()
ts = self.get_obj(core)
mock_start.assert_any_call()
@patch("%s.open" % builtins)
@patch("%s.dump" % cPickle.__name__)
@patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock())
def test_save(self, mock_dump, mock_open):
core = Mock()
ts = self.get_obj(core)
queue = Mock()
queue.empty = Mock(side_effect=Empty)
ts.work_queue = queue
mock_open.side_effect = OSError
# test that save does _not_ raise an exception even when
# everything goes pear-shaped
ts.save()
queue.empty.assert_any_call()
mock_open.assert_called_with(ts.pending_file, 'w')
queue.reset_mock()
mock_open.reset_mock()
queue.data = []
for hostname, xml in self.data:
md = Mock()
md.hostname = hostname
queue.data.append((md, lxml.etree.XML(xml)))
queue.empty.side_effect = lambda: len(queue.data) == 0
queue.get_nowait = Mock(side_effect=lambda: queue.data.pop())
mock_open.side_effect = None
ts.save()
queue.empty.assert_any_call()
queue.get_nowait.assert_any_call()
mock_open.assert_called_with(ts.pending_file, 'w')
mock_open.return_value.close.assert_any_call()
# the order of the queue data gets changed, so we have to
# verify this call in an ugly way
self.assertItemsEqual(mock_dump.call_args[0][0], self.data)
self.assertEqual(mock_dump.call_args[0][1], mock_open.return_value)
@patch("os.unlink")
@patch("os.path.exists")
@patch("%s.open" % builtins)
@patch("lxml.etree.XML")
@patch("%s.load" % cPickle.__name__)
@patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock())
def test_load(self, mock_load, mock_XML, mock_open, mock_exists,
mock_unlink):
core = Mock()
core.terminate.isSet.return_value = False
ts = self.get_obj(core)
ts.work_queue = Mock()
ts.work_queue.data = []
def reset():
core.reset_mock()
mock_open.reset_mock()
mock_exists.reset_mock()
mock_unlink.reset_mock()
mock_load.reset_mock()
mock_XML.reset_mock()
ts.work_queue.reset_mock()
ts.work_queue.data = []
mock_exists.return_value = False
self.assertTrue(ts.load())
mock_exists.assert_called_with(ts.pending_file)
reset()
mock_exists.return_value = True
mock_open.side_effect = OSError
self.assertFalse(ts.load())
mock_exists.assert_called_with(ts.pending_file)
mock_open.assert_called_with(ts.pending_file, 'r')
reset()
mock_open.side_effect = None
mock_load.return_value = self.data
ts.work_queue.put_nowait.side_effect = Full
self.assertTrue(ts.load())
mock_exists.assert_called_with(ts.pending_file)
mock_open.assert_called_with(ts.pending_file, 'r')
mock_open.return_value.close.assert_any_call()
mock_load.assert_called_with(mock_open.return_value)
reset()
core.build_metadata.side_effect = lambda x: x
mock_XML.side_effect = lambda x, parser=None: x
ts.work_queue.put_nowait.side_effect = None
self.assertTrue(ts.load())
mock_exists.assert_called_with(ts.pending_file)
mock_open.assert_called_with(ts.pending_file, 'r')
mock_open.return_value.close.assert_any_call()
mock_load.assert_called_with(mock_open.return_value)
self.assertItemsEqual(mock_XML.call_args_list,
[call(x, parser=Bcfg2.Server.XMLParser)
for h, x in self.data])
self.assertItemsEqual(ts.work_queue.put_nowait.call_args_list,
[call((h, x)) for h, x in self.data])
mock_unlink.assert_called_with(ts.pending_file)
@patch("threading.Thread.start", Mock())
@patch("Bcfg2.Server.Plugin.ThreadedStatistics.load")
@patch("Bcfg2.Server.Plugin.ThreadedStatistics.save")
@patch("Bcfg2.Server.Plugin.ThreadedStatistics.handle_statistic")
def test_run(self, mock_handle, mock_save, mock_load):
core = Mock()
ts = self.get_obj(core)
mock_load.return_value = True
ts.work_queue = Mock()
def reset():
mock_handle.reset_mock()
mock_save.reset_mock()
mock_load.reset_mock()
core.reset_mock()
ts.work_queue.reset_mock()
ts.work_queue.data = self.data[:]
ts.work_queue.get_calls = 0
reset()
def get_rv(**kwargs):
ts.work_queue.get_calls += 1
try:
return ts.work_queue.data.pop()
except:
raise Empty
ts.work_queue.get.side_effect = get_rv
def terminate_isset():
# this lets the loop go on a few iterations with an empty
# queue to test that it doesn't error out
return ts.work_queue.get_calls > 3
core.terminate.isSet.side_effect = terminate_isset
ts.work_queue.empty.return_value = False
ts.run()
mock_load.assert_any_call()
self.assertGreaterEqual(ts.work_queue.get.call_count, len(self.data))
self.assertItemsEqual(mock_handle.call_args_list,
[call(h, x) for h, x in self.data])
mock_save.assert_any_call()
@patch("copy.copy", Mock(side_effect=lambda x: x))
@patch("Bcfg2.Server.Plugin.ThreadedStatistics.run", Mock())
def test_process_statistics(self):
core = Mock()
ts = self.get_obj(core)
ts.work_queue = Mock()
ts.process_statistics(*self.data[0])
ts.work_queue.put_nowait.assert_called_with(self.data[0])
ts.work_queue.reset_mock()
ts.work_queue.put_nowait.side_effect = Full
# test that no exception is thrown
ts.process_statistics(*self.data[0])
class TestPullSource(Bcfg2TestCase):
def test_GetCurrentEntry(self):
ps = PullSource()
self.assertRaises(NotImplementedError,
ps.GetCurrentEntry, None, None, None)
class TestPullTarget(Bcfg2TestCase):
def test_AcceptChoices(self):
pt = PullTarget()
self.assertRaises(NotImplementedError,
pt.AcceptChoices, None, None)
def test_AcceptPullData(self):
pt = PullTarget()
self.assertRaises(NotImplementedError,
pt.AcceptPullData, None, None, None)
class TestDecision(Bcfg2TestCase):
""" placeholder for future tests """
pass
class TestValidationError(Bcfg2TestCase):
""" placeholder for future tests """
pass
class TestStructureValidator(Bcfg2TestCase):
def test_validate_structures(self):
sv = StructureValidator()
self.assertRaises(NotImplementedError,
sv.validate_structures, None, None)
class TestGoalValidator(Bcfg2TestCase):
def test_validate_goals(self):
gv = GoalValidator()
self.assertRaises(NotImplementedError,
gv.validate_goals, None, None)
class TestVersion(Bcfg2TestCase):
""" placeholder for future tests """
pass
class TestClientRunHooks(Bcfg2TestCase):
""" placeholder for future tests """
pass
class TestFileBacked(Bcfg2TestCase):
test_obj = FileBacked
def get_obj(self, path=datastore, fam=None):
return self.test_obj(path, fam=fam)
@patch("%s.open" % builtins)
def test_HandleEvent(self, mock_open):
path = "/test"
fb = self.get_obj(path)
fb.Index = Mock()
def reset():
fb.Index.reset_mock()
mock_open.reset_mock()
for evt in ["exists", "changed", "created"]:
reset()
event = Mock()
event.code2str.return_value = evt
fb.HandleEvent(event)
mock_open.assert_called_with(path)
mock_open.return_value.read.assert_any_call()
fb.Index.assert_any_call()
reset()
event = Mock()
event.code2str.return_value = "endExist"
fb.HandleEvent(event)
self.assertFalse(mock_open.called)
self.assertFalse(fb.Index.called)
class TestDirectoryBacked(Bcfg2TestCase):
test_obj = DirectoryBacked
testpaths = {1: '',
2: '/foo',
3: '/foo/bar',
4: '/foo/bar/baz',
5: 'quux',
6: 'xyzzy/',
7: 'xyzzy/plugh/'}
def test_child_interface(self):
# ensure that the child object has the correct interface
self.assertTrue(hasattr(self.test_obj.__child__, "HandleEvent"))
@patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__,
Mock())
def get_obj(self, fam=None):
if fam is None:
fam = Mock()
return self.test_obj(os.path.join(datastore, self.test_obj.__name__),
fam)
@patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__)
def test__init(self, mock_add_monitor):
db = self.test_obj(datastore, Mock())
mock_add_monitor.assert_called_with('')
def test__getitem(self):
db = self.get_obj()
db.entries.update(dict(a=1, b=2, c=3))
self.assertEqual(db['a'], 1)
self.assertEqual(db['b'], 2)
with self.assertRaises(KeyError):
db['d']
def test__iter(self):
db = self.get_obj()
db.entries.update(dict(a=1, b=2, c=3))
self.assertEqual([i for i in db],
[i for i in db.entries.items()])
@patch("os.path.isdir")
def test_add_directory_monitor(self, mock_isdir):
db = self.get_obj()
db.fam = Mock()
db.fam.rv = 0
def reset():
db.fam.rv += 1
db.fam.AddMonitor.return_value = db.fam.rv
db.fam.reset_mock()
mock_isdir.reset_mock()
mock_isdir.return_value = True
for path in self.testpaths.values():
reset()
db.add_directory_monitor(path)
db.fam.AddMonitor.assert_called_with(os.path.join(db.data, path),
db)
self.assertIn(db.fam.rv, db.handles)
self.assertEqual(db.handles[db.fam.rv], path)
reset()
# test duplicate adds
for path in self.testpaths.values():
reset()
db.add_directory_monitor(path)
self.assertFalse(db.fam.AddMonitor.called)
reset()
mock_isdir.return_value = False
db.add_directory_monitor('bogus')
self.assertFalse(db.fam.AddMonitor.called)
self.assertNotIn(db.fam.rv, db.handles)
def test_add_entry(self):
db = self.get_obj()
db.fam = Mock()
class MockChild(Mock):
def __init__(self, path, fam, **kwargs):
Mock.__init__(self, **kwargs)
self.path = path
self.fam = fam
self.HandleEvent = Mock()
db.__child__ = MockChild
for path in self.testpaths.values():
event = Mock()
db.add_entry(path, event)
self.assertIn(path, db.entries)
self.assertEqual(db.entries[path].path,
os.path.join(db.data, path))
self.assertEqual(db.entries[path].fam, db.fam)
db.entries[path].HandleEvent.assert_called_with(event)
@patch("os.path.isdir")
@patch("Bcfg2.Server.Plugin.%s.add_entry" % test_obj.__name__)
@patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__)
def test_HandleEvent(self, mock_add_monitor, mock_add_entry, mock_isdir):
db = self.get_obj()
# a path with a leading / should never get into
# DirectoryBacked.handles, so strip that test case
for rid, path in self.testpaths.items():
path = path.lstrip('/')
db.handles[rid] = path
def reset():
mock_isdir.reset_mock()
mock_add_entry.reset_mock()
mock_add_monitor.reset_mock()
def get_event(filename, action, requestID):
event = Mock()
event.code2str.return_value = action
event.filename = filename
event.requestID = requestID
return event
# test events on the data directory itself
reset()
mock_isdir.return_value = True
event = get_event(db.data, "exists", 1)
db.HandleEvent(event)
mock_add_monitor.assert_called_with("")
# test events on paths that aren't handled
reset()
mock_isdir.return_value = False
event = get_event('/foo', 'created', max(self.testpaths.keys()) + 1)
db.HandleEvent(event)
self.assertFalse(mock_add_monitor.called)
self.assertFalse(mock_add_entry.called)
for req_id, path in self.testpaths.items():
# a path with a leading / should never get into
# DirectoryBacked.handles, so strip that test case
path = path.lstrip('/')
basepath = os.path.join(datastore, path)
for fname in ['foo', 'bar/baz.txt', 'plugh.py']:
relpath = os.path.join(path, fname)
abspath = os.path.join(basepath, fname)
# test endExist does nothing
reset()
event = get_event(fname, 'endExist', req_id)
db.HandleEvent(event)
self.assertFalse(mock_add_monitor.called)
self.assertFalse(mock_add_entry.called)
mock_isdir.return_value = True
for evt in ["created", "exists", "changed"]:
# test that creating or changing a directory works
reset()
event = get_event(fname, evt, req_id)
db.HandleEvent(event)
mock_add_monitor.assert_called_with(relpath)
self.assertFalse(mock_add_entry.called)
mock_isdir.return_value = False
for evt in ["created", "exists"]:
# test that creating a file works
reset()
event = get_event(fname, evt, req_id)
db.HandleEvent(event)
mock_add_entry.assert_called_with(relpath, event)
self.assertFalse(mock_add_monitor.called)
db.entries[relpath] = MagicMock()
# test that changing a file that already exists works
reset()
event = get_event(fname, "changed", req_id)
db.HandleEvent(event)
db.entries[relpath].HandleEvent.assert_called_with(event)
self.assertFalse(mock_add_monitor.called)
self.assertFalse(mock_add_entry.called)
# test that deleting an entry works
reset()
event = get_event(fname, "deleted", req_id)
db.HandleEvent(event)
self.assertNotIn(relpath, db.entries)
# test that changing a file that doesn't exist works
reset()
event = get_event(fname, "changed", req_id)
db.HandleEvent(event)
mock_add_entry.assert_called_with(relpath, event)
self.assertFalse(mock_add_monitor.called)
db.entries[relpath] = MagicMock()
# test that deleting a directory works. this is a little
# strange because the _parent_ directory has to handle the
# deletion
reset()
event = get_event('quux', "deleted", 1)
db.HandleEvent(event)
for key in db.entries.keys():
self.assertFalse(key.startswith('quux'))
class TestXMLFileBacked(TestFileBacked):
test_obj = XMLFileBacked
def get_obj(self, path=datastore, fam=None, should_monitor=False):
return self.test_obj(path, fam=fam, should_monitor=should_monitor)
def test__init(self):
fam = Mock()
fname = "/test"
xfb = self.get_obj(fname)
self.assertIsNone(xfb.fam)
xfb = self.get_obj(fname, fam=fam)
self.assertFalse(fam.AddMonitor.called)
fam.reset_mock()
xfb = self.get_obj(fname, fam=fam, should_monitor=True)
fam.AddMonitor.assert_called_with(fname, xfb)
@patch("os.path.exists")
@patch("lxml.etree.parse")
def test_follow_xincludes(self, mock_parse, mock_exists):
fname = "/test/test1.xml"
xfb = self.get_obj(fname)
xfb.add_monitor = Mock()
def reset():
xfb.add_monitor.reset_mock()
mock_parse.reset_mock()
mock_exists.reset_mock()
xfb.extras = []
mock_exists.return_value = True
xdata = dict()
mock_parse.side_effect = lambda p: xdata[p]
# basic functionality
xdata['/test/test2.xml'] = lxml.etree.Element("Test").getroottree()
xfb._follow_xincludes(xdata=xdata['/test/test2.xml'])
self.assertFalse(xfb.add_monitor.called)
if (not hasattr(self.test_obj, "xdata") or
not isinstance(self.test_obj.xdata, property)):
# if xdata is settable, test that method of getting data
# to _follow_xincludes
reset()
xfb.xdata = xdata['/test/test2.xml'].getroot()
xfb._follow_xincludes()
self.assertFalse(xfb.add_monitor.called)
xfb.xdata = None
reset()
xfb._follow_xincludes(fname="/test/test2.xml")
self.assertFalse(xfb.add_monitor.called)
# test one level of xinclude
xdata[fname] = lxml.etree.Element("Test").getroottree()
lxml.etree.SubElement(xdata[fname].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
href="/test/test2.xml")
reset()
xfb._follow_xincludes(fname=fname)
xfb.add_monitor.assert_called_with("/test/test2.xml")
self.assertItemsEqual(mock_parse.call_args_list,
[call(f) for f in xdata.keys()])
mock_exists.assert_called_with("/test/test2.xml")
reset()
xfb._follow_xincludes(xdata=xdata[fname])
xfb.add_monitor.assert_called_with("/test/test2.xml")
self.assertItemsEqual(mock_parse.call_args_list,
[call(f) for f in xdata.keys()
if f != fname])
mock_exists.assert_called_with("/test/test2.xml")
# test two-deep level of xinclude, with some files in another
# directory
xdata["/test/test3.xml"] = \
lxml.etree.Element("Test").getroottree()
lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
href="/test/test_dir/test4.xml")
xdata["/test/test_dir/test4.xml"] = \
lxml.etree.Element("Test").getroottree()
lxml.etree.SubElement(xdata["/test/test_dir/test4.xml"].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
href="/test/test_dir/test5.xml")
xdata['/test/test_dir/test5.xml'] = \
lxml.etree.Element("Test").getroottree()
xdata['/test/test_dir/test6.xml'] = \
lxml.etree.Element("Test").getroottree()
# relative includes
lxml.etree.SubElement(xdata[fname].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
href="test3.xml")
lxml.etree.SubElement(xdata["/test/test3.xml"].getroot(),
Bcfg2.Server.XI_NAMESPACE + "include",
href="test_dir/test6.xml")
reset()
xfb._follow_xincludes(fname=fname)
self.assertItemsEqual(xfb.add_monitor.call_args_list,
[call(f) for f in xdata.keys() if f != fname])
self.assertItemsEqual(mock_parse.call_args_list,
[call(f) for f in xdata.keys()])
self.assertItemsEqual(mock_exists.call_args_list,
[call(f) for f in xdata.keys() if f != fname])
reset()
xfb._follow_xincludes(xdata=xdata[fname])
self.assertItemsEqual(xfb.add_monitor.call_args_list,
[call(f) for f in xdata.keys() if f != fname])
self.assertItemsEqual(mock_parse.call_args_list,
[call(f) for f in xdata.keys() if f != fname])
self.assertItemsEqual(mock_exists.call_args_list,
[call(f) for f in xdata.keys() if f != fname])
@patch("lxml.etree._ElementTree", FakeElementTree)
@patch("Bcfg2.Server.Plugin.%s._follow_xincludes" % test_obj.__name__)
def test_Index(self, mock_follow):
fname = "/test/test1.xml"
xfb = self.get_obj(fname)
def reset():
mock_follow.reset_mock()
FakeElementTree.xinclude.reset_mock()
xfb.extras = []
xfb.xdata = None
# syntax error
xfb.data = "<"
self.assertRaises(PluginInitError, xfb.Index)
# no xinclude
reset()
xdata = lxml.etree.Element("Test", name="test")
children = [lxml.etree.SubElement(xdata, "Foo"),
lxml.etree.SubElement(xdata, "Bar", name="bar")]
xfb.data = lxml.etree.tostring(xdata)
xfb.Index()
mock_follow.assert_any_call()
self.assertEqual(xfb.xdata.base, fname)
self.assertItemsEqual([lxml.etree.tostring(e) for e in xfb.entries],
[lxml.etree.tostring(e) for e in children])
# with xincludes
reset()
mock_follow.side_effect = \
lambda: xfb.extras.extend(["/test/test2.xml",
"/test/test_dir/test3.xml"])
children.extend([
lxml.etree.SubElement(xdata,
Bcfg2.Server.XI_NAMESPACE + "include",
href="/test/test2.xml"),
lxml.etree.SubElement(xdata,
Bcfg2.Server.XI_NAMESPACE + "include",
href="/test/test_dir/test3.xml")])
test2 = lxml.etree.Element("Test", name="test2")
lxml.etree.SubElement(test2, "Baz")
test3 = lxml.etree.Element("Test", name="test3")
replacements = {"/test/test2.xml": test2,
"/test/test_dir/test3.xml": test3}
def xinclude():
for el in xfb.xdata.findall('//%sinclude' %
Bcfg2.Server.XI_NAMESPACE):
xfb.xdata.replace(el, replacements[el.get("href")])
FakeElementTree.xinclude.side_effect = xinclude
xfb.data = lxml.etree.tostring(xdata)
xfb.Index()
mock_follow.assert_any_call()
FakeElementTree.xinclude.assert_any_call
self.assertEqual(xfb.xdata.base, fname)
self.assertItemsEqual([lxml.etree.tostring(e) for e in xfb.entries],
[lxml.etree.tostring(e) for e in children])
def test_add_monitor(self):
fname = "/test/test1.xml"
xfb = self.get_obj(fname)
xfb.add_monitor("/test/test2.xml")
self.assertIn("/test/test2.xml", xfb.extras)
fam = Mock()
xfb = self.get_obj(fname, fam=fam)
fam.reset_mock()
xfb.add_monitor("/test/test3.xml")
self.assertFalse(fam.AddMonitor.called)
self.assertIn("/test/test3.xml", xfb.extras)
fam.reset_mock()
xfb = self.get_obj(fname, fam=fam, should_monitor=True)
xfb.add_monitor("/test/test4.xml")
fam.AddMonitor.assert_called_with("/test/test4.xml", xfb)
self.assertIn("/test/test4.xml", xfb.extras)
class TestStructFile(TestXMLFileBacked):
test_obj = StructFile
def _get_test_data(self):
""" build a very complex set of test data """
# top-level group and client elements
groups = dict()
# group and client elements that are descendents of other group or
# client elements
subgroups = dict()
# children of elements in `groups' that should be included in
# match results
children = dict()
# children of elements in `subgroups' that should be included in
# match results
subchildren = dict()
# top-level tags that are not group elements
standalone = []
xdata = lxml.etree.Element("Test", name="test")
groups[0] = lxml.etree.SubElement(xdata, "Group", name="group1",
include="true")
children[0] = [lxml.etree.SubElement(groups[0], "Child", name="c1"),
lxml.etree.SubElement(groups[0], "Child", name="c2")]
subgroups[0] = [lxml.etree.SubElement(groups[0], "Group",
name="subgroup1", include="true"),
lxml.etree.SubElement(groups[0],
"Client", name="client1",
include="false")]
subchildren[0] = \
[lxml.etree.SubElement(subgroups[0][0], "Child", name="sc1"),
lxml.etree.SubElement(subgroups[0][0], "Child", name="sc2",
attr="some attr"),
lxml.etree.SubElement(subgroups[0][0], "Child", name="sc3")]
lxml.etree.SubElement(subchildren[0][-1], "SubChild", name="subchild")
lxml.etree.SubElement(subgroups[0][1], "Child", name="sc4")
groups[1] = lxml.etree.SubElement(xdata, "Group", name="group2",
include="false")
children[1] = []
subgroups[1] = []
subchildren[1] = []
lxml.etree.SubElement(groups[1], "Child", name="c3")
lxml.etree.SubElement(groups[1], "Child", name="c4")
standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s1"))
groups[2] = lxml.etree.SubElement(xdata, "Client", name="client2",
include="false")
children[2] = []
subgroups[2] = []
subchildren[2] = []
lxml.etree.SubElement(groups[2], "Child", name="c5")
lxml.etree.SubElement(groups[2], "Child", name="c6")
standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s2",
attr="some attr"))
groups[3] = lxml.etree.SubElement(xdata, "Client", name="client3",
include="true")
children[3] = [lxml.etree.SubElement(groups[3], "Child", name="c7",
attr="some_attr"),
lxml.etree.SubElement(groups[3], "Child", name="c8")]
subgroups[3] = []
subchildren[3] = []
lxml.etree.SubElement(children[3][-1], "SubChild", name="subchild")
standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s3"))
lxml.etree.SubElement(standalone[-1], "SubStandalone", name="sub1")
children[4] = standalone
return (xdata, groups, subgroups, children, subchildren, standalone)
def test_include_element(self):
sf = self.get_obj("/test/test.xml")
metadata = Mock()
metadata.groups = ["group1", "group2"]
metadata.hostname = "foo.example.com"
inc = lambda tag, **attrs: \
sf._include_element(lxml.etree.Element(tag, **attrs), metadata)
self.assertFalse(sf._include_element(lxml.etree.Comment("test"),
metadata))
self.assertFalse(inc("Group", name="group3"))
self.assertFalse(inc("Group", name="group2", negate="true"))
self.assertFalse(inc("Group", name="group2", negate="tRuE"))
self.assertTrue(inc("Group", name="group2"))
self.assertTrue(inc("Group", name="group2", negate="false"))
self.assertTrue(inc("Group", name="group2", negate="faLSe"))
self.assertTrue(inc("Group", name="group3", negate="true"))
self.assertTrue(inc("Group", name="group3", negate="tRUe"))
self.assertFalse(inc("Client", name="bogus.example.com"))
self.assertFalse(inc("Client", name="foo.example.com", negate="true"))
self.assertFalse(inc("Client", name="foo.example.com", negate="tRuE"))
self.assertTrue(inc("Client", name="foo.example.com"))
self.assertTrue(inc("Client", name="foo.example.com", negate="false"))
self.assertTrue(inc("Client", name="foo.example.com", negate="faLSe"))
self.assertTrue(inc("Client", name="bogus.example.com", negate="true"))
self.assertTrue(inc("Client", name="bogus.example.com", negate="tRUe"))
self.assertTrue(inc("Other"))
@patch("Bcfg2.Server.Plugin.%s._include_element" % test_obj.__name__)
def test__match(self, mock_include):
sf = self.get_obj("/test/test.xml")
metadata = Mock()
(xdata, groups, subgroups, children, subchildren, standalone) = \
self._get_test_data()
mock_include.side_effect = \
lambda x, _: (x.tag not in ['Client', 'Group'] or
x.get("include") == "true")
for i, group in groups.items():
actual = sf._match(group, metadata)
expected = children[i] + subchildren[i]
self.assertEqual(len(actual), len(expected))
# easiest way to compare the values is actually to make
# them into an XML document and let assertXMLEqual compare
# them
xactual = lxml.etree.Element("Container")
xactual.extend(actual)
xexpected = lxml.etree.Element("Container")
xexpected.extend(expected)
self.assertXMLEqual(xactual, xexpected)
for el in standalone:
self.assertXMLEqual(el, sf._match(el, metadata)[0])
@patch("Bcfg2.Server.Plugin.%s._match" % test_obj.__name__)
def test_Match(self, mock_match):
sf = self.get_obj("/test/test.xml")
metadata = Mock()
(xdata, groups, subgroups, children, subchildren, standalone) = \
self._get_test_data()
sf.entries.extend(copy.deepcopy(xdata).getchildren())
def match_rv(el, _):
if el.tag not in ['Client', 'Group']:
return [el]
elif x.get("include") == "true":
return el.getchildren()
else:
return []
mock_match.side_effect = match_rv
actual = sf.Match(metadata)
expected = reduce(lambda x, y: x + y,
list(children.values()) + list(subgroups.values()))
self.assertEqual(len(actual), len(expected))
# easiest way to compare the values is actually to make
# them into an XML document and let assertXMLEqual compare
# them
xactual = lxml.etree.Element("Container")
xactual.extend(actual)
xexpected = lxml.etree.Element("Container")
xexpected.extend(expected)
self.assertXMLEqual(xactual, xexpected)
@patch("Bcfg2.Server.Plugin.%s._include_element" % test_obj.__name__)
def test__xml_match(self, mock_include):
sf = self.get_obj("/test/test.xml")
metadata = Mock()
(xdata, groups, subgroups, children, subchildren, standalone) = \
self._get_test_data()
mock_include.side_effect = \
lambda x, _: (x.tag not in ['Client', 'Group'] or
x.get("include") == "true")
actual = copy.deepcopy(xdata)
for el in actual.getchildren():
sf._xml_match(el, metadata)
expected = lxml.etree.Element(xdata.tag, **xdata.attrib)
expected.text = xdata.text
expected.extend(reduce(lambda x, y: x + y,
list(children.values()) + list(subchildren.values())))
expected.extend(standalone)
self.assertXMLEqual(actual, expected)
@patch("Bcfg2.Server.Plugin.%s._xml_match" % test_obj.__name__)
def test_Match(self, mock_xml_match):
sf = self.get_obj("/test/test.xml")
metadata = Mock()
(sf.xdata, groups, subgroups, children, subchildren, standalone) = \
self._get_test_data()
sf.XMLMatch(metadata)
actual = []
for call in mock_xml_match.call_args_list:
actual.append(call[0][0])
self.assertEqual(call[0][1], metadata)
expected = list(groups.values()) + standalone
# easiest way to compare the values is actually to make
# them into an XML document and let assertXMLEqual compare
# them
xactual = lxml.etree.Element("Container")
xactual.extend(actual)
xexpected = lxml.etree.Element("Container")
xexpected.extend(expected)
self.assertXMLEqual(xactual, xexpected)
class TestINode(Bcfg2TestCase):
test_obj = INode
# INode.__init__ and INode._load_children() call each other
# recursively, which makes this class kind of a nightmare to test.
# we have to first patch INode._load_children so that we can
# create an INode object with no children loaded, then we unpatch
# INode._load_children and patch INode.__init__ so that child
# objects aren't actually created. but in order to test things
# atomically, we do this umpteen times in order to test with
# different data. this convenience method makes this a little
# easier. fun fun fun.
@patch("Bcfg2.Server.Plugin.%s._load_children" % test_obj.__name__, Mock())
def _get_inode(self, data, idict):
return self.test_obj(data, idict)
def test_raw_predicates(self):
metadata = Mock()
metadata.groups = ["group1", "group2"]
metadata.hostname = "foo.example.com"
entry = None
parent_predicate = lambda m, e: True
pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
dict(predicate=parent_predicate))
self.assertTrue(pred(metadata, entry))
pred = eval(self.test_obj.raw['Client'] % dict(name="bar.example.com"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
dict(predicate=parent_predicate))
self.assertTrue(pred(metadata, entry))
pred = eval(self.test_obj.raw['Group'] % dict(name="group3"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(self.test_obj.nraw['Client'] % dict(name="foo.example.com"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
dict(predicate=parent_predicate))
self.assertTrue(pred(metadata, entry))
pred = eval(self.test_obj.nraw['Group'] % dict(name="group1"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
dict(predicate=parent_predicate))
self.assertTrue(pred(metadata, entry))
parent_predicate = lambda m, e: False
pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
self.assertItemsEqual(self.test_obj.containers,
self.test_obj.raw.keys())
self.assertItemsEqual(self.test_obj.containers,
self.test_obj.nraw.keys())
@patch("Bcfg2.Server.Plugin.INode._load_children")
def test__init(self, mock_load_children):
data = lxml.etree.Element("Bogus")
# called with no parent, should not raise an exception; it's a
# top-level tag in an XML file and so is not expected to be a
# proper predicate
INode(data, dict())
self.assertRaises(PluginExecutionError,
INode, data, dict(), Mock())
data = lxml.etree.Element("Client", name="foo.example.com")
idict = dict()
inode = INode(data, idict)
mock_load_children.assert_called_with(data, idict)
self.assertTrue(inode.predicate(Mock(), Mock()))
parent = Mock()
parent.predicate = lambda m, e: True
metadata = Mock()
metadata.groups = ["group1", "group2"]
metadata.hostname = "foo.example.com"
entry = None
# test setting predicate with parent object
mock_load_children.reset_mock()
inode = INode(data, idict, parent=parent)
mock_load_children.assert_called_with(data, idict)
self.assertTrue(inode.predicate(metadata, entry))
# test negation
data = lxml.etree.Element("Client", name="foo.example.com",
negate="true")
mock_load_children.reset_mock()
inode = INode(data, idict, parent=parent)
mock_load_children.assert_called_with(data, idict)
self.assertFalse(inode.predicate(metadata, entry))
# test failure of a matching predicate (client names do not match)
data = lxml.etree.Element("Client", name="foo.example.com")
metadata.hostname = "bar.example.com"
mock_load_children.reset_mock()
inode = INode(data, idict, parent=parent)
mock_load_children.assert_called_with(data, idict)
self.assertFalse(inode.predicate(metadata, entry))
# test that parent predicate is AND'ed in correctly
parent.predicate = lambda m, e: False
metadata.hostname = "foo.example.com"
mock_load_children.reset_mock()
inode = INode(data, idict, parent=parent)
mock_load_children.assert_called_with(data, idict)
self.assertFalse(inode.predicate(metadata, entry))
def test_load_children(self):
data = lxml.etree.Element("Parent")
child1 = lxml.etree.SubElement(data, "Client", name="foo.example.com")
child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
idict = dict()
inode = self._get_inode(data, idict)
@patch("Bcfg2.Server.Plugin.%s.__init__" % inode.__class__.__name__)
def inner(mock_init):
mock_init.return_value = None
inode._load_children(data, idict)
self.assertItemsEqual(mock_init.call_args_list,
[call(child1, idict, inode),
call(child2, idict, inode)])
self.assertEqual(idict, dict())
self.assertItemsEqual(inode.contents, dict())
inner()
data = lxml.etree.Element("Parent")
child1 = lxml.etree.SubElement(data, "Data", name="child1",
attr="some attr")
child1.text = "text"
subchild1 = lxml.etree.SubElement(child1, "SubChild", name="subchild")
child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
idict = dict()
inode = self._get_inode(data, idict)
@patch("Bcfg2.Server.Plugin.%s.__init__" % inode.__class__.__name__)
def inner2(mock_init):
mock_init.return_value = None
inode._load_children(data, idict)
mock_init.assert_called_with(child2, idict, inode)
tag = child1.tag
name = child1.get("name")
self.assertEqual(idict, dict(Data=[name]))
self.assertIn(tag, inode.contents)
self.assertIn(name, inode.contents[tag])
self.assertItemsEqual(inode.contents[tag][name],
dict(name=name,
attr=child1.get('attr'),
__text__=child1.text,
__children__=[subchild1]))
inner2()
# test ignore. no ignore is set on INode by default, so we
# have to set one
old_ignore = copy.copy(self.test_obj.ignore)
self.test_obj.ignore.append("Data")
idict = dict()
inode = self._get_inode(data, idict)
@patch("Bcfg2.Server.Plugin.%s.__init__" % inode.__class__.__name__)
def inner3(mock_init):
mock_init.return_value = None
inode._load_children(data, idict)
mock_init.assert_called_with(child2, idict, inode)
self.assertEqual(idict, dict())
self.assertItemsEqual(inode.contents, dict())
inner3()
self.test_obj.ignore = old_ignore
def test_Match(self):
idata = lxml.etree.Element("Parent")
contents = lxml.etree.SubElement(idata, "Data", name="contents",
attr="some attr")
child = lxml.etree.SubElement(idata, "Group", name="bar", negate="true")
inode = INode(idata, dict())
inode.predicate = Mock()
inode.predicate.return_value = False
metadata = Mock()
metadata.groups = ['foo']
data = dict()
entry = child
inode.Match(metadata, data, entry=child)
self.assertEqual(data, dict())
inode.predicate.assert_called_with(metadata, child)
inode.predicate.reset_mock()
inode.Match(metadata, data)
self.assertEqual(data, dict())
# can't easily compare XML args without the original
# object, and we're testing that Match() works without an
# XML object passed in, so...
self.assertEqual(inode.predicate.call_args[0][0],
metadata)
self.assertXMLEqual(inode.predicate.call_args[0][1],
lxml.etree.Element("None"))
inode.predicate.reset_mock()
inode.predicate.return_value = True
inode.Match(metadata, data, entry=child)
self.assertEqual(data, inode.contents)
inode.predicate.assert_called_with(metadata, child)
class TestInfoNode(TestINode):
__test__ = True
test_obj = InfoNode
def test_raw_predicates(self):
TestINode.test_raw_predicates(self)
metadata = Mock()
entry = lxml.etree.Element("Path", name="/tmp/foo",
realname="/tmp/bar")
parent_predicate = lambda m, d: True
pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
dict(predicate=parent_predicate))
self.assertTrue(pred(metadata, entry))
pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
dict(predicate=parent_predicate))
self.assertTrue(pred(metadata, entry))
pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bogus"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(self.test_obj.nraw['Path'] % dict(name="/tmp/foo"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bar"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
dict(predicate=parent_predicate))
self.assertTrue(pred(metadata, entry))
parent_predicate = lambda m, d: False
pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
dict(predicate=parent_predicate))
self.assertFalse(pred(metadata, entry))
class TestXMLSrc(TestXMLFileBacked):
test_obj = XMLSrc
def test_node_interface(self):
# ensure that the node object has the necessary interface
self.assertTrue(hasattr(self.test_obj.__node__, "Match"))
@patch("%s.open" % builtins)
def test_HandleEvent(self, mock_open):
xdata = lxml.etree.Element("Test")
lxml.etree.SubElement(xdata, "Path", name="path", attr="whatever")
xsrc = self.get_obj("/test/foo.xml")
xsrc.__node__ = Mock()
mock_open.return_value.read.return_value = lxml.etree.tostring(xdata)
self.assertRaises(PluginExecutionError,
xsrc.HandleEvent, Mock())
xdata.set("priority", "cow")
mock_open.return_value.read.return_value = lxml.etree.tostring(xdata)
self.assertRaises(PluginExecutionError,
xsrc.HandleEvent, Mock())
xdata.set("priority", "10")
mock_open.return_value.read.return_value = lxml.etree.tostring(xdata)
mock_open.reset_mock()
xsrc = self.get_obj("/test/foo.xml")
xsrc.__node__ = Mock()
xsrc.HandleEvent(Mock())
mock_open.assert_called_with("/test/foo.xml")
mock_open.return_value.read.assert_any_call()
self.assertXMLEqual(xsrc.__node__.call_args[0][0], xdata)
self.assertEqual(xsrc.__node__.call_args[0][1], dict())
self.assertEqual(xsrc.pnode, xsrc.__node__.return_value)
self.assertEqual(xsrc.cache, None)
@patch("Bcfg2.Server.Plugin.XMLSrc.HandleEvent")
def test_Cache(self, mock_HandleEvent):
xsrc = self.get_obj("/test/foo.xml")
metadata = Mock()
xsrc.Cache(metadata)
mock_HandleEvent.assert_any_call()
xsrc.pnode = Mock()
xsrc.Cache(metadata)
xsrc.pnode.Match.assert_called_with(metadata, xsrc.__cacheobj__())
self.assertEqual(xsrc.cache[0], metadata)
xsrc.pnode.reset_mock()
xsrc.Cache(metadata)
self.assertFalse(xsrc.pnode.Mock.called)
self.assertEqual(xsrc.cache[0], metadata)
xsrc.cache = ("bogus")
xsrc.Cache(metadata)
xsrc.pnode.Match.assert_called_with(metadata, xsrc.__cacheobj__())
self.assertEqual(xsrc.cache[0], metadata)
class TestInfoXML(TestXMLSrc):
test_obj = InfoXML
class TestXMLDirectoryBacked(TestDirectoryBacked):
test_obj = XMLDirectoryBacked
class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
test_obj = PrioDir
@patch("Bcfg2.Server.Plugin.%s.add_directory_monitor" % test_obj.__name__,
Mock())
def get_obj(self, core=None):
if core is None:
core = Mock()
return self.test_obj(core, datastore)
def test_HandleEvent(self):
TestXMLDirectoryBacked.test_HandleEvent(self)
@patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent", Mock())
def inner():
pd = self.get_obj()
test1 = Mock()
test1.items = dict(Path=["/etc/foo.conf", "/etc/bar.conf"])
test2 = Mock()
test2.items = dict(Path=["/etc/baz.conf"],
Package=["quux", "xyzzy"])
pd.entries = {"/test1.xml": test1,
"/test2.xml": test2}
pd.HandleEvent(Mock())
self.assertItemsEqual(pd.Entries,
dict(Path={"/etc/foo.conf": pd.BindEntry,
"/etc/bar.conf": pd.BindEntry,
"/etc/baz.conf": pd.BindEntry},
Package={"quux": pd.BindEntry,
"xyzzy": pd.BindEntry}))
inner()
def test__matches(self):
pd = self.get_obj()
self.assertTrue(pd._matches(lxml.etree.Element("Test",
name="/etc/foo.conf"),
Mock(),
{"/etc/foo.conf": pd.BindEntry,
"/etc/bar.conf": pd.BindEntry}))
self.assertFalse(pd._matches(lxml.etree.Element("Test",
name="/etc/baz.conf"),
Mock(),
{"/etc/foo.conf": pd.BindEntry,
"/etc/bar.conf": pd.BindEntry}))
def test_BindEntry(self):
pd = self.get_obj()
pd.get_attrs = Mock(return_value=dict(test1="test1", test2="test2"))
entry = lxml.etree.Element("Path", name="/etc/foo.conf", test1="bogus")
metadata = Mock()
pd.BindEntry(entry, metadata)
pd.get_attrs.assert_called_with(entry, metadata)
self.assertItemsEqual(entry.attrib,
dict(name="/etc/foo.conf",
test1="test1", test2="test2"))
def test_get_attrs(self):
pd = self.get_obj()
entry = lxml.etree.Element("Path", name="/etc/foo.conf")
children = [lxml.etree.Element("Child")]
metadata = Mock()
pd.entries = dict()
def reset():
metadata.reset_mock()
for src in pd.entries.values():
src.reset_mock()
src.cache = None
# test with no matches
self.assertRaises(PluginExecutionError,
pd.get_attrs, entry, metadata)
def add_entry(name, data, prio=10):
path = os.path.join(pd.data, name)
pd.entries[path] = Mock()
pd.entries[path].priority = prio
def do_Cache(metadata):
pd.entries[path].cache = (metadata, data)
pd.entries[path].Cache.side_effect = do_Cache
add_entry('test1.xml',
dict(Path={'/etc/foo.conf': dict(attr="attr1",
__children__=children),
'/etc/bar.conf': dict()}))
add_entry('test2.xml',
dict(Path={'/etc/bar.conf': dict(__text__="text",
attr="attr1")},
Package={'quux': dict(),
'xyzzy': dict()}),
prio=20)
add_entry('test3.xml',
dict(Path={'/etc/baz.conf': dict()},
Package={'xyzzy': dict()}),
prio=20)
# test with exactly one match, __children__
reset()
self.assertItemsEqual(pd.get_attrs(entry, metadata),
dict(attr="attr1"))
for src in pd.entries.values():
src.Cache.assert_called_with(metadata)
self.assertEqual(len(entry.getchildren()), 1)
self.assertXMLEqual(entry.getchildren()[0], children[0])
# test with multiple matches with different priorities, __text__
reset()
entry = lxml.etree.Element("Path", name="/etc/bar.conf")
self.assertItemsEqual(pd.get_attrs(entry, metadata),
dict(attr="attr1"))
for src in pd.entries.values():
src.Cache.assert_called_with(metadata)
self.assertEqual(entry.text, "text")
# test with multiple matches with identical priorities
reset()
entry = lxml.etree.Element("Package", name="xyzzy")
self.assertRaises(PluginExecutionError,
pd.get_attrs, entry, metadata)
class TestSpecificityError(Bcfg2TestCase):
""" placeholder for future tests """
pass
class TestSpecificity(Bcfg2TestCase):
test_obj = Specificity
def get_obj(self, **kwargs):
return self.test_obj(**kwargs)
def test_matches(self):
metadata = Mock()
metadata.hostname = "foo.example.com"
metadata.groups = ["group1", "group2"]
self.assertTrue(self.get_obj(all=True).matches(metadata))
self.assertTrue(self.get_obj(group="group1").matches(metadata))
self.assertTrue(self.get_obj(hostname="foo.example.com").matches(metadata))
self.assertFalse(self.get_obj().matches(metadata))
self.assertFalse(self.get_obj(group="group3").matches(metadata))
self.assertFalse(self.get_obj(hostname="bar.example.com").matches(metadata))
def test__cmp(self):
specs = [self.get_obj(all=True),
self.get_obj(group="group1", prio=10),
self.get_obj(group="group1", prio=20),
self.get_obj(hostname="foo.example.com")]
for i in range(len(specs)):
for j in range(len(specs)):
if i == j:
self.assertEqual(0, specs[i].__cmp__(specs[j]))
self.assertEqual(0, specs[j].__cmp__(specs[i]))
elif i > j:
self.assertEqual(-1, specs[i].__cmp__(specs[j]))
self.assertEqual(1, specs[j].__cmp__(specs[i]))
elif i < j:
self.assertEqual(1, specs[i].__cmp__(specs[j]))
self.assertEqual(-1, specs[j].__cmp__(specs[i]))
def test_cmp(self):
""" test __lt__/__gt__/__eq__ """
specs = [self.get_obj(all=True),
self.get_obj(group="group1", prio=10),
self.get_obj(group="group1", prio=20),
self.get_obj(hostname="foo.example.com")]
for i in range(len(specs)):
for j in range(len(specs)):
if i < j:
self.assertGreater(specs[i], specs[j])
self.assertLess(specs[j], specs[i])
self.assertGreaterEqual(specs[i], specs[j])
self.assertLessEqual(specs[j], specs[i])
elif i == j:
self.assertEqual(specs[i], specs[j])
self.assertEqual(specs[j], specs[i])
self.assertLessEqual(specs[i], specs[j])
self.assertGreaterEqual(specs[j], specs[i])
elif i > j:
self.assertLess(specs[i], specs[j])
self.assertGreater(specs[j], specs[i])
self.assertLessEqual(specs[i], specs[j])
self.assertGreaterEqual(specs[j], specs[i])
class TestSpecificData(Bcfg2TestCase):
test_obj = SpecificData
def get_obj(self, name="/test.txt", specific=None, encoding=None):
if specific is None:
specific = Mock()
return self.test_obj(name, specific, encoding)
@patch("%s.open" % builtins)
def test_handle_event(self, mock_open):
event = Mock()
event.code2str.return_value = 'deleted'
sd = self.get_obj()
sd.handle_event(event)
self.assertFalse(mock_open.called)
if hasattr(sd, 'data'):
self.assertIsNone(sd.data)
else:
self.assertFalse(hasattr(sd, 'data'))
event = Mock()
mock_open.return_value.read.return_value = "test"
sd.handle_event(event)
mock_open.assert_called_with("/test.txt")
mock_open.return_value.read.assert_any_call()
self.assertEqual(sd.data, "test")
class TestEntrySet(TestDebuggable):
test_obj = EntrySet
# filenames that should be matched successfully by the EntrySet
# 'specific' regex. these are filenames alone -- a specificity
# will be added to these
basenames = ["test", "test.py", "test with spaces.txt",
"test.multiple.dots.py", "test_underscores.and.dots",
"really_misleading.G10_test",
"name$with*regex(special){chars}",
"misleading.H_hostname.test.com"]
# filenames that do not match any of the basenames (or the
# basename regex, if applicable)
bogus_names = ["bogus"]
# filenames that should be ignored
ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx",
"test.txt.genshi_include", "test.G_foo.genshi_include"]
def get_obj(self, basename="test", path=datastore, entry_type=MagicMock(),
encoding=None):
return self.test_obj(basename, path, entry_type, encoding)
def test__init(self):
for basename in self.basenames:
eset = self.get_obj(basename=basename)
self.assertIsInstance(eset.specific, re._pattern_type)
self.assertTrue(eset.specific.match(os.path.join(datastore,
basename)))
ppath = os.path.join(datastore, "Plugin", basename)
self.assertTrue(eset.specific.match(ppath))
self.assertTrue(eset.specific.match(ppath + ".G20_foo"))
self.assertTrue(eset.specific.match(ppath + ".G1_foo"))
self.assertTrue(eset.specific.match(ppath + ".G32768_foo"))
# a group named '_'
self.assertTrue(eset.specific.match(ppath + ".G10__"))
self.assertTrue(eset.specific.match(ppath + ".H_hostname"))
self.assertTrue(eset.specific.match(ppath + ".H_fqdn.subdomain.example.com"))
self.assertTrue(eset.specific.match(ppath + ".G20_group_with_underscores"))
self.assertFalse(eset.specific.match(ppath + ".G20_group with spaces"))
self.assertFalse(eset.specific.match(ppath + ".G_foo"))
self.assertFalse(eset.specific.match(ppath + ".G_"))
self.assertFalse(eset.specific.match(ppath + ".G20_"))
self.assertFalse(eset.specific.match(ppath + ".H_"))
for bogus in self.bogus_names:
self.assertFalse(eset.specific.match(os.path.join(datastore,
"Plugin",
bogus)))
for ignore in self.ignore:
self.assertTrue(eset.ignore.match(ignore))
self.assertFalse(eset.ignore.match(basename))
self.assertFalse(eset.ignore.match(basename + ".G20_foo"))
self.assertFalse(eset.ignore.match(basename + ".G1_foo"))
self.assertFalse(eset.ignore.match(basename + ".G32768_foo"))
self.assertFalse(eset.ignore.match(basename + ".G10__"))
self.assertFalse(eset.ignore.match(basename + ".H_hostname"))
self.assertFalse(eset.ignore.match(basename + ".H_fqdn.subdomain.example.com"))
self.assertFalse(eset.ignore.match(basename + ".G20_group_with_underscores"))
def test_get_matching(self):
items = {0: Mock(), 1: Mock(), 2: Mock(), 3: Mock(), 4: Mock(),
5: Mock()}
items[0].specific.matches.return_value = False
items[1].specific.matches.return_value = True
items[2].specific.matches.return_value = False
items[3].specific.matches.return_value = False
items[4].specific.matches.return_value = True
items[5].specific.matches.return_value = True
metadata = Mock()
eset = self.get_obj()
eset.entries = items
self.assertItemsEqual(eset.get_matching(metadata),
[items[1], items[4], items[5]])
for i in items.values():
i.specific.matches.assert_called_with(metadata)
@patch("Bcfg2.Server.Plugin.%s.get_matching" % test_obj.__name__)
def test_best_matching(self, mock_get_matching):
eset = self.get_obj()
metadata = Mock()
matching = []
def reset():
mock_get_matching.reset_mock()
metadata.reset_mock()
for m in matching:
m.reset_mock()
def specific(all=False, group=False, prio=None, hostname=False):
spec = Mock()
spec.specific = Specificity(all=all, group=group, prio=prio,
hostname=hostname)
return spec
self.assertRaises(PluginExecutionError,
eset.best_matching, metadata, matching=[])
reset()
mock_get_matching.return_value = matching
self.assertRaises(PluginExecutionError,
eset.best_matching, metadata)
mock_get_matching.assert_called_with(metadata)
# test with a single file for all
reset()
expected = specific(all=True)
matching.append(expected)
mock_get_matching.return_value = matching
self.assertEqual(eset.best_matching(metadata), expected)
mock_get_matching.assert_called_with(metadata)
# test with a single group-specific file
reset()
expected = specific(group=True, prio=10)
matching.append(expected)
mock_get_matching.return_value = matching
self.assertEqual(eset.best_matching(metadata), expected)
mock_get_matching.assert_called_with(metadata)
# test with multiple group-specific files
reset()
expected = specific(group=True, prio=20)
matching.append(expected)
mock_get_matching.return_value = matching
self.assertEqual(eset.best_matching(metadata), expected)
mock_get_matching.assert_called_with(metadata)
# test with host-specific file
reset()
expected = specific(hostname=True)
matching.append(expected)
mock_get_matching.return_value = matching
self.assertEqual(eset.best_matching(metadata), expected)
mock_get_matching.assert_called_with(metadata)
@patch("Bcfg2.Server.Plugin.%s.entry_init" % test_obj.__name__)
@patch("Bcfg2.Server.Plugin.%s.reset_metadata" % test_obj.__name__)
@patch("Bcfg2.Server.Plugin.%s.update_metadata" % test_obj.__name__)
def test_handle_event(self, mock_update_md, mock_reset_md, mock_init):
def reset():
mock_update_md.reset_mock()
mock_reset_md.reset_mock()
mock_init.reset_mock()
eset = self.get_obj()
for fname in ["info", "info.xml", ":info"]:
for evt in ["exists", "created", "changed"]:
reset()
event = Mock()
event.code2str.return_value = evt
event.filename = fname
eset.handle_event(event)
mock_update_md.assert_called_with(event)
self.assertFalse(mock_init.called)
self.assertFalse(mock_reset_md.called)
reset()
event = Mock()
event.code2str.return_value = "deleted"
event.filename = fname
eset.handle_event(event)
mock_reset_md.assert_called_with(event)
self.assertFalse(mock_init.called)
self.assertFalse(mock_update_md.called)
for evt in ["exists", "created", "changed"]:
reset()
event = Mock()
event.code2str.return_value = evt
event.filename = "test.txt"
eset.handle_event(event)
mock_init.assert_called_with(event)
self.assertFalse(mock_reset_md.called)
self.assertFalse(mock_update_md.called)
reset()
entry = Mock()
eset.entries["test.txt"] = entry
event = Mock()
event.code2str.return_value = "changed"
event.filename = "test.txt"
eset.handle_event(event)
entry.handle_event.assert_called_with(event)
self.assertFalse(mock_init.called)
self.assertFalse(mock_reset_md.called)
self.assertFalse(mock_update_md.called)
reset()
entry = Mock()
eset.entries["test.txt"] = entry
event = Mock()
event.code2str.return_value = "deleted"
event.filename = "test.txt"
eset.handle_event(event)
self.assertNotIn("test.txt", eset.entries)
@patch("Bcfg2.Server.Plugin.%s.specificity_from_filename" %
test_obj.__name__)
def test_entry_init(self, mock_spec):
eset = self.get_obj()
def reset():
eset.entry_type.reset_mock()
mock_spec.reset_mock()
event = Mock()
event.code2str.return_value = "created"
event.filename = "test.txt"
eset.entry_init(event)
mock_spec.assert_called_with("test.txt", specific=None)
eset.entry_type.assert_called_with(os.path.join(eset.path, "test.txt"),
mock_spec.return_value, None)
eset.entry_type.return_value.handle_event.assert_called_with(event)
self.assertIn("test.txt", eset.entries)
# test duplicate add
reset()
eset.entry_init(event)
self.assertFalse(mock_spec.called)
self.assertFalse(eset.entry_type.called)
eset.entries["test.txt"].handle_event.assert_called_with(event)
# test keyword args
etype = Mock()
specific = Mock()
event = Mock()
event.code2str.return_value = "created"
event.filename = "test2.txt"
eset.entry_init(event, entry_type=etype, specific=specific)
mock_spec.assert_called_with("test2.txt", specific=specific)
etype.assert_called_with(os.path.join(eset.path, "test2.txt"),
mock_spec.return_value, None)
etype.return_value.handle_event.assert_called_with(event)
self.assertIn("test2.txt", eset.entries)
# test specificity error
event = Mock()
event.code2str.return_value = "created"
event.filename = "test3.txt"
mock_spec.side_effect = SpecificityError
eset.entry_init(event)
mock_spec.assert_called_with("test3.txt", specific=None)
self.assertFalse(eset.entry_type.called)
@patch("Bcfg2.Server.Plugin.Specificity")
def test_specificity_from_filename(self, mock_spec):
def test(eset, fname, **kwargs):
mock_spec.reset_mock()
if "specific" in kwargs:
specific = kwargs['specific']
del kwargs['specific']
else:
specific = None
self.assertEqual(eset.specificity_from_filename(fname,
specific=specific),
mock_spec.return_value)
mock_spec.assert_called_with(**kwargs)
def fails(eset, fname, specific=None):
mock_spec.reset_mock()
self.assertRaises(SpecificityError,
eset.specificity_from_filename, fname,
specific=specific)
for basename in self.basenames:
eset = self.get_obj(basename=basename)
ppath = os.path.join(datastore, "Plugin", basename)
test(eset, ppath, all=True)
test(eset, ppath + ".G20_foo", group="foo", prio=20)
test(eset, ppath + ".G1_foo", group="foo", prio=1)
test(eset, ppath + ".G32768_foo", group="foo", prio=32768)
test(eset, ppath + ".G10__", group="_", prio=10)
test(eset, ppath + ".H_hostname", hostname="hostname")
test(eset, ppath + ".H_fqdn.subdomain.example.com",
hostname="fqdn.subdomain.example.com")
test(eset, ppath + ".G20_group_with_underscores",
group="group_with_underscores", prio=20)
for bogus in self.bogus_names:
fails(eset, bogus)
fails(eset, ppath + ".G_group with spaces")
fails(eset, ppath + ".G_foo")
fails(eset, ppath + ".G_")
fails(eset, ppath + ".G20_")
fails(eset, ppath + ".H_")
@patch("%s.open" % builtins)
@patch("Bcfg2.Server.Plugin.InfoXML")
def test_update_metadata(self, mock_InfoXML, mock_open):
eset = self.get_obj()
# add info.xml
event = Mock()
event.filename = "info.xml"
eset.update_metadata(event)
mock_InfoXML.assert_called_with(os.path.join(eset.path, "info.xml"),
True)
mock_InfoXML.return_value.HandleEvent.assert_called_with(event)
self.assertEqual(eset.infoxml, mock_InfoXML.return_value)
# modify info.xml
mock_InfoXML.reset_mock()
eset.update_metadata(event)
self.assertFalse(mock_InfoXML.called)
eset.infoxml.HandleEvent.assert_called_with(event)
for fname in [':info', 'info']:
event = Mock()
event.filename = fname
idata = ["owner:owner",
"group: GROUP",
"perms: 775",
"important: true",
"bogus: line"]
mock_open.return_value.readlines.return_value = idata
eset.update_metadata(event)
expected = default_file_metadata.copy()
expected['owner'] = 'owner'
expected['group'] = 'GROUP'
expected['perms'] = '0775'
expected['important'] = 'true'
self.assertItemsEqual(eset.metadata,
expected)
def test_reset_metadata(self):
eset = self.get_obj()
# test info.xml
event = Mock()
event.filename = "info.xml"
eset.infoxml = Mock()
eset.reset_metadata(event)
self.assertIsNone(eset.infoxml)
for fname in [':info', 'info']:
event = Mock()
event.filename = fname
eset.metadata = Mock()
eset.reset_metadata(event)
self.assertItemsEqual(eset.metadata, default_file_metadata)
@patch("Bcfg2.Server.Plugin.bind_info")
def test_bind_info_to_entry(self, mock_bind_info):
eset = self.get_obj()
entry = Mock()
metadata = Mock()
eset.bind_info_to_entry(entry, metadata)
mock_bind_info.assert_called_with(entry, metadata,
infoxml=eset.infoxml,
default=eset.metadata)
@patch("Bcfg2.Server.Plugin.%s.best_matching" % test_obj.__name__)
@patch("Bcfg2.Server.Plugin.%s.bind_info_to_entry" % test_obj.__name__)
def test_bind_entry(self, mock_bind_info, mock_best_matching):
eset = self.get_obj()
entry = Mock()
metadata = Mock()
eset.bind_entry(entry, metadata)
mock_bind_info.assert_called_with(entry, metadata)
mock_best_matching.assert_called_with(metadata)
mock_best_matching.return_value.bind_entry.assert_called_with(entry,
metadata)
class TestGroupSpool(TestPlugin, TestGenerator):
test_obj = GroupSpool
@patch("Bcfg2.Server.Plugin.%s.AddDirectoryMonitor" % test_obj.__name__)
def get_obj(self, core=None):
return TestPlugin.get_obj(self, core=core)
@patch("Bcfg2.Server.Plugin.%s.AddDirectoryMonitor" % test_obj.__name__)
def test__init(self, mock_Add):
core = Mock()
gs = self.test_obj(core, datastore)
mock_Add.assert_called_with('')
self.assertItemsEqual(gs.Entries, {gs.entry_type: {}})
@patch("os.path.isdir")
@patch("os.path.isfile")
@patch("Bcfg2.Server.Plugin.%s.event_id" % test_obj.__name__)
@patch("Bcfg2.Server.Plugin.%s.event_path" % test_obj.__name__)
@patch("Bcfg2.Server.Plugin.%s.AddDirectoryMonitor" % test_obj.__name__)
def test_add_entry(self, mock_Add, mock_event_path, mock_event_id,
mock_isfile, mock_isdir):
gs = self.get_obj()
gs.es_cls = Mock()
gs.es_child_cls = Mock()
def reset():
gs.es_cls.reset_mock()
gs.es_child_cls.reset_mock()
mock_Add.reset_mock()
mock_event_path.reset_mock()
mock_event_id.reset_mock()
mock_isfile.reset_mock()
mock_isdir.reset_mock()
# directory
event = Mock()
event.filename = "foo"
basedir = "test"
epath = os.path.join(gs.data, basedir, event.filename)
ident = os.path.join(basedir, event.filename)
mock_event_path.return_value = epath
mock_event_id.return_value = ident
mock_isdir.return_value = True
mock_isfile.return_value = False
gs.add_entry(event)
mock_Add.assert_called_with(os.path.join("/" + basedir, event.filename))
self.assertNotIn(ident, gs.entries)
mock_isdir.assert_called_with(epath)
# file that is not in self.entries
reset()
event = Mock()
event.filename = "foo"
basedir = "test/foo/"
epath = os.path.join(gs.data, basedir, event.filename)
ident = basedir[:-1]
mock_event_path.return_value = epath
mock_event_id.return_value = ident
mock_isdir.return_value = False
mock_isfile.return_value = True
gs.add_entry(event)
self.assertFalse(mock_Add.called)
gs.es_cls.assert_called_with(gs.filename_pattern,
gs.data + ident,
gs.es_child_cls,
gs.encoding)
self.assertIn(ident, gs.entries)
self.assertEqual(gs.entries[ident], gs.es_cls.return_value)
self.assertIn(ident, gs.Entries[gs.entry_type])
self.assertEqual(gs.Entries[gs.entry_type][ident],
gs.es_cls.return_value.bind_entry)
gs.entries[ident].handle_event.assert_called_with(event)
mock_isfile.assert_called_with(epath)
# file that is in self.entries
reset()
gs.add_entry(event)
self.assertFalse(mock_Add.called)
self.assertFalse(gs.es_cls.called)
gs.entries[ident].handle_event.assert_called_with(event)
def test_event_path(self):
gs = self.get_obj()
gs.handles[1] = "/var/lib/foo/"
gs.handles[2] = "/etc/foo/"
gs.handles[3] = "/usr/share/foo/"
event = Mock()
event.filename = "foo"
for i in range(1, 4):
event.requestID = i
self.assertEqual(gs.event_path(event),
os.path.join(datastore, gs.name,
gs.handles[event.requestID].lstrip('/'),
event.filename))
@patch("os.path.isdir")
@patch("Bcfg2.Server.Plugin.%s.event_path" % test_obj.__name__)
def test_event_id(self, mock_event_path, mock_isdir):
gs = self.get_obj()
def reset():
mock_event_path.reset_mock()
mock_isdir.reset_mock()
gs.handles[1] = "/var/lib/foo/"
gs.handles[2] = "/etc/foo/"
gs.handles[3] = "/usr/share/foo/"
event = Mock()
event.filename = "foo"
for i in range(1, 4):
event.requestID = i
reset()
mock_isdir.return_value = True
self.assertEqual(gs.event_id(event),
os.path.join(gs.handles[event.requestID].lstrip('/'),
event.filename))
mock_isdir.assert_called_with(mock_event_path.return_value)
reset()
mock_isdir.return_value = False
self.assertEqual(gs.event_id(event),
gs.handles[event.requestID].rstrip('/'))
mock_isdir.assert_called_with(mock_event_path.return_value)
def test_toggle_debug(self):
gs = self.get_obj()
gs.entries = {"/foo": Mock(),
"/bar": Mock(),
"/baz/quux": Mock()}
@patch("Bcfg2.Server.Plugin.Plugin.toggle_debug")
def inner(mock_debug):
gs.toggle_debug()
mock_debug.assert_called_with(gs)
for entry in gs.entries.values():
entry.toggle_debug.assert_any_call()
inner()
TestPlugin.test_toggle_debug(self)
@patch("Bcfg2.Server.Plugin.%s.event_id" % test_obj.__name__)
@patch("Bcfg2.Server.Plugin.%s.add_entry" % test_obj.__name__)
def test_HandleEvent(self, mock_add_entry, mock_event_id):
gs = self.get_obj()
gs.entries = {"/foo": Mock(),
"/bar": Mock(),
"/baz": Mock(),
"/baz/quux": Mock()}
for path in gs.entries.keys():
gs.Entries[gs.entry_type] = {path: Mock()}
gs.handles = {1: "/foo/",
2: "/bar/",
3: "/baz/",
4: "/baz/quux"}
def reset():
mock_add_entry.reset_mock()
mock_event_id.reset_mock()
for entry in gs.entries.values():
entry.reset_mock()
# test event creation, changing entry that doesn't exist
for evt in ["exists", "created", "changed"]:
reset()
event = Mock()
event.filename = "foo"
event.code2str.return_value = evt
gs.HandleEvent(event)
mock_event_id.assert_called_with(event)
mock_add_entry.assert_called_with(event)
# test deleting entry, changing entry that does exist
for evt in ["changed", "deleted"]:
reset()
event = Mock()
event.filename = "quux"
event.requestID = 4
event.code2str.return_value = evt
mock_event_id.return_value = "/baz/quux"
gs.HandleEvent(event)
mock_event_id.assert_called_with(event)
self.assertIn(mock_event_id.return_value, gs.entries)
gs.entries[mock_event_id.return_value].handle_event.assert_called_with(event)
self.assertFalse(mock_add_entry.called)
# test deleting directory
reset()
event = Mock()
event.filename = "quux"
event.requestID = 3
event.code2str.return_value = "deleted"
mock_event_id.return_value = "/baz/quux"
gs.HandleEvent(event)
mock_event_id.assert_called_with(event)
self.assertNotIn("/baz/quux", gs.entries)
self.assertNotIn("/baz/quux", gs.Entries[gs.entry_type])