summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/Plugin.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-09-06 09:17:08 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-09-06 09:17:08 -0400
commitd3aa773f9f42045a0922d6c194e01d029ee53a40 (patch)
treeb499c7973ff4e6c14a50d8708e07a23aeb863cc4 /src/lib/Bcfg2/Server/Plugin.py
parent460a1c2a0f1caa7eb2043ad10ba64b8b55e43844 (diff)
downloadbcfg2-d3aa773f9f42045a0922d6c194e01d029ee53a40.tar.gz
bcfg2-d3aa773f9f42045a0922d6c194e01d029ee53a40.tar.bz2
bcfg2-d3aa773f9f42045a0922d6c194e01d029ee53a40.zip
split up mammoth Plugin.py
Diffstat (limited to 'src/lib/Bcfg2/Server/Plugin.py')
-rw-r--r--src/lib/Bcfg2/Server/Plugin.py1642
1 files changed, 0 insertions, 1642 deletions
diff --git a/src/lib/Bcfg2/Server/Plugin.py b/src/lib/Bcfg2/Server/Plugin.py
deleted file mode 100644
index 0b2f7cee0..000000000
--- a/src/lib/Bcfg2/Server/Plugin.py
+++ /dev/null
@@ -1,1642 +0,0 @@
-"""This module provides the baseclass for Bcfg2 Server Plugins."""
-
-import os
-import re
-import sys
-import copy
-import logging
-import operator
-import threading
-import lxml.etree
-import Bcfg2.Server
-import Bcfg2.Options
-from Bcfg2.Compat import ConfigParser, CmpMixin, reduce, Queue, Empty, \
- Full, cPickle
-
-try:
- import django
- has_django = True
-except ImportError:
- has_django = False
-
-# grab default metadata info from bcfg2.conf
-opts = {'owner': Bcfg2.Options.MDATA_OWNER,
- 'group': Bcfg2.Options.MDATA_GROUP,
- 'perms': Bcfg2.Options.MDATA_PERMS,
- 'secontext': Bcfg2.Options.MDATA_SECONTEXT,
- 'important': Bcfg2.Options.MDATA_IMPORTANT,
- 'paranoid': Bcfg2.Options.MDATA_PARANOID,
- 'sensitive': Bcfg2.Options.MDATA_SENSITIVE}
-default_file_metadata = Bcfg2.Options.OptionParser(opts)
-default_file_metadata.parse([])
-del default_file_metadata['args']
-
-logger = logging.getLogger('Bcfg2.Server.Plugin')
-
-info_regex = re.compile('owner:(\s)*(?P<owner>\S+)|' +
- 'group:(\s)*(?P<group>\S+)|' +
- 'perms:(\s)*(?P<perms>\w+)|' +
- 'secontext:(\s)*(?P<secontext>\S+)|' +
- 'paranoid:(\s)*(?P<paranoid>\S+)|' +
- 'sensitive:(\s)*(?P<sensitive>\S+)|' +
- 'encoding:(\s)*(?P<encoding>\S+)|' +
- 'important:(\s)*(?P<important>\S+)|' +
- 'mtime:(\s)*(?P<mtime>\w+)|')
-
-def bind_info(entry, metadata, infoxml=None, default=default_file_metadata):
- for attr, val in list(default.items()):
- entry.set(attr, val)
- if infoxml:
- mdata = dict()
- infoxml.pnode.Match(metadata, mdata, entry=entry)
- if 'Info' not in mdata:
- msg = "Failed to set metadata for file %s" % entry.get('name')
- logger.error(msg)
- raise PluginExecutionError(msg)
- for attr, val in list(mdata['Info'][None].items()):
- entry.set(attr, val)
-
-
-class PluginInitError(Exception):
- """Error raised in cases of :class:`Bcfg2.Server.Plugin.Plugin`
- initialization errors."""
- pass
-
-
-class PluginExecutionError(Exception):
- """Error raised in case of :class:`Bcfg2.Server.Plugin.Plugin`
- execution errors."""
- pass
-
-
-class MetadataConsistencyError(Exception):
- """This error gets raised when metadata is internally inconsistent."""
- pass
-
-
-class MetadataRuntimeError(Exception):
- """This error is raised when the metadata engine is called prior
- to reading enough data, or for other
- :class:`Bcfg2.Server.Plugin.Metadata` errors. """
- pass
-
-
-class Debuggable(object):
- """ Mixin to add a debugging interface to an object and expose it
- via XML-RPC on :class:`Bcfg2.Server.Plugin.Plugin` objects """
-
- #: List of names of methods to be exposed as XML-RPC functions
- __rmi__ = ['toggle_debug']
-
- def __init__(self, name=None):
- if name is None:
- name = "%s.%s" % (self.__class__.__module__,
- self.__class__.__name__)
- self.debug_flag = False
- self.logger = logging.getLogger(name)
-
- def toggle_debug(self):
- """ Turn debugging output on or off.
-
- :returns: bool - The new value of the debug flag
- """
- self.debug_flag = not self.debug_flag
- self.debug_log("%s: debug_flag = %s" % (self.__class__.__name__,
- self.debug_flag),
- flag=True)
- return self.debug_flag
-
- def debug_log(self, message, flag=None):
- """ Log a message at the debug level.
-
- :param message: The message to log
- :type message: string
- :param flag: Override the current debug flag with this value
- :type flag: bool
- :returns: None
- """
- if (flag is None and self.debug_flag) or flag:
- self.logger.error(message)
-
-
-class Plugin(Debuggable):
- """ The base class for all Bcfg2 Server plugins. """
-
- #: The name of the plugin.
- name = 'Plugin'
-
- #: The email address of the plugin author.
- __author__ = 'bcfg-dev@mcs.anl.gov'
-
- #: Plugin is experimental. Use of this plugin will produce a log
- #: message alerting the administrator that an experimental plugin
- #: is in use.
- experimental = False
-
- #: Plugin is deprecated and will be removed in a future release.
- #: Use of this plugin will produce a log message alerting the
- #: administrator that an experimental plugin is in use.
- deprecated = False
-
- #: Plugin conflicts with the list of other plugin names
- conflicts = []
-
- #: Plugins of the same type are processed in order of ascending
- #: sort_order value. Plugins with the same sort_order are sorted
- #: alphabetically by their name.
- sort_order = 500
-
- def __init__(self, core, datastore):
- """ Initialize the plugin.
-
- :param core: The Bcfg2.Server.Core initializing the plugin
- :type core: Bcfg2.Server.Core
- :param datastore: The path to the Bcfg2 repository on the
- filesystem
- :type datastore: string
- :raises: Bcfg2.Server.Plugin.PluginInitError
- """
- object.__init__(self)
- self.Entries = {}
- self.core = core
- self.data = os.path.join(datastore, self.name)
- self.running = True
- Debuggable.__init__(self, name=self.name)
-
- @classmethod
- def init_repo(cls, repo):
- """ Perform any tasks necessary to create an initial Bcfg2
- repository.
-
- :param repo: The path to the Bcfg2 repository on the filesystem
- :type repo: string
- :returns: None
- """
- os.makedirs(os.path.join(repo, cls.name))
-
- def shutdown(self):
- """ Perform shutdown tasks for the plugin
-
- :returns: None """
- self.running = False
-
- def __str__(self):
- return "%s Plugin" % self.__class__.__name__
-
-
-class DatabaseBacked(Plugin):
- @property
- def _use_db(self):
- use_db = self.core.setup.cfp.getboolean(self.name.lower(),
- "use_database",
- default=False)
- if use_db and has_django and self.core.database_available:
- return True
- elif not use_db:
- return False
- else:
- self.logger.error("use_database is true but django not found")
- return False
-
-
-class PluginDatabaseModel(object):
- class Meta:
- app_label = "Server"
-
-
-class Generator(object):
- """ Generator plugins contribute to literal client configurations.
- That is, they generate entry contents.
-
- An entry is generated in one of two ways:
-
- #. The Bcfg2 core looks in the ``Entries`` dict attribute of the
- plugin object. ``Entries`` is expected to be a dict whose keys
- are entry tags (e.g., ``"Path"``, ``"Service"``, etc.) and
- whose values are dicts; those dicts should map the ``name``
- attribute of an entry to a callable that will be called to
- generate the content. The callable will receive two arguments:
- the abstract entry (as an lxml.etree._Element object), and the
- client metadata object the entry is being generated for.
-
- #. If the entry is not listed in ``Entries``, the Bcfg2 core calls
- :func:`Bcfg2.Server.Plugin.Generator.HandlesEntry`; if that
- returns True, then it calls
- :func:`Bcfg2.Server.Plugin.Generator.HandleEntry`.
- """
-
- def HandlesEntry(self, entry, metadata):
- """ HandlesEntry is the slow path method for routing
- configuration binding requests. It is called if the
- ``Entries`` dict does not contain a method for binding the
- entry.
-
- :param entry: The entry to bind
- :type entry: lxml.etree._Element
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :return: bool - Whether or not this plugin can handle the entry
- :raises: Bcfg2.Server.Plugin.PluginExecutionError
- """
- return False
-
- def HandleEntry(self, entry, metadata):
- """ HandlesEntry is the slow path method for binding
- configuration binding requests. It is called if the
- ``Entries`` dict does not contain a method for binding the
- entry, and :func:`Bcfg2.Server.Plugin.Generator.HandlesEntry`
- returns True.
-
- :param entry: The entry to bind
- :type entry: lxml.etree._Element
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :return: lxml.etree._Element - The fully bound entry
- :raises: Bcfg2.Server.Plugin.PluginExecutionError
- """
- return entry
-
-
-class Structure(object):
- """ Structure Plugins contribute to abstract client
- configurations. That is, they produce lists of entries that will
- be generated for a client. """
-
- def BuildStructures(self, metadata):
- """ Build a list of lxml.etree._Element objects that will be
- added to the top-level ``<Configuration>`` tag of the client
- configuration. Consequently, each object in the list returned
- by ``BuildStructures()`` must consist of a container tag
- (e.g., ``<Bundle>`` or ``<Independent>``) which contains the
- entry tags. It must not return a list of entry tags.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :return: list of lxml.etree._Element objects
- """
- raise NotImplementedError
-
-
-class Metadata(object):
- """Signal metadata capabilities for this plugin"""
- def viz(self, hosts, bundles, key, only_client, colors):
- """ Return a string containing a graphviz document that maps
- out the Metadata for :ref:`bcfg2-admin viz <server-admin-viz>`
-
- :param hosts: Include hosts in the graph
- :type hosts: bool
- :param bundles: Include bundles in the graph
- :type bundles: bool
- :param key: Include a key in the graph
- :type key: bool
- :param only_client: Only include data for the specified client
- :type only_client: string
- :param colors: Use the specified graphviz colors
- :type colors: list of strings
- :return: string
- """
- return ''
-
- def set_version(self, client, version):
- """ Set the version for the named client to the specified
- version string.
-
- :param client: Hostname of the client
- :type client: string
- :param profile: Client Bcfg2 version
- :type profile: string
- :return: None
- :raises: Bcfg2.Server.Plugin.MetadataRuntimeError,
- Bcfg2.Server.Plugin.MetadataConsistencyError
- """
- pass
-
- def set_profile(self, client, profile, address):
- """ Set the profile for the named client to the named profile
- group.
-
- :param client: Hostname of the client
- :type client: string
- :param profile: Name of the profile group
- :type profile: string
- :param address: Address pair of ``(<ip address>, <hostname>)``
- :type address: tuple
- :return: None
- :raises: Bcfg2.Server.Plugin.MetadataRuntimeError,
- Bcfg2.Server.Plugin.MetadataConsistencyError
- """
- pass
-
- def resolve_client(self, address, cleanup_cache=False):
- """ Resolve the canonical name of this client. If this method
- is not implemented, the hostname claimed by the client is
- used. (This may be a security risk; it's highly recommended
- that you implement ``resolve_client`` if you are writing a
- Metadata plugin.)
-
- :param address: Address pair of ``(<ip address>, <hostname>)``
- :type address: tuple
- :param cleanup_cache: Whether or not to remove expire the
- entire client hostname resolution class
- :type cleanup_cache: bool
- :return: string - canonical client hostname
- :raises: Bcfg2.Server.Plugin.MetadataRuntimeError,
- Bcfg2.Server.Plugin.MetadataConsistencyError
- """
- return address[1]
-
- def AuthenticateConnection(self, cert, user, password, address):
- """ Authenticate the given client.
-
- :param cert: an x509 certificate
- :type cert: dict
- :param user: The username of the user trying to authenticate
- :type user: string
- :param password: The password supplied by the client
- :type password: string
- :param addresspair: An address pair of ``(<ip address>,
- <hostname>)``
- :type addresspair: tuple
- :return: bool - True if the authenticate succeeds, False otherwise
- """
- raise NotImplementedError
-
- def get_initial_metadata(self, client_name):
- """ Return a
- :class:`Bcfg2.Server.Plugins.Metadata.ClientMetadata` object
- that fully describes everything the Metadata plugin knows
- about the named client.
-
- :param client_name: The hostname of the client
- :type client_name: string
- :return: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- """
- raise NotImplementedError
-
- def merge_additional_data(self, imd, source, data):
- """ Add arbitrary data from a
- :class:`Bcfg2.Server.Plugin.Connector` plugin to the given
- metadata object.
-
- :param imd: An initial metadata object
- :type imd: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param source: The name of the plugin providing this data
- :type source: string
- :param data: The data to add
- :type data: any
- :return: None
- """
- raise NotImplementedError
-
- def merge_additional_groups(self, imd, groups):
- """ Add groups from a
- :class:`Bcfg2.Server.Plugin.Connector` plugin to the given
- metadata object.
-
- :param imd: An initial metadata object
- :type imd: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param groups: The groups to add
- :type groups: list of strings
- :return: None
- """
- raise NotImplementedError
-
-
-class Connector(object):
- """ Connector plugins augment client metadata instances with
- additional data, additional groups, or both. """
-
- def get_additional_groups(self, metadata):
- """ Return a list of additional groups for the given client.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :return: list of strings
- """
- return list()
-
- def get_additional_data(self, metadata):
- """ Return arbitrary additional data for the given
- ClientMetadata object. By convention this is usually a dict
- object, but doesn't need to be.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :return: list of strings
- """
- return dict()
-
-
-class Probing(object):
- """ Probing plugins can collect data from clients and process it.
- """
-
- def GetProbes(self, metadata):
- """ Return a list of probes for the given client. Each probe
- should be an lxml.etree._Element object that adheres to
- the following specification. Each probe must the following
- attributes:
-
- * ``name``: The unique name of the probe.
- * ``source``: The origin of the probe; probably the name of
- the plugin that supplies the probe.
- * ``interpreter``: The command that will be run on the client
- to interpret the probe script. Compiled (i.e.,
- non-interpreted) probes are not supported.
-
- The text of the XML tag should be the contents of the probe,
- i.e., the code that will be run on the client.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :return: list of lxml.etree._Element objects
- """
- raise NotImplementedError
-
- def ReceiveData(self, metadata, datalist):
- """ Process data returned from the probes for the given
- client. ``datalist`` is a list of lxml.etree._Element
- objects, each of which is a single tag; the ``name`` attribute
- holds the unique name of the probe that was run, and the text
- contents of the tag hold the results of the probe.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param datalist: The probe data
- :type datalist: list of lxml.etree._Element objects
- :return: None
- """
- raise NotImplementedError
-
-
-class Statistics(Plugin):
- """ Statistics plugins handle statistics for clients. In general,
- you should avoid using Statistics and use
- :class:`Bcfg2.Server.Plugin.ThreadedStatistics` instead."""
-
- def process_statistics(self, client, xdata):
- """ Process the given XML statistics data for the specified
- client.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param data: The statistics data
- :type data: lxml.etree._Element
- :return: None
- """
- raise NotImplementedError
-
-
-class ThreadedStatistics(Statistics, threading.Thread):
- """ ThreadedStatistics plugins process client statistics in a
- separate thread. """
-
- def __init__(self, core, datastore):
- Statistics.__init__(self, core, datastore)
- threading.Thread.__init__(self)
- # Event from the core signaling an exit
- self.terminate = core.terminate
- self.work_queue = Queue(100000)
- self.pending_file = os.path.join(datastore, "etc",
- "%s.pending" % self.name)
- self.daemon = False
- self.start()
-
- def _save(self):
- """Save any pending data to a file."""
- pending_data = []
- try:
- while not self.work_queue.empty():
- (metadata, data) = self.work_queue.get_nowait()
- try:
- pending_data.append((metadata.hostname,
- lxml.etree.tostring(data,
- xml_declaration=False).decode("UTF-8")))
- except:
- err = sys.exc_info()[1]
- self.logger.warning("Dropping interaction for %s: %s" %
- (metadata.hostname, err))
- except Empty:
- pass
-
- try:
- savefile = open(self.pending_file, 'w')
- cPickle.dump(pending_data, savefile)
- savefile.close()
- self.logger.info("Saved pending %s data" % self.name)
- except:
- err = sys.exc_info()[1]
- self.logger.warning("Failed to save pending data: %s" % err)
-
- def _load(self):
- """Load any pending data from a file."""
- if not os.path.exists(self.pending_file):
- return True
- pending_data = []
- try:
- savefile = open(self.pending_file, 'r')
- pending_data = cPickle.load(savefile)
- savefile.close()
- except Exception:
- e = sys.exc_info()[1]
- self.logger.warning("Failed to load pending data: %s" % e)
- return False
- for (pmetadata, pdata) in pending_data:
- # check that shutdown wasnt called early
- if self.terminate.isSet():
- return False
-
- try:
- while True:
- try:
- metadata = self.core.build_metadata(pmetadata)
- break
- except MetadataRuntimeError:
- pass
-
- self.terminate.wait(5)
- if self.terminate.isSet():
- return False
-
- self.work_queue.put_nowait((metadata,
- lxml.etree.XML(pdata,
- parser=Bcfg2.Server.XMLParser)))
- except Full:
- self.logger.warning("Queue.Full: Failed to load queue data")
- break
- except lxml.etree.LxmlError:
- lxml_error = sys.exc_info()[1]
- self.logger.error("Unable to load saved interaction: %s" %
- lxml_error)
- except MetadataConsistencyError:
- self.logger.error("Unable to load metadata for save "
- "interaction: %s" % pmetadata)
- try:
- os.unlink(self.pending_file)
- except:
- self.logger.error("Failed to unlink save file: %s" %
- self.pending_file)
- self.logger.info("Loaded pending %s data" % self.name)
- return True
-
- def run(self):
- if not self._load():
- return
- while not self.terminate.isSet() and self.work_queue != None:
- try:
- (client, xdata) = self.work_queue.get(block=True, timeout=2)
- except Empty:
- continue
- except Exception:
- e = sys.exc_info()[1]
- self.logger.error("ThreadedStatistics: %s" % e)
- continue
- self.handle_statistic(client, xdata)
- if self.work_queue != None and not self.work_queue.empty():
- self._save()
-
- def process_statistics(self, metadata, data):
- try:
- self.work_queue.put_nowait((metadata, copy.copy(data)))
- except Full:
- self.logger.warning("%s: Queue is full. Dropping interactions." %
- self.name)
-
- def handle_statistic(self, metadata, data):
- """ Process the given XML statistics data for the specified
- client object. This differs from the
- :func:`Bcfg2.Server.Plugin.Statistics.process_statistics`
- method only in that ThreadedStatistics first adds the data to
- a queue, and then processes them in a separate thread.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param data: The statistics data
- :type data: lxml.etree._Element
- :return: None
- """
- raise NotImplementedError
-
-
-class PullSource(object):
- def GetExtra(self, client):
- return []
-
- def GetCurrentEntry(self, client, e_type, e_name):
- raise NotImplementedError
-
-
-class PullTarget(object):
- def AcceptChoices(self, entry, metadata):
- raise NotImplementedError
-
- def AcceptPullData(self, specific, new_entry, verbose):
- raise NotImplementedError
-
-
-class Decision(object):
- """ Decision plugins produce decision lists for affecting which
- entries are actually installed on clients. """
-
- def GetDecisions(self, metadata, mode):
- """ Return a list of tuples of ``(<entry type>, <entry
- name>)`` to be used as the decision list for the given
- client in the specified mode.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param mode: The decision mode ("whitelist" or "blacklist")
- :type mode: string
- :return: list of tuples
- """
- raise NotImplementedError
-
-
-class ValidationError(Exception):
- """ Exception raised by
- :class:`Bcfg2.Server.Plugin.StructureValidator` and
- :class:`Bcfg2.Server.Plugin.GoalValidator` objects """
-
-
-class StructureValidator(object):
- """ StructureValidator plugins can modify the list of structures
- after it has been created but before the entries have been
- concretely bound. """
-
- def validate_structures(self, metadata, structures):
- """ Given a list of structures (i.e., of tags that contain
- entry tags), modify that list or the structures in it
- in-place.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param config: A list of lxml.etree._Element objects
- describing the structures for this client
- :type config: list
- :returns: None
- :raises: Bcfg2.Server.Plugin.ValidationError
- """
- raise NotImplementedError
-
-
-class GoalValidator(object):
- """ GoalValidator plugins can modify the concretely-bound configuration of
- a client as a last stage before the configuration is sent to the
- client. """
-
- def validate_goals(self, metadata, config):
- """ Given a monolithic XML document of the full configuration,
- modify the document in-place.
-
- :param metadata: The client metadata
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param config: The full configuration for the client
- :type config: lxml.etree._Element
- :returns: None
- :raises: Bcfg2.Server.Plugin.ValidationError
- """
- raise NotImplementedError
-
-
-class Version(object):
- """ Version plugins interact with various version control systems. """
-
- def get_revision(self):
- """ Return the current revision of the Bcfg2 specification.
- This will be included in the ``revision`` attribute of the
- top-level tag of the XML configuration sent to the client.
-
- :returns: string - the current version
- """
- raise NotImplementedError
-
-
-class ClientRunHooks(object):
- """ ClientRunHooks can hook into various parts of a client run to
- perform actions at various times without needing to pretend to be
- a different plugin type. """
-
- def start_client_run(self, metadata):
- """ Invoked at the start of a client run, after all probe data
- has been received and decision lists have been queried (if
- applicable), but before the configuration is generated.
-
- :param metadata: The client metadata object
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :returns: None
- """
- pass
-
- def end_client_run(self, metadata):
- """ Invoked at the end of a client run, immediately after
- :class:`Bcfg2.Server.Plugin.GoalValidator` plugins have been run
- and just before the configuration is returned to the client.
-
- :param metadata: The client metadata object
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :returns: None
- """
- pass
-
- def end_statistics(self, metadata):
- """ Invoked after statistics are processed for a client.
-
- :param metadata: The client metadata object
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :returns: None
- """
- pass
-
-# the rest of the file contains classes for coherent file caching
-
-class FileBacked(object):
- """This object caches file data in memory.
- HandleEvent is called whenever fam registers an event.
- Index can parse the data into member data as required.
- This object is meant to be used as a part of DirectoryBacked.
- """
-
- def __init__(self, name, fam=None):
- object.__init__(self)
- self.data = ''
- self.name = name
- self.fam = fam
-
- def HandleEvent(self, event=None):
- """Read file upon update."""
- if event and event.code2str() not in ['exists', 'changed', 'created']:
- return
- try:
- self.data = open(self.name).read()
- self.Index()
- except IOError:
- err = sys.exc_info()[1]
- logger.error("Failed to read file %s: %s" % (self.name, err))
-
- def Index(self):
- """Update local data structures based on current file state"""
- pass
-
- def __repr__(self):
- return "%s: %s" % (self.__class__.__name__, self.name)
-
-
-class DirectoryBacked(object):
- """This object is a coherent cache for a filesystem hierarchy of files."""
- __child__ = FileBacked
- patterns = re.compile('.*')
- ignore = None
-
- def __init__(self, data, fam):
- """Initialize the DirectoryBacked object.
-
- :param self: the object being initialized.
- :param data: the path to the data directory that will be
- monitored.
- :param fam: The FileMonitor object used to receive
- notifications of changes.
- """
- object.__init__(self)
-
- self.data = os.path.normpath(data)
- self.fam = fam
-
- # self.entries contains information about the files monitored
- # by this object.... The keys of the dict are the relative
- # paths to the files. The values are the objects (of type
- # __child__) that handle their contents.
- self.entries = {}
-
- # self.handles contains information about the directories
- # monitored by this object. The keys of the dict are the
- # values returned by the initial fam.AddMonitor() call (which
- # appear to be integers). The values are the relative paths of
- # the directories.
- self.handles = {}
-
- # Monitor everything in the plugin's directory
- self.add_directory_monitor('')
-
- def __getitem__(self, key):
- return self.entries[key]
-
- def __iter__(self):
- return iter(list(self.entries.items()))
-
- def add_directory_monitor(self, relative):
- """Add a new directory to FAM structures for monitoring.
-
- :param relative: Path name to monitor. This must be relative
- to the plugin's directory. An empty string value ("") will
- cause the plugin directory itself to be monitored.
- """
- dirpathname = os.path.join(self.data, relative)
- if relative not in self.handles.values():
- if not os.path.isdir(dirpathname):
- logger.error("%s is not a directory" % dirpathname)
- return
- reqid = self.fam.AddMonitor(dirpathname, self)
- self.handles[reqid] = relative
-
- def add_entry(self, relative, event):
- """Add a new file to our structures for monitoring.
-
- :param relative: Path name to monitor. This must be relative
- to the plugin's directory.
- :param event: File Monitor event that caused this entry to be
- added.
- """
- self.entries[relative] = self.__child__(os.path.join(self.data,
- relative),
- self.fam)
- self.entries[relative].HandleEvent(event)
-
- def HandleEvent(self, event):
- """Handle FAM/Gamin events.
-
- This method is invoked by FAM/Gamin when it detects a change
- to a filesystem object we have requsted to be monitored.
-
- This method manages the lifecycle of events related to the
- monitored objects, adding them to our indiciess and creating
- objects of type __child__ that actually do the domain-specific
- processing. When appropriate, it propogates events those
- objects by invoking their HandleEvent in turn.
- """
- action = event.code2str()
-
- # Exclude events for actions we don't care about
- if action == 'endExist':
- return
-
- if event.requestID not in self.handles:
- logger.warn("Got %s event with unknown handle (%s) for %s" %
- (action, event.requestID, event.filename))
- return
-
- # Clean up path names
- event.filename = os.path.normpath(event.filename)
- if event.filename.startswith(self.data):
- # the first event we get is on the data directory itself
- event.filename = event.filename[len(self.data) + 1:]
-
- if self.ignore and self.ignore.search(event.filename):
- logger.debug("Ignoring event %s" % event.filename)
- return
-
- # Calculate the absolute and relative paths this event refers to
- abspath = os.path.join(self.data, self.handles[event.requestID],
- event.filename)
- relpath = os.path.join(self.handles[event.requestID],
- event.filename).lstrip('/')
-
- if action == 'deleted':
- for key in list(self.entries.keys()):
- if key.startswith(relpath):
- del self.entries[key]
- # We remove values from self.entries, but not
- # self.handles, because the FileMonitor doesn't stop
- # watching a directory just because it gets deleted. If it
- # is recreated, we will start getting notifications for it
- # again without having to add a new monitor.
- elif os.path.isdir(abspath):
- # Deal with events for directories
- if action in ['exists', 'created']:
- self.add_directory_monitor(relpath)
- elif action == 'changed':
- if relpath in self.entries:
- # Ownerships, permissions or timestamps changed on
- # the directory. None of these should affect the
- # contents of the files, though it could change
- # our ability to access them.
- #
- # It seems like the right thing to do is to cancel
- # monitoring the directory and then begin
- # monitoring it again. But the current FileMonitor
- # class doesn't support canceling, so at least let
- # the user know that a restart might be a good
- # idea.
- logger.warn("Directory properties for %s changed, please " +
- " consider restarting the server" % (abspath))
- else:
- # Got a "changed" event for a directory that we
- # didn't know about. Go ahead and treat it like a
- # "created" event, but log a warning, because this
- # is unexpected.
- logger.warn("Got %s event for unexpected dir %s" %
- (action, abspath))
- self.add_directory_monitor(relpath)
- else:
- logger.warn("Got unknown dir event %s %s %s" %
- (event.requestID, event.code2str(), abspath))
- elif self.patterns.search(event.filename):
- if action in ['exists', 'created']:
- self.add_entry(relpath, event)
- elif action == 'changed':
- if relpath in self.entries:
- self.entries[relpath].HandleEvent(event)
- else:
- # Got a "changed" event for a file that we didn't
- # know about. Go ahead and treat it like a
- # "created" event, but log a warning, because this
- # is unexpected.
- logger.warn("Got %s event for unexpected file %s" %
- (action,
- abspath))
- self.add_entry(relpath, event)
- else:
- logger.warn("Got unknown file event %s %s %s" %
- (event.requestID, event.code2str(), abspath))
- else:
- logger.warn("Could not process filename %s; ignoring" %
- event.filename)
-
-
-class XMLFileBacked(FileBacked):
- """
- This object is a coherent cache for an XML file to be used as a
- part of DirectoryBacked.
- """
- __identifier__ = 'name'
-
- def __init__(self, filename, fam=None, should_monitor=False):
- FileBacked.__init__(self, filename)
- self.label = ""
- self.entries = []
- self.extras = []
- self.fam = fam
- self.should_monitor = should_monitor
- if fam and should_monitor:
- self.fam.AddMonitor(filename, self)
-
- def _follow_xincludes(self, fname=None, xdata=None):
- ''' follow xincludes, adding included files to self.extras '''
- if xdata is None:
- if fname is None:
- xdata = self.xdata.getroottree()
- else:
- xdata = lxml.etree.parse(fname)
- included = [el for el in xdata.findall('//%sinclude' %
- Bcfg2.Server.XI_NAMESPACE)]
- for el in included:
- name = el.get("href")
- if name.startswith("/"):
- fpath = name
- else:
- if fname:
- rel = fname
- else:
- rel = self.name
- fpath = os.path.join(os.path.dirname(rel), name)
- if fpath not in self.extras:
- if os.path.exists(fpath):
- self._follow_xincludes(fname=fpath)
- self.add_monitor(fpath)
- else:
- msg = "%s: %s does not exist, skipping" % (self.name, name)
- if el.findall('./%sfallback' % Bcfg2.Server.XI_NAMESPACE):
- self.logger.debug(msg)
- else:
- self.logger.warning(msg)
-
- def Index(self):
- """Build local data structures."""
- try:
- self.xdata = lxml.etree.XML(self.data, base_url=self.name,
- parser=Bcfg2.Server.XMLParser)
- except lxml.etree.XMLSyntaxError:
- msg = "Failed to parse %s: %s" % (self.name, sys.exc_info()[1])
- logger.error(msg)
- raise PluginInitError(msg)
-
- self._follow_xincludes()
- if self.extras:
- try:
- self.xdata.getroottree().xinclude()
- except lxml.etree.XIncludeError:
- err = sys.exc_info()[1]
- logger.error("XInclude failed on %s: %s" % (self.name, err))
-
- self.entries = self.xdata.getchildren()
- if self.__identifier__ is not None:
- self.label = self.xdata.attrib[self.__identifier__]
-
- def add_monitor(self, fpath):
- self.extras.append(fpath)
- if self.fam and self.should_monitor:
- self.fam.AddMonitor(fpath, self)
-
- def __iter__(self):
- return iter(self.entries)
-
- def __str__(self):
- return "%s at %s" % (self.__class__.__name__, self.name)
-
-
-class StructFile(XMLFileBacked):
- """This file contains a set of structure file formatting logic."""
- __identifier__ = None
-
- def _include_element(self, item, metadata):
- """ determine if an XML element matches the metadata """
- if isinstance(item, lxml.etree._Comment):
- return False
- negate = item.get('negate', 'false').lower() == 'true'
- if item.tag == 'Group':
- return negate == (item.get('name') not in metadata.groups)
- elif item.tag == 'Client':
- return negate == (item.get('name') != metadata.hostname)
- else:
- return True
-
- def _match(self, item, metadata):
- """ recursive helper for Match() """
- if self._include_element(item, metadata):
- if item.tag == 'Group' or item.tag == 'Client':
- rv = []
- if self._include_element(item, metadata):
- for child in item.iterchildren():
- rv.extend(self._match(child, metadata))
- return rv
- else:
- rv = copy.deepcopy(item)
- for child in rv.iterchildren():
- rv.remove(child)
- for child in item.iterchildren():
- rv.extend(self._match(child, metadata))
- return [rv]
- else:
- return []
-
- def Match(self, metadata):
- """Return matching fragments of independent."""
- rv = []
- for child in self.entries:
- rv.extend(self._match(child, metadata))
- return rv
-
- def _xml_match(self, item, metadata):
- """ recursive helper for XMLMatch """
- if self._include_element(item, metadata):
- if item.tag == 'Group' or item.tag == 'Client':
- for child in item.iterchildren():
- item.remove(child)
- item.getparent().append(child)
- self._xml_match(child, metadata)
- item.getparent().remove(item)
- else:
- for child in item.iterchildren():
- self._xml_match(child, metadata)
- else:
- item.getparent().remove(item)
-
- def XMLMatch(self, metadata):
- """ Return a rebuilt XML document that only contains the
- matching portions """
- rv = copy.deepcopy(self.xdata)
- for child in rv.iterchildren():
- self._xml_match(child, metadata)
- return rv
-
-
-class INode(object):
- """
- LNodes provide lists of things available at a particular
- group intersection.
- """
- raw = dict(
- Client="lambda m, e:'%(name)s' == m.hostname and predicate(m, e)",
- Group="lambda m, e:'%(name)s' in m.groups and predicate(m, e)")
- nraw = dict(
- Client="lambda m, e:'%(name)s' != m.hostname and predicate(m, e)",
- Group="lambda m, e:'%(name)s' not in m.groups and predicate(m, e)")
- containers = ['Group', 'Client']
- ignore = []
-
- def __init__(self, data, idict, parent=None):
- self.data = data
- self.contents = {}
- if parent is None:
- self.predicate = lambda m, e: True
- else:
- predicate = parent.predicate
- if data.get('negate', 'false').lower() == 'true':
- psrc = self.nraw
- else:
- psrc = self.raw
- if data.tag in list(psrc.keys()):
- self.predicate = eval(psrc[data.tag] %
- {'name': data.get('name')},
- {'predicate': predicate})
- else:
- raise PluginExecutionError("Unknown tag: %s" % data.tag)
- self.children = []
- self._load_children(data, idict)
-
- def _load_children(self, data, idict):
- for item in data.getchildren():
- if item.tag in self.ignore:
- continue
- elif item.tag in self.containers:
- self.children.append(self.__class__(item, idict, self))
- else:
- try:
- self.contents[item.tag][item.get('name')] = \
- dict(item.attrib)
- except KeyError:
- self.contents[item.tag] = \
- {item.get('name'): dict(item.attrib)}
- if item.text:
- self.contents[item.tag][item.get('name')]['__text__'] = \
- item.text
- if item.getchildren():
- self.contents[item.tag][item.get('name')]['__children__'] =\
- item.getchildren()
- try:
- idict[item.tag].append(item.get('name'))
- except KeyError:
- idict[item.tag] = [item.get('name')]
-
- def Match(self, metadata, data, entry=lxml.etree.Element("None")):
- """Return a dictionary of package mappings."""
- if self.predicate(metadata, entry):
- for key in self.contents:
- try:
- data[key].update(self.contents[key])
- except:
- data[key] = {}
- data[key].update(self.contents[key])
- for child in self.children:
- child.Match(metadata, data, entry=entry)
-
-
-class InfoNode (INode):
- """ INode implementation that includes <Path> tags """
- raw = {'Client': "lambda m, e:'%(name)s' == m.hostname and predicate(m, e)",
- 'Group': "lambda m, e:'%(name)s' in m.groups and predicate(m, e)",
- 'Path': "lambda m, e:('%(name)s' == e.get('name') or '%(name)s' == e.get('realname')) and predicate(m, e)"}
- nraw = {'Client': "lambda m, e:'%(name)s' != m.hostname and predicate(m, e)",
- 'Group': "lambda m, e:'%(name)s' not in m.groups and predicate(m, e)",
- 'Path': "lambda m, e:('%(name)s' != e.get('name') and '%(name)s' != e.get('realname')) and predicate(m, e)"}
- containers = ['Group', 'Client', 'Path']
-
-
-class XMLSrc(XMLFileBacked):
- """XMLSrc files contain a LNode hierarchy that returns matching entries."""
- __node__ = INode
- __cacheobj__ = dict
- __priority_required__ = True
-
- def __init__(self, filename, fam=None, should_monitor=False):
- XMLFileBacked.__init__(self, filename, fam, should_monitor)
- self.items = {}
- self.cache = None
- self.pnode = None
- self.priority = -1
-
- def HandleEvent(self, _=None):
- """Read file upon update."""
- try:
- data = open(self.name).read()
- except IOError:
- msg = "Failed to read file %s: %s" % (self.name, sys.exc_info()[1])
- logger.error(msg)
- raise PluginExecutionError(msg)
- self.items = {}
- try:
- xdata = lxml.etree.XML(data, parser=Bcfg2.Server.XMLParser)
- except lxml.etree.XMLSyntaxError:
- msg = "Failed to parse file %s" % (self.name, sys.exc_info()[1])
- logger.error(msg)
- raise PluginExecutionError(msg)
- self.pnode = self.__node__(xdata, self.items)
- self.cache = None
- try:
- self.priority = int(xdata.get('priority'))
- except (ValueError, TypeError):
- if self.__priority_required__:
- msg = "Got bogus priority %s for file %s" % \
- (xdata.get('priority'), self.name)
- logger.error(msg)
- raise PluginExecutionError(msg)
-
- del xdata, data
-
- def Cache(self, metadata):
- """Build a package dict for a given host."""
- if self.cache is None or self.cache[0] != metadata:
- cache = (metadata, self.__cacheobj__())
- if self.pnode is None:
- logger.error("Cache method called early for %s; forcing data load" % (self.name))
- self.HandleEvent()
- return
- self.pnode.Match(metadata, cache[1])
- self.cache = cache
-
- def __str__(self):
- return str(self.items)
-
-
-class InfoXML(XMLSrc):
- __node__ = InfoNode
- __priority_required__ = False
-
-
-class XMLDirectoryBacked(DirectoryBacked):
- """Directorybacked for *.xml."""
- patterns = re.compile('^.*\.xml$')
- __child__ = XMLFileBacked
-
-
-class PrioDir(Plugin, Generator, XMLDirectoryBacked):
- """This is a generator that handles package assignments."""
- name = 'PrioDir'
- __child__ = XMLSrc
-
- def __init__(self, core, datastore):
- Plugin.__init__(self, core, datastore)
- Generator.__init__(self)
- XMLDirectoryBacked.__init__(self, self.data, self.core.fam)
-
- def HandleEvent(self, event):
- """Handle events and update dispatch table."""
- XMLDirectoryBacked.HandleEvent(self, event)
- self.Entries = {}
- for src in list(self.entries.values()):
- for itype, children in list(src.items.items()):
- for child in children:
- try:
- self.Entries[itype][child] = self.BindEntry
- except KeyError:
- self.Entries[itype] = {child: self.BindEntry}
-
- def _matches(self, entry, metadata, rules):
- return entry.get('name') in rules
-
- def BindEntry(self, entry, metadata):
- attrs = self.get_attrs(entry, metadata)
- for key, val in list(attrs.items()):
- entry.attrib[key] = val
-
- def get_attrs(self, entry, metadata):
- """ get a list of attributes to add to the entry during the bind """
- for src in self.entries.values():
- src.Cache(metadata)
-
- matching = [src for src in list(self.entries.values())
- if (src.cache and
- entry.tag in src.cache[1] and
- self._matches(entry, metadata,
- src.cache[1][entry.tag]))]
- if len(matching) == 0:
- raise PluginExecutionError('No matching source for entry when retrieving attributes for %s(%s)' % (entry.tag, entry.attrib.get('name')))
- elif len(matching) == 1:
- index = 0
- else:
- prio = [int(src.priority) for src in matching]
- if prio.count(max(prio)) > 1:
- msg = "Found conflicting sources with same priority for " + \
- "%s:%s for %s" % (entry.tag, entry.get("name"),
- metadata.hostname)
- self.logger.error(msg)
- self.logger.error([item.name for item in matching])
- self.logger.error("Priority was %s" % max(prio))
- raise PluginExecutionError(msg)
- index = prio.index(max(prio))
-
- for rname in list(matching[index].cache[1][entry.tag].keys()):
- if self._matches(entry, metadata, [rname]):
- data = matching[index].cache[1][entry.tag][rname]
- break
- else:
- # Fall back on __getitem__. Required if override used
- data = matching[index].cache[1][entry.tag][entry.get('name')]
- if '__text__' in data:
- entry.text = data['__text__']
- if '__children__' in data:
- [entry.append(copy.copy(item)) for item in data['__children__']]
-
- return dict([(key, data[key])
- for key in list(data.keys())
- if not key.startswith('__')])
-
-
-# new unified EntrySet backend
-class SpecificityError(Exception):
- """Thrown in case of filename parse failure."""
- pass
-
-
-class Specificity(CmpMixin):
- def __init__(self, all=False, group=False, hostname=False, prio=0,
- delta=False):
- CmpMixin.__init__(self)
- self.hostname = hostname
- self.all = all
- self.group = group
- self.prio = prio
- self.delta = delta
-
- def matches(self, metadata):
- return self.all or \
- self.hostname == metadata.hostname or \
- self.group in metadata.groups
-
- def __cmp__(self, other):
- """Sort most to least specific."""
- if self.all:
- if other.all:
- return 0
- else:
- return 1
- elif other.all:
- return -1
- elif self.group:
- if other.hostname:
- return 1
- if other.group and other.prio > self.prio:
- return 1
- if other.group and other.prio == self.prio:
- return 0
- elif other.group:
- return -1
- elif self.hostname and other.hostname:
- return 0
- return -1
-
- def __str__(self):
- rv = [self.__class__.__name__, ': ']
- if self.all:
- rv.append("all")
- elif self.group:
- rv.append("Group %s, priority %s" % (self.group, self.prio))
- elif self.hostname:
- rv.append("Host %s" % self.hostname)
- if self.delta:
- rv.append(", delta=%s" % self.delta)
- return "".join(rv)
-
-
-class SpecificData(object):
- def __init__(self, name, specific, encoding):
- self.name = name
- self.specific = specific
-
- def handle_event(self, event):
- if event.code2str() == 'deleted':
- return
- try:
- self.data = open(self.name).read()
- except UnicodeDecodeError:
- self.data = open(self.name, mode='rb').read()
- except:
- logger.error("Failed to read file %s" % self.name)
-
-
-class EntrySet(Debuggable):
- """Entry sets deal with the host- and group-specific entries."""
- ignore = re.compile("^(\.#.*|.*~|\\..*\\.(sw[px])|.*\\.genshi_include)$")
- basename_is_regex=False
-
- def __init__(self, basename, path, entry_type, encoding):
- Debuggable.__init__(self, name=basename)
- self.path = path
- self.entry_type = entry_type
- self.entries = {}
- self.metadata = default_file_metadata.copy()
- self.infoxml = None
- self.encoding = encoding
-
- if self.basename_is_regex:
- base_pat = basename
- else:
- base_pat = re.escape(basename)
- pattern = '(.*/)?%s(\.((H_(?P<hostname>\S+))|' % base_pat
- pattern += '(G(?P<prio>\d+)_(?P<group>\S+))))?$'
- self.specific = re.compile(pattern)
-
- def sort_by_specific(self, one, other):
- return cmp(one.specific, other.specific)
-
- def get_matching(self, metadata):
- return [item for item in list(self.entries.values())
- if item.specific.matches(metadata)]
-
- def best_matching(self, metadata, matching=None):
- """ Return the appropriate interpreted template from the set of
- available templates. """
- if matching is None:
- matching = self.get_matching(metadata)
-
- if matching:
- matching.sort(key=operator.attrgetter("specific"))
- return matching[0]
- else:
- raise PluginExecutionError("No matching entries available for %s "
- "for %s" % (self.path,
- metadata.hostname))
-
- def handle_event(self, event):
- """Handle FAM events for the TemplateSet."""
- action = event.code2str()
-
- if event.filename in ['info', 'info.xml', ':info']:
- if action in ['exists', 'created', 'changed']:
- self.update_metadata(event)
- elif action == 'deleted':
- self.reset_metadata(event)
- return
-
- if action in ['exists', 'created']:
- self.entry_init(event)
- else:
- if event.filename not in self.entries:
- logger.warning("Got %s event for unknown file %s" %
- (action, event.filename))
- if action == 'changed':
- # received a bogus changed event; warn, but treat
- # it like a created event
- self.entry_init(event)
- return
- if action == 'changed':
- self.entries[event.filename].handle_event(event)
- elif action == 'deleted':
- del self.entries[event.filename]
-
- def entry_init(self, event, entry_type=None, specific=None):
- """Handle template and info file creation."""
- if entry_type is None:
- entry_type = self.entry_type
-
- if event.filename in self.entries:
- logger.warn("Got duplicate add for %s" % event.filename)
- else:
- fpath = os.path.join(self.path, event.filename)
- try:
- spec = self.specificity_from_filename(event.filename,
- specific=specific)
- except SpecificityError:
- if not self.ignore.match(event.filename):
- logger.error("Could not process filename %s; ignoring" %
- fpath)
- return
- self.entries[event.filename] = entry_type(fpath, spec,
- self.encoding)
- self.entries[event.filename].handle_event(event)
-
- def specificity_from_filename(self, fname, specific=None):
- """Construct a specificity instance from a filename and regex."""
- if specific is None:
- specific = self.specific
- data = specific.match(fname)
- if not data:
- raise SpecificityError(fname)
- kwargs = {}
- if data.group('hostname'):
- kwargs['hostname'] = data.group('hostname')
- elif data.group('group'):
- kwargs['group'] = data.group('group')
- kwargs['prio'] = int(data.group('prio'))
- else:
- kwargs['all'] = True
- if 'delta' in data.groupdict():
- kwargs['delta'] = data.group('delta')
- return Specificity(**kwargs)
-
- def update_metadata(self, event):
- """Process info and info.xml files for the templates."""
- fpath = os.path.join(self.path, event.filename)
- if event.filename == 'info.xml':
- if not self.infoxml:
- self.infoxml = InfoXML(fpath)
- self.infoxml.HandleEvent(event)
- elif event.filename in [':info', 'info']:
- for line in open(fpath).readlines():
- match = info_regex.match(line)
- if not match:
- logger.warning("Failed to match line in %s: %s" % (fpath,
- line))
- continue
- else:
- mgd = match.groupdict()
- for key, value in list(mgd.items()):
- if value:
- self.metadata[key] = value
- if len(self.metadata['perms']) == 3:
- self.metadata['perms'] = "0%s" % self.metadata['perms']
-
- def reset_metadata(self, event):
- """Reset metadata to defaults if info or info.xml removed."""
- if event.filename == 'info.xml':
- self.infoxml = None
- elif event.filename in [':info', 'info']:
- self.metadata = default_file_metadata.copy()
-
- def bind_info_to_entry(self, entry, metadata):
- bind_info(entry, metadata, infoxml=self.infoxml, default=self.metadata)
-
- def bind_entry(self, entry, metadata):
- """Return the appropriate interpreted template from the set of
- available templates."""
- self.bind_info_to_entry(entry, metadata)
- return self.best_matching(metadata).bind_entry(entry, metadata)
-
-
-class GroupSpool(Plugin, Generator):
- """Unified interface for handling group-specific data (e.g. .G## files)."""
- name = 'GroupSpool'
- __author__ = 'bcfg-dev@mcs.anl.gov'
- filename_pattern = ""
- es_child_cls = object
- es_cls = EntrySet
- entry_type = 'Path'
-
- def __init__(self, core, datastore):
- Plugin.__init__(self, core, datastore)
- Generator.__init__(self)
- if self.data[-1] == '/':
- self.data = self.data[:-1]
- self.Entries[self.entry_type] = {}
- self.entries = {}
- self.handles = {}
- self.AddDirectoryMonitor('')
- self.encoding = core.encoding
-
- def add_entry(self, event):
- epath = self.event_path(event)
- ident = self.event_id(event)
- if os.path.isdir(epath):
- self.AddDirectoryMonitor(epath[len(self.data):])
- if ident not in self.entries and os.path.isfile(epath):
- dirpath = self.data + ident
- self.entries[ident] = self.es_cls(self.filename_pattern,
- dirpath,
- self.es_child_cls,
- self.encoding)
- self.Entries[self.entry_type][ident] = \
- self.entries[ident].bind_entry
- if not os.path.isdir(epath):
- # do not pass through directory events
- self.entries[ident].handle_event(event)
-
- def event_path(self, event):
- return os.path.join(self.data,
- self.handles[event.requestID].lstrip("/"),
- event.filename)
-
- def event_id(self, event):
- epath = self.event_path(event)
- if os.path.isdir(epath):
- return os.path.join(self.handles[event.requestID].lstrip("/"),
- event.filename)
- else:
- return self.handles[event.requestID].rstrip("/")
-
- def toggle_debug(self):
- for entry in self.entries.values():
- if hasattr(entry, "toggle_debug"):
- entry.toggle_debug()
- return Plugin.toggle_debug(self)
-
- def HandleEvent(self, event):
- """Unified FAM event handler for GroupSpool."""
- action = event.code2str()
- if event.filename[0] == '/':
- return
- ident = self.event_id(event)
-
- if action in ['exists', 'created']:
- self.add_entry(event)
- elif action == 'changed':
- if ident in self.entries:
- self.entries[ident].handle_event(event)
- else:
- # got a changed event for a file we didn't know
- # about. go ahead and process this as a 'created', but
- # warn
- self.logger.warning("Got changed event for unknown file %s" %
- ident)
- self.add_entry(event)
- elif action == 'deleted':
- fbase = self.handles[event.requestID] + event.filename
- if fbase in self.entries:
- # a directory was deleted
- del self.entries[fbase]
- del self.Entries[self.entry_type][fbase]
- elif ident in self.entries:
- self.entries[ident].handle_event(event)
- elif ident not in self.entries:
- self.logger.warning("Got deleted event for unknown file %s" %
- ident)
-
- def AddDirectoryMonitor(self, relative):
- """Add new directory to FAM structures."""
- if not relative.endswith('/'):
- relative += '/'
- name = self.data + relative
- if relative not in list(self.handles.values()):
- if not os.path.isdir(name):
- self.logger.error("Failed to open directory %s" % name)
- return
- reqid = self.core.fam.AddMonitor(name, self)
- self.handles[reqid] = relative