summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/Plugins/Probes.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Server/Plugins/Probes.py')
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Probes.py521
1 files changed, 306 insertions, 215 deletions
diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py
index 48be1ac26..21d50ace6 100644
--- a/src/lib/Bcfg2/Server/Plugins/Probes.py
+++ b/src/lib/Bcfg2/Server/Plugins/Probes.py
@@ -8,15 +8,33 @@ import copy
import operator
import lxml.etree
import Bcfg2.Server
+import Bcfg2.Server.Cache
import Bcfg2.Server.Plugin
-from Bcfg2.Compat import unicode # pylint: disable=W0622
-
-try:
- from django.db import models
- from django.core.exceptions import MultipleObjectsReturned
- HAS_DJANGO = True
+from Bcfg2.Compat import unicode, any # pylint: disable=W0622
+import Bcfg2.Server.FileMonitor
+from Bcfg2.Logger import Debuggable
+from Bcfg2.Server.Statistics import track_statistics
+
+HAS_DJANGO = False
+# pylint: disable=C0103
+ProbesDataModel = None
+ProbesGroupsModel = None
+# pylint: enable=C0103
+
+
+def load_django_models():
+ """ Load models for Django after option parsing has completed """
+ # pylint: disable=W0602
+ global ProbesDataModel, ProbesGroupsModel, HAS_DJANGO
+ # pylint: enable=W0602
+ try:
+ from django.db import models
+ HAS_DJANGO = True
+ except ImportError:
+ HAS_DJANGO = False
+ return
- class ProbesDataModel(models.Model,
+ class ProbesDataModel(models.Model, # pylint: disable=W0621,W0612
Bcfg2.Server.Plugin.PluginDatabaseModel):
""" The database model for storing probe data """
hostname = models.CharField(max_length=255)
@@ -24,13 +42,12 @@ try:
timestamp = models.DateTimeField(auto_now=True)
data = models.TextField(null=True)
- class ProbesGroupsModel(models.Model,
+ class ProbesGroupsModel(models.Model, # pylint: disable=W0621,W0612
Bcfg2.Server.Plugin.PluginDatabaseModel):
""" The database model for storing probe groups """
hostname = models.CharField(max_length=255)
group = models.CharField(max_length=255)
-except ImportError:
- HAS_DJANGO = False
+
try:
import json
@@ -51,6 +68,226 @@ except ImportError:
HAS_YAML = False
+class ProbeStore(Debuggable):
+ """ Caching abstraction layer between persistent probe data
+ storage and the Probes plugin."""
+
+ def __init__(self, core, datadir): # pylint: disable=W0613
+ Debuggable.__init__(self)
+ self._groupcache = Bcfg2.Server.Cache.Cache("Probes", "probegroups")
+ self._datacache = Bcfg2.Server.Cache.Cache("Probes", "probedata")
+
+ def get_groups(self, hostname):
+ """ Get the list of groups for the given host """
+ if hostname not in self._groupcache:
+ self._load_groups(hostname)
+ return self._groupcache.get(hostname, [])
+
+ def set_groups(self, hostname, groups):
+ """ Set the list of groups for the given host """
+ raise NotImplementedError
+
+ def get_data(self, hostname):
+ """ Get a dict of probe data for the given host """
+ if hostname not in self._datacache:
+ self._load_data(hostname)
+ return self._datacache.get(hostname, dict())
+
+ def set_data(self, hostname, data):
+ """ Set probe data for the given host """
+ raise NotImplementedError
+
+ def _load_groups(self, hostname):
+ """ When probe groups are not found in the cache, this
+ function is called to load them from the backend (XML or
+ database). """
+ raise NotImplementedError
+
+ def _load_data(self, hostname):
+ """ When probe groups are not found in the cache, this
+ function is called to load them from the backend (XML or
+ database). """
+ raise NotImplementedError
+
+ def commit(self):
+ """ Commit the current data in the cache to the persistent
+ backend store. This is not used with the
+ :class:`Bcfg2.Server.Plugins.Probes.DBProbeStore`, because it
+ commits on every change. """
+ pass
+
+
+class DBProbeStore(ProbeStore, Bcfg2.Server.Plugin.DatabaseBacked):
+ """ Caching abstraction layer between the database and the Probes
+ plugin. """
+ create = False
+
+ def __init__(self, core, datadir):
+ Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core)
+ ProbeStore.__init__(self, core, datadir)
+
+ @property
+ def _use_db(self):
+ return True
+
+ def _load_groups(self, hostname):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ groupdata = ProbesGroupsModel.objects.filter(hostname=hostname)
+ self._groupcache[hostname] = list(set(r.group for r in groupdata))
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock
+ def set_groups(self, hostname, groups):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ olddata = self._groupcache.get(hostname, [])
+ self._groupcache[hostname] = groups
+ for group in groups:
+ try:
+ ProbesGroupsModel.objects.get_or_create(
+ hostname=hostname,
+ group=group)
+ except ProbesGroupsModel.MultipleObjectsReturned:
+ ProbesGroupsModel.objects.filter(hostname=hostname,
+ group=group).delete()
+ ProbesGroupsModel.objects.get_or_create(
+ hostname=hostname,
+ group=group)
+ ProbesGroupsModel.objects.filter(
+ hostname=hostname).exclude(group__in=groups).delete()
+ if olddata != groups:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ def _load_data(self, hostname):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ Bcfg2.Server.Cache.expire("Probes", "probedata", hostname)
+ self._datacache[hostname] = ClientProbeDataSet()
+ ts_set = False
+ for pdata in ProbesDataModel.objects.filter(hostname=hostname):
+ if not ts_set:
+ self._datacache[hostname].timestamp = \
+ time.mktime(pdata.timestamp.timetuple())
+ ts_set = True
+ self._datacache[hostname][pdata.probe] = ProbeData(pdata.data)
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock
+ def set_data(self, hostname, data):
+ Bcfg2.Server.Cache.expire("Probes", "probedata", hostname)
+ self._datacache[hostname] = ClientProbeDataSet()
+ expire_metadata = False
+ for probe, pdata in data.items():
+ self._datacache[hostname][probe] = pdata
+ try:
+ record, created = ProbesDataModel.objects.get_or_create(
+ hostname=hostname,
+ probe=probe)
+ except ProbesDataModel.MultipleObjectsReturned:
+ ProbesDataModel.objects.filter(hostname=hostname,
+ probe=probe).delete()
+ record, created = ProbesDataModel.objects.get_or_create(
+ hostname=hostname,
+ probe=probe)
+ expire_metadata |= created
+ if record.data != pdata:
+ record.data = pdata
+ record.save()
+ expire_metadata = True
+ qset = ProbesDataModel.objects.filter(
+ hostname=hostname).exclude(probe__in=data.keys())
+ if len(qset):
+ qset.delete()
+ expire_metadata = True
+ if expire_metadata:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+
+class XMLProbeStore(ProbeStore):
+ """ Caching abstraction layer between ``probed.xml`` and the
+ Probes plugin."""
+ def __init__(self, core, datadir):
+ ProbeStore.__init__(self, core, datadir)
+ self._fname = os.path.join(datadir, 'probed.xml')
+ self._load_data()
+
+ def _load_data(self, _=None):
+ """ Load probe data from probed.xml """
+ Bcfg2.Server.Cache.expire("Probes", "probegroups")
+ Bcfg2.Server.Cache.expire("Probes", "probedata")
+ if not os.path.exists(self._fname):
+ self.commit()
+ try:
+ data = lxml.etree.parse(self._fname,
+ parser=Bcfg2.Server.XMLParser).getroot()
+ except (IOError, lxml.etree.XMLSyntaxError):
+ err = sys.exc_info()[1]
+ self.logger.error("Failed to read file probed.xml: %s" % err)
+ return
+ for client in data.getchildren():
+ self._datacache[client.get('name')] = \
+ ClientProbeDataSet(timestamp=client.get("timestamp"))
+ self._groupcache[client.get('name')] = []
+ for pdata in client:
+ if pdata.tag == 'Probe':
+ self._datacache[client.get('name')][pdata.get('name')] = \
+ ProbeData(pdata.get("value"))
+ elif pdata.tag == 'Group':
+ self._groupcache[client.get('name')].append(
+ pdata.get('name'))
+
+ Bcfg2.Server.Cache.expire("Metadata")
+
+ def _load_groups(self, hostname):
+ self._load_data(hostname)
+
+ def commit(self):
+ """ Write received probe data to probed.xml """
+ top = lxml.etree.Element("Probed")
+ for client, probed in sorted(self._datacache.items()):
+ # make a copy of probe data for this client in case it
+ # submits probe data while we're trying to write
+ # probed.xml
+ probedata = copy.copy(probed)
+ ctag = \
+ lxml.etree.SubElement(top, 'Client', name=client,
+ timestamp=str(int(probedata.timestamp)))
+ for probe in sorted(probedata):
+ try:
+ lxml.etree.SubElement(
+ ctag, 'Probe', name=probe,
+ value=self._datacache[client][probe].decode('utf-8'))
+ except AttributeError:
+ lxml.etree.SubElement(
+ ctag, 'Probe', name=probe,
+ value=self._datacache[client][probe])
+ for group in sorted(self._groupcache[client]):
+ lxml.etree.SubElement(ctag, "Group", name=group)
+ try:
+ top.getroottree().write(self._fname,
+ xml_declaration=False,
+ pretty_print='true')
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Failed to write %s: %s" % (self._fname, err))
+
+ def set_groups(self, hostname, groups):
+ Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname)
+ olddata = self._groupcache.get(hostname, [])
+ self._groupcache[hostname] = groups
+ if olddata != groups:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+ def set_data(self, hostname, data):
+ Bcfg2.Server.Cache.expire("Probes", "probedata", hostname)
+ self._datacache[hostname] = ClientProbeDataSet()
+ expire_metadata = False
+ for probe, pdata in data.items():
+ olddata = self._datacache[hostname].get(probe, ProbeData(''))
+ self._datacache[hostname][probe] = pdata
+ expire_metadata |= olddata != data
+ if expire_metadata:
+ Bcfg2.Server.Cache.expire("Metadata", hostname)
+
+
class ClientProbeDataSet(dict):
""" dict of probe => [probe data] that records a timestamp for
each host """
@@ -124,17 +361,16 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet):
bangline = re.compile(r'^#!\s*(?P<interpreter>.*)$')
basename_is_regex = True
- def __init__(self, path, fam, encoding, plugin_name):
+ def __init__(self, path, plugin_name):
self.plugin_name = plugin_name
Bcfg2.Server.Plugin.EntrySet.__init__(self, r'[0-9A-Za-z_\-]+', path,
- Bcfg2.Server.Plugin.SpecificData,
- encoding)
- fam.AddMonitor(path, self)
+ Bcfg2.Server.Plugin.SpecificData)
+ Bcfg2.Server.FileMonitor.get_fam().AddMonitor(path, self)
def HandleEvent(self, event):
""" handle events on everything but probed.xml """
if (event.filename != self.path and
- not event.filename.endswith("probed.xml")):
+ not event.filename.endswith("probed.xml")):
return self.handle_event(event)
def get_probe_data(self, metadata):
@@ -161,7 +397,7 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet):
probe.set('name', os.path.basename(name))
probe.set('source', self.plugin_name)
if (metadata.version_info and
- metadata.version_info > (1, 3, 1, '', 0)):
+ metadata.version_info > (1, 3, 1, '', 0)):
try:
probe.text = entry.data.decode('utf-8')
except AttributeError:
@@ -187,235 +423,90 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet):
class Probes(Bcfg2.Server.Plugin.Probing,
- Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.Connector,
Bcfg2.Server.Plugin.DatabaseBacked):
""" A plugin to gather information from a client machine """
__author__ = 'bcfg-dev@mcs.anl.gov'
- def __init__(self, core, datastore):
+ groupline_re = re.compile(r'^group:\s*(?P<groupname>\S+)\s*')
+
+ options = [
+ Bcfg2.Options.BooleanOption(
+ cf=('probes', 'use_database'), dest="probes_db",
+ help="Use database capabilities of the Probes plugin"),
+ Bcfg2.Options.Option(
+ cf=('probes', 'allowed_groups'), dest="probes_allowed_groups",
+ help="Whitespace-separated list of group name regexps to which "
+ "probes can assign a client",
+ default=[re.compile('.*')],
+ type=Bcfg2.Options.Types.anchored_regex_list)]
+ options_parsed_hook = staticmethod(load_django_models)
+
+ def __init__(self, core):
Bcfg2.Server.Plugin.Probing.__init__(self)
- Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.Connector.__init__(self)
- Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
+ Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core)
try:
- self.probes = ProbeSet(self.data, core.fam, core.setup['encoding'],
- self.name)
+ self.probes = ProbeSet(self.data, self.name)
except:
err = sys.exc_info()[1]
raise Bcfg2.Server.Plugin.PluginInitError(err)
- self.allowed_cgroups = core.setup['probe_allowed_groups']
- self.probedata = dict()
- self.cgroups = dict()
- self.load_data()
- __init__.__doc__ = Bcfg2.Server.Plugin.DatabaseBacked.__init__.__doc__
-
- @Bcfg2.Server.Plugin.track_statistics()
- def write_data(self, client):
- """ Write probe data out for use with bcfg2-info """
if self._use_db:
- return self._write_data_db(client)
+ self.probestore = DBProbeStore(core, self.data)
else:
- return self._write_data_xml(client)
-
- def _write_data_xml(self, _):
- """ Write received probe data to probed.xml """
- top = lxml.etree.Element("Probed")
- for client, probed in sorted(self.probedata.items()):
- # make a copy of probe data for this client in case it
- # submits probe data while we're trying to write
- # probed.xml
- probedata = copy.copy(probed)
- ctag = \
- lxml.etree.SubElement(top, 'Client', name=client,
- timestamp=str(int(probedata.timestamp)))
- for probe in sorted(probedata):
- try:
- lxml.etree.SubElement(
- ctag, 'Probe', name=probe,
- value=str(
- self.probedata[client][probe]).decode('utf-8'))
- except AttributeError:
- lxml.etree.SubElement(
- ctag, 'Probe', name=probe,
- value=str(self.probedata[client][probe]))
- for group in sorted(self.cgroups[client]):
- lxml.etree.SubElement(ctag, "Group", name=group)
- try:
- top.getroottree().write(os.path.join(self.data, 'probed.xml'),
- xml_declaration=False,
- pretty_print='true')
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Failed to write probed.xml: %s" % err)
+ self.probestore = XMLProbeStore(core, self.data)
- @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock
- def _write_data_db(self, client):
- """ Write received probe data to the database """
- for probe, data in self.probedata[client.hostname].items():
- try:
- pdata = ProbesDataModel.objects.get_or_create(
- hostname=client.hostname,
- probe=probe)[0]
- except MultipleObjectsReturned:
- ProbesDataModel.objects.filter(hostname=client.hostname,
- probe=probe).delete()
- ProbesDataModel.objects.get_or_create(
- hostname=client.hostname,
- probe=probe)
- if pdata.data != data:
- pdata.data = data
- pdata.save()
+ @track_statistics()
+ def GetProbes(self, metadata):
+ return self.probes.get_probe_data(metadata)
- ProbesDataModel.objects.filter(
- hostname=client.hostname).exclude(
- probe__in=self.probedata[client.hostname]).delete()
-
- for group in self.cgroups[client.hostname]:
- try:
- ProbesGroupsModel.objects.get_or_create(
- hostname=client.hostname,
- group=group)
- except MultipleObjectsReturned:
- ProbesGroupsModel.objects.filter(hostname=client.hostname,
- group=group).delete()
- ProbesGroupsModel.objects.get_or_create(
- hostname=client.hostname,
- group=group)
- ProbesGroupsModel.objects.filter(
- hostname=client.hostname).exclude(
- group__in=self.cgroups[client.hostname]).delete()
-
- def expire_cache(self, key=None):
- self.load_data(client=key)
-
- def load_data(self, client=None):
- """ Load probe data from the appropriate backend (probed.xml
- or the database) """
- if self._use_db:
- return self._load_data_db(client=client)
- else:
- # the XML backend doesn't support loading data for single
- # clients, so it reloads all data
- return self._load_data_xml()
-
- def _load_data_xml(self):
- """ Load probe data from probed.xml """
- try:
- data = lxml.etree.parse(os.path.join(self.data, 'probed.xml'),
- parser=Bcfg2.Server.XMLParser).getroot()
- except (IOError, lxml.etree.XMLSyntaxError):
- err = sys.exc_info()[1]
- self.logger.error("Failed to read file probed.xml: %s" % err)
- return
- self.probedata = {}
- self.cgroups = {}
- for client in data.getchildren():
- self.probedata[client.get('name')] = \
- ClientProbeDataSet(timestamp=client.get("timestamp"))
- self.cgroups[client.get('name')] = []
- for pdata in client:
- if pdata.tag == 'Probe':
- self.probedata[client.get('name')][pdata.get('name')] = \
- ProbeData(pdata.get("value"))
- elif pdata.tag == 'Group':
- self.cgroups[client.get('name')].append(pdata.get('name'))
-
- if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
- self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
-
- def _load_data_db(self, client=None):
- """ Load probe data from the database """
- if client is None:
- self.probedata = {}
- self.cgroups = {}
- probedata = ProbesDataModel.objects.all()
- groupdata = ProbesGroupsModel.objects.all()
- else:
- self.probedata.pop(client, None)
- self.cgroups.pop(client, None)
- probedata = ProbesDataModel.objects.filter(hostname=client)
- groupdata = ProbesGroupsModel.objects.filter(hostname=client)
-
- for pdata in probedata:
- if pdata.hostname not in self.probedata:
- self.probedata[pdata.hostname] = ClientProbeDataSet(
- timestamp=time.mktime(pdata.timestamp.timetuple()))
- self.probedata[pdata.hostname][pdata.probe] = ProbeData(pdata.data)
- for pgroup in groupdata:
- if pgroup.hostname not in self.cgroups:
- self.cgroups[pgroup.hostname] = []
- self.cgroups[pgroup.hostname].append(pgroup.group)
-
- if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
- self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata,
- key=client)
-
- @Bcfg2.Server.Plugin.track_statistics()
- def GetProbes(self, meta):
- return self.probes.get_probe_data(meta)
- GetProbes.__doc__ = Bcfg2.Server.Plugin.Probing.GetProbes.__doc__
-
- @Bcfg2.Server.Plugin.track_statistics()
def ReceiveData(self, client, datalist):
- if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
- if client.hostname in self.cgroups:
- olddata = copy.copy(self.cgroups[client.hostname])
- else:
- olddata = []
-
- cgroups = []
- cprobedata = ClientProbeDataSet()
+ cgroups = set()
+ cdata = dict()
for data in datalist:
- self.ReceiveDataItem(client, data, cgroups, cprobedata)
- self.cgroups[client.hostname] = cgroups
- self.probedata[client.hostname] = cprobedata
-
- if (self.core.metadata_cache_mode in ['cautious', 'aggressive'] and
- olddata != self.cgroups[client.hostname]):
- self.core.metadata_cache.expire(client.hostname)
- self.write_data(client)
- ReceiveData.__doc__ = Bcfg2.Server.Plugin.Probing.ReceiveData.__doc__
-
- def ReceiveDataItem(self, client, data, cgroups, cprobedata):
- """Receive probe results pertaining to client."""
+ groups, cdata[data.get("name")] = \
+ self.ReceiveDataItem(client, data)
+ cgroups.update(groups)
+ self.probestore.set_groups(client.hostname, list(cgroups))
+ self.probestore.set_data(client.hostname, cdata)
+ self.probestore.commit()
+
+ def ReceiveDataItem(self, client, data):
+ """ Receive probe results pertaining to client. Returns a
+ tuple of (<probe groups>, <probe data>). """
if data.text is None:
self.logger.info("Got null response to probe %s from %s" %
(data.get('name'), client.hostname))
- cprobedata[data.get('name')] = ProbeData('')
- return
+ return [], ''
dlines = data.text.split('\n')
self.logger.debug("Processing probe from %s: %s:%s" %
(client.hostname, data.get('name'),
[line.strip() for line in dlines]))
+ groups = []
for line in dlines[:]:
- if line.split(':')[0] == 'group':
- newgroup = line.split(':')[1].strip()
- if newgroup not in cgroups:
- if self._group_allowed(newgroup):
- cgroups.append(newgroup)
- else:
- self.logger.info(
- "Disallowed group assignment %s from %s" %
- (newgroup, client.hostname))
+ match = self.groupline_re.match(line)
+ if match:
+ newgroup = match.group("groupname")
+ if self._group_allowed(newgroup):
+ groups.append(newgroup)
+ else:
+ self.logger.warning(
+ "Disallowed group assignment %s from %s" %
+ (newgroup, client.hostname))
dlines.remove(line)
- dobj = ProbeData("\n".join(dlines))
- cprobedata[data.get('name')] = dobj
+ return (groups, ProbeData("\n".join(dlines)))
+
+ def get_additional_groups(self, metadata):
+ return self.probestore.get_groups(metadata.hostname)
+
+ def get_additional_data(self, metadata):
+ return self.probestore.get_data(metadata.hostname)
def _group_allowed(self, group):
""" Determine if the named group can be set as a probe group
by checking the regexes listed in the [probes] groups_allowed
setting """
- return any(r.match(group) for r in self.allowed_cgroups)
-
- def get_additional_groups(self, meta):
- return self.cgroups.get(meta.hostname, list())
- get_additional_groups.__doc__ = \
- Bcfg2.Server.Plugin.Connector.get_additional_groups.__doc__
-
- def get_additional_data(self, meta):
- return self.probedata.get(meta.hostname, ClientProbeDataSet())
- get_additional_data.__doc__ = \
- Bcfg2.Server.Plugin.Connector.get_additional_data.__doc__
+ return any(r.match(group)
+ for r in Bcfg2.Options.setup.probes_allowed_groups)