summaryrefslogtreecommitdiffstats
path: root/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py')
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py1087
1 files changed, 826 insertions, 261 deletions
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
index 455731d00..2ff0af78e 100644
--- a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
+++ b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
@@ -1,17 +1,31 @@
import os
+import sys
import copy
import time
import socket
-import unittest
import lxml.etree
-from mock import Mock, patch
+import Bcfg2.Server
import Bcfg2.Server.Plugin
from Bcfg2.Server.Plugins.Metadata import *
+from mock import Mock, patch
-XI_NAMESPACE = "http://www.w3.org/2001/XInclude"
-XI = "{%s}" % XI_NAMESPACE
-
-clients_test_tree = lxml.etree.XML('''
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore
+from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \
+ TestStatistics, TestDatabaseBacked
+
+def get_clients_test_tree():
+ return lxml.etree.XML('''
<Clients>
<Client name="client1" address="1.2.3.1" auth="cert"
location="floating" password="password2" profile="group1"/>
@@ -30,10 +44,19 @@ clients_test_tree = lxml.etree.XML('''
<Client name="client8" profile="group1" auth="cert+password"
address="1.2.3.5"/>
<Client name="client9" profile="group2" secure="true" password="password3"/>
+ <Client name="client10" profile="group1" floating="true"/>
</Clients>''').getroottree()
-groups_test_tree = lxml.etree.XML('''
+def get_groups_test_tree():
+ return lxml.etree.XML('''
<Groups xmlns:xi="http://www.w3.org/2001/XInclude">
+ <Client name="client8">
+ <Group name="group8"/>
+ </Client>
+ <Client name="client9">
+ <Group name="group8"/>
+ </Client>
+
<Group name="group1" default="true" profile="true" public="true"
category="category1"/>
<Group name="group2" profile="true" public="true" category="category1">
@@ -53,103 +76,246 @@ groups_test_tree = lxml.etree.XML('''
</Group>
<Group name="group8">
<Group name="group9"/>
+ <Client name="client9">
+ <Group name="group11"/>
+ <Group name="group9" negate="true"/>
+ </Client>
+ <Group name="group1">
+ <Group name="group10"/>
+ </Group>
</Group>
</Groups>''').getroottree()
-datastore = "/"
-def get_metadata_object(core=None, watch_clients=False):
+def get_metadata_object(core=None, watch_clients=False, use_db=False):
if core is None:
core = Mock()
- metadata = Metadata(core, datastore, watch_clients=watch_clients)
- #metadata.logger = Mock()
- return metadata
+ core.setup.cfp.getboolean = Mock(return_value=use_db)
+ return Metadata(core, datastore, watch_clients=watch_clients)
+
+
+class TestMetadataDB(DBModelTestCase):
+ if has_django:
+ models = [MetadataClientModel]
+
+
+if has_django or can_skip:
+ class TestClientVersions(Bcfg2TestCase):
+ test_clients = dict(client1="1.2.0",
+ client2="1.2.2",
+ client3="1.3.0pre1",
+ client4="1.1.0",
+ client5=None,
+ client6=None)
+
+ @skipUnless(has_django, "Django not found")
+ def setUp(self):
+ syncdb(TestMetadataDB)
+ for client, version in self.test_clients.items():
+ MetadataClientModel(hostname=client, version=version).save()
+
+ def test__contains(self):
+ v = ClientVersions()
+ self.assertIn("client1", v)
+ self.assertIn("client5", v)
+ self.assertNotIn("client__contains", v)
+
+ def test_keys(self):
+ v = ClientVersions()
+ self.assertItemsEqual(self.test_clients.keys(), v.keys())
+
+ def test__setitem(self):
+ v = ClientVersions()
+
+ # test setting version of existing client
+ v["client1"] = "1.2.3"
+ self.assertIn("client1", v)
+ self.assertEqual(v['client1'], "1.2.3")
+ client = MetadataClientModel.objects.get(hostname="client1")
+ self.assertEqual(client.version, "1.2.3")
+
+ # test adding new client
+ new = "client__setitem"
+ v[new] = "1.3.0"
+ self.assertIn(new, v)
+ self.assertEqual(v[new], "1.3.0")
+ client = MetadataClientModel.objects.get(hostname=new)
+ self.assertEqual(client.version, "1.3.0")
+
+ # test adding new client with no version
+ new2 = "client__setitem_2"
+ v[new2] = None
+ self.assertIn(new2, v)
+ self.assertEqual(v[new2], None)
+ client = MetadataClientModel.objects.get(hostname=new2)
+ self.assertEqual(client.version, None)
+
+ def test__getitem(self):
+ v = ClientVersions()
+
+ # test getting existing client
+ self.assertEqual(v['client2'], "1.2.2")
+ self.assertIsNone(v['client5'])
+
+ # test exception on nonexistent client
+ expected = KeyError
+ try:
+ v['clients__getitem']
+ except expected:
+ pass
+ except:
+ err = sys.exc_info()[1]
+ self.assertFalse(True, "%s raised instead of %s" %
+ (err.__class__.__name__,
+ expected.__class__.__name__))
+ else:
+ self.assertFalse(True,
+ "%s not raised" % expected.__class__.__name__)
+ def test__len(self):
+ v = ClientVersions()
+ self.assertEqual(len(v), MetadataClientModel.objects.count())
-class TestXMLMetadataConfig(unittest.TestCase):
- def get_config_object(self, basefile="clients.xml", core=None,
- watch_clients=False):
+ def test__iter(self):
+ v = ClientVersions()
+ self.assertItemsEqual([h for h in iter(v)], v.keys())
+
+ def test__delitem(self):
+ v = ClientVersions()
+
+ # test adding new client
+ new = "client__delitem"
+ v[new] = "1.3.0"
+
+ del v[new]
+ self.assertIn(new, v)
+ self.assertIsNone(v[new])
+
+
+class TestXMLMetadataConfig(TestXMLFileBacked):
+ test_obj = XMLMetadataConfig
+
+ def get_obj(self, basefile="clients.xml", core=None, watch_clients=False):
self.metadata = get_metadata_object(core=core,
watch_clients=watch_clients)
return XMLMetadataConfig(self.metadata, watch_clients, basefile)
+ def test__init(self):
+ xmc = self.get_obj()
+ self.assertEqual(self.metadata.core.fam, xmc.fam)
+ self.assertFalse(xmc.fam.AddMonitor.called)
+
def test_xdata(self):
- config = self.get_config_object()
- # we can't use assertRaises here because xdata is a property
+ config = self.get_obj()
+ expected = Bcfg2.Server.Plugin.MetadataRuntimeError
try:
config.xdata
- except MetadataRuntimeError:
+ except expected:
pass
except:
- assert False
+ err = sys.exc_info()[1]
+ self.assertFalse(True, "%s raised instead of %s" %
+ (err.__class__.__name__,
+ expected.__class__.__name__))
+ else:
+ self.assertFalse(True,
+ "%s not raised" % expected.__class__.__name__)
+ pass
+
config.data = "<test/>"
self.assertEqual(config.xdata, "<test/>")
def test_base_xdata(self):
- config = self.get_config_object()
+ config = self.get_obj()
# we can't use assertRaises here because base_xdata is a property
+ expected = Bcfg2.Server.Plugin.MetadataRuntimeError
try:
config.base_xdata
- except MetadataRuntimeError:
+ except expected:
pass
except:
- assert False
+ err = sys.exc_info()[1]
+ self.assertFalse(True, "%s raised instead of %s" %
+ (err.__class__.__name__,
+ expected.__class__.__name__))
+ else:
+ self.assertFalse(True,
+ "%s not raised" % expected.__class__.__name__)
+ pass
+
config.basedata = "<test/>"
self.assertEqual(config.base_xdata, "<test/>")
def test_add_monitor(self):
core = Mock()
- config = self.get_config_object(core=core)
+ config = self.get_obj(core=core)
fname = "test.xml"
- fpath = os.path.join(self.metadata.data, "test.xml")
+ fpath = os.path.join(self.metadata.data, fname)
config.extras = []
- config.add_monitor(fpath, fname)
+ config.add_monitor(fpath)
self.assertFalse(core.fam.AddMonitor.called)
- self.assertEqual(config.extras, [])
+ self.assertEqual(config.extras, [fpath])
+
+ config = self.get_obj(core=core, watch_clients=True)
+ config.add_monitor(fpath)
+ core.fam.AddMonitor.assert_called_with(fpath, config.metadata)
+ self.assertItemsEqual(config.extras, [fpath])
- config = self.get_config_object(core=core, watch_clients=True)
- config.add_monitor(fpath, fname)
- core.fam.AddMonitor.assert_called_with(fpath, self.metadata)
- self.assertItemsEqual(config.extras, [fname])
+ def test_Index(self):
+ # Index() isn't used on XMLMetadataConfig objects
+ pass
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.add_monitor")
@patch("lxml.etree.parse")
- def test_load_xml(self, mock_parse, mock_add_monitor):
- config = self.get_config_object("clients.xml")
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig._follow_xincludes")
+ def test_load_xml(self, mock_follow, mock_parse):
+ config = self.get_obj("clients.xml")
+
+ def reset():
+ mock_parse.reset_mock()
+ mock_follow.reset_mock()
+ config.data = None
+ config.basedata = None
+
+ reset()
+ config.load_xml()
+ mock_follow.assert_called_with(xdata=mock_parse.return_value)
+ mock_parse.assert_called_with(os.path.join(config.basedir,
+ "clients.xml"),
+ parser=Bcfg2.Server.XMLParser)
+ self.assertFalse(mock_parse.return_value.xinclude.called)
+ self.assertEqual(config.data, mock_parse.return_value)
+ self.assertIsNotNone(config.basedata)
+
+ reset()
mock_parse.side_effect = lxml.etree.XMLSyntaxError(None, None, None,
None)
config.load_xml()
+ mock_parse.assert_called_with(os.path.join(config.basedir,
+ "clients.xml"),
+ parser=Bcfg2.Server.XMLParser)
self.assertIsNone(config.data)
self.assertIsNone(config.basedata)
- config.data = None
- config.basedata = None
+ reset()
mock_parse.side_effect = None
- mock_parse.return_value.findall = Mock(return_value=[])
+ def follow_xincludes(xdata=None):
+ config.extras = [Mock(), Mock()]
+ mock_follow.side_effect = follow_xincludes
config.load_xml()
- self.assertIsNotNone(config.data)
+ mock_follow.assert_called_with(xdata=mock_parse.return_value)
+ mock_parse.assert_called_with(os.path.join(config.basedir,
+ "clients.xml"),
+ parser=Bcfg2.Server.XMLParser)
+ mock_parse.return_value.xinclude.assert_any_call()
+ self.assertEqual(config.data, mock_parse.return_value)
self.assertIsNotNone(config.basedata)
- config.data = None
- config.basedata = None
-
- def side_effect(*args):
- def second_call(*args):
- return []
- mock_parse.return_value.findall.side_effect = second_call
- return [lxml.etree.Element(XI + "include", href="more.xml"),
- lxml.etree.Element(XI + "include", href="evenmore.xml")]
-
- mock_parse.return_value.findall = Mock(side_effect=side_effect)
- config.load_xml()
- mock_add_monitor.assert_any_call("more.xml")
- mock_add_monitor.assert_any_call("evenmore.xml")
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml")
def test_write(self, mock_write_xml):
- config = self.get_config_object("clients.xml")
+ config = self.get_obj("clients.xml")
config.basedata = "<test/>"
config.write()
mock_write_xml.assert_called_with(os.path.join(self.metadata.data,
@@ -158,7 +324,7 @@ class TestXMLMetadataConfig(unittest.TestCase):
@patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False))
@patch('fcntl.lockf', Mock())
- @patch('__builtin__.open')
+ @patch('%s.open' % builtins)
@patch('os.unlink')
@patch('os.rename')
@patch('os.path.islink')
@@ -166,14 +332,14 @@ class TestXMLMetadataConfig(unittest.TestCase):
def test_write_xml(self, mock_readlink, mock_islink, mock_rename,
mock_unlink, mock_open):
fname = "clients.xml"
- config = self.get_config_object(fname)
+ config = self.get_obj(fname)
fpath = os.path.join(self.metadata.data, fname)
tmpfile = "%s.new" % fpath
linkdest = os.path.join(self.metadata.data, "client-link.xml")
mock_islink.return_value = False
- config.write_xml(fpath, clients_test_tree)
+ config.write_xml(fpath, get_clients_test_tree())
mock_open.assert_called_with(tmpfile, "w")
self.assertTrue(mock_open.return_value.write.called)
mock_islink.assert_called_with(fpath)
@@ -181,41 +347,42 @@ class TestXMLMetadataConfig(unittest.TestCase):
mock_islink.return_value = True
mock_readlink.return_value = linkdest
- config.write_xml(fpath, clients_test_tree)
+ config.write_xml(fpath, get_clients_test_tree())
mock_rename.assert_called_with(tmpfile, linkdest)
mock_rename.side_effect = OSError
- self.assertRaises(MetadataRuntimeError,
- config.write_xml, fpath, clients_test_tree)
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ config.write_xml, fpath, get_clients_test_tree())
mock_open.return_value.write.side_effect = IOError
- self.assertRaises(MetadataRuntimeError,
- config.write_xml, fpath, clients_test_tree)
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ config.write_xml, fpath, get_clients_test_tree())
mock_unlink.assert_called_with(tmpfile)
mock_open.side_effect = IOError
- self.assertRaises(MetadataRuntimeError,
- config.write_xml, fpath, clients_test_tree)
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ config.write_xml, fpath, get_clients_test_tree())
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch('lxml.etree.parse')
def test_find_xml_for_xpath(self, mock_parse):
- config = self.get_config_object("groups.xml")
- config.basedata = groups_test_tree
+ config = self.get_obj("groups.xml")
+ config.basedata = get_groups_test_tree()
xpath = "//Group[@name='group1']"
self.assertItemsEqual(config.find_xml_for_xpath(xpath),
dict(filename=os.path.join(self.metadata.data,
"groups.xml"),
- xmltree=groups_test_tree,
- xquery=groups_test_tree.xpath(xpath)))
+ xmltree=get_groups_test_tree(),
+ xquery=get_groups_test_tree().xpath(xpath)))
self.assertEqual(config.find_xml_for_xpath("//boguselement"), dict())
- config.extras = ["foo.xml", "bar.xml", "clients.xml"]
+ config.extras = [os.path.join(self.metadata.data, p)
+ for p in ["foo.xml", "bar.xml", "clients.xml"]]
- def parse_side_effect(fname):
+ def parse_side_effect(fname, parser=Bcfg2.Server.XMLParser):
if fname == os.path.join(self.metadata.data, "clients.xml"):
- return clients_test_tree
+ return get_clients_test_tree()
else:
return lxml.etree.XML("<null/>").getroottree()
@@ -224,12 +391,12 @@ class TestXMLMetadataConfig(unittest.TestCase):
self.assertItemsEqual(config.find_xml_for_xpath(xpath),
dict(filename=os.path.join(self.metadata.data,
"clients.xml"),
- xmltree=clients_test_tree,
- xquery=clients_test_tree.xpath(xpath)))
+ xmltree=get_clients_test_tree(),
+ xquery=get_clients_test_tree().xpath(xpath)))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
def test_HandleEvent(self, mock_load_xml):
- config = self.get_config_object("groups.xml")
+ config = self.get_obj("groups.xml")
evt = Mock()
evt.filename = os.path.join(self.metadata.data, "groups.xml")
evt.code2str = Mock(return_value="changed")
@@ -237,27 +404,55 @@ class TestXMLMetadataConfig(unittest.TestCase):
mock_load_xml.assert_called_with()
-class TestClientMetadata(unittest.TestCase):
+class TestClientMetadata(Bcfg2TestCase):
def test_inGroup(self):
cm = ClientMetadata("client1", "group1", ["group1", "group2"],
- ["bundle1"], [], [], [], None, None, None)
+ ["bundle1"], [], [], [], None, None, None, None)
self.assertTrue(cm.inGroup("group1"))
self.assertFalse(cm.inGroup("group3"))
-class TestMetadata(unittest.TestCase):
- def test__init_no_fam(self):
+class TestMetadata(_TestMetadata, TestStatistics, TestDatabaseBacked):
+ test_obj = Metadata
+ use_db = False
+
+ def get_obj(self, core=None, watch_clients=False):
+ return get_metadata_object(core=core, watch_clients=watch_clients,
+ use_db=self.use_db)
+
+ @skipUnless(has_django, "Django not found")
+ def test__use_db(self):
+ # with the way we've set up our metadata tests, it's unweildy
+ # to test _use_db. however, given the way get_obj works, if
+ # there was a bug in _use_db it'd be almost certain to shake
+ # out in the rest of the testing.
+ pass
+
+ def get_nonexistent_client(self, metadata, prefix="newclient"):
+ if metadata is None:
+ metadata = self.load_clients_data()
+ i = 0
+ client_name = "%s%s" % (prefix, i)
+ while client_name in metadata.clients:
+ i += 1
+ client_name = "%s%s" % (prefix, i)
+ return client_name
+
+ def test__init(self):
# test with watch_clients=False
core = Mock()
- metadata = get_metadata_object(core=core)
- self.check_metadata_object(metadata)
+ metadata = self.get_obj(core=core)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics)
+ self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig)
+ self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig)
+ self.assertIsInstance(metadata.query, MetadataQuery)
self.assertEqual(metadata.states, dict())
- def test__init_with_fam(self):
# test with watch_clients=True
- core = Mock()
core.fam = Mock()
- metadata = get_metadata_object(core=core, watch_clients=True)
+ metadata = self.get_obj(core=core, watch_clients=True)
self.assertEqual(len(metadata.states), 2)
core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
"groups.xml"),
@@ -269,63 +464,43 @@ class TestMetadata(unittest.TestCase):
core.fam.reset_mock()
core.fam.AddMonitor = Mock(side_effect=IOError)
self.assertRaises(Bcfg2.Server.Plugin.PluginInitError,
- get_metadata_object,
- core=core, watch_clients=True)
-
- def check_metadata_object(self, metadata):
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics)
- self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig)
- self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig)
- self.assertIsInstance(metadata.query, MetadataQuery)
+ self.get_obj, core=core, watch_clients=True)
@patch('os.makedirs', Mock())
- @patch('__builtin__.open')
+ @patch('%s.open' % builtins)
def test_init_repo(self, mock_open):
- groups = "groups %s"
- os_selection = "os"
- clients = "clients %s"
- Metadata.init_repo(datastore, groups, os_selection, clients)
+ Metadata.init_repo(datastore,
+ groups_xml="groups", clients_xml="clients")
mock_open.assert_any_call(os.path.join(datastore, "Metadata",
"groups.xml"), "w")
mock_open.assert_any_call(os.path.join(datastore, "Metadata",
"clients.xml"), "w")
- @patch('lxml.etree.parse')
- def test_get_groups(self, mock_parse):
- metadata = get_metadata_object()
- mock_parse.return_value = groups_test_tree
- groups = metadata.get_groups()
- mock_parse.assert_called_with(os.path.join(datastore, "Metadata",
- "groups.xml"))
- self.assertIsInstance(groups, lxml.etree._Element)
-
- def test_search_xdata_name(self):
+ def test_search_xdata(self):
# test finding a node with the proper name
- metadata = get_metadata_object()
- tree = groups_test_tree
+ metadata = self.get_obj()
+ tree = get_groups_test_tree()
res = metadata._search_xdata("Group", "group1", tree)
self.assertIsInstance(res, lxml.etree._Element)
self.assertEqual(res.get("name"), "group1")
- def test_search_xdata_alias(self):
# test finding a node with the wrong name but correct alias
- metadata = get_metadata_object()
- tree = clients_test_tree
+ metadata = self.get_obj()
+ tree = get_clients_test_tree()
res = metadata._search_xdata("Client", "alias3", tree, alias=True)
self.assertIsInstance(res, lxml.etree._Element)
self.assertNotEqual(res.get("name"), "alias3")
- def test_search_xdata_not_found(self):
# test failure finding a node
- metadata = get_metadata_object()
- tree = clients_test_tree
- res = metadata._search_xdata("Client", "bogus_client", tree, alias=True)
+ metadata = self.get_obj()
+ tree = get_clients_test_tree()
+ res = metadata._search_xdata("Client",
+ self.get_nonexistent_client(metadata),
+ tree, alias=True)
self.assertIsNone(res)
def search_xdata(self, tag, name, tree, alias=False):
- metadata = get_metadata_object()
+ metadata = self.get_obj()
res = metadata._search_xdata(tag, name, tree, alias=alias)
self.assertIsInstance(res, lxml.etree._Element)
if not alias:
@@ -333,22 +508,22 @@ class TestMetadata(unittest.TestCase):
def test_search_group(self):
# test finding a group with the proper name
- tree = groups_test_tree
+ tree = get_groups_test_tree()
self.search_xdata("Group", "group1", tree)
def test_search_bundle(self):
# test finding a bundle with the proper name
- tree = groups_test_tree
+ tree = get_groups_test_tree()
self.search_xdata("Bundle", "bundle1", tree)
def test_search_client(self):
# test finding a client with the proper name
- tree = clients_test_tree
+ tree = get_clients_test_tree()
self.search_xdata("Client", "client1", tree, alias=True)
self.search_xdata("Client", "alias1", tree, alias=True)
def test_add_group(self):
- metadata = get_metadata_object()
+ metadata = self.get_obj()
metadata.groups_xml.write = Mock()
metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -374,15 +549,15 @@ class TestMetadata(unittest.TestCase):
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.groups_xml.write.reset_mock()
- self.assertRaises(MetadataConsistencyError,
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.add_group,
"test1", dict())
self.assertFalse(metadata.groups_xml.write.called)
def test_update_group(self):
- metadata = get_metadata_object()
+ metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
- metadata.groups_xml.data = copy.deepcopy(groups_test_tree)
+ metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.update_group("group1", dict(foo="bar"))
@@ -392,14 +567,14 @@ class TestMetadata(unittest.TestCase):
self.assertEqual(grp.get("foo"), "bar")
self.assertTrue(metadata.groups_xml.write_xml.called)
- self.assertRaises(MetadataConsistencyError,
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.update_group,
"bogus_group", dict())
def test_remove_group(self):
- metadata = get_metadata_object()
+ metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
- metadata.groups_xml.data = copy.deepcopy(groups_test_tree)
+ metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.remove_group("group5")
@@ -407,12 +582,12 @@ class TestMetadata(unittest.TestCase):
self.assertIsNone(grp)
self.assertTrue(metadata.groups_xml.write_xml.called)
- self.assertRaises(MetadataConsistencyError,
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.remove_group,
"bogus_group")
def test_add_bundle(self):
- metadata = get_metadata_object()
+ metadata = self.get_obj()
metadata.groups_xml.write = Mock()
metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -429,15 +604,15 @@ class TestMetadata(unittest.TestCase):
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.groups_xml.write.reset_mock()
- self.assertRaises(MetadataConsistencyError,
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.add_bundle,
"bundle1")
self.assertFalse(metadata.groups_xml.write.called)
def test_remove_bundle(self):
- metadata = get_metadata_object()
+ metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
- metadata.groups_xml.data = copy.deepcopy(groups_test_tree)
+ metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
metadata.remove_bundle("bundle1")
@@ -445,46 +620,49 @@ class TestMetadata(unittest.TestCase):
self.assertIsNone(grp)
self.assertTrue(metadata.groups_xml.write_xml.called)
- self.assertRaises(MetadataConsistencyError,
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.remove_bundle,
"bogus_bundle")
def test_add_client(self):
- metadata = get_metadata_object()
+ metadata = self.get_obj()
metadata.clients_xml.write = Mock()
metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree()
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
- metadata.add_client("test1", dict())
+ new1 = self.get_nonexistent_client(metadata)
+ metadata.add_client(new1, dict())
metadata.clients_xml.write.assert_any_call()
- grp = metadata.search_client("test1", metadata.clients_xml.base_xdata)
+ grp = metadata.search_client(new1, metadata.clients_xml.base_xdata)
self.assertIsNotNone(grp)
- self.assertEqual(grp.attrib, dict(name='test1'))
+ self.assertEqual(grp.attrib, dict(name=new1))
# have to call this explicitly -- usually load_xml does this
# on FAM events
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+ metadata._handle_clients_xml_event(Mock())
- metadata.add_client("test2", dict(foo='bar'))
+ new2 = self.get_nonexistent_client(metadata)
+ metadata.add_client(new2, dict(foo='bar'))
metadata.clients_xml.write.assert_any_call()
- grp = metadata.search_client("test2", metadata.clients_xml.base_xdata)
+ grp = metadata.search_client(new2, metadata.clients_xml.base_xdata)
self.assertIsNotNone(grp)
- self.assertEqual(grp.attrib, dict(name='test2', foo='bar'))
+ self.assertEqual(grp.attrib, dict(name=new2, foo='bar'))
# have to call this explicitly -- usually load_xml does this
# on FAM events
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
metadata.clients_xml.write.reset_mock()
- self.assertRaises(MetadataConsistencyError,
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.add_client,
- "test1", dict())
+ new1, dict())
self.assertFalse(metadata.clients_xml.write.called)
def test_update_client(self):
- metadata = get_metadata_object()
+ metadata = self.get_obj()
metadata.clients_xml.write_xml = Mock()
- metadata.clients_xml.data = copy.deepcopy(clients_test_tree)
+ metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree())
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
metadata.update_client("client1", dict(foo="bar"))
@@ -494,14 +672,16 @@ class TestMetadata(unittest.TestCase):
self.assertEqual(grp.get("foo"), "bar")
self.assertTrue(metadata.clients_xml.write_xml.called)
- self.assertRaises(MetadataConsistencyError,
+ new = self.get_nonexistent_client(metadata)
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.update_client,
- "bogus_client", dict())
+ new, dict())
def load_clients_data(self, metadata=None, xdata=None):
if metadata is None:
- metadata = get_metadata_object()
- metadata.clients_xml.data = xdata or copy.deepcopy(clients_test_tree)
+ metadata = self.get_obj()
+ metadata.clients_xml.data = \
+ xdata or copy.deepcopy(get_clients_test_tree())
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
evt = Mock()
evt.filename = os.path.join(datastore, "Metadata", "clients.xml")
@@ -509,41 +689,43 @@ class TestMetadata(unittest.TestCase):
metadata.HandleEvent(evt)
return metadata
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
- def test_clients_xml_event(self, mock_load_xml):
- metadata = get_metadata_object()
+ def test_handle_clients_xml_event(self):
+ metadata = self.get_obj()
metadata.profiles = ["group1", "group2"]
- self.load_clients_data(metadata=metadata)
- mock_load_xml.assert_any_call()
- self.assertItemsEqual(metadata.clients,
- dict([(c.get("name"), c.get("profile"))
- for c in clients_test_tree.findall("//Client")]))
+
+ metadata.clients_xml = Mock()
+ metadata.clients_xml.xdata = copy.deepcopy(get_clients_test_tree())
+ metadata._handle_clients_xml_event(Mock())
+
+ if not self.use_db:
+ self.assertItemsEqual(metadata.clients,
+ dict([(c.get("name"), c.get("profile"))
+ for c in get_clients_test_tree().findall("//Client")]))
aliases = dict([(a.get("name"), a.getparent().get("name"))
- for a in clients_test_tree.findall("//Alias")])
+ for a in get_clients_test_tree().findall("//Alias")])
self.assertItemsEqual(metadata.aliases, aliases)
raliases = dict([(c.get("name"), set())
- for c in clients_test_tree.findall("//Client")])
- for alias in clients_test_tree.findall("//Alias"):
+ for c in get_clients_test_tree().findall("//Client")])
+ for alias in get_clients_test_tree().findall("//Alias"):
raliases[alias.getparent().get("name")].add(alias.get("name"))
self.assertItemsEqual(metadata.raliases, raliases)
- self.assertEqual(metadata.bad_clients, dict())
self.assertEqual(metadata.secure,
[c.get("name")
- for c in clients_test_tree.findall("//Client[@secure='true']")])
- self.assertEqual(metadata.floating, ["client1"])
+ for c in get_clients_test_tree().findall("//Client[@secure='true']")])
+ self.assertEqual(metadata.floating, ["client1", "client10"])
addresses = dict([(c.get("address"), [])
- for c in clients_test_tree.findall("//*[@address]")])
+ for c in get_clients_test_tree().findall("//*[@address]")])
raddresses = dict()
- for client in clients_test_tree.findall("//Client[@address]"):
+ for client in get_clients_test_tree().findall("//Client[@address]"):
addresses[client.get("address")].append(client.get("name"))
try:
raddresses[client.get("name")].append(client.get("address"))
except KeyError:
raddresses[client.get("name")] = [client.get("address")]
- for alias in clients_test_tree.findall("//Alias[@address]"):
+ for alias in get_clients_test_tree().findall("//Alias[@address]"):
addresses[alias.get("address")].append(alias.getparent().get("name"))
try:
raddresses[alias.getparent().get("name")].append(alias.get("address"))
@@ -554,25 +736,11 @@ class TestMetadata(unittest.TestCase):
self.assertItemsEqual(metadata.raddresses, raddresses)
self.assertTrue(metadata.states['clients.xml'])
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- def test_clients_xml_event_bad_clients(self):
- metadata = get_metadata_object()
- metadata.profiles = ["group2"]
- self.load_clients_data(metadata=metadata)
- clients = dict()
- badclients = dict()
- for client in clients_test_tree.findall("//Client"):
- if client.get("profile") in metadata.profiles:
- clients[client.get("name")] = client.get("profile")
- else:
- badclients[client.get("name")] = client.get("profile")
- self.assertItemsEqual(metadata.clients, clients)
- self.assertItemsEqual(metadata.bad_clients, badclients)
-
def load_groups_data(self, metadata=None, xdata=None):
if metadata is None:
- metadata = get_metadata_object()
- metadata.groups_xml.data = xdata or copy.deepcopy(groups_test_tree)
+ metadata = self.get_obj()
+ metadata.groups_xml.data = \
+ xdata or copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
evt = Mock()
evt.filename = os.path.join(datastore, "Metadata", "groups.xml")
@@ -580,65 +748,109 @@ class TestMetadata(unittest.TestCase):
metadata.HandleEvent(evt)
return metadata
- @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
- def test_groups_xml_event(self, mock_load_xml):
- dup_data = copy.deepcopy(groups_test_tree)
- lxml.etree.SubElement(dup_data.getroot(),
- "Group", name="group1")
- metadata = self.load_groups_data(xdata=dup_data)
- mock_load_xml.assert_any_call()
- self.assertEqual(metadata.public, ["group1", "group2"])
- self.assertEqual(metadata.private, ["group3"])
- self.assertEqual(metadata.profiles, ["group1", "group2"])
+ def test_handle_groups_xml_event(self):
+ metadata = self.get_obj()
+ metadata.groups_xml = Mock()
+ metadata.groups_xml.xdata = get_groups_test_tree()
+ metadata._handle_groups_xml_event(Mock())
+
+ self.assertTrue(metadata.states['groups.xml'])
+ self.assertTrue(metadata.groups['group1'].is_public)
+ self.assertTrue(metadata.groups['group2'].is_public)
+ self.assertFalse(metadata.groups['group3'].is_public)
+ self.assertFalse(metadata.groups['group1'].is_private)
+ self.assertFalse(metadata.groups['group2'].is_private)
+ self.assertTrue(metadata.groups['group3'].is_private)
+ self.assertTrue(metadata.groups['group1'].is_profile)
+ self.assertTrue(metadata.groups['group2'].is_profile)
+ self.assertFalse(metadata.groups['group3'].is_profile)
self.assertItemsEqual(metadata.groups.keys(),
- [g.get("name")
- for g in groups_test_tree.findall("/Group")])
- self.assertEqual(metadata.categories,
- dict(group1="category1",
- group2="category1",
- group3="category2",
- group4="category1"))
+ set(g.get("name")
+ for g in get_groups_test_tree().findall("//Group")))
+ self.assertEqual(metadata.groups['group1'].category, 'category1')
+ self.assertEqual(metadata.groups['group2'].category, 'category1')
+ self.assertEqual(metadata.groups['group3'].category, 'category2')
+ self.assertEqual(metadata.groups['group4'].category, 'category1')
self.assertEqual(metadata.default, "group1")
- self.assertTrue(metadata.states['groups.xml'])
+ all_groups = []
+ negated_groups = []
+ for group in get_groups_test_tree().xpath("//Groups/Client//*") + \
+ get_groups_test_tree().xpath("//Groups/Group//*"):
+ if group.tag == 'Group' and not group.getchildren():
+ if group.get("negate", "false").lower() == 'true':
+ negated_groups.append(group.get("name"))
+ else:
+ all_groups.append(group.get("name"))
+ self.assertItemsEqual([g.name
+ for g in metadata.group_membership.values()],
+ all_groups)
+ self.assertItemsEqual([g.name
+ for g in metadata.negated_groups.values()],
+ negated_groups)
+
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client")
- @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
- def test_set_profile(self, mock_update_client, mock_add_client):
- metadata = get_metadata_object()
- metadata.states['clients.xml'] = False
- self.assertRaises(MetadataRuntimeError,
- metadata.set_profile,
- None, None, None)
+ def test_set_profile(self):
+ metadata = self.get_obj()
+ if 'clients.xml' in metadata.states:
+ metadata.states['clients.xml'] = False
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ metadata.set_profile,
+ None, None, None)
self.load_groups_data(metadata=metadata)
self.load_clients_data(metadata=metadata)
- self.assertRaises(MetadataConsistencyError,
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.set_profile,
"client1", "group5", None)
- metadata.clients_xml.write = Mock()
- metadata.set_profile("client1", "group2", None)
- mock_update_client.assert_called_with("client1", dict(profile="group2"))
- metadata.clients_xml.write.assert_any_call()
- self.assertEqual(metadata.clients["client1"], "group2")
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.set_profile,
+ "client1", "group3", None)
- metadata.clients_xml.write.reset_mock()
- metadata.set_profile("client_new", "group1", None)
- mock_add_client.assert_called_with("client_new", dict(profile="group1"))
- metadata.clients_xml.write.assert_any_call()
- self.assertEqual(metadata.clients["client_new"], "group1")
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_set_profile_db(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ if metadata._use_db:
+ profile = "group1"
+ client_name = self.get_nonexistent_client(metadata)
+ metadata.set_profile(client_name, profile, None)
+ self.assertIn(client_name, metadata.clients)
+ self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
+ metadata.set_profile,
+ client_name, profile, None)
- metadata.session_cache[('1.2.3.6', None)] = (None, 'client_new2')
- metadata.clients_xml.write.reset_mock()
- metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None))
- mock_add_client.assert_called_with("client_new2",
- dict(uuid='uuid_new',
- profile="group1",
- address='1.2.3.6'))
- metadata.clients_xml.write.assert_any_call()
- self.assertEqual(metadata.clients["uuid_new"], "group1")
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
+ def test_set_profile_xml(self, mock_update_client, mock_add_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ if not metadata._use_db:
+ metadata.clients_xml.write = Mock()
+ metadata.set_profile("client1", "group2", None)
+ mock_update_client.assert_called_with("client1",
+ dict(profile="group2"))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clientgroups["client1"], ["group2"])
+
+ metadata.clients_xml.write.reset_mock()
+ new1 = self.get_nonexistent_client(metadata)
+ metadata.set_profile(new1, "group1", None)
+ mock_add_client.assert_called_with(new1, dict(profile="group1"))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clientgroups[new1], ["group1"])
+
+ metadata.clients_xml.write.reset_mock()
+ new2 = self.get_nonexistent_client(metadata)
+ metadata.session_cache[('1.2.3.6', None)] = (None, new2)
+ metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None))
+ mock_add_client.assert_called_with(new2,
+ dict(uuid='uuid_new',
+ profile="group1",
+ address='1.2.3.6'))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clientgroups["uuid_new"], ["group1"])
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
@patch("socket.gethostbyaddr")
@@ -647,7 +859,7 @@ class TestMetadata(unittest.TestCase):
metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
- self.assertRaises(MetadataConsistencyError,
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.resolve_client,
('1.2.3.2', None))
self.assertEqual(metadata.resolve_client(('1.2.3.1', None)), 'client1')
@@ -671,7 +883,7 @@ class TestMetadata(unittest.TestCase):
mock_gethostbyaddr.reset_mock()
mock_gethostbyaddr.return_value = None
mock_gethostbyaddr.side_effect = socket.herror
- self.assertRaises(MetadataConsistencyError,
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
metadata.resolve_client,
('1.2.3.8', None))
mock_gethostbyaddr.assert_called_with('1.2.3.8')
@@ -680,63 +892,115 @@ class TestMetadata(unittest.TestCase):
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
@patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
def test_get_initial_metadata(self, mock_clientmetadata):
- metadata = get_metadata_object()
- metadata.states['clients.xml'] = False
- self.assertRaises(MetadataRuntimeError,
- metadata.get_initial_metadata, None)
+ metadata = self.get_obj()
+ if 'clients.xml' in metadata.states:
+ metadata.states['clients.xml'] = False
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ metadata.get_initial_metadata, None)
self.load_groups_data(metadata=metadata)
self.load_clients_data(metadata=metadata)
+ # test address, password
metadata.get_initial_metadata("client1")
- self.assertEqual(mock_clientmetadata.call_args[0][:9],
- ("client1", "group1", set(["group1"]), set(), set(),
- set(["1.2.3.1"]), dict(), None, 'password2'))
-
+ mock_clientmetadata.assert_called_with("client1", "group1",
+ set(["group1"]), set(), set(),
+ set(["1.2.3.1"]),
+ dict(category1='group1'), None,
+ 'password2', None,
+ metadata.query)
+
+ # test address, bundles, category suppression
metadata.get_initial_metadata("client2")
- self.assertEqual(mock_clientmetadata.call_args[0][:9],
- ("client2", "group2", set(["group1", "group2"]),
- set(["bundle1", "bundle2"]), set(),
- set(["1.2.3.2"]), dict(category1="group1"),
- None, None))
-
+ mock_clientmetadata.assert_called_with("client2", "group2",
+ set(["group2"]),
+ set(["bundle1", "bundle2"]),
+ set(), set(["1.2.3.2"]),
+ dict(category1="group2"),
+ None, None, None,
+ metadata.query)
+
+ # test aliases, address, uuid, password
imd = metadata.get_initial_metadata("alias1")
- self.assertEqual(mock_clientmetadata.call_args[0][:9],
- ("client3", "group1", set(["group1"]), set(),
- set(['alias1']), set(["1.2.3.3"]), dict(), 'uuid1',
- 'password2'))
+ mock_clientmetadata.assert_called_with("client3", "group1",
+ set(["group1"]), set(),
+ set(['alias1']),
+ set(["1.2.3.3"]),
+ dict(category1="group1"),
+ 'uuid1', 'password2', None,
+ metadata.query)
+
+ # test new client creation
+ new1 = self.get_nonexistent_client(metadata)
+ imd = metadata.get_initial_metadata(new1)
+ mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]),
+ set(), set(), set(),
+ dict(category1="group1"), None,
+ None, None, metadata.query)
+
+ # test nested groups, address, per-client groups
+ imd = metadata.get_initial_metadata("client8")
+ mock_clientmetadata.assert_called_with("client8", "group1",
+ set(["group1", "group8",
+ "group9", "group10"]),
+ set(),
+ set(), set(["1.2.3.5"]),
+ dict(category1="group1"),
+ None, None, None, metadata.query)
+
+ # test setting per-client groups, group negation, nested groups
+ imd = metadata.get_initial_metadata("client9")
+ mock_clientmetadata.assert_called_with("client9", "group2",
+ set(["group2", "group8",
+ "group11"]),
+ set(["bundle1", "bundle2"]),
+ set(), set(),
+ dict(category1="group2"), None,
+ "password3", None,
+ metadata.query)
+
+ # test new client with no default profile
+ metadata.default = None
+ new2 = self.get_nonexistent_client(metadata)
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.get_initial_metadata, new2)
- imd = metadata.get_initial_metadata("client_new")
- self.assertEqual(mock_clientmetadata.call_args[0][:9],
- ("client_new", "group1", set(["group1"]), set(),
- set(), set(), dict(), None, None))
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_merge_groups(self):
+ metadata = self.get_obj()
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
- metadata.default = None
- self.assertRaises(MetadataConsistencyError,
- metadata.get_initial_metadata,
- "client_new2")
+ self.assertEqual(metadata._merge_groups("client1", set(["group1"]),
+ categories=dict(group1="category1")),
+ (set(["group1"]), dict(group1="category1")))
+ self.assertEqual(metadata._merge_groups("client8",
+ set(["group1", "group8", "group9"]),
+ categories=dict(group1="category1")),
+ (set(["group1", "group8", "group9", "group10"]),
+ dict(group1="category1")))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_get_all_group_names(self):
metadata = self.load_groups_data()
self.assertItemsEqual(metadata.get_all_group_names(),
set([g.get("name")
- for g in groups_test_tree.findall("//Group")]))
+ for g in get_groups_test_tree().findall("//Group")]))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_get_all_groups_in_category(self):
metadata = self.load_groups_data()
self.assertItemsEqual(metadata.get_all_groups_in_category("category1"),
set([g.get("name")
- for g in groups_test_tree.findall("//Group[@category='category1']")]))
+ for g in get_groups_test_tree().findall("//Group[@category='category1']")]))
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_get_client_names_by_profiles(self):
metadata = self.load_clients_data(metadata=self.load_groups_data())
- self.assertItemsEqual(metadata.get_client_names_by_profiles("group2"),
+ self.assertItemsEqual(metadata.get_client_names_by_profiles(["group2"]),
[c.get("name")
- for c in clients_test_tree.findall("//Client[@profile='group2']")])
+ for c in get_clients_test_tree().findall("//Client[@profile='group2']")])
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_get_client_names_by_groups(self):
@@ -751,7 +1015,7 @@ class TestMetadata(unittest.TestCase):
lambda c: metadata.get_initial_metadata(c)
self.assertItemsEqual(metadata.get_client_names_by_groups(["group2"]),
[c.get("name")
- for c in clients_test_tree.findall("//Client[@profile='group2']")])
+ for c in get_clients_test_tree().findall("//Client[@profile='group2']")])
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_merge_additional_groups(self):
@@ -782,6 +1046,14 @@ class TestMetadata(unittest.TestCase):
self.assertItemsEqual(imd.groups,
oldgroups.union(["group6", "group8", "group9"]))
+ # test adding a group that is not defined in groups.xml
+ imd = metadata.get_initial_metadata("client2")
+ oldgroups = imd.groups
+ metadata.merge_additional_groups(imd, ["group6", "newgroup"])
+ self.assertItemsEqual(imd.groups,
+ oldgroups.union(["group6", "newgroup"]))
+
+
@patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
def test_merge_additional_data(self):
metadata = self.load_clients_data(metadata=self.load_groups_data())
@@ -856,7 +1128,8 @@ class TestMetadata(unittest.TestCase):
self.assertTrue(metadata.AuthenticateConnection(None, "root",
"password1", "1.2.3.8"))
- mock_resolve_client.side_effect = MetadataConsistencyError
+ mock_resolve_client.side_effect = \
+ Bcfg2.Server.Plugin.MetadataConsistencyError
self.assertFalse(metadata.AuthenticateConnection(None, "root",
"password1",
"1.2.3.8"))
@@ -903,3 +1176,295 @@ class TestMetadata(unittest.TestCase):
def test_viz(self):
pass
+
+
+class TestMetadataBase(TestMetadata):
+ """ base test object for testing Metadata with database enabled """
+ __test__ = False
+ use_db = True
+
+ @skipUnless(has_django, "Django not found")
+ def setUp(self):
+ syncdb(TestMetadataDB)
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = get_obj()
+ for client in get_clients_test_tree().findall("Client"):
+ metadata.add_client(client.get("name"))
+ return metadata
+
+ def get_nonexistent_client(self, _, prefix="newclient"):
+ clients = [o.hostname for o in MetadataClientModel.objects.all()]
+ i = 0
+ client_name = "%s%s" % (prefix, i)
+ while client_name in clients:
+ i += 1
+ client_name = "%s%s" % (prefix, i)
+ return client_name
+
+ @patch('os.path.exists')
+ def test__init(self, mock_exists):
+ core = Mock()
+ core.fam = Mock()
+ mock_exists.return_value = False
+ metadata = self.get_obj(core=core, watch_clients=True)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked)
+ core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+
+ mock_exists.return_value = True
+ core.fam.reset_mock()
+ metadata = self.get_obj(core=core, watch_clients=True)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "clients.xml"),
+ metadata)
+
+ def test_add_group(self):
+ pass
+
+ def test_add_bundle(self):
+ pass
+
+ def test_add_client(self):
+ metadata = self.get_obj()
+ hostname = self.get_nonexistent_client(metadata)
+ client = metadata.add_client(hostname)
+ self.assertIsInstance(client, MetadataClientModel)
+ self.assertEqual(client.hostname, hostname)
+ self.assertIn(hostname, metadata.clients)
+ self.assertIn(hostname, metadata.list_clients())
+ self.assertItemsEqual(metadata.clients,
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_update_group(self):
+ pass
+
+ def test_update_bundle(self):
+ pass
+
+ def test_update_client(self):
+ pass
+
+ def test_list_clients(self):
+ metadata = self.get_obj()
+ self.assertItemsEqual(metadata.list_clients(),
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_remove_group(self):
+ pass
+
+ def test_remove_bundle(self):
+ pass
+
+ def test_remove_client(self):
+ metadata = self.get_obj()
+ client_name = self.get_nonexistent_client(metadata)
+
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.remove_client,
+ client_name)
+
+ metadata.add_client(client_name)
+ metadata.remove_client(client_name)
+ self.assertNotIn(client_name, metadata.clients)
+ self.assertNotIn(client_name, metadata.list_clients())
+ self.assertItemsEqual(metadata.clients,
+ [c.hostname
+ for c in MetadataClientModel.objects.all()])
+
+ def test_process_statistics(self):
+ pass
+
+
+class TestMetadata_NoClientsXML(TestMetadataBase):
+ """ test Metadata without a clients.xml. we have to disable or
+ override tests that rely on client options """
+ # only run these tests if it's possible to skip tests or if we
+ # have django. otherwise they'll all get run because our fake
+ # skipping decorators for python < 2.7 won't work when they
+ # decorate setUp()
+ if can_skip or has_django:
+ __test__ = True
+
+ def load_groups_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = self.get_obj()
+ if not xdata:
+ xdata = copy.deepcopy(get_groups_test_tree())
+ for client in get_clients_test_tree().findall("Client"):
+ newclient = \
+ lxml.etree.SubElement(xdata.getroot(),
+ "Client", name=client.get("name"))
+ lxml.etree.SubElement(newclient, "Group",
+ name=client.get("profile"))
+ metadata.groups_xml.data = xdata
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+ evt = Mock()
+ evt.filename = os.path.join(datastore, "Metadata", "groups.xml")
+ evt.code2str = Mock(return_value="changed")
+ metadata.HandleEvent(evt)
+ return metadata
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
+ def test_get_initial_metadata(self, mock_clientmetadata):
+ metadata = self.get_obj()
+ if 'clients.xml' in metadata.states:
+ metadata.states['clients.xml'] = False
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataRuntimeError,
+ metadata.get_initial_metadata, None)
+
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
+
+ # test basic client metadata
+ metadata.get_initial_metadata("client1")
+ mock_clientmetadata.assert_called_with("client1", "group1",
+ set(["group1"]), set(), set(),
+ set(), dict(category1='group1'),
+ None, None, None, metadata.query)
+
+ # test bundles, category suppression
+ metadata.get_initial_metadata("client2")
+ mock_clientmetadata.assert_called_with("client2", "group2",
+ set(["group2"]),
+ set(["bundle1", "bundle2"]),
+ set(), set(),
+ dict(category1="group2"), None,
+ None, None, metadata.query)
+
+ # test new client creation
+ new1 = self.get_nonexistent_client(metadata)
+ imd = metadata.get_initial_metadata(new1)
+ mock_clientmetadata.assert_called_with(new1, "group1", set(["group1"]),
+ set(), set(), set(),
+ dict(category1="group1"), None,
+ None, None, metadata.query)
+
+ # test nested groups, per-client groups
+ imd = metadata.get_initial_metadata("client8")
+ mock_clientmetadata.assert_called_with("client8", "group1",
+ set(["group1", "group8",
+ "group9", "group10"]),
+ set(), set(), set(),
+ dict(category1="group1"), None,
+ None, None, metadata.query)
+
+ # test per-client groups, group negation, nested groups
+ imd = metadata.get_initial_metadata("client9")
+ mock_clientmetadata.assert_called_with("client9", "group2",
+ set(["group2", "group8",
+ "group11"]),
+ set(["bundle1", "bundle2"]),
+ set(), set(),
+ dict(category1="group2"), None,
+ None, None, metadata.query)
+
+ # test exception on new client with no default profile
+ metadata.default = None
+ new2 = self.get_nonexistent_client(metadata)
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.get_initial_metadata,
+ new2)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_validate_client_address(self, mock_resolve_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ # this is upper case to ensure that case is folded properly in
+ # validate_client_address()
+ mock_resolve_client.return_value = "CLIENT4"
+ self.assertTrue(metadata.validate_client_address("client4",
+ ("1.2.3.7", None)))
+ mock_resolve_client.assert_called_with(("1.2.3.7", None))
+
+ mock_resolve_client.reset_mock()
+ self.assertFalse(metadata.validate_client_address("client5",
+ ("1.2.3.5", None)))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_AuthenticateConnection(self, mock_resolve_client,
+ mock_validate_client_address):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.password = "password1"
+
+ cert = dict(subject=[[("commonName", "client1")]])
+ mock_validate_client_address.return_value = False
+ self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.1"))
+ mock_validate_client_address.return_value = True
+ self.assertTrue(metadata.AuthenticateConnection(cert, "root",
+ metadata.password,
+ "1.2.3.1"))
+
+ cert = dict(subject=[[("commonName", "client8")]])
+
+ mock_resolve_client.return_value = "client5"
+ self.assertTrue(metadata.AuthenticateConnection(None, "root",
+ "password1", "1.2.3.8"))
+
+ mock_resolve_client.side_effect = \
+ Bcfg2.Server.Plugin.MetadataConsistencyError
+ self.assertFalse(metadata.AuthenticateConnection(None, "root",
+ "password1",
+ "1.2.3.8"))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("socket.gethostbyaddr")
+ def test_resolve_client(self, mock_gethostbyaddr):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
+
+ metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
+ 'client3')
+ mock_gethostbyaddr.return_value = ("client3", [], ['1.2.3.3'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
+ cleanup_cache=True), 'client3')
+ self.assertEqual(metadata.session_cache, dict())
+
+ mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
+ mock_gethostbyaddr.assert_called_with('1.2.3.6')
+
+ mock_gethostbyaddr.reset_mock()
+ mock_gethostbyaddr.return_value = None
+ mock_gethostbyaddr.side_effect = socket.herror
+ self.assertRaises(Bcfg2.Server.Plugin.MetadataConsistencyError,
+ metadata.resolve_client,
+ ('1.2.3.8', None))
+ mock_gethostbyaddr.assert_called_with('1.2.3.8')
+
+ def test_handle_clients_xml_event(self):
+ pass
+
+
+class TestMetadata_ClientsXML(TestMetadataBase):
+ """ test Metadata with a clients.xml. """
+ # only run these tests if it's possible to skip tests or if we
+ # have django. otherwise they'll all get run because our fake
+ # skipping decorators for python < 2.7 won't work when they
+ # decorate setUp()
+ if can_skip or has_django:
+ __test__ = True
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = self.get_obj()
+ metadata.core.fam = Mock()
+ metadata._handle_file("clients.xml")
+ metadata = TestMetadata.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+ return TestMetadataBase.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+