summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Probes.py409
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py569
2 files changed, 386 insertions, 592 deletions
diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py
index f75d88d8f..ef339a34b 100644
--- a/src/lib/Bcfg2/Server/Plugins/Probes.py
+++ b/src/lib/Bcfg2/Server/Plugins/Probes.py
@@ -8,8 +8,10 @@ import copy
import operator
import lxml.etree
import Bcfg2.Server
+import Bcfg2.Server.Cache
import Bcfg2.Server.Plugin
import Bcfg2.Server.FileMonitor
+from Bcfg2.Logger import Debuggable
from Bcfg2.Server.Statistics import track_statistics
HAS_DJANGO = False
@@ -63,6 +65,210 @@ except ImportError:
HAS_YAML = False
+class ProbeStore(Debuggable):
+ """ Caching abstraction layer between persistent probe data
+ storage and the Probes plugin."""
+
+ def __init__(self, core, datastore): # pylint: disable=W0613
+ Debuggable.__init__(self)
+ self._groupcache = Bcfg2.Server.Cache.Cache("Probes", "probegroups")
+ self._datacache = Bcfg2.Server.Cache.Cache("Probes", "probedata")
+
+ def get_groups(self, hostname):
+ """ Get the list of groups for the given host """
+ if hostname not in self._groupcache:
+ self._load_groups(hostname)
+ return self._groupcache.get(hostname, [])
+
+ def set_groups(self, hostname, groups):
+ """ Set the list of groups for the given host """
+ raise NotImplementedError
+
+ def get_data(self, hostname):
+ """ Get a dict of probe data for the given host """
+ if hostname not in self._datacache:
+ self._load_data(hostname)
+ return self._datacache.get(hostname, dict())
+
+ def set_data(self, hostname, data):
+ """ Set probe data for the given host """
+ raise NotImplementedError
+
+ def _load_groups(self, hostname):
+ """ When probe groups are not found in the cache, this
+ function is called to load them from the backend (XML or
+ database). """
+ raise NotImplementedError
+
+ def _load_data(self, hostname):
+ """ When probe groups are not found in the cache, this
+ function is called to load them from the backend (XML or
+ database). """
+ raise NotImplementedError
+
+ def commit(self):
+ """ Commit the current data in the cache to the persistent
+ backend store. This is not used with the
+ :class:`Bcfg2.Server.Plugins.Probes.DBProbeStore`, because it
+ commits on every change. """
+ pass
+
+
+class DBProbeStore(ProbeStore, Bcfg2.Server.Plugin.DatabaseBacked):
+ """ Caching abstraction layer between the database and the Probes
+ plugin. """
+ create = False
+
+ def __init__(self, core, datastore):
+ Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
+ ProbeStore.__init__(self, core, datastore)
+
+ def _load_groups(self, hostname):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ groupdata = ProbesGroupsModel.objects.filter(hostname=hostname)
+ self._groupcache[hostname] = list(set(r.group for r in groupdata))
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock
+ def set_groups(self, hostname, groups):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ olddata = self._groupcache.get(hostname, [])
+ self._groupcache[hostname] = groups
+ for group in groups:
+ try:
+ ProbesGroupsModel.objects.get_or_create(
+ hostname=hostname,
+ group=group)
+ except ProbesGroupsModel.MultipleObjectsReturned:
+ ProbesGroupsModel.objects.filter(hostname=hostname,
+ group=group).delete()
+ ProbesGroupsModel.objects.get_or_create(
+ hostname=hostname,
+ group=group)
+ ProbesGroupsModel.objects.filter(
+ hostname=hostname).exclude(group__in=groups).delete()
+ if olddata != groups:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ def _load_data(self, hostname):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ Bcfg2.Server.Cache.expire("Probes", "probedata", hostname)
+ self._datacache[hostname] = ClientProbeDataSet()
+ ts_set = False
+ for pdata in ProbesDataModel.objects.filter(hostname=hostname):
+ if not ts_set:
+ self._datacache[hostname].timestamp = \
+ time.mktime(pdata.timestamp.timetuple())
+ ts_set = True
+ self._datacache[hostname][pdata.probe] = ProbeData(pdata.data)
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock
+ def set_data(self, hostname, data):
+ Bcfg2.Server.Cache.expire("Probes", "probedata", hostname)
+ self._datacache[hostname] = ClientProbeDataSet()
+ expire_metadata = False
+ for probe, pdata in data.items():
+ self._datacache[hostname][probe] = pdata
+ record, created = ProbesDataModel.objects.get_or_create(
+ hostname=hostname,
+ probe=probe)
+ expire_metadata |= created
+ if record.data != pdata:
+ record.data = pdata
+ record.save()
+ expire_metadata = True
+ qset = ProbesDataModel.objects.filter(
+ hostname=hostname).exclude(probe__in=data.keys())
+ if len(qset):
+ qset.delete()
+ expire_metadata = True
+ if expire_metadata:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+
+class XMLProbeStore(ProbeStore):
+ """ Caching abstraction layer between ``probed.xml`` and the
+ Probes plugin."""
+ def __init__(self, core, datastore):
+ ProbeStore.__init__(self, core, datastore)
+ self._fname = os.path.join(datastore, 'probed.xml')
+ self._load_data()
+
+ def _load_data(self, _=None):
+ """ Load probe data from probed.xml """
+ Bcfg2.Server.Cache.expire("Probes", "probegroups")
+ Bcfg2.Server.Cache.expire("Probes", "probedata")
+ if not os.path.exists(self._fname):
+ self.commit()
+ try:
+ data = lxml.etree.parse(self._fname,
+ parser=Bcfg2.Server.XMLParser).getroot()
+ except (IOError, lxml.etree.XMLSyntaxError):
+ err = sys.exc_info()[1]
+ self.logger.error("Failed to read file probed.xml: %s" % err)
+ return
+ for client in data.getchildren():
+ self._datacache[client.get('name')] = \
+ ClientProbeDataSet(timestamp=client.get("timestamp"))
+ self._groupcache[client.get('name')] = []
+ for pdata in client:
+ if pdata.tag == 'Probe':
+ self._datacache[client.get('name')][pdata.get('name')] = \
+ ProbeData(pdata.get("value"))
+ elif pdata.tag == 'Group':
+ self._groupcache[client.get('name')].append(
+ pdata.get('name'))
+
+ Bcfg2.Server.Cache.expire("Metadata")
+
+ def _load_groups(self, hostname):
+ self._load_data(hostname)
+
+ def commit(self):
+ """ Write received probe data to probed.xml """
+ top = lxml.etree.Element("Probed")
+ for client, probed in sorted(self._datacache.items()):
+ # make a copy of probe data for this client in case it
+ # submits probe data while we're trying to write
+ # probed.xml
+ probedata = copy.copy(probed)
+ ctag = \
+ lxml.etree.SubElement(top, 'Client', name=client,
+ timestamp=str(int(probedata.timestamp)))
+ for probe in sorted(probedata):
+ lxml.etree.SubElement(
+ ctag, 'Probe', name=probe,
+ value=self._datacache[client][probe])
+ for group in sorted(self._groupcache[client]):
+ lxml.etree.SubElement(ctag, "Group", name=group)
+ try:
+ top.getroottree().write(self._fname,
+ xml_declaration=False,
+ pretty_print='true')
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Failed to write probed.xml: %s" % err)
+
+ def set_groups(self, hostname, groups):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ olddata = self._groupcache.get(hostname, [])
+ self._groupcache[hostname] = groups
+ if olddata != groups:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ def set_data(self, hostname, data):
+ Bcfg2.Server.Cache.expire("Probes", "probedata", hostname)
+ self._datacache[hostname] = ClientProbeDataSet()
+ expire_metadata = False
+ for probe, pdata in data.items():
+ olddata = self._datacache[hostname].get(probe, ProbeData(''))
+ self._datacache[hostname][probe] = pdata
+ expire_metadata |= olddata != data
+ if expire_metadata:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+
class ClientProbeDataSet(dict):
""" dict of probe => [probe data] that records a timestamp for
each host """
@@ -195,12 +401,13 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet):
class Probes(Bcfg2.Server.Plugin.Probing,
- Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.Connector,
Bcfg2.Server.Plugin.DatabaseBacked):
""" A plugin to gather information from a client machine """
__author__ = 'bcfg-dev@mcs.anl.gov'
+ groupline_re = re.compile(r'^group:\s*(?P<groupname>\S+)\s*')
+
options = [
Bcfg2.Options.BooleanOption(
cf=('probes', 'use_database'), dest="probes_db",
@@ -209,7 +416,6 @@ class Probes(Bcfg2.Server.Plugin.Probing,
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Probing.__init__(self)
- Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.Connector.__init__(self)
Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
@@ -219,191 +425,48 @@ class Probes(Bcfg2.Server.Plugin.Probing,
err = sys.exc_info()[1]
raise Bcfg2.Server.Plugin.PluginInitError(err)
- self.probedata = dict()
- self.cgroups = dict()
- self.load_data()
- __init__.__doc__ = Bcfg2.Server.Plugin.DatabaseBacked.__init__.__doc__
-
- @track_statistics()
- def write_data(self, client):
- """ Write probe data out for use with bcfg2-info """
if self._use_db:
- return self._write_data_db(client)
+ self.probestore = DBProbeStore(core, datastore)
else:
- return self._write_data_xml(client)
-
- def _write_data_xml(self, _):
- """ Write received probe data to probed.xml """
- top = lxml.etree.Element("Probed")
- for client, probed in sorted(self.probedata.items()):
- # make a copy of probe data for this client in case it
- # submits probe data while we're trying to write
- # probed.xml
- probedata = copy.copy(probed)
- ctag = \
- lxml.etree.SubElement(top, 'Client', name=client,
- timestamp=str(int(probedata.timestamp)))
- for probe in sorted(probedata):
- lxml.etree.SubElement(
- ctag, 'Probe', name=probe,
- value=self.probedata[client][probe])
- for group in sorted(self.cgroups[client]):
- lxml.etree.SubElement(ctag, "Group", name=group)
- try:
- top.getroottree().write(os.path.join(self.data, 'probed.xml'),
- xml_declaration=False,
- pretty_print='true')
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Failed to write probed.xml: %s" % err)
-
- @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock
- def _write_data_db(self, client):
- """ Write received probe data to the database """
- for probe, data in self.probedata[client.hostname].items():
- pdata = \
- ProbesDataModel.objects.get_or_create(hostname=client.hostname,
- probe=probe)[0]
- if pdata.data != data:
- pdata.data = data
- pdata.save()
-
- ProbesDataModel.objects.filter(
- hostname=client.hostname).exclude(
- probe__in=self.probedata[client.hostname]).delete()
-
- for group in self.cgroups[client.hostname]:
- try:
- ProbesGroupsModel.objects.get_or_create(
- hostname=client.hostname,
- group=group)
- except ProbesGroupsModel.MultipleObjectsReturned:
- ProbesGroupsModel.objects.filter(hostname=client.hostname,
- group=group).delete()
- ProbesGroupsModel.objects.get_or_create(
- hostname=client.hostname,
- group=group)
- ProbesGroupsModel.objects.filter(
- hostname=client.hostname).exclude(
- group__in=self.cgroups[client.hostname]).delete()
-
- def expire_cache(self, key=None):
- self.load_data(client=key)
-
- def load_data(self, client=None):
- """ Load probe data from the appropriate backend (probed.xml
- or the database) """
- if self._use_db:
- return self._load_data_db(client=client)
- else:
- # the XML backend doesn't support loading data for single
- # clients, so it reloads all data
- return self._load_data_xml()
-
- def _load_data_xml(self):
- """ Load probe data from probed.xml """
- try:
- data = lxml.etree.parse(os.path.join(self.data, 'probed.xml'),
- parser=Bcfg2.Server.XMLParser).getroot()
- except (IOError, lxml.etree.XMLSyntaxError):
- err = sys.exc_info()[1]
- self.logger.error("Failed to read file probed.xml: %s" % err)
- return
- self.probedata = {}
- self.cgroups = {}
- for client in data.getchildren():
- self.probedata[client.get('name')] = \
- ClientProbeDataSet(timestamp=client.get("timestamp"))
- self.cgroups[client.get('name')] = []
- for pdata in client:
- if pdata.tag == 'Probe':
- self.probedata[client.get('name')][pdata.get('name')] = \
- ProbeData(pdata.get("value"))
- elif pdata.tag == 'Group':
- self.cgroups[client.get('name')].append(pdata.get('name'))
-
- if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
- self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
-
- def _load_data_db(self, client=None):
- """ Load probe data from the database """
- if client is None:
- self.probedata = {}
- self.cgroups = {}
- probedata = ProbesDataModel.objects.all()
- groupdata = ProbesGroupsModel.objects.all()
- else:
- self.probedata.pop(client, None)
- self.cgroups.pop(client, None)
- probedata = ProbesDataModel.objects.filter(hostname=client)
- groupdata = ProbesGroupsModel.objects.filter(hostname=client)
-
- for pdata in probedata:
- if pdata.hostname not in self.probedata:
- self.probedata[pdata.hostname] = ClientProbeDataSet(
- timestamp=time.mktime(pdata.timestamp.timetuple()))
- self.probedata[pdata.hostname][pdata.probe] = ProbeData(pdata.data)
- for pgroup in groupdata:
- if pgroup.hostname not in self.cgroups:
- self.cgroups[pgroup.hostname] = []
- self.cgroups[pgroup.hostname].append(pgroup.group)
-
- if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
- self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata,
- key=client)
+ self.probestore = XMLProbeStore(core, datastore)
@track_statistics()
- def GetProbes(self, meta):
- return self.probes.get_probe_data(meta)
- GetProbes.__doc__ = Bcfg2.Server.Plugin.Probing.GetProbes.__doc__
+ def GetProbes(self, metadata):
+ return self.probes.get_probe_data(metadata)
@track_statistics()
def ReceiveData(self, client, datalist):
- if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
- if client.hostname in self.cgroups:
- olddata = copy.copy(self.cgroups[client.hostname])
- else:
- olddata = []
-
- cgroups = []
- cprobedata = ClientProbeDataSet()
+ cgroups = set()
+ cdata = dict()
for data in datalist:
- self.ReceiveDataItem(client, data, cgroups, cprobedata)
- self.cgroups[client.hostname] = cgroups
- self.probedata[client.hostname] = cprobedata
-
- if (self.core.metadata_cache_mode in ['cautious', 'aggressive'] and
- olddata != self.cgroups[client.hostname]):
- self.core.metadata_cache.expire(client.hostname)
- self.write_data(client)
- ReceiveData.__doc__ = Bcfg2.Server.Plugin.Probing.ReceiveData.__doc__
-
- def ReceiveDataItem(self, client, data, cgroups, cprobedata):
- """Receive probe results pertaining to client."""
+ groups, cdata[data.get("name")] = \
+ self.ReceiveDataItem(client, data)
+ cgroups.update(groups)
+ self.probestore.set_groups(client.hostname, list(cgroups))
+ self.probestore.set_data(client.hostname, cdata)
+ self.probestore.commit()
+
+ def ReceiveDataItem(self, client, data):
+ """ Receive probe results pertaining to client. Returns a
+ tuple of (<probe groups>, <probe data>). """
if data.text is None:
self.logger.info("Got null response to probe %s from %s" %
(data.get('name'), client.hostname))
- cprobedata[data.get('name')] = ProbeData('')
- return
+ return [], ''
dlines = data.text.split('\n')
self.logger.debug("Processing probe from %s: %s:%s" %
(client.hostname, data.get('name'),
[line.strip() for line in dlines]))
+ groups = []
for line in dlines[:]:
- if line.split(':')[0] == 'group':
- newgroup = line.split(':')[1].strip()
- if newgroup not in cgroups:
- cgroups.append(newgroup)
+ match = self.groupline_re.match(line)
+ if match:
+ groups.append(match.group("groupname"))
dlines.remove(line)
- dobj = ProbeData("\n".join(dlines))
- cprobedata[data.get('name')] = dobj
-
- def get_additional_groups(self, meta):
- return self.cgroups.get(meta.hostname, list())
- get_additional_groups.__doc__ = \
- Bcfg2.Server.Plugin.Connector.get_additional_groups.__doc__
-
- def get_additional_data(self, meta):
- return self.probedata.get(meta.hostname, ClientProbeDataSet())
- get_additional_data.__doc__ = \
- Bcfg2.Server.Plugin.Connector.get_additional_data.__doc__
+ return (groups, ProbeData("\n".join(dlines)))
+
+ def get_additional_groups(self, metadata):
+ return self.probestore.get_groups(metadata.hostname)
+
+ def get_additional_data(self, metadata):
+ return self.probestore.get_data(metadata.hostname)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
index b0e6e9142..bbaa0f403 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
@@ -1,7 +1,7 @@
import os
import sys
-import copy
-import time
+import shutil
+import tempfile
import lxml.etree
import Bcfg2.version
import Bcfg2.Server
@@ -36,47 +36,6 @@ test_data = dict(a=1, b=[1, 2, 3], c="test",
d=dict(a=1, b=dict(a=1), c=(1, "2", 3)))
-class FakeElement(lxml.etree._Element):
- getroottree = Mock()
-
- def __init__(self, el):
- self._element = el
-
- def __getattribute__(self, attr):
- el = lxml.etree._Element.__getattribute__(self,
- '__dict__')['_element']
- if attr == "getroottree":
- return FakeElement.getroottree
- elif attr == "_element":
- return el
- else:
- return getattr(el, attr)
-
-
-class StoringElement(object):
- OriginalElement = copy.copy(lxml.etree.Element)
-
- def __init__(self):
- self.element = None
- self.return_value = None
-
- def __call__(self, *args, **kwargs):
- self.element = self.OriginalElement(*args, **kwargs)
- self.return_value = FakeElement(self.element)
- return self.return_value
-
-
-class StoringSubElement(object):
- OriginalSubElement = copy.copy(lxml.etree.SubElement)
-
- def __call__(self, parent, tag, **kwargs):
- try:
- return self.OriginalSubElement(parent._element, tag,
- **kwargs)
- except AttributeError:
- return self.OriginalSubElement(parent, tag, **kwargs)
-
-
class FakeList(list):
pass
@@ -87,18 +46,6 @@ class TestProbesDB(DBModelTestCase):
ProbesDataModel]
-class TestClientProbeDataSet(Bcfg2TestCase):
- def test__init(self):
- ds = ClientProbeDataSet()
- self.assertLessEqual(ds.timestamp, time.time())
- self.assertIsInstance(ds, dict)
- self.assertNotIn("timestamp", ds)
-
- ds = ClientProbeDataSet(timestamp=123)
- self.assertEqual(ds.timestamp, 123)
- self.assertNotIn("timestamp", ds)
-
-
class TestProbeData(Bcfg2TestCase):
def test_str(self):
# a value that is not valid XML, JSON, or YAML
@@ -253,377 +200,161 @@ group-specific"""
assert False, "Strange probe found in get_probe_data() return"
-class TestProbes(TestProbing, TestConnector, TestDatabaseBacked):
+class TestProbes(Bcfg2TestCase):
test_obj = Probes
- def get_obj(self, core=None, load_data=None):
- core = MagicMock()
- if load_data is None:
- load_data = MagicMock()
-
- @patch("%s.%s.load_data" % (self.test_obj.__module__,
- self.test_obj.__name__), new=load_data)
- def inner():
- return TestDatabaseBacked.get_obj(self, core=core)
-
- return inner()
-
- def get_test_probedata(self):
- test_xdata = lxml.etree.Element("test")
- lxml.etree.SubElement(test_xdata, "test", foo="foo")
- rv = dict()
- rv["foo.example.com"] = ClientProbeDataSet(
- timestamp=time.time())
- rv["foo.example.com"]["xml"] = \
- ProbeData(lxml.etree.tostring(
- test_xdata,
- xml_declaration=False).decode('UTF-8'))
- rv["foo.example.com"]["text"] = \
- ProbeData("freeform text")
- rv["foo.example.com"]["multiline"] = \
- ProbeData("""multiple
+ test_xdata = lxml.etree.Element("test")
+ lxml.etree.SubElement(test_xdata, "test", foo="foo")
+ test_xdoc = lxml.etree.tostring(test_xdata,
+ xml_declaration=False).decode('UTF-8')
+
+ data = dict()
+ data['xml'] = "group:group\n" + test_xdoc
+ data['text'] = "freeform text"
+ data['multiline'] = """multiple
lines
of
freeform
text
-""")
- rv["bar.example.com"] = ClientProbeDataSet(
- timestamp=time.time())
- rv["bar.example.com"]["empty"] = \
- ProbeData("")
- if HAS_JSON:
- rv["bar.example.com"]["json"] = \
- ProbeData(json.dumps(test_data))
- if HAS_YAML:
- rv["bar.example.com"]["yaml"] = \
- ProbeData(yaml.dump(test_data))
- return rv
-
- def get_test_cgroups(self):
- return {"foo.example.com": ["group", "group with spaces",
- "group-with-dashes"],
- "bar.example.com": []}
+group:group-with-dashes
+group: group:with:colons
+"""
+ data['empty'] = ''
+ data['almost_empty'] = 'group: other_group'
+ if HAS_JSON:
+ data['json'] = json.dumps(test_data)
+ if HAS_YAML:
+ data['yaml'] = yaml.dump(test_data)
+
+ def setUp(self):
+ Bcfg2TestCase.setUp(self)
+ set_setup_default("probes_db")
+ self.datastore = None
+ Bcfg2.Server.Cache.expire("Probes")
+
+ def tearDown(self):
+ if self.datastore is not None:
+ shutil.rmtree(self.datastore)
+ self.datastore = None
+
+ def get_obj(self, core=None):
+ if core is None:
+ core = Mock()
+ if Bcfg2.Options.setup.probes_db:
+ @patch("os.makedirs", Mock())
+ def inner():
+ return self.test_obj(core, datastore)
+ return inner()
+ else:
+ # actually use a real datastore so we can read and write
+ # probed.xml
+ if self.datastore is None:
+ self.datastore = tempfile.mkdtemp()
+ return self.test_obj(core, self.datastore)
+
+ def test_GetProbes(self):
+ p = self.get_obj()
+ p.probes = Mock()
+ metadata = Mock()
+ p.GetProbes(metadata)
+ p.probes.get_probe_data.assert_called_with(metadata)
- def test__init(self):
- mock_load_data = Mock()
- probes = self.get_obj(load_data=mock_load_data)
- mock_load_data.assert_any_call()
- self.assertEqual(probes.probedata,
- ClientProbeDataSet())
- self.assertEqual(probes.cgroups, dict())
+ def additionalDataEqual(self, actual, expected):
+ self.assertItemsEqual(
+ dict([(k, str(d)) for k, d in actual.items()]),
+ expected)
- def test_write_data_xml(self):
- Bcfg2.Options.setup.probes_db = False
- probes = self.get_obj()
- probes._write_data_db = Mock()
- probes._write_data_xml = Mock()
- probes.write_data("test")
- probes._write_data_xml.assert_called_with("test")
- self.assertFalse(probes._write_data_db.called)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test_write_data_db(self):
- Bcfg2.Options.setup.probes_db = True
- probes = self.get_obj()
- probes._write_data_db = Mock()
- probes._write_data_xml = Mock()
- probes.write_data("test")
- probes._write_data_db.assert_called_with("test")
- self.assertFalse(probes._write_data_xml.called)
-
- def test__write_data_xml(self):
- Bcfg2.Options.setup.probes_db = False
- probes = self.get_obj()
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- @patch("lxml.etree.Element")
- @patch("lxml.etree.SubElement", StoringSubElement())
- def inner(mock_Element):
- mock_Element.side_effect = StoringElement()
- probes._write_data_xml(None)
-
- top = mock_Element.side_effect.return_value
- write = top.getroottree.return_value.write
- self.assertEqual(write.call_args[0][0],
- os.path.join(datastore, probes.name,
- "probed.xml"))
-
- data = top._element
- foodata = data.find("Client[@name='foo.example.com']")
- self.assertIsNotNone(foodata)
- self.assertIsNotNone(foodata.get("timestamp"))
- self.assertEqual(len(foodata.findall("Probe")),
- len(probes.probedata['foo.example.com']))
- self.assertEqual(len(foodata.findall("Group")),
- len(probes.cgroups['foo.example.com']))
- xml = foodata.find("Probe[@name='xml']")
- self.assertIsNotNone(xml)
- self.assertIsNotNone(xml.get("value"))
- xdata = lxml.etree.XML(xml.get("value"))
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- text = foodata.find("Probe[@name='text']")
- self.assertIsNotNone(text)
- self.assertIsNotNone(text.get("value"))
- multiline = foodata.find("Probe[@name='multiline']")
- self.assertIsNotNone(multiline)
- self.assertIsNotNone(multiline.get("value"))
- self.assertGreater(len(multiline.get("value").splitlines()), 1)
-
- bardata = data.find("Client[@name='bar.example.com']")
- self.assertIsNotNone(bardata)
- self.assertIsNotNone(bardata.get("timestamp"))
- self.assertEqual(len(bardata.findall("Probe")),
- len(probes.probedata['bar.example.com']))
- self.assertEqual(len(bardata.findall("Group")),
- len(probes.cgroups['bar.example.com']))
- empty = bardata.find("Probe[@name='empty']")
- self.assertIsNotNone(empty)
- self.assertIsNotNone(empty.get("value"))
- self.assertEqual(empty.get("value"), "")
- if HAS_JSON:
- jdata = bardata.find("Probe[@name='json']")
- self.assertIsNotNone(jdata)
- self.assertIsNotNone(jdata.get("value"))
- self.assertItemsEqual(test_data,
- json.loads(jdata.get("value")))
- if HAS_YAML:
- ydata = bardata.find("Probe[@name='yaml']")
- self.assertIsNotNone(ydata)
- self.assertIsNotNone(ydata.get("value"))
- self.assertItemsEqual(test_data,
- yaml.load(ydata.get("value")))
-
- inner()
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test__write_data_db(self):
- syncdb(TestProbesDB)
- Bcfg2.Options.setup.probes_db = True
- probes = self.get_obj()
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- for cname in ["foo.example.com", "bar.example.com"]:
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
-
- for probe in pdata:
- self.assertEqual(probe.hostname, client.hostname)
- self.assertIsNotNone(probe.data)
- if probe.probe == "xml":
- xdata = lxml.etree.XML(probe.data)
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- elif probe.probe == "text":
- pass
- elif probe.probe == "multiline":
- self.assertGreater(len(probe.data.splitlines()), 1)
- elif probe.probe == "empty":
- self.assertEqual(probe.data, "")
- elif probe.probe == "yaml":
- self.assertItemsEqual(test_data, yaml.load(probe.data))
- elif probe.probe == "json":
- self.assertItemsEqual(test_data, json.loads(probe.data))
- else:
- assert False, "Strange probe found in _write_data_db data"
-
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- # test that old probe data is removed properly
- cname = 'foo.example.com'
- del probes.probedata[cname]['text']
- probes.cgroups[cname].pop()
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- def test_load_data_xml(self):
+ def test_probes_xml(self):
+ """ Set and retrieve probe data with database disabled """
Bcfg2.Options.setup.probes_db = False
- probes = self.get_obj()
- probes._load_data_db = Mock()
- probes._load_data_xml = Mock()
- probes.load_data()
- probes._load_data_xml.assert_any_call()
- self.assertFalse(probes._load_data_db.called)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test_load_data_db(self):
- Bcfg2.Options.setup.probes_db = True
- probes = self.get_obj()
- probes._load_data_db = Mock()
- probes._load_data_xml = Mock()
- probes.load_data()
- probes._load_data_db.assert_any_call(client=None)
- self.assertFalse(probes._load_data_xml.called)
-
- @patch("lxml.etree.parse")
- def test__load_data_xml(self, mock_parse):
- Bcfg2.Options.setup.probes_db = False
- probes = self.get_obj()
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- # to get the value for lxml.etree.parse to parse, we call
- # _write_data_xml, mock the lxml.etree._ElementTree.write()
- # call, and grab the data that gets "written" to probed.xml
- @patch("lxml.etree.Element")
- @patch("lxml.etree.SubElement", StoringSubElement())
- def inner(mock_Element):
- mock_Element.side_effect = StoringElement()
- probes._write_data_xml(None)
- top = mock_Element.side_effect.return_value
- return top._element
-
- xdata = inner()
- mock_parse.return_value = xdata.getroottree()
- probes.probedata = dict()
- probes.cgroups = dict()
-
- probes._load_data_xml()
- mock_parse.assert_called_with(os.path.join(datastore, probes.name,
- 'probed.xml'),
- parser=Bcfg2.Server.XMLParser)
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- self.assertItemsEqual(probes.cgroups, self.get_test_cgroups())
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test__load_data_db(self):
- syncdb(TestProbesDB)
- Bcfg2.Options.setup.probes_db = True
- probes = self.get_obj()
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
- for cname in probes.probedata.keys():
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- probes.probedata = dict()
- probes.cgroups = dict()
- probes._load_data_db()
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- # the db backend does not store groups at all if a client has
- # no groups set, so we can't just use assertItemsEqual here,
- # because loading saved data may _not_ result in the original
- # data if some clients had no groups set.
- test_cgroups = self.get_test_cgroups()
- for cname, groups in test_cgroups.items():
- if cname in probes.cgroups:
- self.assertEqual(groups, probes.cgroups[cname])
- else:
- self.assertEqual(groups, [])
+ self._perform_tests()
- @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data")
- def test_GetProbes(self, mock_get_probe_data):
- probes = self.get_obj()
- metadata = Mock()
- probes.GetProbes(metadata)
- mock_get_probe_data.assert_called_with(metadata)
-
- def test_ReceiveData(self):
- # we use a simple (read: bogus) datalist here to make this
- # easy to test
- datalist = ["a", "b", "c"]
-
- probes = self.get_obj()
- probes.write_data = Mock()
- probes.ReceiveDataItem = Mock()
- probes.core.metadata_cache_mode = 'off'
- client = Mock()
- client.hostname = "foo.example.com"
- probes.ReceiveData(client, datalist)
-
- cgroups = []
- cprobedata = ClientProbeDataSet()
- self.assertItemsEqual(probes.ReceiveDataItem.call_args_list,
- [call(client, "a", cgroups, cprobedata),
- call(client, "b", cgroups, cprobedata),
- call(client, "c", cgroups, cprobedata)])
- probes.write_data.assert_called_with(client)
- self.assertFalse(probes.core.metadata_cache.expire.called)
-
- # change the datalist, ensure that the cache is cleared
- probes.cgroups[client.hostname] = datalist
- probes.core.metadata_cache_mode = 'aggressive'
- probes.ReceiveData(client, ['a', 'b', 'd'])
-
- probes.write_data.assert_called_with(client)
- probes.core.metadata_cache.expire.assert_called_with(client.hostname)
-
- def test_ReceiveDataItem(self):
- probes = self.get_obj()
- for cname, cdata in self.get_test_probedata().items():
- client = Mock()
- client.hostname = cname
- cgroups = []
- cprobedata = ClientProbeDataSet()
- for pname, pdata in cdata.items():
- dataitem = lxml.etree.Element("Probe", name=pname)
- if pname == "text":
- # add some groups to the plaintext test to test
- # group parsing
- data = [pdata]
- for group in self.get_test_cgroups()[cname]:
- data.append("group:%s" % group)
- dataitem.text = "\n".join(data)
- else:
- dataitem.text = str(pdata)
-
- probes.ReceiveDataItem(client, dataitem, cgroups, cprobedata)
-
- probes.cgroups[client.hostname] = cgroups
- probes.probedata[client.hostname] = cprobedata
- self.assertIn(client.hostname, probes.probedata)
- self.assertIn(pname, probes.probedata[cname])
- self.assertEqual(pdata, probes.probedata[cname][pname])
- self.assertIn(client.hostname, probes.cgroups)
- self.assertEqual(probes.cgroups[cname],
- self.get_test_cgroups()[cname])
-
- def test_get_additional_groups(self):
- TestConnector.test_get_additional_groups(self)
-
- probes = self.get_obj()
- test_cgroups = self.get_test_cgroups()
- probes.cgroups = self.get_test_cgroups()
- for cname in test_cgroups.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_cgroups[cname],
- probes.get_additional_groups(metadata))
- # test a non-existent client
- metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_groups(metadata),
- list())
-
- def test_get_additional_data(self):
- TestConnector.test_get_additional_data(self)
-
- probes = self.get_obj()
- test_probedata = self.get_test_probedata()
- probes.probedata = self.get_test_probedata()
- for cname in test_probedata.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_probedata[cname],
- probes.get_additional_data(metadata))
- # test a non-existent client
- metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_data(metadata),
- ClientProbeDataSet())
+ def test_probes_db(self):
+ """ Set and retrieve probe data with database enabled """
+ Bcfg2.Options.setup.probes_db = True
+ self._perform_tests()
+
+ def _perform_tests(self):
+ p = self.get_obj()
+
+ # first, sanity checks
+ foo_md = Mock(hostname="foo.example.com")
+ bar_md = Mock(hostname="bar.example.com")
+ self.assertItemsEqual(p.get_additional_groups(foo_md), [])
+ self.assertItemsEqual(p.get_additional_data(foo_md), dict())
+ self.assertItemsEqual(p.get_additional_groups(bar_md), [])
+ self.assertItemsEqual(p.get_additional_data(bar_md), dict())
+
+ # next, set some initial probe data
+ foo_datalist = []
+ for key in ['xml', 'text', 'multiline']:
+ pdata = lxml.etree.Element("Probe", name=key)
+ pdata.text = self.data[key]
+ foo_datalist.append(pdata)
+ foo_addl_data = dict(xml=self.test_xdoc,
+ text="freeform text",
+ multiline="""multiple
+lines
+of
+freeform
+text""")
+ bar_datalist = []
+ for key in ['empty', 'almost_empty', 'json', 'yaml']:
+ if key in self.data:
+ pdata = lxml.etree.Element("Probe", name=key)
+ pdata.text = self.data[key]
+ bar_datalist.append(pdata)
+ bar_addl_data = dict(empty="", almost_empty="")
+ if HAS_JSON:
+ bar_addl_data['json'] = self.data['json']
+ if HAS_YAML:
+ bar_addl_data['yaml'] = self.data['yaml']
+
+ p.ReceiveData(foo_md, foo_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+
+ p.ReceiveData(bar_md, bar_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # instantiate a new Probes object and clear Probes caches to
+ # imitate a server restart
+ p = self.get_obj()
+ Bcfg2.Server.Cache.expire("Probes")
+
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # set new data (and groups) for foo
+ foo_datalist = []
+ pdata = lxml.etree.Element("Probe", name='xml')
+ pdata.text = self.data['xml']
+ foo_datalist.append(pdata)
+ foo_addl_data = dict(xml=self.test_xdoc)
+
+ p.ReceiveData(foo_md, foo_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # instantiate a new Probes object and clear Probes caches to
+ # imitate a server restart
+ p = self.get_obj()
+ Bcfg2.Server.Cache.expire("Probes")
+
+ self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)