import os
import copy
import time
import socket
import unittest
import lxml.etree
from mock import Mock, patch
import Bcfg2.Server.Plugin
from Bcfg2.Server.Plugins.Metadata import *
XI_NAMESPACE = "http://www.w3.org/2001/XInclude"
XI = "{%s}" % XI_NAMESPACE
clients_test_tree = lxml.etree.XML('''
''').getroottree()
groups_test_tree = lxml.etree.XML('''
''').getroottree()
datastore = "/"
def get_metadata_object(core=None, watch_clients=False):
if core is None:
core = Mock()
metadata = Metadata(core, datastore, watch_clients=watch_clients)
#metadata.logger = Mock()
return metadata
class TestXMLMetadataConfig(unittest.TestCase):
def get_config_object(self, basefile="clients.xml", core=None,
watch_clients=False):
self.metadata = get_metadata_object(core=core,
watch_clients=watch_clients)
return XMLMetadataConfig(self.metadata, watch_clients, basefile)
def test_xdata(self):
config = self.get_config_object()
# we can't use assertRaises here because xdata is a property
try:
config.xdata
except MetadataRuntimeError:
pass
except:
assert False
config.data = ""
self.assertEqual(config.xdata, "")
def test_base_xdata(self):
config = self.get_config_object()
# we can't use assertRaises here because base_xdata is a property
try:
config.base_xdata
except MetadataRuntimeError:
pass
except:
assert False
config.basedata = ""
self.assertEqual(config.base_xdata, "")
def test_add_monitor(self):
core = Mock()
config = self.get_config_object(core=core)
fname = "test.xml"
fpath = os.path.join(self.metadata.data, "test.xml")
config.extras = []
config.add_monitor(fpath, fname)
self.assertFalse(core.fam.AddMonitor.called)
self.assertEqual(config.extras, [])
config = self.get_config_object(core=core, watch_clients=True)
config.add_monitor(fpath, fname)
core.fam.AddMonitor.assert_called_with(fpath, self.metadata)
self.assertItemsEqual(config.extras, [fname])
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.add_monitor")
@patch("lxml.etree.parse")
def test_load_xml(self, mock_parse, mock_add_monitor):
config = self.get_config_object("clients.xml")
mock_parse.side_effect = lxml.etree.XMLSyntaxError(None, None, None,
None)
config.load_xml()
self.assertIsNone(config.data)
self.assertIsNone(config.basedata)
config.data = None
config.basedata = None
mock_parse.side_effect = None
mock_parse.return_value.findall = Mock(return_value=[])
config.load_xml()
self.assertIsNotNone(config.data)
self.assertIsNotNone(config.basedata)
config.data = None
config.basedata = None
def side_effect(*args):
def second_call(*args):
return []
mock_parse.return_value.findall.side_effect = second_call
return [lxml.etree.Element(XI + "include", href="more.xml"),
lxml.etree.Element(XI + "include", href="evenmore.xml")]
mock_parse.return_value.findall = Mock(side_effect=side_effect)
config.load_xml()
mock_add_monitor.assert_any_call("more.xml")
mock_add_monitor.assert_any_call("evenmore.xml")
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml")
def test_write(self, mock_write_xml):
config = self.get_config_object("clients.xml")
config.basedata = ""
config.write()
mock_write_xml.assert_called_with(os.path.join(self.metadata.data,
"clients.xml"),
"")
@patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False))
@patch('fcntl.lockf', Mock())
@patch('__builtin__.open')
@patch('os.unlink')
@patch('os.rename')
@patch('os.path.islink')
@patch('os.readlink')
def test_write_xml(self, mock_readlink, mock_islink, mock_rename,
mock_unlink, mock_open):
fname = "clients.xml"
config = self.get_config_object(fname)
fpath = os.path.join(self.metadata.data, fname)
tmpfile = "%s.new" % fpath
linkdest = os.path.join(self.metadata.data, "client-link.xml")
mock_islink.return_value = False
config.write_xml(fpath, clients_test_tree)
mock_open.assert_called_with(tmpfile, "w")
self.assertTrue(mock_open.return_value.write.called)
mock_islink.assert_called_with(fpath)
mock_rename.assert_called_with(tmpfile, fpath)
mock_islink.return_value = True
mock_readlink.return_value = linkdest
config.write_xml(fpath, clients_test_tree)
mock_rename.assert_called_with(tmpfile, linkdest)
mock_rename.side_effect = OSError
self.assertRaises(MetadataRuntimeError,
config.write_xml, fpath, clients_test_tree)
mock_open.return_value.write.side_effect = IOError
self.assertRaises(MetadataRuntimeError,
config.write_xml, fpath, clients_test_tree)
mock_unlink.assert_called_with(tmpfile)
mock_open.side_effect = IOError
self.assertRaises(MetadataRuntimeError,
config.write_xml, fpath, clients_test_tree)
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch('lxml.etree.parse')
def test_find_xml_for_xpath(self, mock_parse):
config = self.get_config_object("groups.xml")
config.basedata = groups_test_tree
xpath = "//Group[@name='group1']"
self.assertItemsEqual(config.find_xml_for_xpath(xpath),
dict(filename=os.path.join(self.metadata.data,
"groups.xml"),
xmltree=groups_test_tree,
xquery=groups_test_tree.xpath(xpath)))
self.assertEqual(config.find_xml_for_xpath("//boguselement"), dict())
config.extras = ["foo.xml", "bar.xml", "clients.xml"]
def parse_side_effect(fname):
if fname == os.path.join(self.metadata.data, "clients.xml"):
return clients_test_tree
else:
return lxml.etree.XML("").getroottree()
mock_parse.side_effect = parse_side_effect
xpath = "//Client[@secure='true']"
self.assertItemsEqual(config.find_xml_for_xpath(xpath),
dict(filename=os.path.join(self.metadata.data,
"clients.xml"),
xmltree=clients_test_tree,
xquery=clients_test_tree.xpath(xpath)))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
def test_HandleEvent(self, mock_load_xml):
config = self.get_config_object("groups.xml")
evt = Mock()
evt.filename = os.path.join(self.metadata.data, "groups.xml")
evt.code2str = Mock(return_value="changed")
self.assertTrue(config.HandleEvent(evt))
mock_load_xml.assert_called_with()
class TestClientMetadata(unittest.TestCase):
def test_inGroup(self):
cm = ClientMetadata("client1", "group1", ["group1", "group2"],
["bundle1"], [], [], [], None, None, None)
self.assertTrue(cm.inGroup("group1"))
self.assertFalse(cm.inGroup("group3"))
class TestMetadata(unittest.TestCase):
def test__init_no_fam(self):
# test with watch_clients=False
core = Mock()
metadata = get_metadata_object(core=core)
self.check_metadata_object(metadata)
self.assertEqual(metadata.states, dict())
def test__init_with_fam(self):
# test with watch_clients=True
core = Mock()
core.fam = Mock()
metadata = get_metadata_object(core=core, watch_clients=True)
self.assertEqual(len(metadata.states), 2)
core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
"groups.xml"),
metadata)
core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
"clients.xml"),
metadata)
core.fam.reset_mock()
core.fam.AddMonitor = Mock(side_effect=IOError)
self.assertRaises(Bcfg2.Server.Plugin.PluginInitError,
get_metadata_object,
core=core, watch_clients=True)
def check_metadata_object(self, metadata):
self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin)
self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata)
self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics)
self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig)
self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig)
self.assertIsInstance(metadata.query, MetadataQuery)
@patch('os.makedirs', Mock())
@patch('__builtin__.open')
def test_init_repo(self, mock_open):
groups = "groups %s"
os_selection = "os"
clients = "clients %s"
Metadata.init_repo(datastore, groups, os_selection, clients)
mock_open.assert_any_call(os.path.join(datastore, "Metadata",
"groups.xml"), "w")
mock_open.assert_any_call(os.path.join(datastore, "Metadata",
"clients.xml"), "w")
@patch('lxml.etree.parse')
def test_get_groups(self, mock_parse):
metadata = get_metadata_object()
mock_parse.return_value = groups_test_tree
groups = metadata.get_groups()
mock_parse.assert_called_with(os.path.join(datastore, "Metadata",
"groups.xml"))
self.assertIsInstance(groups, lxml.etree._Element)
def test_search_xdata_name(self):
# test finding a node with the proper name
metadata = get_metadata_object()
tree = groups_test_tree
res = metadata._search_xdata("Group", "group1", tree)
self.assertIsInstance(res, lxml.etree._Element)
self.assertEqual(res.get("name"), "group1")
def test_search_xdata_alias(self):
# test finding a node with the wrong name but correct alias
metadata = get_metadata_object()
tree = clients_test_tree
res = metadata._search_xdata("Client", "alias3", tree, alias=True)
self.assertIsInstance(res, lxml.etree._Element)
self.assertNotEqual(res.get("name"), "alias3")
def test_search_xdata_not_found(self):
# test failure finding a node
metadata = get_metadata_object()
tree = clients_test_tree
res = metadata._search_xdata("Client", "bogus_client", tree, alias=True)
self.assertIsNone(res)
def search_xdata(self, tag, name, tree, alias=False):
metadata = get_metadata_object()
res = metadata._search_xdata(tag, name, tree, alias=alias)
self.assertIsInstance(res, lxml.etree._Element)
if not alias:
self.assertEqual(res.get("name"), name)
def test_search_group(self):
# test finding a group with the proper name
tree = groups_test_tree
self.search_xdata("Group", "group1", tree)
def test_search_bundle(self):
# test finding a bundle with the proper name
tree = groups_test_tree
self.search_xdata("Bundle", "bundle1", tree)
def test_search_client(self):
# test finding a client with the proper name
tree = clients_test_tree
self.search_xdata("Client", "client1", tree, alias=True)
self.search_xdata("Client", "alias1", tree, alias=True)
def test_add_group(self):
metadata = get_metadata_object()
metadata.groups_xml.write = Mock()
metadata.groups_xml.data = lxml.etree.XML('').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.add_group("test1", dict())
metadata.groups_xml.write.assert_any_call()
grp = metadata.search_group("test1", metadata.groups_xml.base_xdata)
self.assertIsNotNone(grp)
self.assertEqual(grp.attrib, dict(name='test1'))
# have to call this explicitly -- usually load_xml does this
# on FAM events
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.add_group("test2", dict(foo='bar'))
metadata.groups_xml.write.assert_any_call()
grp = metadata.search_group("test2", metadata.groups_xml.base_xdata)
self.assertIsNotNone(grp)
self.assertEqual(grp.attrib, dict(name='test2', foo='bar'))
# have to call this explicitly -- usually load_xml does this
# on FAM events
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.groups_xml.write.reset_mock()
self.assertRaises(MetadataConsistencyError,
metadata.add_group,
"test1", dict())
self.assertFalse(metadata.groups_xml.write.called)
def test_update_group(self):
metadata = get_metadata_object()
metadata.groups_xml.write_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(groups_test_tree)
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.update_group("group1", dict(foo="bar"))
grp = metadata.search_group("group1", metadata.groups_xml.base_xdata)
self.assertIsNotNone(grp)
self.assertIn("foo", grp.attrib)
self.assertEqual(grp.get("foo"), "bar")
self.assertTrue(metadata.groups_xml.write_xml.called)
self.assertRaises(MetadataConsistencyError,
metadata.update_group,
"bogus_group", dict())
def test_remove_group(self):
metadata = get_metadata_object()
metadata.groups_xml.write_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(groups_test_tree)
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.remove_group("group5")
grp = metadata.search_group("group5", metadata.groups_xml.base_xdata)
self.assertIsNone(grp)
self.assertTrue(metadata.groups_xml.write_xml.called)
self.assertRaises(MetadataConsistencyError,
metadata.remove_group,
"bogus_group")
def test_add_bundle(self):
metadata = get_metadata_object()
metadata.groups_xml.write = Mock()
metadata.groups_xml.data = lxml.etree.XML('').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.add_bundle("bundle1")
metadata.groups_xml.write.assert_any_call()
bundle = metadata.search_bundle("bundle1",
metadata.groups_xml.base_xdata)
self.assertIsNotNone(bundle)
self.assertEqual(bundle.attrib, dict(name='bundle1'))
# have to call this explicitly -- usually load_xml does this
# on FAM events
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.groups_xml.write.reset_mock()
self.assertRaises(MetadataConsistencyError,
metadata.add_bundle,
"bundle1")
self.assertFalse(metadata.groups_xml.write.called)
def test_remove_bundle(self):
metadata = get_metadata_object()
metadata.groups_xml.write_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(groups_test_tree)
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.remove_bundle("bundle1")
grp = metadata.search_bundle("bundle1", metadata.groups_xml.base_xdata)
self.assertIsNone(grp)
self.assertTrue(metadata.groups_xml.write_xml.called)
self.assertRaises(MetadataConsistencyError,
metadata.remove_bundle,
"bogus_bundle")
def test_add_client(self):
metadata = get_metadata_object()
metadata.clients_xml.write = Mock()
metadata.clients_xml.data = lxml.etree.XML('').getroottree()
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
metadata.add_client("test1", dict())
metadata.clients_xml.write.assert_any_call()
grp = metadata.search_client("test1", metadata.clients_xml.base_xdata)
self.assertIsNotNone(grp)
self.assertEqual(grp.attrib, dict(name='test1'))
# have to call this explicitly -- usually load_xml does this
# on FAM events
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
metadata.add_client("test2", dict(foo='bar'))
metadata.clients_xml.write.assert_any_call()
grp = metadata.search_client("test2", metadata.clients_xml.base_xdata)
self.assertIsNotNone(grp)
self.assertEqual(grp.attrib, dict(name='test2', foo='bar'))
# have to call this explicitly -- usually load_xml does this
# on FAM events
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
metadata.clients_xml.write.reset_mock()
self.assertRaises(MetadataConsistencyError,
metadata.add_client,
"test1", dict())
self.assertFalse(metadata.clients_xml.write.called)
def test_update_client(self):
metadata = get_metadata_object()
metadata.clients_xml.write_xml = Mock()
metadata.clients_xml.data = copy.deepcopy(clients_test_tree)
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
metadata.update_client("client1", dict(foo="bar"))
grp = metadata.search_client("client1", metadata.clients_xml.base_xdata)
self.assertIsNotNone(grp)
self.assertIn("foo", grp.attrib)
self.assertEqual(grp.get("foo"), "bar")
self.assertTrue(metadata.clients_xml.write_xml.called)
self.assertRaises(MetadataConsistencyError,
metadata.update_client,
"bogus_client", dict())
def load_clients_data(self, metadata=None, xdata=None):
if metadata is None:
metadata = get_metadata_object()
metadata.clients_xml.data = xdata or copy.deepcopy(clients_test_tree)
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
evt = Mock()
evt.filename = os.path.join(datastore, "Metadata", "clients.xml")
evt.code2str = Mock(return_value="changed")
metadata.HandleEvent(evt)
return metadata
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
def test_clients_xml_event(self, mock_load_xml):
metadata = get_metadata_object()
metadata.profiles = ["group1", "group2"]
self.load_clients_data(metadata=metadata)
mock_load_xml.assert_any_call()
self.assertItemsEqual(metadata.clients,
dict([(c.get("name"), c.get("profile"))
for c in clients_test_tree.findall("//Client")]))
aliases = dict([(a.get("name"), a.getparent().get("name"))
for a in clients_test_tree.findall("//Alias")])
self.assertItemsEqual(metadata.aliases, aliases)
raliases = dict([(c.get("name"), set())
for c in clients_test_tree.findall("//Client")])
for alias in clients_test_tree.findall("//Alias"):
raliases[alias.getparent().get("name")].add(alias.get("name"))
self.assertItemsEqual(metadata.raliases, raliases)
self.assertEqual(metadata.bad_clients, dict())
self.assertEqual(metadata.secure,
[c.get("name")
for c in clients_test_tree.findall("//Client[@secure='true']")])
self.assertEqual(metadata.floating, ["client1"])
addresses = dict([(c.get("address"), [])
for c in clients_test_tree.findall("//*[@address]")])
raddresses = dict()
for client in clients_test_tree.findall("//Client[@address]"):
addresses[client.get("address")].append(client.get("name"))
try:
raddresses[client.get("name")].append(client.get("address"))
except KeyError:
raddresses[client.get("name")] = [client.get("address")]
for alias in clients_test_tree.findall("//Alias[@address]"):
addresses[alias.get("address")].append(alias.getparent().get("name"))
try:
raddresses[alias.getparent().get("name")].append(alias.get("address"))
except KeyError:
raddresses[alias.getparent().get("name")] = alias.get("address")
self.assertItemsEqual(metadata.addresses, addresses)
self.assertItemsEqual(metadata.raddresses, raddresses)
self.assertTrue(metadata.states['clients.xml'])
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_clients_xml_event_bad_clients(self):
metadata = get_metadata_object()
metadata.profiles = ["group2"]
self.load_clients_data(metadata=metadata)
clients = dict()
badclients = dict()
for client in clients_test_tree.findall("//Client"):
if client.get("profile") in metadata.profiles:
clients[client.get("name")] = client.get("profile")
else:
badclients[client.get("name")] = client.get("profile")
self.assertItemsEqual(metadata.clients, clients)
self.assertItemsEqual(metadata.bad_clients, badclients)
def load_groups_data(self, metadata=None, xdata=None):
if metadata is None:
metadata = get_metadata_object()
metadata.groups_xml.data = xdata or copy.deepcopy(groups_test_tree)
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
evt = Mock()
evt.filename = os.path.join(datastore, "Metadata", "groups.xml")
evt.code2str = Mock(return_value="changed")
metadata.HandleEvent(evt)
return metadata
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
def test_groups_xml_event(self, mock_load_xml):
dup_data = copy.deepcopy(groups_test_tree)
lxml.etree.SubElement(dup_data.getroot(),
"Group", name="group1")
metadata = self.load_groups_data(xdata=dup_data)
mock_load_xml.assert_any_call()
self.assertEqual(metadata.public, ["group1", "group2"])
self.assertEqual(metadata.private, ["group3"])
self.assertEqual(metadata.profiles, ["group1", "group2"])
self.assertItemsEqual(metadata.groups.keys(),
[g.get("name")
for g in groups_test_tree.findall("/Group")])
self.assertEqual(metadata.categories,
dict(group1="category1",
group2="category1",
group3="category2",
group4="category1"))
self.assertEqual(metadata.default, "group1")
self.assertTrue(metadata.states['groups.xml'])
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client")
@patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
def test_set_profile(self, mock_update_client, mock_add_client):
metadata = get_metadata_object()
metadata.states['clients.xml'] = False
self.assertRaises(MetadataRuntimeError,
metadata.set_profile,
None, None, None)
self.load_groups_data(metadata=metadata)
self.load_clients_data(metadata=metadata)
self.assertRaises(MetadataConsistencyError,
metadata.set_profile,
"client1", "group5", None)
metadata.clients_xml.write = Mock()
metadata.set_profile("client1", "group2", None)
mock_update_client.assert_called_with("client1", dict(profile="group2"))
metadata.clients_xml.write.assert_any_call()
self.assertEqual(metadata.clients["client1"], "group2")
metadata.clients_xml.write.reset_mock()
metadata.set_profile("client_new", "group1", None)
mock_add_client.assert_called_with("client_new", dict(profile="group1"))
metadata.clients_xml.write.assert_any_call()
self.assertEqual(metadata.clients["client_new"], "group1")
metadata.session_cache[('1.2.3.6', None)] = (None, 'client_new2')
metadata.clients_xml.write.reset_mock()
metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None))
mock_add_client.assert_called_with("client_new2",
dict(uuid='uuid_new',
profile="group1",
address='1.2.3.6'))
metadata.clients_xml.write.assert_any_call()
self.assertEqual(metadata.clients["uuid_new"], "group1")
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch("socket.gethostbyaddr")
def test_resolve_client(self, mock_gethostbyaddr):
metadata = self.load_clients_data(metadata=self.load_groups_data())
metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
self.assertRaises(MetadataConsistencyError,
metadata.resolve_client,
('1.2.3.2', None))
self.assertEqual(metadata.resolve_client(('1.2.3.1', None)), 'client1')
metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
'client3')
self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
cleanup_cache=True), 'client3')
self.assertEqual(metadata.session_cache, dict())
mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
mock_gethostbyaddr.assert_called_with('1.2.3.6')
mock_gethostbyaddr.reset_mock()
mock_gethostbyaddr.return_value = ('alias3', [], ['1.2.3.7'])
self.assertEqual(metadata.resolve_client(('1.2.3.7', None)), 'client4')
mock_gethostbyaddr.assert_called_with('1.2.3.7')
mock_gethostbyaddr.reset_mock()
mock_gethostbyaddr.return_value = None
mock_gethostbyaddr.side_effect = socket.herror
self.assertRaises(MetadataConsistencyError,
metadata.resolve_client,
('1.2.3.8', None))
mock_gethostbyaddr.assert_called_with('1.2.3.8')
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
@patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
def test_get_initial_metadata(self, mock_clientmetadata):
metadata = get_metadata_object()
metadata.states['clients.xml'] = False
self.assertRaises(MetadataRuntimeError,
metadata.get_initial_metadata, None)
self.load_groups_data(metadata=metadata)
self.load_clients_data(metadata=metadata)
metadata.get_initial_metadata("client1")
self.assertEqual(mock_clientmetadata.call_args[0][:9],
("client1", "group1", set(["group1"]), set(), set(),
set(["1.2.3.1"]), dict(), None, 'password2'))
metadata.get_initial_metadata("client2")
self.assertEqual(mock_clientmetadata.call_args[0][:9],
("client2", "group2", set(["group1", "group2"]),
set(["bundle1", "bundle2"]), set(),
set(["1.2.3.2"]), dict(category1="group1"),
None, None))
imd = metadata.get_initial_metadata("alias1")
self.assertEqual(mock_clientmetadata.call_args[0][:9],
("client3", "group1", set(["group1"]), set(),
set(['alias1']), set(["1.2.3.3"]), dict(), 'uuid1',
'password2'))
imd = metadata.get_initial_metadata("client_new")
self.assertEqual(mock_clientmetadata.call_args[0][:9],
("client_new", "group1", set(["group1"]), set(),
set(), set(), dict(), None, None))
metadata.default = None
self.assertRaises(MetadataConsistencyError,
metadata.get_initial_metadata,
"client_new2")
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_get_all_group_names(self):
metadata = self.load_groups_data()
self.assertItemsEqual(metadata.get_all_group_names(),
set([g.get("name")
for g in groups_test_tree.findall("//Group")]))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_get_all_groups_in_category(self):
metadata = self.load_groups_data()
self.assertItemsEqual(metadata.get_all_groups_in_category("category1"),
set([g.get("name")
for g in groups_test_tree.findall("//Group[@category='category1']")]))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_get_client_names_by_profiles(self):
metadata = self.load_clients_data(metadata=self.load_groups_data())
self.assertItemsEqual(metadata.get_client_names_by_profiles("group2"),
[c.get("name")
for c in clients_test_tree.findall("//Client[@profile='group2']")])
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_get_client_names_by_groups(self):
metadata = self.load_clients_data(metadata=self.load_groups_data())
# this is not the best test in the world, since we mock
# core.build_metadata to just build _initial_ metadata, which
# is not at all the same thing. it turns out that mocking
# this out without starting a Bcfg2 server is pretty
# non-trivial, so this works-ish
metadata.core.build_metadata = Mock()
metadata.core.build_metadata.side_effect = \
lambda c: metadata.get_initial_metadata(c)
self.assertItemsEqual(metadata.get_client_names_by_groups(["group2"]),
[c.get("name")
for c in clients_test_tree.findall("//Client[@profile='group2']")])
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_merge_additional_groups(self):
metadata = self.load_clients_data(metadata=self.load_groups_data())
imd = metadata.get_initial_metadata("client2")
# test adding a group excluded by categories
oldgroups = imd.groups
metadata.merge_additional_groups(imd, ["group4"])
self.assertEqual(imd.groups, oldgroups)
# test adding a private group
oldgroups = imd.groups
metadata.merge_additional_groups(imd, ["group3"])
self.assertEqual(imd.groups, oldgroups)
# test adding groups with bundles
oldgroups = imd.groups
oldbundles = imd.bundles
metadata.merge_additional_groups(imd, ["group7"])
self.assertEqual(imd.groups, oldgroups.union(["group7"]))
self.assertEqual(imd.bundles, oldbundles.union(["bundle3"]))
# test adding multiple groups
imd = metadata.get_initial_metadata("client2")
oldgroups = imd.groups
metadata.merge_additional_groups(imd, ["group6", "group8"])
self.assertItemsEqual(imd.groups,
oldgroups.union(["group6", "group8", "group9"]))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_merge_additional_data(self):
metadata = self.load_clients_data(metadata=self.load_groups_data())
imd = metadata.get_initial_metadata("client1")
# we need to use a unique attribute name for this test. this
# is probably overkill, but it works
pattern = "connector%d"
for i in range(0, 100):
connector = pattern % i
if not hasattr(imd, connector):
break
self.assertFalse(hasattr(imd, connector),
"Could not find unique connector name to test "
"merge_additional_data()")
metadata.merge_additional_data(imd, connector, "test data")
self.assertEqual(getattr(imd, connector), "test data")
self.assertIn(connector, imd.connectors)
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
def test_validate_client_address(self, mock_resolve_client):
metadata = self.load_clients_data(metadata=self.load_groups_data())
self.assertTrue(metadata.validate_client_address("client1",
(None, None)))
self.assertTrue(metadata.validate_client_address("client2",
("1.2.3.2", None)))
self.assertFalse(metadata.validate_client_address("client2",
("1.2.3.8", None)))
self.assertTrue(metadata.validate_client_address("client4",
("1.2.3.2", None)))
# this is upper case to ensure that case is folded properly in
# validate_client_address()
mock_resolve_client.return_value = "CLIENT4"
self.assertTrue(metadata.validate_client_address("client4",
("1.2.3.7", None)))
mock_resolve_client.assert_called_with(("1.2.3.7", None))
mock_resolve_client.reset_mock()
self.assertFalse(metadata.validate_client_address("client5",
("1.2.3.5", None)))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
@patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
def test_AuthenticateConnection(self, mock_resolve_client,
mock_validate_client_address):
metadata = self.load_clients_data(metadata=self.load_groups_data())
metadata.password = "password1"
cert = dict(subject=[[("commonName", "client1")]])
mock_validate_client_address.return_value = False
self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
"1.2.3.1"))
mock_validate_client_address.return_value = True
self.assertTrue(metadata.AuthenticateConnection(cert, "root", None,
"1.2.3.1"))
# floating cert-auth clients add themselves to the cache
self.assertIn("1.2.3.1", metadata.session_cache)
self.assertEqual(metadata.session_cache["1.2.3.1"][1], "client1")
cert = dict(subject=[[("commonName", "client7")]])
self.assertTrue(metadata.AuthenticateConnection(cert, "root", None,
"1.2.3.4"))
# non-floating cert-auth clients do not add themselves to the cache
self.assertNotIn("1.2.3.4", metadata.session_cache)
cert = dict(subject=[[("commonName", "client8")]])
mock_resolve_client.return_value = "client5"
self.assertTrue(metadata.AuthenticateConnection(None, "root",
"password1", "1.2.3.8"))
mock_resolve_client.side_effect = MetadataConsistencyError
self.assertFalse(metadata.AuthenticateConnection(None, "root",
"password1",
"1.2.3.8"))
# secure mode, no password
self.assertFalse(metadata.AuthenticateConnection(None, 'client2', None,
"1.2.3.2"))
self.assertTrue(metadata.AuthenticateConnection(None, 'uuid1',
"password1", "1.2.3.3"))
# non-root, non-cert clients populate session cache
self.assertIn("1.2.3.3", metadata.session_cache)
self.assertEqual(metadata.session_cache["1.2.3.3"][1], "client3")
# use alternate password
self.assertTrue(metadata.AuthenticateConnection(None, 'client3',
"password2", "1.2.3.3"))
# test secure mode
self.assertFalse(metadata.AuthenticateConnection(None, 'client9',
"password1",
"1.2.3.9"))
self.assertTrue(metadata.AuthenticateConnection(None, 'client9',
"password3", "1.2.3.9"))
self.assertFalse(metadata.AuthenticateConnection(None, "client5",
"password2",
"1.2.3.7"))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
def test_process_statistics(self, mock_update_client):
metadata = self.load_clients_data(metadata=self.load_groups_data())
md = Mock()
md.hostname = "client6"
metadata.process_statistics(md, None)
mock_update_client.assert_called_with(md.hostname,
dict(auth='cert'))
mock_update_client.reset_mock()
md.hostname = "client5"
metadata.process_statistics(md, None)
self.assertFalse(mock_update_client.called)
def test_viz(self):
pass