""" A plugin to gather information from a client machine """ import re import os import sys import time import copy import operator import lxml.etree import Bcfg2.Server import Bcfg2.Server.Cache import Bcfg2.Server.Plugin import Bcfg2.Server.FileMonitor from Bcfg2.Logger import Debuggable from Bcfg2.Server.Statistics import track_statistics HAS_DJANGO = False # pylint: disable=C0103 ProbesDataModel = None ProbesGroupsModel = None # pylint: enable=C0103 def load_django_models(): """ Load models for Django after option parsing has completed """ # pylint: disable=W0602 global ProbesDataModel, ProbesGroupsModel, HAS_DJANGO # pylint: enable=W0602 try: from django.db import models HAS_DJANGO = True except ImportError: HAS_DJANGO = False return class ProbesDataModel(models.Model, # pylint: disable=W0621,W0612 Bcfg2.Server.Plugin.PluginDatabaseModel): """ The database model for storing probe data """ hostname = models.CharField(max_length=255) probe = models.CharField(max_length=255) timestamp = models.DateTimeField(auto_now=True) data = models.TextField(null=True) class ProbesGroupsModel(models.Model, # pylint: disable=W0621,W0612 Bcfg2.Server.Plugin.PluginDatabaseModel): """ The database model for storing probe groups """ hostname = models.CharField(max_length=255) group = models.CharField(max_length=255) try: import json HAS_JSON = True except ImportError: try: import simplejson as json HAS_JSON = True except ImportError: HAS_JSON = False try: import yaml HAS_YAML = True except ImportError: HAS_YAML = False class ProbeStore(Debuggable): """ Caching abstraction layer between persistent probe data storage and the Probes plugin.""" def __init__(self, core, datastore): # pylint: disable=W0613 Debuggable.__init__(self) self._groupcache = Bcfg2.Server.Cache.Cache("Probes", "probegroups") self._datacache = Bcfg2.Server.Cache.Cache("Probes", "probedata") def get_groups(self, hostname): """ Get the list of groups for the given host """ if hostname not in self._groupcache: self._load_groups(hostname) return self._groupcache.get(hostname, []) def set_groups(self, hostname, groups): """ Set the list of groups for the given host """ raise NotImplementedError def get_data(self, hostname): """ Get a dict of probe data for the given host """ if hostname not in self._datacache: self._load_data(hostname) return self._datacache.get(hostname, dict()) def set_data(self, hostname, data): """ Set probe data for the given host """ raise NotImplementedError def _load_groups(self, hostname): """ When probe groups are not found in the cache, this function is called to load them from the backend (XML or database). """ raise NotImplementedError def _load_data(self, hostname): """ When probe groups are not found in the cache, this function is called to load them from the backend (XML or database). """ raise NotImplementedError def commit(self): """ Commit the current data in the cache to the persistent backend store. This is not used with the :class:`Bcfg2.Server.Plugins.Probes.DBProbeStore`, because it commits on every change. """ pass class DBProbeStore(ProbeStore, Bcfg2.Server.Plugin.DatabaseBacked): """ Caching abstraction layer between the database and the Probes plugin. """ create = False def __init__(self, core, datastore): Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore) ProbeStore.__init__(self, core, datastore) def _load_groups(self, hostname): Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname) groupdata = ProbesGroupsModel.objects.filter(hostname=hostname) self._groupcache[hostname] = list(set(r.group for r in groupdata)) Bcfg2.Server.Cache.expire("Metadata", hostname) @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock def set_groups(self, hostname, groups): Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname) olddata = self._groupcache.get(hostname, []) self._groupcache[hostname] = groups for group in groups: try: ProbesGroupsModel.objects.get_or_create( hostname=hostname, group=group) except ProbesGroupsModel.MultipleObjectsReturned: ProbesGroupsModel.objects.filter(hostname=hostname, group=group).delete() ProbesGroupsModel.objects.get_or_create( hostname=hostname, group=group) ProbesGroupsModel.objects.filter( hostname=hostname).exclude(group__in=groups).delete() if olddata != groups: Bcfg2.Server.Cache.expire("Metadata", hostname) def _load_data(self, hostname): Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname) Bcfg2.Server.Cache.expire("Probes", "probedata", hostname) self._datacache[hostname] = ClientProbeDataSet() ts_set = False for pdata in ProbesDataModel.objects.filter(hostname=hostname): if not ts_set: self._datacache[hostname].timestamp = \ time.mktime(pdata.timestamp.timetuple()) ts_set = True self._datacache[hostname][pdata.probe] = ProbeData(pdata.data) Bcfg2.Server.Cache.expire("Metadata", hostname) @Bcfg2.Server.Plugin.DatabaseBacked.get_db_lock def set_data(self, hostname, data): Bcfg2.Server.Cache.expire("Probes", "probedata", hostname) self._datacache[hostname] = ClientProbeDataSet() expire_metadata = False for probe, pdata in data.items(): self._datacache[hostname][probe] = pdata record, created = ProbesDataModel.objects.get_or_create( hostname=hostname, probe=probe) expire_metadata |= created if record.data != pdata: record.data = pdata record.save() expire_metadata = True qset = ProbesDataModel.objects.filter( hostname=hostname).exclude(probe__in=data.keys()) if len(qset): qset.delete() expire_metadata = True if expire_metadata: Bcfg2.Server.Cache.expire("Metadata", hostname) class XMLProbeStore(ProbeStore): """ Caching abstraction layer between ``probed.xml`` and the Probes plugin.""" def __init__(self, core, datastore): ProbeStore.__init__(self, core, datastore) self._fname = os.path.join(datastore, 'probed.xml') self._load_data() def _load_data(self, _=None): """ Load probe data from probed.xml """ Bcfg2.Server.Cache.expire("Probes", "probegroups") Bcfg2.Server.Cache.expire("Probes", "probedata") if not os.path.exists(self._fname): self.commit() try: data = lxml.etree.parse(self._fname, parser=Bcfg2.Server.XMLParser).getroot() except (IOError, lxml.etree.XMLSyntaxError): err = sys.exc_info()[1] self.logger.error("Failed to read file probed.xml: %s" % err) return for client in data.getchildren(): self._datacache[client.get('name')] = \ ClientProbeDataSet(timestamp=client.get("timestamp")) self._groupcache[client.get('name')] = [] for pdata in client: if pdata.tag == 'Probe': self._datacache[client.get('name')][pdata.get('name')] = \ ProbeData(pdata.get("value")) elif pdata.tag == 'Group': self._groupcache[client.get('name')].append( pdata.get('name')) Bcfg2.Server.Cache.expire("Metadata") def _load_groups(self, hostname): self._load_data(hostname) def commit(self): """ Write received probe data to probed.xml """ top = lxml.etree.Element("Probed") for client, probed in sorted(self._datacache.items()): # make a copy of probe data for this client in case it # submits probe data while we're trying to write # probed.xml probedata = copy.copy(probed) ctag = \ lxml.etree.SubElement(top, 'Client', name=client, timestamp=str(int(probedata.timestamp))) for probe in sorted(probedata): lxml.etree.SubElement( ctag, 'Probe', name=probe, value=self._datacache[client][probe]) for group in sorted(self._groupcache[client]): lxml.etree.SubElement(ctag, "Group", name=group) try: top.getroottree().write(self._fname, xml_declaration=False, pretty_print='true') except IOError: err = sys.exc_info()[1] self.logger.error("Failed to write probed.xml: %s" % err) def set_groups(self, hostname, groups): Bcfg2.Server.Cache.expire("Probes", "probegroups", hostname) olddata = self._groupcache.get(hostname, []) self._groupcache[hostname] = groups if olddata != groups: Bcfg2.Server.Cache.expire("Metadata", hostname) def set_data(self, hostname, data): Bcfg2.Server.Cache.expire("Probes", "probedata", hostname) self._datacache[hostname] = ClientProbeDataSet() expire_metadata = False for probe, pdata in data.items(): olddata = self._datacache[hostname].get(probe, ProbeData('')) self._datacache[hostname][probe] = pdata expire_metadata |= olddata != data if expire_metadata: Bcfg2.Server.Cache.expire("Metadata", hostname) class ClientProbeDataSet(dict): """ dict of probe => [probe data] that records a timestamp for each host """ def __init__(self, *args, **kwargs): if "timestamp" in kwargs and kwargs['timestamp'] is not None: self.timestamp = kwargs.pop("timestamp") else: self.timestamp = time.time() dict.__init__(self, *args, **kwargs) class ProbeData(str): # pylint: disable=E0012,R0924 """ a ProbeData object emulates a str object, but also has .xdata, .json, and .yaml properties to provide convenient ways to use ProbeData objects as XML, JSON, or YAML data """ def __new__(cls, data): return str.__new__(cls, data) def __init__(self, data): # pylint: disable=W0613 str.__init__(self) self._xdata = None self._json = None self._yaml = None @property def data(self): """ provide backwards compatibility with broken ProbeData object in bcfg2 1.2.0 thru 1.2.2 """ return str(self) @property def xdata(self): """ The probe data as a lxml.etree._Element document """ if self._xdata is None: try: self._xdata = lxml.etree.XML(self.data, parser=Bcfg2.Server.XMLParser) except lxml.etree.XMLSyntaxError: pass return self._xdata @property def json(self): """ The probe data as a decoded JSON data structure """ if self._json is None and HAS_JSON: try: self._json = json.loads(self.data) except ValueError: pass return self._json @property def yaml(self): """ The probe data as a decoded YAML data structure """ if self._yaml is None and HAS_YAML: try: self._yaml = yaml.load(self.data) except yaml.YAMLError: pass return self._yaml class ProbeSet(Bcfg2.Server.Plugin.EntrySet): """ Handle universal and group- and host-specific probe files """ ignore = re.compile(r'^(\.#.*|.*~|\..*\.(tmp|sw[px])|probed\.xml)$') probename = \ re.compile(r'(.*/)?(?P\S+?)(\.(?P(?:G\d\d)|H)_\S+)?$') bangline = re.compile(r'^#!\s*(?P.*)$') basename_is_regex = True def __init__(self, path, plugin_name): self.plugin_name = plugin_name Bcfg2.Server.Plugin.EntrySet.__init__(self, r'[0-9A-Za-z_\-]+', path, Bcfg2.Server.Plugin.SpecificData) Bcfg2.Server.FileMonitor.get_fam().AddMonitor(path, self) def HandleEvent(self, event): """ handle events on everything but probed.xml """ if (event.filename != self.path and not event.filename.endswith("probed.xml")): return self.handle_event(event) def get_probe_data(self, metadata): """ Get an XML description of all probes for a client suitable for sending to that client. :param metadata: The client metadata to get probes for. :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata :returns: list of lxml.etree._Element objects, each of which represents one probe. """ ret = [] build = dict() candidates = self.get_matching(metadata) candidates.sort(key=operator.attrgetter('specific')) for entry in candidates: rem = self.probename.match(entry.name) pname = rem.group('basename') if pname not in build: build[pname] = entry for (name, entry) in list(build.items()): probe = lxml.etree.Element('probe') probe.set('name', os.path.basename(name)) probe.set('source', self.plugin_name) if (metadata.version_info and metadata.version_info > (1, 3, 1, '', 0)): try: probe.text = entry.data.decode('utf-8') except AttributeError: probe.text = entry.data else: try: probe.text = entry.data except: # pylint: disable=W0702 self.logger.error("Client unable to handle unicode " "probes. Skipping %s" % probe.get('name')) continue match = self.bangline.match(entry.data.split('\n')[0]) if match: probe.set('interpreter', match.group('interpreter')) else: probe.set('interpreter', '/bin/sh') ret.append(probe) return ret def __str__(self): return "ProbeSet for %s" % self.plugin_name class Probes(Bcfg2.Server.Plugin.Probing, Bcfg2.Server.Plugin.Connector, Bcfg2.Server.Plugin.DatabaseBacked): """ A plugin to gather information from a client machine """ __author__ = 'bcfg-dev@mcs.anl.gov' groupline_re = re.compile(r'^group:\s*(?P\S+)\s*') options = [ Bcfg2.Options.BooleanOption( cf=('probes', 'use_database'), dest="probes_db", help="Use database capabilities of the Probes plugin")] options_parsed_hook = staticmethod(load_django_models) def __init__(self, core, datastore): Bcfg2.Server.Plugin.Probing.__init__(self) Bcfg2.Server.Plugin.Connector.__init__(self) Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore) try: self.probes = ProbeSet(self.data, self.name) except: err = sys.exc_info()[1] raise Bcfg2.Server.Plugin.PluginInitError(err) if self._use_db: self.probestore = DBProbeStore(core, datastore) else: self.probestore = XMLProbeStore(core, datastore) @track_statistics() def GetProbes(self, metadata): return self.probes.get_probe_data(metadata) @track_statistics() def ReceiveData(self, client, datalist): cgroups = set() cdata = dict() for data in datalist: groups, cdata[data.get("name")] = \ self.ReceiveDataItem(client, data) cgroups.update(groups) self.probestore.set_groups(client.hostname, list(cgroups)) self.probestore.set_data(client.hostname, cdata) self.probestore.commit() def ReceiveDataItem(self, client, data): """ Receive probe results pertaining to client. Returns a tuple of (, ). """ if data.text is None: self.logger.info("Got null response to probe %s from %s" % (data.get('name'), client.hostname)) return [], '' dlines = data.text.split('\n') self.logger.debug("Processing probe from %s: %s:%s" % (client.hostname, data.get('name'), [line.strip() for line in dlines])) groups = [] for line in dlines[:]: match = self.groupline_re.match(line) if match: groups.append(match.group("groupname")) dlines.remove(line) return (groups, ProbeData("\n".join(dlines))) def get_additional_groups(self, metadata): return self.probestore.get_groups(metadata.hostname) def get_additional_data(self, metadata): return self.probestore.get_data(metadata.hostname)