summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-16 10:56:15 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-16 10:56:15 -0400
commit5a0307fb58ae5f2a2735a45d0877c37baba06447 (patch)
treee1490b62c7de547e8ec478a2362782cc437eafdf
parent32f810ae165c05e99cf1a8530e40f4da7eb2697f (diff)
downloadbcfg2-5a0307fb58ae5f2a2735a45d0877c37baba06447.tar.gz
bcfg2-5a0307fb58ae5f2a2735a45d0877c37baba06447.tar.bz2
bcfg2-5a0307fb58ae5f2a2735a45d0877c37baba06447.zip
Probes: rewrite for new caching system
Substantially rewrote the way the Probes plugin caches data. Formerly, it was structured to assume that probe data was stored in probed.xml, with the database feature very much a second-class citizen; this adds a proper abstraction layer between the persistent storage and the plugin (and its caches). Also rewrote most Probes unit tests to actually be useful unit tests, not implementation tests.
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Probes.py409
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py569
2 files changed, 386 insertions, 592 deletions
diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py
index f75d88d8f..ef339a34b 100644
--- a/src/lib/Bcfg2/Server/Plugins/Probes.py
+++ b/src/lib/Bcfg2/Server/Plugins/Probes.py
@@ -8,8 +8,10 @@ import copy
import operator
import lxml.etree
import Bcfg2.Server
+import Bcfg2.Server.Cache
import Bcfg2.Server.Plugin
import Bcfg2.Server.FileMonitor
+from Bcfg2.Logger import Debuggable
from Bcfg2.Server.Statistics import track_statistics
HAS_DJANGO = False
@@ -63,6 +65,210 @@ except ImportError:
HAS_YAML = False
+class ProbeStore(Debuggable):
+ """ Caching abstraction layer between persistent probe data
+ storage and the Probes plugin."""
+
+ def __init__(self, core, datastore): # pylint: disable=W0613
+ Debuggable.__init__(self)
+ self._groupcache = Bcfg2.Server.Cache.Cache("Probes", "probegroups")
+ self._datacache = Bcfg2.Server.Cache.Cache("Probes", "probedata")
+
+ def get_groups(self, hostname):
+ """ Get the list of groups for the given host """
+ if hostname not in self._groupcache:
+ self._load_groups(hostname)
+ return self._groupcache.get(hostname, [])
+
+ def set_groups(self, hostname, groups):
+ """ Set the list of groups for the given host """
+ raise NotImplementedError
+
+ def get_data(self, hostname):
+ """ Get a dict of probe data for the given host """
+ if hostname not in self._datacache:
+ self._load_data(hostname)
+ return self._datacache.get(hostname, dict())
+
+ def set_data(self, hostname, data):
+ """ Set probe data for the given host """
+ raise NotImplementedError
+
+ def _load_groups(self, hostname):
+ """ When probe groups are not found in the cache, this
+ function is called to load them from the backend (XML or
+ database). """
+ raise NotImplementedError
+
+ def _load_data(self, hostname):
+ """ When probe groups are not found in the cache, this
+ function is called to load them from the backend (XML or
+ database). """
+ raise NotImplementedError
+
+ def commit(self):
+ """ Commit the current data in the cache to the persistent
+ backend store. This is not used with the
+ :class:`Bcfg2.Server.Plugins.Probes.DBProbeStore`, because it
+ commits on every change. """
+ pass
+
+
+class DBProbeStore(ProbeStore, Bcfg2.Server.Plugin.DatabaseBacked):
+ """ Caching abstraction layer between the database and the Probes
+ plugin. """
+ create = False
+
+ def __init__(self, core, datastore):
+ Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
+ ProbeStore.__init__(self, core, datastore)
+
+ def _load_groups(self, hostname):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ groupdata = ProbesGroupsModel.objects.filter(hostname=hostname)
+ self._groupcache[hostname] = list(set(r.group for r in groupdata))
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock
+ def set_groups(self, hostname, groups):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ olddata = self._groupcache.get(hostname, [])
+ self._groupcache[hostname] = groups
+ for group in groups:
+ try:
+ ProbesGroupsModel.objects.get_or_create(
+ hostname=hostname,
+ group=group)
+ except ProbesGroupsModel.MultipleObjectsReturned:
+ ProbesGroupsModel.objects.filter(hostname=hostname,
+ group=group).delete()
+ ProbesGroupsModel.objects.get_or_create(
+ hostname=hostname,
+ group=group)
+ ProbesGroupsModel.objects.filter(
+ hostname=hostname).exclude(group__in=groups).delete()
+ if olddata != groups:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ def _load_data(self, hostname):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ Bcfg2.Server.Cache.expire("Probes", "probedata", hostname)
+ self._datacache[hostname] = ClientProbeDataSet()
+ ts_set = False
+ for pdata in ProbesDataModel.objects.filter(hostname=hostname):
+ if not ts_set:
+ self._datacache[hostname].timestamp = \
+ time.mktime(pdata.timestamp.timetuple())
+ ts_set = True
+ self._datacache[hostname][pdata.probe] = ProbeData(pdata.data)
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock
+ def set_data(self, hostname, data):
+ Bcfg2.Server.Cache.expire("Probes", "probedata", hostname)
+ self._datacache[hostname] = ClientProbeDataSet()
+ expire_metadata = False
+ for probe, pdata in data.items():
+ self._datacache[hostname][probe] = pdata
+ record, created = ProbesDataModel.objects.get_or_create(
+ hostname=hostname,
+ probe=probe)
+ expire_metadata |= created
+ if record.data != pdata:
+ record.data = pdata
+ record.save()
+ expire_metadata = True
+ qset = ProbesDataModel.objects.filter(
+ hostname=hostname).exclude(probe__in=data.keys())
+ if len(qset):
+ qset.delete()
+ expire_metadata = True
+ if expire_metadata:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+
+class XMLProbeStore(ProbeStore):
+ """ Caching abstraction layer between ``probed.xml`` and the
+ Probes plugin."""
+ def __init__(self, core, datastore):
+ ProbeStore.__init__(self, core, datastore)
+ self._fname = os.path.join(datastore, 'probed.xml')
+ self._load_data()
+
+ def _load_data(self, _=None):
+ """ Load probe data from probed.xml """
+ Bcfg2.Server.Cache.expire("Probes", "probegroups")
+ Bcfg2.Server.Cache.expire("Probes", "probedata")
+ if not os.path.exists(self._fname):
+ self.commit()
+ try:
+ data = lxml.etree.parse(self._fname,
+ parser=Bcfg2.Server.XMLParser).getroot()
+ except (IOError, lxml.etree.XMLSyntaxError):
+ err = sys.exc_info()[1]
+ self.logger.error("Failed to read file probed.xml: %s" % err)
+ return
+ for client in data.getchildren():
+ self._datacache[client.get('name')] = \
+ ClientProbeDataSet(timestamp=client.get("timestamp"))
+ self._groupcache[client.get('name')] = []
+ for pdata in client:
+ if pdata.tag == 'Probe':
+ self._datacache[client.get('name')][pdata.get('name')] = \
+ ProbeData(pdata.get("value"))
+ elif pdata.tag == 'Group':
+ self._groupcache[client.get('name')].append(
+ pdata.get('name'))
+
+ Bcfg2.Server.Cache.expire("Metadata")
+
+ def _load_groups(self, hostname):
+ self._load_data(hostname)
+
+ def commit(self):
+ """ Write received probe data to probed.xml """
+ top = lxml.etree.Element("Probed")
+ for client, probed in sorted(self._datacache.items()):
+ # make a copy of probe data for this client in case it
+ # submits probe data while we're trying to write
+ # probed.xml
+ probedata = copy.copy(probed)
+ ctag = \
+ lxml.etree.SubElement(top, 'Client', name=client,
+ timestamp=str(int(probedata.timestamp)))
+ for probe in sorted(probedata):
+ lxml.etree.SubElement(
+ ctag, 'Probe', name=probe,
+ value=self._datacache[client][probe])
+ for group in sorted(self._groupcache[client]):
+ lxml.etree.SubElement(ctag, "Group", name=group)
+ try:
+ top.getroottree().write(self._fname,
+ xml_declaration=False,
+ pretty_print='true')
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Failed to write probed.xml: %s" % err)
+
+ def set_groups(self, hostname, groups):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ olddata = self._groupcache.get(hostname, [])
+ self._groupcache[hostname] = groups
+ if olddata != groups:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ def set_data(self, hostname, data):
+ Bcfg2.Server.Cache.expire("Probes", "probedata", hostname)
+ self._datacache[hostname] = ClientProbeDataSet()
+ expire_metadata = False
+ for probe, pdata in data.items():
+ olddata = self._datacache[hostname].get(probe, ProbeData(''))
+ self._datacache[hostname][probe] = pdata
+ expire_metadata |= olddata != data
+ if expire_metadata:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+
class ClientProbeDataSet(dict):
""" dict of probe => [probe data] that records a timestamp for
each host """
@@ -195,12 +401,13 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet):
class Probes(Bcfg2.Server.Plugin.Probing,
- Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.Connector,
Bcfg2.Server.Plugin.DatabaseBacked):
""" A plugin to gather information from a client machine """
__author__ = 'bcfg-dev@mcs.anl.gov'
+ groupline_re = re.compile(r'^group:\s*(?P<groupname>\S+)\s*')
+
options = [
Bcfg2.Options.BooleanOption(
cf=('probes', 'use_database'), dest="probes_db",
@@ -209,7 +416,6 @@ class Probes(Bcfg2.Server.Plugin.Probing,
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Probing.__init__(self)
- Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.Connector.__init__(self)
Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
@@ -219,191 +425,48 @@ class Probes(Bcfg2.Server.Plugin.Probing,
err = sys.exc_info()[1]
raise Bcfg2.Server.Plugin.PluginInitError(err)
- self.probedata = dict()
- self.cgroups = dict()
- self.load_data()
- __init__.__doc__ = Bcfg2.Server.Plugin.DatabaseBacked.__init__.__doc__
-
- @track_statistics()
- def write_data(self, client):
- """ Write probe data out for use with bcfg2-info """
if self._use_db:
- return self._write_data_db(client)
+ self.probestore = DBProbeStore(core, datastore)
else:
- return self._write_data_xml(client)
-
- def _write_data_xml(self, _):
- """ Write received probe data to probed.xml """
- top = lxml.etree.Element("Probed")
- for client, probed in sorted(self.probedata.items()):
- # make a copy of probe data for this client in case it
- # submits probe data while we're trying to write
- # probed.xml
- probedata = copy.copy(probed)
- ctag = \
- lxml.etree.SubElement(top, 'Client', name=client,
- timestamp=str(int(probedata.timestamp)))
- for probe in sorted(probedata):
- lxml.etree.SubElement(
- ctag, 'Probe', name=probe,
- value=self.probedata[client][probe])
- for group in sorted(self.cgroups[client]):
- lxml.etree.SubElement(ctag, "Group", name=group)
- try:
- top.getroottree().write(os.path.join(self.data, 'probed.xml'),
- xml_declaration=False,
- pretty_print='true')
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Failed to write probed.xml: %s" % err)
-
- @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock
- def _write_data_db(self, client):
- """ Write received probe data to the database """
- for probe, data in self.probedata[client.hostname].items():
- pdata = \
- ProbesDataModel.objects.get_or_create(hostname=client.hostname,
- probe=probe)[0]
- if pdata.data != data:
- pdata.data = data
- pdata.save()
-
- ProbesDataModel.objects.filter(
- hostname=client.hostname).exclude(
- probe__in=self.probedata[client.hostname]).delete()
-
- for group in self.cgroups[client.hostname]:
- try:
- ProbesGroupsModel.objects.get_or_create(
- hostname=client.hostname,
- group=group)
- except ProbesGroupsModel.MultipleObjectsReturned:
- ProbesGroupsModel.objects.filter(hostname=client.hostname,
- group=group).delete()
- ProbesGroupsModel.objects.get_or_create(
- hostname=client.hostname,
- group=group)
- ProbesGroupsModel.objects.filter(
- hostname=client.hostname).exclude(
- group__in=self.cgroups[client.hostname]).delete()
-
- def expire_cache(self, key=None):
- self.load_data(client=key)
-
- def load_data(self, client=None):
- """ Load probe data from the appropriate backend (probed.xml
- or the database) """
- if self._use_db:
- return self._load_data_db(client=client)
- else:
- # the XML backend doesn't support loading data for single
- # clients, so it reloads all data
- return self._load_data_xml()
-
- def _load_data_xml(self):
- """ Load probe data from probed.xml """
- try:
- data = lxml.etree.parse(os.path.join(self.data, 'probed.xml'),
- parser=Bcfg2.Server.XMLParser).getroot()
- except (IOError, lxml.etree.XMLSyntaxError):
- err = sys.exc_info()[1]
- self.logger.error("Failed to read file probed.xml: %s" % err)
- return
- self.probedata = {}
- self.cgroups = {}
- for client in data.getchildren():
- self.probedata[client.get('name')] = \
- ClientProbeDataSet(timestamp=client.get("timestamp"))
- self.cgroups[client.get('name')] = []
- for pdata in client:
- if pdata.tag == 'Probe':
- self.probedata[client.get('name')][pdata.get('name')] = \
- ProbeData(pdata.get("value"))
- elif pdata.tag == 'Group':
- self.cgroups[client.get('name')].append(pdata.get('name'))
-
- if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
- self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
-
- def _load_data_db(self, client=None):
- """ Load probe data from the database """
- if client is None:
- self.probedata = {}
- self.cgroups = {}
- probedata = ProbesDataModel.objects.all()
- groupdata = ProbesGroupsModel.objects.all()
- else:
- self.probedata.pop(client, None)
- self.cgroups.pop(client, None)
- probedata = ProbesDataModel.objects.filter(hostname=client)
- groupdata = ProbesGroupsModel.objects.filter(hostname=client)
-
- for pdata in probedata:
- if pdata.hostname not in self.probedata:
- self.probedata[pdata.hostname] = ClientProbeDataSet(
- timestamp=time.mktime(pdata.timestamp.timetuple()))
- self.probedata[pdata.hostname][pdata.probe] = ProbeData(pdata.data)
- for pgroup in groupdata:
- if pgroup.hostname not in self.cgroups:
- self.cgroups[pgroup.hostname] = []
- self.cgroups[pgroup.hostname].append(pgroup.group)
-
- if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
- self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata,
- key=client)
+ self.probestore = XMLProbeStore(core, datastore)
@track_statistics()
- def GetProbes(self, meta):
- return self.probes.get_probe_data(meta)
- GetProbes.__doc__ = Bcfg2.Server.Plugin.Probing.GetProbes.__doc__
+ def GetProbes(self, metadata):
+ return self.probes.get_probe_data(metadata)
@track_statistics()
def ReceiveData(self, client, datalist):
- if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
- if client.hostname in self.cgroups:
- olddata = copy.copy(self.cgroups[client.hostname])
- else:
- olddata = []
-
- cgroups = []
- cprobedata = ClientProbeDataSet()
+ cgroups = set()
+ cdata = dict()
for data in datalist:
- self.ReceiveDataItem(client, data, cgroups, cprobedata)
- self.cgroups[client.hostname] = cgroups
- self.probedata[client.hostname] = cprobedata
-
- if (self.core.metadata_cache_mode in ['cautious', 'aggressive'] and
- olddata != self.cgroups[client.hostname]):
- self.core.metadata_cache.expire(client.hostname)
- self.write_data(client)
- ReceiveData.__doc__ = Bcfg2.Server.Plugin.Probing.ReceiveData.__doc__
-
- def ReceiveDataItem(self, client, data, cgroups, cprobedata):
- """Receive probe results pertaining to client."""
+ groups, cdata[data.get("name")] = \
+ self.ReceiveDataItem(client, data)
+ cgroups.update(groups)
+ self.probestore.set_groups(client.hostname, list(cgroups))
+ self.probestore.set_data(client.hostname, cdata)
+ self.probestore.commit()
+
+ def ReceiveDataItem(self, client, data):
+ """ Receive probe results pertaining to client. Returns a
+ tuple of (<probe groups>, <probe data>). """
if data.text is None:
self.logger.info("Got null response to probe %s from %s" %
(data.get('name'), client.hostname))
- cprobedata[data.get('name')] = ProbeData('')
- return
+ return [], ''
dlines = data.text.split('\n')
self.logger.debug("Processing probe from %s: %s:%s" %
(client.hostname, data.get('name'),
[line.strip() for line in dlines]))
+ groups = []
for line in dlines[:]:
- if line.split(':')[0] == 'group':
- newgroup = line.split(':')[1].strip()
- if newgroup not in cgroups:
- cgroups.append(newgroup)
+ match = self.groupline_re.match(line)
+ if match:
+ groups.append(match.group("groupname"))
dlines.remove(line)
- dobj = ProbeData("\n".join(dlines))
- cprobedata[data.get('name')] = dobj
-
- def get_additional_groups(self, meta):
- return self.cgroups.get(meta.hostname, list())
- get_additional_groups.__doc__ = \
- Bcfg2.Server.Plugin.Connector.get_additional_groups.__doc__
-
- def get_additional_data(self, meta):
- return self.probedata.get(meta.hostname, ClientProbeDataSet())
- get_additional_data.__doc__ = \
- Bcfg2.Server.Plugin.Connector.get_additional_data.__doc__
+ return (groups, ProbeData("\n".join(dlines)))
+
+ def get_additional_groups(self, metadata):
+ return self.probestore.get_groups(metadata.hostname)
+
+ def get_additional_data(self, metadata):
+ return self.probestore.get_data(metadata.hostname)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
index b0e6e9142..bbaa0f403 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
@@ -1,7 +1,7 @@
import os
import sys
-import copy
-import time
+import shutil
+import tempfile
import lxml.etree
import Bcfg2.version
import Bcfg2.Server
@@ -36,47 +36,6 @@ test_data = dict(a=1, b=[1, 2, 3], c="test",
d=dict(a=1, b=dict(a=1), c=(1, "2", 3)))
-class FakeElement(lxml.etree._Element):
- getroottree = Mock()
-
- def __init__(self, el):
- self._element = el
-
- def __getattribute__(self, attr):
- el = lxml.etree._Element.__getattribute__(self,
- '__dict__')['_element']
- if attr == "getroottree":
- return FakeElement.getroottree
- elif attr == "_element":
- return el
- else:
- return getattr(el, attr)
-
-
-class StoringElement(object):
- OriginalElement = copy.copy(lxml.etree.Element)
-
- def __init__(self):
- self.element = None
- self.return_value = None
-
- def __call__(self, *args, **kwargs):
- self.element = self.OriginalElement(*args, **kwargs)
- self.return_value = FakeElement(self.element)
- return self.return_value
-
-
-class StoringSubElement(object):
- OriginalSubElement = copy.copy(lxml.etree.SubElement)
-
- def __call__(self, parent, tag, **kwargs):
- try:
- return self.OriginalSubElement(parent._element, tag,
- **kwargs)
- except AttributeError:
- return self.OriginalSubElement(parent, tag, **kwargs)
-
-
class FakeList(list):
pass
@@ -87,18 +46,6 @@ class TestProbesDB(DBModelTestCase):
ProbesDataModel]
-class TestClientProbeDataSet(Bcfg2TestCase):
- def test__init(self):
- ds = ClientProbeDataSet()
- self.assertLessEqual(ds.timestamp, time.time())
- self.assertIsInstance(ds, dict)
- self.assertNotIn("timestamp", ds)
-
- ds = ClientProbeDataSet(timestamp=123)
- self.assertEqual(ds.timestamp, 123)
- self.assertNotIn("timestamp", ds)
-
-
class TestProbeData(Bcfg2TestCase):
def test_str(self):
# a value that is not valid XML, JSON, or YAML
@@ -253,377 +200,161 @@ group-specific"""
assert False, "Strange probe found in get_probe_data() return"
-class TestProbes(TestProbing, TestConnector, TestDatabaseBacked):
+class TestProbes(Bcfg2TestCase):
test_obj = Probes
- def get_obj(self, core=None, load_data=None):
- core = MagicMock()
- if load_data is None:
- load_data = MagicMock()
-
- @patch("%s.%s.load_data" % (self.test_obj.__module__,
- self.test_obj.__name__), new=load_data)
- def inner():
- return TestDatabaseBacked.get_obj(self, core=core)
-
- return inner()
-
- def get_test_probedata(self):
- test_xdata = lxml.etree.Element("test")
- lxml.etree.SubElement(test_xdata, "test", foo="foo")
- rv = dict()
- rv["foo.example.com"] = ClientProbeDataSet(
- timestamp=time.time())
- rv["foo.example.com"]["xml"] = \
- ProbeData(lxml.etree.tostring(
- test_xdata,
- xml_declaration=False).decode('UTF-8'))
- rv["foo.example.com"]["text"] = \
- ProbeData("freeform text")
- rv["foo.example.com"]["multiline"] = \
- ProbeData("""multiple
+ test_xdata = lxml.etree.Element("test")
+ lxml.etree.SubElement(test_xdata, "test", foo="foo")
+ test_xdoc = lxml.etree.tostring(test_xdata,
+ xml_declaration=False).decode('UTF-8')
+
+ data = dict()
+ data['xml'] = "group:group\n" + test_xdoc
+ data['text'] = "freeform text"
+ data['multiline'] = """multiple
lines
of
freeform
text
-""")
- rv["bar.example.com"] = ClientProbeDataSet(
- timestamp=time.time())
- rv["bar.example.com"]["empty"] = \
- ProbeData("")
- if HAS_JSON:
- rv["bar.example.com"]["json"] = \
- ProbeData(json.dumps(test_data))
- if HAS_YAML:
- rv["bar.example.com"]["yaml"] = \
- ProbeData(yaml.dump(test_data))
- return rv
-
- def get_test_cgroups(self):
- return {"foo.example.com": ["group", "group with spaces",
- "group-with-dashes"],
- "bar.example.com": []}
+group:group-with-dashes
+group: group:with:colons
+"""
+ data['empty'] = ''
+ data['almost_empty'] = 'group: other_group'
+ if HAS_JSON:
+ data['json'] = json.dumps(test_data)
+ if HAS_YAML:
+ data['yaml'] = yaml.dump(test_data)
+
+ def setUp(self):
+ Bcfg2TestCase.setUp(self)
+ set_setup_default("probes_db")
+ self.datastore = None
+ Bcfg2.Server.Cache.expire("Probes")
+
+ def tearDown(self):
+ if self.datastore is not None:
+ shutil.rmtree(self.datastore)
+ self.datastore = None
+
+ def get_obj(self, core=None):
+ if core is None:
+ core = Mock()
+ if Bcfg2.Options.setup.probes_db:
+ @patch("os.makedirs", Mock())
+ def inner():
+ return self.test_obj(core, datastore)
+ return inner()
+ else:
+ # actually use a real datastore so we can read and write
+ # probed.xml
+ if self.datastore is None:
+ self.datastore = tempfile.mkdtemp()
+ return self.test_obj(core, self.datastore)
+
+ def test_GetProbes(self):
+ p = self.get_obj()
+ p.probes = Mock()
+ metadata = Mock()
+ p.GetProbes(metadata)
+ p.probes.get_probe_data.assert_called_with(metadata)
- def test__init(self):
- mock_load_data = Mock()
- probes = self.get_obj(load_data=mock_load_data)
- mock_load_data.assert_any_call()
- self.assertEqual(probes.probedata,
- ClientProbeDataSet())
- self.assertEqual(probes.cgroups, dict())
+ def additionalDataEqual(self, actual, expected):
+ self.assertItemsEqual(
+ dict([(k, str(d)) for k, d in actual.items()]),
+ expected)
- def test_write_data_xml(self):
- Bcfg2.Options.setup.probes_db = False
- probes = self.get_obj()
- probes._write_data_db = Mock()
- probes._write_data_xml = Mock()
- probes.write_data("test")
- probes._write_data_xml.assert_called_with("test")
- self.assertFalse(probes._write_data_db.called)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test_write_data_db(self):
- Bcfg2.Options.setup.probes_db = True
- probes = self.get_obj()
- probes._write_data_db = Mock()
- probes._write_data_xml = Mock()
- probes.write_data("test")
- probes._write_data_db.assert_called_with("test")
- self.assertFalse(probes._write_data_xml.called)
-
- def test__write_data_xml(self):
- Bcfg2.Options.setup.probes_db = False
- probes = self.get_obj()
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- @patch("lxml.etree.Element")
- @patch("lxml.etree.SubElement", StoringSubElement())
- def inner(mock_Element):
- mock_Element.side_effect = StoringElement()
- probes._write_data_xml(None)
-
- top = mock_Element.side_effect.return_value
- write = top.getroottree.return_value.write
- self.assertEqual(write.call_args[0][0],
- os.path.join(datastore, probes.name,
- "probed.xml"))
-
- data = top._element
- foodata = data.find("Client[@name='foo.example.com']")
- self.assertIsNotNone(foodata)
- self.assertIsNotNone(foodata.get("timestamp"))
- self.assertEqual(len(foodata.findall("Probe")),
- len(probes.probedata['foo.example.com']))
- self.assertEqual(len(foodata.findall("Group")),
- len(probes.cgroups['foo.example.com']))
- xml = foodata.find("Probe[@name='xml']")
- self.assertIsNotNone(xml)
- self.assertIsNotNone(xml.get("value"))
- xdata = lxml.etree.XML(xml.get("value"))
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- text = foodata.find("Probe[@name='text']")
- self.assertIsNotNone(text)
- self.assertIsNotNone(text.get("value"))
- multiline = foodata.find("Probe[@name='multiline']")
- self.assertIsNotNone(multiline)
- self.assertIsNotNone(multiline.get("value"))
- self.assertGreater(len(multiline.get("value").splitlines()), 1)
-
- bardata = data.find("Client[@name='bar.example.com']")
- self.assertIsNotNone(bardata)
- self.assertIsNotNone(bardata.get("timestamp"))
- self.assertEqual(len(bardata.findall("Probe")),
- len(probes.probedata['bar.example.com']))
- self.assertEqual(len(bardata.findall("Group")),
- len(probes.cgroups['bar.example.com']))
- empty = bardata.find("Probe[@name='empty']")
- self.assertIsNotNone(empty)
- self.assertIsNotNone(empty.get("value"))
- self.assertEqual(empty.get("value"), "")
- if HAS_JSON:
- jdata = bardata.find("Probe[@name='json']")
- self.assertIsNotNone(jdata)
- self.assertIsNotNone(jdata.get("value"))
- self.assertItemsEqual(test_data,
- json.loads(jdata.get("value")))
- if HAS_YAML:
- ydata = bardata.find("Probe[@name='yaml']")
- self.assertIsNotNone(ydata)
- self.assertIsNotNone(ydata.get("value"))
- self.assertItemsEqual(test_data,
- yaml.load(ydata.get("value")))
-
- inner()
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test__write_data_db(self):
- syncdb(TestProbesDB)
- Bcfg2.Options.setup.probes_db = True
- probes = self.get_obj()
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- for cname in ["foo.example.com", "bar.example.com"]:
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
-
- for probe in pdata:
- self.assertEqual(probe.hostname, client.hostname)
- self.assertIsNotNone(probe.data)
- if probe.probe == "xml":
- xdata = lxml.etree.XML(probe.data)
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- elif probe.probe == "text":
- pass
- elif probe.probe == "multiline":
- self.assertGreater(len(probe.data.splitlines()), 1)
- elif probe.probe == "empty":
- self.assertEqual(probe.data, "")
- elif probe.probe == "yaml":
- self.assertItemsEqual(test_data, yaml.load(probe.data))
- elif probe.probe == "json":
- self.assertItemsEqual(test_data, json.loads(probe.data))
- else:
- assert False, "Strange probe found in _write_data_db data"
-
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- # test that old probe data is removed properly
- cname = 'foo.example.com'
- del probes.probedata[cname]['text']
- probes.cgroups[cname].pop()
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- def test_load_data_xml(self):
+ def test_probes_xml(self):
+ """ Set and retrieve probe data with database disabled """
Bcfg2.Options.setup.probes_db = False
- probes = self.get_obj()
- probes._load_data_db = Mock()
- probes._load_data_xml = Mock()
- probes.load_data()
- probes._load_data_xml.assert_any_call()
- self.assertFalse(probes._load_data_db.called)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test_load_data_db(self):
- Bcfg2.Options.setup.probes_db = True
- probes = self.get_obj()
- probes._load_data_db = Mock()
- probes._load_data_xml = Mock()
- probes.load_data()
- probes._load_data_db.assert_any_call(client=None)
- self.assertFalse(probes._load_data_xml.called)
-
- @patch("lxml.etree.parse")
- def test__load_data_xml(self, mock_parse):
- Bcfg2.Options.setup.probes_db = False
- probes = self.get_obj()
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- # to get the value for lxml.etree.parse to parse, we call
- # _write_data_xml, mock the lxml.etree._ElementTree.write()
- # call, and grab the data that gets "written" to probed.xml
- @patch("lxml.etree.Element")
- @patch("lxml.etree.SubElement", StoringSubElement())
- def inner(mock_Element):
- mock_Element.side_effect = StoringElement()
- probes._write_data_xml(None)
- top = mock_Element.side_effect.return_value
- return top._element
-
- xdata = inner()
- mock_parse.return_value = xdata.getroottree()
- probes.probedata = dict()
- probes.cgroups = dict()
-
- probes._load_data_xml()
- mock_parse.assert_called_with(os.path.join(datastore, probes.name,
- 'probed.xml'),
- parser=Bcfg2.Server.XMLParser)
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- self.assertItemsEqual(probes.cgroups, self.get_test_cgroups())
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test__load_data_db(self):
- syncdb(TestProbesDB)
- Bcfg2.Options.setup.probes_db = True
- probes = self.get_obj()
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
- for cname in probes.probedata.keys():
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- probes.probedata = dict()
- probes.cgroups = dict()
- probes._load_data_db()
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- # the db backend does not store groups at all if a client has
- # no groups set, so we can't just use assertItemsEqual here,
- # because loading saved data may _not_ result in the original
- # data if some clients had no groups set.
- test_cgroups = self.get_test_cgroups()
- for cname, groups in test_cgroups.items():
- if cname in probes.cgroups:
- self.assertEqual(groups, probes.cgroups[cname])
- else:
- self.assertEqual(groups, [])
+ self._perform_tests()
- @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data")
- def test_GetProbes(self, mock_get_probe_data):
- probes = self.get_obj()
- metadata = Mock()
- probes.GetProbes(metadata)
- mock_get_probe_data.assert_called_with(metadata)
-
- def test_ReceiveData(self):
- # we use a simple (read: bogus) datalist here to make this
- # easy to test
- datalist = ["a", "b", "c"]
-
- probes = self.get_obj()
- probes.write_data = Mock()
- probes.ReceiveDataItem = Mock()
- probes.core.metadata_cache_mode = 'off'
- client = Mock()
- client.hostname = "foo.example.com"
- probes.ReceiveData(client, datalist)
-
- cgroups = []
- cprobedata = ClientProbeDataSet()
- self.assertItemsEqual(probes.ReceiveDataItem.call_args_list,
- [call(client, "a", cgroups, cprobedata),
- call(client, "b", cgroups, cprobedata),
- call(client, "c", cgroups, cprobedata)])
- probes.write_data.assert_called_with(client)
- self.assertFalse(probes.core.metadata_cache.expire.called)
-
- # change the datalist, ensure that the cache is cleared
- probes.cgroups[client.hostname] = datalist
- probes.core.metadata_cache_mode = 'aggressive'
- probes.ReceiveData(client, ['a', 'b', 'd'])
-
- probes.write_data.assert_called_with(client)
- probes.core.metadata_cache.expire.assert_called_with(client.hostname)
-
- def test_ReceiveDataItem(self):
- probes = self.get_obj()
- for cname, cdata in self.get_test_probedata().items():
- client = Mock()
- client.hostname = cname
- cgroups = []
- cprobedata = ClientProbeDataSet()
- for pname, pdata in cdata.items():
- dataitem = lxml.etree.Element("Probe", name=pname)
- if pname == "text":
- # add some groups to the plaintext test to test
- # group parsing
- data = [pdata]
- for group in self.get_test_cgroups()[cname]:
- data.append("group:%s" % group)
- dataitem.text = "\n".join(data)
- else:
- dataitem.text = str(pdata)
-
- probes.ReceiveDataItem(client, dataitem, cgroups, cprobedata)
-
- probes.cgroups[client.hostname] = cgroups
- probes.probedata[client.hostname] = cprobedata
- self.assertIn(client.hostname, probes.probedata)
- self.assertIn(pname, probes.probedata[cname])
- self.assertEqual(pdata, probes.probedata[cname][pname])
- self.assertIn(client.hostname, probes.cgroups)
- self.assertEqual(probes.cgroups[cname],
- self.get_test_cgroups()[cname])
-
- def test_get_additional_groups(self):
- TestConnector.test_get_additional_groups(self)
-
- probes = self.get_obj()
- test_cgroups = self.get_test_cgroups()
- probes.cgroups = self.get_test_cgroups()
- for cname in test_cgroups.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_cgroups[cname],
- probes.get_additional_groups(metadata))
- # test a non-existent client
- metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_groups(metadata),
- list())
-
- def test_get_additional_data(self):
- TestConnector.test_get_additional_data(self)
-
- probes = self.get_obj()
- test_probedata = self.get_test_probedata()
- probes.probedata = self.get_test_probedata()
- for cname in test_probedata.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_probedata[cname],
- probes.get_additional_data(metadata))
- # test a non-existent client
- metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_data(metadata),
- ClientProbeDataSet())
+ def test_probes_db(self):
+ """ Set and retrieve probe data with database enabled """
+ Bcfg2.Options.setup.probes_db = True
+ self._perform_tests()
+
+ def _perform_tests(self):
+ p = self.get_obj()
+
+ # first, sanity checks
+ foo_md = Mock(hostname="foo.example.com")
+ bar_md = Mock(hostname="bar.example.com")
+ self.assertItemsEqual(p.get_additional_groups(foo_md), [])
+ self.assertItemsEqual(p.get_additional_data(foo_md), dict())
+ self.assertItemsEqual(p.get_additional_groups(bar_md), [])
+ self.assertItemsEqual(p.get_additional_data(bar_md), dict())
+
+ # next, set some initial probe data
+ foo_datalist = []
+ for key in ['xml', 'text', 'multiline']:
+ pdata = lxml.etree.Element("Probe", name=key)
+ pdata.text = self.data[key]
+ foo_datalist.append(pdata)
+ foo_addl_data = dict(xml=self.test_xdoc,
+ text="freeform text",
+ multiline="""multiple
+lines
+of
+freeform
+text""")
+ bar_datalist = []
+ for key in ['empty', 'almost_empty', 'json', 'yaml']:
+ if key in self.data:
+ pdata = lxml.etree.Element("Probe", name=key)
+ pdata.text = self.data[key]
+ bar_datalist.append(pdata)
+ bar_addl_data = dict(empty="", almost_empty="")
+ if HAS_JSON:
+ bar_addl_data['json'] = self.data['json']
+ if HAS_YAML:
+ bar_addl_data['yaml'] = self.data['yaml']
+
+ p.ReceiveData(foo_md, foo_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+
+ p.ReceiveData(bar_md, bar_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # instantiate a new Probes object and clear Probes caches to
+ # imitate a server restart
+ p = self.get_obj()
+ Bcfg2.Server.Cache.expire("Probes")
+
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # set new data (and groups) for foo
+ foo_datalist = []
+ pdata = lxml.etree.Element("Probe", name='xml')
+ pdata.text = self.data['xml']
+ foo_datalist.append(pdata)
+ foo_addl_data = dict(xml=self.test_xdoc)
+
+ p.ReceiveData(foo_md, foo_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # instantiate a new Probes object and clear Probes caches to
+ # imitate a server restart
+ p = self.get_obj()
+ Bcfg2.Server.Cache.expire("Probes")
+
+ self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)