diff options
-rw-r--r-- | src/lib/Bcfg2/Server/Plugins/Probes.py | 409 | ||||
-rw-r--r-- | testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py | 569 |
2 files changed, 386 insertions, 592 deletions
diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py index f75d88d8f..ef339a34b 100644 --- a/src/lib/Bcfg2/Server/Plugins/Probes.py +++ b/src/lib/Bcfg2/Server/Plugins/Probes.py @@ -8,8 +8,10 @@ import copy import operator import lxml.etree import Bcfg2.Server +import Bcfg2.Server.Cache import Bcfg2.Server.Plugin import Bcfg2.Server.FileMonitor +from Bcfg2.Logger import Debuggable from Bcfg2.Server.Statistics import track_statistics HAS_DJANGO = False @@ -63,6 +65,210 @@ except ImportError: HAS_YAML = False +class ProbeStore(Debuggable): + """ Caching abstraction layer between persistent probe data + storage and the Probes plugin.""" + + def __init__(self, core, datastore): # pylint: disable=W0613 + Debuggable.__init__(self) + self._groupcache = Bcfg2.Server.Cache.Cache("Probes", "probegroups") + self._datacache = Bcfg2.Server.Cache.Cache("Probes", "probedata") + + def get_groups(self, hostname): + """ Get the list of groups for the given host """ + if hostname not in self._groupcache: + self._load_groups(hostname) + return self._groupcache.get(hostname, []) + + def set_groups(self, hostname, groups): + """ Set the list of groups for the given host """ + raise NotImplementedError + + def get_data(self, hostname): + """ Get a dict of probe data for the given host """ + if hostname not in self._datacache: + self._load_data(hostname) + return self._datacache.get(hostname, dict()) + + def set_data(self, hostname, data): + """ Set probe data for the given host """ + raise NotImplementedError + + def _load_groups(self, hostname): + """ When probe groups are not found in the cache, this + function is called to load them from the backend (XML or + database). """ + raise NotImplementedError + + def _load_data(self, hostname): + """ When probe groups are not found in the cache, this + function is called to load them from the backend (XML or + database). """ + raise NotImplementedError + + def commit(self): + """ Commit the current data in the cache to the persistent + backend store. This is not used with the + :class:`Bcfg2.Server.Plugins.Probes.DBProbeStore`, because it + commits on every change. """ + pass + + +class DBProbeStore(ProbeStore, Bcfg2.Server.Plugin.DatabaseBacked): + """ Caching abstraction layer between the database and the Probes + plugin. """ + create = False + + def __init__(self, core, datastore): + Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore) + ProbeStore.__init__(self, core, datastore) + + def _load_groups(self, hostname): + Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname) + groupdata = ProbesGroupsModel.objects.filter(hostname=hostname) + self._groupcache[hostname] = list(set(r.group for r in groupdata)) + Bcfg2.Server.Cache.expire("Metadata", hostname) + + @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock + def set_groups(self, hostname, groups): + Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname) + olddata = self._groupcache.get(hostname, []) + self._groupcache[hostname] = groups + for group in groups: + try: + ProbesGroupsModel.objects.get_or_create( + hostname=hostname, + group=group) + except ProbesGroupsModel.MultipleObjectsReturned: + ProbesGroupsModel.objects.filter(hostname=hostname, + group=group).delete() + ProbesGroupsModel.objects.get_or_create( + hostname=hostname, + group=group) + ProbesGroupsModel.objects.filter( + hostname=hostname).exclude(group__in=groups).delete() + if olddata != groups: + Bcfg2.Server.Cache.expire("Metadata", hostname) + + def _load_data(self, hostname): + Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname) + Bcfg2.Server.Cache.expire("Probes", "probedata", hostname) + self._datacache[hostname] = ClientProbeDataSet() + ts_set = False + for pdata in ProbesDataModel.objects.filter(hostname=hostname): + if not ts_set: + self._datacache[hostname].timestamp = \ + time.mktime(pdata.timestamp.timetuple()) + ts_set = True + self._datacache[hostname][pdata.probe] = ProbeData(pdata.data) + Bcfg2.Server.Cache.expire("Metadata", hostname) + + @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock + def set_data(self, hostname, data): + Bcfg2.Server.Cache.expire("Probes", "probedata", hostname) + self._datacache[hostname] = ClientProbeDataSet() + expire_metadata = False + for probe, pdata in data.items(): + self._datacache[hostname][probe] = pdata + record, created = ProbesDataModel.objects.get_or_create( + hostname=hostname, + probe=probe) + expire_metadata |= created + if record.data != pdata: + record.data = pdata + record.save() + expire_metadata = True + qset = ProbesDataModel.objects.filter( + hostname=hostname).exclude(probe__in=data.keys()) + if len(qset): + qset.delete() + expire_metadata = True + if expire_metadata: + Bcfg2.Server.Cache.expire("Metadata", hostname) + + +class XMLProbeStore(ProbeStore): + """ Caching abstraction layer between ``probed.xml`` and the + Probes plugin.""" + def __init__(self, core, datastore): + ProbeStore.__init__(self, core, datastore) + self._fname = os.path.join(datastore, 'probed.xml') + self._load_data() + + def _load_data(self, _=None): + """ Load probe data from probed.xml """ + Bcfg2.Server.Cache.expire("Probes", "probegroups") + Bcfg2.Server.Cache.expire("Probes", "probedata") + if not os.path.exists(self._fname): + self.commit() + try: + data = lxml.etree.parse(self._fname, + parser=Bcfg2.Server.XMLParser).getroot() + except (IOError, lxml.etree.XMLSyntaxError): + err = sys.exc_info()[1] + self.logger.error("Failed to read file probed.xml: %s" % err) + return + for client in data.getchildren(): + self._datacache[client.get('name')] = \ + ClientProbeDataSet(timestamp=client.get("timestamp")) + self._groupcache[client.get('name')] = [] + for pdata in client: + if pdata.tag == 'Probe': + self._datacache[client.get('name')][pdata.get('name')] = \ + ProbeData(pdata.get("value")) + elif pdata.tag == 'Group': + self._groupcache[client.get('name')].append( + pdata.get('name')) + + Bcfg2.Server.Cache.expire("Metadata") + + def _load_groups(self, hostname): + self._load_data(hostname) + + def commit(self): + """ Write received probe data to probed.xml """ + top = lxml.etree.Element("Probed") + for client, probed in sorted(self._datacache.items()): + # make a copy of probe data for this client in case it + # submits probe data while we're trying to write + # probed.xml + probedata = copy.copy(probed) + ctag = \ + lxml.etree.SubElement(top, 'Client', name=client, + timestamp=str(int(probedata.timestamp))) + for probe in sorted(probedata): + lxml.etree.SubElement( + ctag, 'Probe', name=probe, + value=self._datacache[client][probe]) + for group in sorted(self._groupcache[client]): + lxml.etree.SubElement(ctag, "Group", name=group) + try: + top.getroottree().write(self._fname, + xml_declaration=False, + pretty_print='true') + except IOError: + err = sys.exc_info()[1] + self.logger.error("Failed to write probed.xml: %s" % err) + + def set_groups(self, hostname, groups): + Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname) + olddata = self._groupcache.get(hostname, []) + self._groupcache[hostname] = groups + if olddata != groups: + Bcfg2.Server.Cache.expire("Metadata", hostname) + + def set_data(self, hostname, data): + Bcfg2.Server.Cache.expire("Probes", "probedata", hostname) + self._datacache[hostname] = ClientProbeDataSet() + expire_metadata = False + for probe, pdata in data.items(): + olddata = self._datacache[hostname].get(probe, ProbeData('')) + self._datacache[hostname][probe] = pdata + expire_metadata |= olddata != data + if expire_metadata: + Bcfg2.Server.Cache.expire("Metadata", hostname) + + class ClientProbeDataSet(dict): """ dict of probe => [probe data] that records a timestamp for each host """ @@ -195,12 +401,13 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet): class Probes(Bcfg2.Server.Plugin.Probing, - Bcfg2.Server.Plugin.Caching, Bcfg2.Server.Plugin.Connector, Bcfg2.Server.Plugin.DatabaseBacked): """ A plugin to gather information from a client machine """ __author__ = 'bcfg-dev@mcs.anl.gov' + groupline_re = re.compile(r'^group:\s*(?P<groupname>\S+)\s*') + options = [ Bcfg2.Options.BooleanOption( cf=('probes', 'use_database'), dest="probes_db", @@ -209,7 +416,6 @@ class Probes(Bcfg2.Server.Plugin.Probing, def __init__(self, core, datastore): Bcfg2.Server.Plugin.Probing.__init__(self) - Bcfg2.Server.Plugin.Caching.__init__(self) Bcfg2.Server.Plugin.Connector.__init__(self) Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore) @@ -219,191 +425,48 @@ class Probes(Bcfg2.Server.Plugin.Probing, err = sys.exc_info()[1] raise Bcfg2.Server.Plugin.PluginInitError(err) - self.probedata = dict() - self.cgroups = dict() - self.load_data() - __init__.__doc__ = Bcfg2.Server.Plugin.DatabaseBacked.__init__.__doc__ - - @track_statistics() - def write_data(self, client): - """ Write probe data out for use with bcfg2-info """ if self._use_db: - return self._write_data_db(client) + self.probestore = DBProbeStore(core, datastore) else: - return self._write_data_xml(client) - - def _write_data_xml(self, _): - """ Write received probe data to probed.xml """ - top = lxml.etree.Element("Probed") - for client, probed in sorted(self.probedata.items()): - # make a copy of probe data for this client in case it - # submits probe data while we're trying to write - # probed.xml - probedata = copy.copy(probed) - ctag = \ - lxml.etree.SubElement(top, 'Client', name=client, - timestamp=str(int(probedata.timestamp))) - for probe in sorted(probedata): - lxml.etree.SubElement( - ctag, 'Probe', name=probe, - value=self.probedata[client][probe]) - for group in sorted(self.cgroups[client]): - lxml.etree.SubElement(ctag, "Group", name=group) - try: - top.getroottree().write(os.path.join(self.data, 'probed.xml'), - xml_declaration=False, - pretty_print='true') - except IOError: - err = sys.exc_info()[1] - self.logger.error("Failed to write probed.xml: %s" % err) - - @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock - def _write_data_db(self, client): - """ Write received probe data to the database """ - for probe, data in self.probedata[client.hostname].items(): - pdata = \ - ProbesDataModel.objects.get_or_create(hostname=client.hostname, - probe=probe)[0] - if pdata.data != data: - pdata.data = data - pdata.save() - - ProbesDataModel.objects.filter( - hostname=client.hostname).exclude( - probe__in=self.probedata[client.hostname]).delete() - - for group in self.cgroups[client.hostname]: - try: - ProbesGroupsModel.objects.get_or_create( - hostname=client.hostname, - group=group) - except ProbesGroupsModel.MultipleObjectsReturned: - ProbesGroupsModel.objects.filter(hostname=client.hostname, - group=group).delete() - ProbesGroupsModel.objects.get_or_create( - hostname=client.hostname, - group=group) - ProbesGroupsModel.objects.filter( - hostname=client.hostname).exclude( - group__in=self.cgroups[client.hostname]).delete() - - def expire_cache(self, key=None): - self.load_data(client=key) - - def load_data(self, client=None): - """ Load probe data from the appropriate backend (probed.xml - or the database) """ - if self._use_db: - return self._load_data_db(client=client) - else: - # the XML backend doesn't support loading data for single - # clients, so it reloads all data - return self._load_data_xml() - - def _load_data_xml(self): - """ Load probe data from probed.xml """ - try: - data = lxml.etree.parse(os.path.join(self.data, 'probed.xml'), - parser=Bcfg2.Server.XMLParser).getroot() - except (IOError, lxml.etree.XMLSyntaxError): - err = sys.exc_info()[1] - self.logger.error("Failed to read file probed.xml: %s" % err) - return - self.probedata = {} - self.cgroups = {} - for client in data.getchildren(): - self.probedata[client.get('name')] = \ - ClientProbeDataSet(timestamp=client.get("timestamp")) - self.cgroups[client.get('name')] = [] - for pdata in client: - if pdata.tag == 'Probe': - self.probedata[client.get('name')][pdata.get('name')] = \ - ProbeData(pdata.get("value")) - elif pdata.tag == 'Group': - self.cgroups[client.get('name')].append(pdata.get('name')) - - if self.core.metadata_cache_mode in ['cautious', 'aggressive']: - self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) - - def _load_data_db(self, client=None): - """ Load probe data from the database """ - if client is None: - self.probedata = {} - self.cgroups = {} - probedata = ProbesDataModel.objects.all() - groupdata = ProbesGroupsModel.objects.all() - else: - self.probedata.pop(client, None) - self.cgroups.pop(client, None) - probedata = ProbesDataModel.objects.filter(hostname=client) - groupdata = ProbesGroupsModel.objects.filter(hostname=client) - - for pdata in probedata: - if pdata.hostname not in self.probedata: - self.probedata[pdata.hostname] = ClientProbeDataSet( - timestamp=time.mktime(pdata.timestamp.timetuple())) - self.probedata[pdata.hostname][pdata.probe] = ProbeData(pdata.data) - for pgroup in groupdata: - if pgroup.hostname not in self.cgroups: - self.cgroups[pgroup.hostname] = [] - self.cgroups[pgroup.hostname].append(pgroup.group) - - if self.core.metadata_cache_mode in ['cautious', 'aggressive']: - self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata, - key=client) + self.probestore = XMLProbeStore(core, datastore) @track_statistics() - def GetProbes(self, meta): - return self.probes.get_probe_data(meta) - GetProbes.__doc__ = Bcfg2.Server.Plugin.Probing.GetProbes.__doc__ + def GetProbes(self, metadata): + return self.probes.get_probe_data(metadata) @track_statistics() def ReceiveData(self, client, datalist): - if self.core.metadata_cache_mode in ['cautious', 'aggressive']: - if client.hostname in self.cgroups: - olddata = copy.copy(self.cgroups[client.hostname]) - else: - olddata = [] - - cgroups = [] - cprobedata = ClientProbeDataSet() + cgroups = set() + cdata = dict() for data in datalist: - self.ReceiveDataItem(client, data, cgroups, cprobedata) - self.cgroups[client.hostname] = cgroups - self.probedata[client.hostname] = cprobedata - - if (self.core.metadata_cache_mode in ['cautious', 'aggressive'] and - olddata != self.cgroups[client.hostname]): - self.core.metadata_cache.expire(client.hostname) - self.write_data(client) - ReceiveData.__doc__ = Bcfg2.Server.Plugin.Probing.ReceiveData.__doc__ - - def ReceiveDataItem(self, client, data, cgroups, cprobedata): - """Receive probe results pertaining to client.""" + groups, cdata[data.get("name")] = \ + self.ReceiveDataItem(client, data) + cgroups.update(groups) + self.probestore.set_groups(client.hostname, list(cgroups)) + self.probestore.set_data(client.hostname, cdata) + self.probestore.commit() + + def ReceiveDataItem(self, client, data): + """ Receive probe results pertaining to client. Returns a + tuple of (<probe groups>, <probe data>). """ if data.text is None: self.logger.info("Got null response to probe %s from %s" % (data.get('name'), client.hostname)) - cprobedata[data.get('name')] = ProbeData('') - return + return [], '' dlines = data.text.split('\n') self.logger.debug("Processing probe from %s: %s:%s" % (client.hostname, data.get('name'), [line.strip() for line in dlines])) + groups = [] for line in dlines[:]: - if line.split(':')[0] == 'group': - newgroup = line.split(':')[1].strip() - if newgroup not in cgroups: - cgroups.append(newgroup) + match = self.groupline_re.match(line) + if match: + groups.append(match.group("groupname")) dlines.remove(line) - dobj = ProbeData("\n".join(dlines)) - cprobedata[data.get('name')] = dobj - - def get_additional_groups(self, meta): - return self.cgroups.get(meta.hostname, list()) - get_additional_groups.__doc__ = \ - Bcfg2.Server.Plugin.Connector.get_additional_groups.__doc__ - - def get_additional_data(self, meta): - return self.probedata.get(meta.hostname, ClientProbeDataSet()) - get_additional_data.__doc__ = \ - Bcfg2.Server.Plugin.Connector.get_additional_data.__doc__ + return (groups, ProbeData("\n".join(dlines))) + + def get_additional_groups(self, metadata): + return self.probestore.get_groups(metadata.hostname) + + def get_additional_data(self, metadata): + return self.probestore.get_data(metadata.hostname) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index b0e6e9142..bbaa0f403 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -1,7 +1,7 @@ import os import sys -import copy -import time +import shutil +import tempfile import lxml.etree import Bcfg2.version import Bcfg2.Server @@ -36,47 +36,6 @@ test_data = dict(a=1, b=[1, 2, 3], c="test", d=dict(a=1, b=dict(a=1), c=(1, "2", 3))) -class FakeElement(lxml.etree._Element): - getroottree = Mock() - - def __init__(self, el): - self._element = el - - def __getattribute__(self, attr): - el = lxml.etree._Element.__getattribute__(self, - '__dict__')['_element'] - if attr == "getroottree": - return FakeElement.getroottree - elif attr == "_element": - return el - else: - return getattr(el, attr) - - -class StoringElement(object): - OriginalElement = copy.copy(lxml.etree.Element) - - def __init__(self): - self.element = None - self.return_value = None - - def __call__(self, *args, **kwargs): - self.element = self.OriginalElement(*args, **kwargs) - self.return_value = FakeElement(self.element) - return self.return_value - - -class StoringSubElement(object): - OriginalSubElement = copy.copy(lxml.etree.SubElement) - - def __call__(self, parent, tag, **kwargs): - try: - return self.OriginalSubElement(parent._element, tag, - **kwargs) - except AttributeError: - return self.OriginalSubElement(parent, tag, **kwargs) - - class FakeList(list): pass @@ -87,18 +46,6 @@ class TestProbesDB(DBModelTestCase): ProbesDataModel] -class TestClientProbeDataSet(Bcfg2TestCase): - def test__init(self): - ds = ClientProbeDataSet() - self.assertLessEqual(ds.timestamp, time.time()) - self.assertIsInstance(ds, dict) - self.assertNotIn("timestamp", ds) - - ds = ClientProbeDataSet(timestamp=123) - self.assertEqual(ds.timestamp, 123) - self.assertNotIn("timestamp", ds) - - class TestProbeData(Bcfg2TestCase): def test_str(self): # a value that is not valid XML, JSON, or YAML @@ -253,377 +200,161 @@ group-specific""" assert False, "Strange probe found in get_probe_data() return" -class TestProbes(TestProbing, TestConnector, TestDatabaseBacked): +class TestProbes(Bcfg2TestCase): test_obj = Probes - def get_obj(self, core=None, load_data=None): - core = MagicMock() - if load_data is None: - load_data = MagicMock() - - @patch("%s.%s.load_data" % (self.test_obj.__module__, - self.test_obj.__name__), new=load_data) - def inner(): - return TestDatabaseBacked.get_obj(self, core=core) - - return inner() - - def get_test_probedata(self): - test_xdata = lxml.etree.Element("test") - lxml.etree.SubElement(test_xdata, "test", foo="foo") - rv = dict() - rv["foo.example.com"] = ClientProbeDataSet( - timestamp=time.time()) - rv["foo.example.com"]["xml"] = \ - ProbeData(lxml.etree.tostring( - test_xdata, - xml_declaration=False).decode('UTF-8')) - rv["foo.example.com"]["text"] = \ - ProbeData("freeform text") - rv["foo.example.com"]["multiline"] = \ - ProbeData("""multiple + test_xdata = lxml.etree.Element("test") + lxml.etree.SubElement(test_xdata, "test", foo="foo") + test_xdoc = lxml.etree.tostring(test_xdata, + xml_declaration=False).decode('UTF-8') + + data = dict() + data['xml'] = "group:group\n" + test_xdoc + data['text'] = "freeform text" + data['multiline'] = """multiple lines of freeform text -""") - rv["bar.example.com"] = ClientProbeDataSet( - timestamp=time.time()) - rv["bar.example.com"]["empty"] = \ - ProbeData("") - if HAS_JSON: - rv["bar.example.com"]["json"] = \ - ProbeData(json.dumps(test_data)) - if HAS_YAML: - rv["bar.example.com"]["yaml"] = \ - ProbeData(yaml.dump(test_data)) - return rv - - def get_test_cgroups(self): - return {"foo.example.com": ["group", "group with spaces", - "group-with-dashes"], - "bar.example.com": []} +group:group-with-dashes +group: group:with:colons +""" + data['empty'] = '' + data['almost_empty'] = 'group: other_group' + if HAS_JSON: + data['json'] = json.dumps(test_data) + if HAS_YAML: + data['yaml'] = yaml.dump(test_data) + + def setUp(self): + Bcfg2TestCase.setUp(self) + set_setup_default("probes_db") + self.datastore = None + Bcfg2.Server.Cache.expire("Probes") + + def tearDown(self): + if self.datastore is not None: + shutil.rmtree(self.datastore) + self.datastore = None + + def get_obj(self, core=None): + if core is None: + core = Mock() + if Bcfg2.Options.setup.probes_db: + @patch("os.makedirs", Mock()) + def inner(): + return self.test_obj(core, datastore) + return inner() + else: + # actually use a real datastore so we can read and write + # probed.xml + if self.datastore is None: + self.datastore = tempfile.mkdtemp() + return self.test_obj(core, self.datastore) + + def test_GetProbes(self): + p = self.get_obj() + p.probes = Mock() + metadata = Mock() + p.GetProbes(metadata) + p.probes.get_probe_data.assert_called_with(metadata) - def test__init(self): - mock_load_data = Mock() - probes = self.get_obj(load_data=mock_load_data) - mock_load_data.assert_any_call() - self.assertEqual(probes.probedata, - ClientProbeDataSet()) - self.assertEqual(probes.cgroups, dict()) + def additionalDataEqual(self, actual, expected): + self.assertItemsEqual( + dict([(k, str(d)) for k, d in actual.items()]), + expected) - def test_write_data_xml(self): - Bcfg2.Options.setup.probes_db = False - probes = self.get_obj() - probes._write_data_db = Mock() - probes._write_data_xml = Mock() - probes.write_data("test") - probes._write_data_xml.assert_called_with("test") - self.assertFalse(probes._write_data_db.called) - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - def test_write_data_db(self): - Bcfg2.Options.setup.probes_db = True - probes = self.get_obj() - probes._write_data_db = Mock() - probes._write_data_xml = Mock() - probes.write_data("test") - probes._write_data_db.assert_called_with("test") - self.assertFalse(probes._write_data_xml.called) - - def test__write_data_xml(self): - Bcfg2.Options.setup.probes_db = False - probes = self.get_obj() - probes.probedata = self.get_test_probedata() - probes.cgroups = self.get_test_cgroups() - - @patch("lxml.etree.Element") - @patch("lxml.etree.SubElement", StoringSubElement()) - def inner(mock_Element): - mock_Element.side_effect = StoringElement() - probes._write_data_xml(None) - - top = mock_Element.side_effect.return_value - write = top.getroottree.return_value.write - self.assertEqual(write.call_args[0][0], - os.path.join(datastore, probes.name, - "probed.xml")) - - data = top._element - foodata = data.find("Client[@name='foo.example.com']") - self.assertIsNotNone(foodata) - self.assertIsNotNone(foodata.get("timestamp")) - self.assertEqual(len(foodata.findall("Probe")), - len(probes.probedata['foo.example.com'])) - self.assertEqual(len(foodata.findall("Group")), - len(probes.cgroups['foo.example.com'])) - xml = foodata.find("Probe[@name='xml']") - self.assertIsNotNone(xml) - self.assertIsNotNone(xml.get("value")) - xdata = lxml.etree.XML(xml.get("value")) - self.assertIsNotNone(xdata) - self.assertIsNotNone(xdata.find("test")) - self.assertEqual(xdata.find("test").get("foo"), "foo") - text = foodata.find("Probe[@name='text']") - self.assertIsNotNone(text) - self.assertIsNotNone(text.get("value")) - multiline = foodata.find("Probe[@name='multiline']") - self.assertIsNotNone(multiline) - self.assertIsNotNone(multiline.get("value")) - self.assertGreater(len(multiline.get("value").splitlines()), 1) - - bardata = data.find("Client[@name='bar.example.com']") - self.assertIsNotNone(bardata) - self.assertIsNotNone(bardata.get("timestamp")) - self.assertEqual(len(bardata.findall("Probe")), - len(probes.probedata['bar.example.com'])) - self.assertEqual(len(bardata.findall("Group")), - len(probes.cgroups['bar.example.com'])) - empty = bardata.find("Probe[@name='empty']") - self.assertIsNotNone(empty) - self.assertIsNotNone(empty.get("value")) - self.assertEqual(empty.get("value"), "") - if HAS_JSON: - jdata = bardata.find("Probe[@name='json']") - self.assertIsNotNone(jdata) - self.assertIsNotNone(jdata.get("value")) - self.assertItemsEqual(test_data, - json.loads(jdata.get("value"))) - if HAS_YAML: - ydata = bardata.find("Probe[@name='yaml']") - self.assertIsNotNone(ydata) - self.assertIsNotNone(ydata.get("value")) - self.assertItemsEqual(test_data, - yaml.load(ydata.get("value"))) - - inner() - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - def test__write_data_db(self): - syncdb(TestProbesDB) - Bcfg2.Options.setup.probes_db = True - probes = self.get_obj() - probes.probedata = self.get_test_probedata() - probes.cgroups = self.get_test_cgroups() - - for cname in ["foo.example.com", "bar.example.com"]: - client = Mock() - client.hostname = cname - probes._write_data_db(client) - - pdata = ProbesDataModel.objects.filter(hostname=cname).all() - self.assertEqual(len(pdata), len(probes.probedata[cname])) - - for probe in pdata: - self.assertEqual(probe.hostname, client.hostname) - self.assertIsNotNone(probe.data) - if probe.probe == "xml": - xdata = lxml.etree.XML(probe.data) - self.assertIsNotNone(xdata) - self.assertIsNotNone(xdata.find("test")) - self.assertEqual(xdata.find("test").get("foo"), "foo") - elif probe.probe == "text": - pass - elif probe.probe == "multiline": - self.assertGreater(len(probe.data.splitlines()), 1) - elif probe.probe == "empty": - self.assertEqual(probe.data, "") - elif probe.probe == "yaml": - self.assertItemsEqual(test_data, yaml.load(probe.data)) - elif probe.probe == "json": - self.assertItemsEqual(test_data, json.loads(probe.data)) - else: - assert False, "Strange probe found in _write_data_db data" - - pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() - self.assertEqual(len(pgroups), len(probes.cgroups[cname])) - - # test that old probe data is removed properly - cname = 'foo.example.com' - del probes.probedata[cname]['text'] - probes.cgroups[cname].pop() - client = Mock() - client.hostname = cname - probes._write_data_db(client) - - pdata = ProbesDataModel.objects.filter(hostname=cname).all() - self.assertEqual(len(pdata), len(probes.probedata[cname])) - pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all() - self.assertEqual(len(pgroups), len(probes.cgroups[cname])) - - def test_load_data_xml(self): + def test_probes_xml(self): + """ Set and retrieve probe data with database disabled """ Bcfg2.Options.setup.probes_db = False - probes = self.get_obj() - probes._load_data_db = Mock() - probes._load_data_xml = Mock() - probes.load_data() - probes._load_data_xml.assert_any_call() - self.assertFalse(probes._load_data_db.called) - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - def test_load_data_db(self): - Bcfg2.Options.setup.probes_db = True - probes = self.get_obj() - probes._load_data_db = Mock() - probes._load_data_xml = Mock() - probes.load_data() - probes._load_data_db.assert_any_call(client=None) - self.assertFalse(probes._load_data_xml.called) - - @patch("lxml.etree.parse") - def test__load_data_xml(self, mock_parse): - Bcfg2.Options.setup.probes_db = False - probes = self.get_obj() - probes.probedata = self.get_test_probedata() - probes.cgroups = self.get_test_cgroups() - - # to get the value for lxml.etree.parse to parse, we call - # _write_data_xml, mock the lxml.etree._ElementTree.write() - # call, and grab the data that gets "written" to probed.xml - @patch("lxml.etree.Element") - @patch("lxml.etree.SubElement", StoringSubElement()) - def inner(mock_Element): - mock_Element.side_effect = StoringElement() - probes._write_data_xml(None) - top = mock_Element.side_effect.return_value - return top._element - - xdata = inner() - mock_parse.return_value = xdata.getroottree() - probes.probedata = dict() - probes.cgroups = dict() - - probes._load_data_xml() - mock_parse.assert_called_with(os.path.join(datastore, probes.name, - 'probed.xml'), - parser=Bcfg2.Server.XMLParser) - self.assertItemsEqual(probes.probedata, self.get_test_probedata()) - self.assertItemsEqual(probes.cgroups, self.get_test_cgroups()) - - @skipUnless(HAS_DJANGO, "Django not found, skipping") - def test__load_data_db(self): - syncdb(TestProbesDB) - Bcfg2.Options.setup.probes_db = True - probes = self.get_obj() - probes.probedata = self.get_test_probedata() - probes.cgroups = self.get_test_cgroups() - for cname in probes.probedata.keys(): - client = Mock() - client.hostname = cname - probes._write_data_db(client) - - probes.probedata = dict() - probes.cgroups = dict() - probes._load_data_db() - self.assertItemsEqual(probes.probedata, self.get_test_probedata()) - # the db backend does not store groups at all if a client has - # no groups set, so we can't just use assertItemsEqual here, - # because loading saved data may _not_ result in the original - # data if some clients had no groups set. - test_cgroups = self.get_test_cgroups() - for cname, groups in test_cgroups.items(): - if cname in probes.cgroups: - self.assertEqual(groups, probes.cgroups[cname]) - else: - self.assertEqual(groups, []) + self._perform_tests() - @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data") - def test_GetProbes(self, mock_get_probe_data): - probes = self.get_obj() - metadata = Mock() - probes.GetProbes(metadata) - mock_get_probe_data.assert_called_with(metadata) - - def test_ReceiveData(self): - # we use a simple (read: bogus) datalist here to make this - # easy to test - datalist = ["a", "b", "c"] - - probes = self.get_obj() - probes.write_data = Mock() - probes.ReceiveDataItem = Mock() - probes.core.metadata_cache_mode = 'off' - client = Mock() - client.hostname = "foo.example.com" - probes.ReceiveData(client, datalist) - - cgroups = [] - cprobedata = ClientProbeDataSet() - self.assertItemsEqual(probes.ReceiveDataItem.call_args_list, - [call(client, "a", cgroups, cprobedata), - call(client, "b", cgroups, cprobedata), - call(client, "c", cgroups, cprobedata)]) - probes.write_data.assert_called_with(client) - self.assertFalse(probes.core.metadata_cache.expire.called) - - # change the datalist, ensure that the cache is cleared - probes.cgroups[client.hostname] = datalist - probes.core.metadata_cache_mode = 'aggressive' - probes.ReceiveData(client, ['a', 'b', 'd']) - - probes.write_data.assert_called_with(client) - probes.core.metadata_cache.expire.assert_called_with(client.hostname) - - def test_ReceiveDataItem(self): - probes = self.get_obj() - for cname, cdata in self.get_test_probedata().items(): - client = Mock() - client.hostname = cname - cgroups = [] - cprobedata = ClientProbeDataSet() - for pname, pdata in cdata.items(): - dataitem = lxml.etree.Element("Probe", name=pname) - if pname == "text": - # add some groups to the plaintext test to test - # group parsing - data = [pdata] - for group in self.get_test_cgroups()[cname]: - data.append("group:%s" % group) - dataitem.text = "\n".join(data) - else: - dataitem.text = str(pdata) - - probes.ReceiveDataItem(client, dataitem, cgroups, cprobedata) - - probes.cgroups[client.hostname] = cgroups - probes.probedata[client.hostname] = cprobedata - self.assertIn(client.hostname, probes.probedata) - self.assertIn(pname, probes.probedata[cname]) - self.assertEqual(pdata, probes.probedata[cname][pname]) - self.assertIn(client.hostname, probes.cgroups) - self.assertEqual(probes.cgroups[cname], - self.get_test_cgroups()[cname]) - - def test_get_additional_groups(self): - TestConnector.test_get_additional_groups(self) - - probes = self.get_obj() - test_cgroups = self.get_test_cgroups() - probes.cgroups = self.get_test_cgroups() - for cname in test_cgroups.keys(): - metadata = Mock() - metadata.hostname = cname - self.assertEqual(test_cgroups[cname], - probes.get_additional_groups(metadata)) - # test a non-existent client - metadata = Mock() - metadata.hostname = "nonexistent" - self.assertEqual(probes.get_additional_groups(metadata), - list()) - - def test_get_additional_data(self): - TestConnector.test_get_additional_data(self) - - probes = self.get_obj() - test_probedata = self.get_test_probedata() - probes.probedata = self.get_test_probedata() - for cname in test_probedata.keys(): - metadata = Mock() - metadata.hostname = cname - self.assertEqual(test_probedata[cname], - probes.get_additional_data(metadata)) - # test a non-existent client - metadata = Mock() - metadata.hostname = "nonexistent" - self.assertEqual(probes.get_additional_data(metadata), - ClientProbeDataSet()) + def test_probes_db(self): + """ Set and retrieve probe data with database enabled """ + Bcfg2.Options.setup.probes_db = True + self._perform_tests() + + def _perform_tests(self): + p = self.get_obj() + + # first, sanity checks + foo_md = Mock(hostname="foo.example.com") + bar_md = Mock(hostname="bar.example.com") + self.assertItemsEqual(p.get_additional_groups(foo_md), []) + self.assertItemsEqual(p.get_additional_data(foo_md), dict()) + self.assertItemsEqual(p.get_additional_groups(bar_md), []) + self.assertItemsEqual(p.get_additional_data(bar_md), dict()) + + # next, set some initial probe data + foo_datalist = [] + for key in ['xml', 'text', 'multiline']: + pdata = lxml.etree.Element("Probe", name=key) + pdata.text = self.data[key] + foo_datalist.append(pdata) + foo_addl_data = dict(xml=self.test_xdoc, + text="freeform text", + multiline="""multiple +lines +of +freeform +text""") + bar_datalist = [] + for key in ['empty', 'almost_empty', 'json', 'yaml']: + if key in self.data: + pdata = lxml.etree.Element("Probe", name=key) + pdata.text = self.data[key] + bar_datalist.append(pdata) + bar_addl_data = dict(empty="", almost_empty="") + if HAS_JSON: + bar_addl_data['json'] = self.data['json'] + if HAS_YAML: + bar_addl_data['yaml'] = self.data['yaml'] + + p.ReceiveData(foo_md, foo_datalist) + self.assertItemsEqual(p.get_additional_groups(foo_md), + ["group", "group-with-dashes", + "group:with:colons"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + + p.ReceiveData(bar_md, bar_datalist) + self.assertItemsEqual(p.get_additional_groups(foo_md), + ["group", "group-with-dashes", + "group:with:colons"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group']) + self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data) + + # instantiate a new Probes object and clear Probes caches to + # imitate a server restart + p = self.get_obj() + Bcfg2.Server.Cache.expire("Probes") + + self.assertItemsEqual(p.get_additional_groups(foo_md), + ["group", "group-with-dashes", + "group:with:colons"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group']) + self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data) + + # set new data (and groups) for foo + foo_datalist = [] + pdata = lxml.etree.Element("Probe", name='xml') + pdata.text = self.data['xml'] + foo_datalist.append(pdata) + foo_addl_data = dict(xml=self.test_xdoc) + + p.ReceiveData(foo_md, foo_datalist) + self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group']) + self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data) + + # instantiate a new Probes object and clear Probes caches to + # imitate a server restart + p = self.get_obj() + Bcfg2.Server.Cache.expire("Probes") + + self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"]) + self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data) + self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group']) + self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data) |