""" Augeas driver """ import sys import augeas import Bcfg2.Client.XML from Bcfg2.Client.Tools.POSIX.base import POSIXTool class AugeasCommand(object): def __init__(self, command, augeas, logger): self.augeas = augeas self.command = command self.entry = self.command.getparent() self.logger = logger def get_path(self, attr="path"): return "/files/%s/%s" % (self.entry.get("name").strip("/"), self.command.get(attr).lstrip("/")) def _exists(self, path): return len(self.augeas.match(path)) > 1 def _verify_exists(self, path=None): if path is None: path = self.get_path() self.logger.debug("Augeas: Verifying that '%s' exists" % path) return self._exists(path) def _verify_not_exists(self, path=None): if path is None: path = self.get_path() self.logger.debug("Augeas: Verifying that '%s' does not exist" % path) return not self._exists(path) def _verify_set(self, expected, path=None): if path is None: path = self.get_path() self.logger.debug("Augeas: Verifying '%s' == '%s'" % (path, expected)) actual = self.augeas.get(path) if actual == expected: return True else: self.logger.debug("Augeas: '%s' failed verification: '%s' != '%s'" % (path, actual, expected)) return False def __str__(self): return Bcfg2.Client.XML.tostring(self.command) class Remove(AugeasCommand): def verify(self): return self._verify_not_exists() def install(self): self.logger.debug("Augeas: Removing %s" % self.get_path()) return self.augeas.remove(self.get_path()) class Move(AugeasCommand): def __init__(self, command, augeas, logger): AugeasCommand.__init__(self, command, augeas, logger) self.source = self.get_path("source") self.dest = self.get_path("destination") def verify(self): return (self._verify_not_exists(self.source), self._verify_exists(self.dest)) def install(self): self.logger.debug("Augeas: Moving %s to %s" % (self.source, self.dest)) return self.augeas.move(self.source, self.dest) class Set(AugeasCommand): def __init__(self, command, augeas, logger): AugeasCommand.__init__(self, command, augeas, logger) self.value = self.command.get("value") def verify(self): return self._verify_set(self.value) def install(self): self.logger.debug("Augeas: Setting %s to %s" % (self.get_path(), self.value)) return self.augeas.set(self.get_path(), self.value) class Clear(Set): def __init__(self, command, augeas, logger): Set.__init__(self, command, augeas, logger) self.value = None class SetMulti(AugeasCommand): def __init__(self, command, augeas, logger): AugeasCommand.__init__(self, command, augeas, logger) self.sub = self.command.get("sub") self.value = self.command.get("value") self.base = self.get_path("base") def verify(self): return all(self._verify_set(self.value, path="%s/%s" % (path, self.sub)) for path in self.augeas.match(self.base)) def install(self): return self.augeas.setm(self.base, self.sub, self.value) class Insert(AugeasCommand): def __init__(self, command, augeas, logger): AugeasCommand.__init__(self, command, augeas, logger) self.label = self.command.get("label") self.where = self.command.get("where", "before") self.before = self.where == "before" def verify(self): return self._verify_exists("%s/../%s" % (self.get_path(), self.label)) def install(self): self.logger.debug("Augeas: Inserting new %s %s %s" % (self.label, self.where, self.get_path())) return self.augeas.insert(self.get_path(), self.label, self.before) class POSIXAugeas(POSIXTool): """ Handle entries. See :ref:`client-tools-augeas`. """ __handles__ = [('Path', 'augeas')] __req__ = {'Path': ['type', 'name', 'setting', 'value']} def __init__(self, logger, setup, config): POSIXTool.__init__(self, logger, setup, config) self._augeas = dict() def get_augeas(self, entry): if entry.get("name") not in self._augeas: aug = augeas.augeas() if entry.get("lens"): self.logger.debug("Augeas: Adding %s to include path for %s" % (entry.get("name"), entry.get("lens"))) incl = "/augeas/load/%s/incl" % entry.get("lens") ilen = len(aug.match(incl)) if ilen == 0: self.logger.error("Augeas: Lens %s does not exist" % entry.get("lens")) else: aug.set("%s[%s]" % (incl, ilen + 1), entry.get("name")) aug.load() self._augeas[entry.get("name")] = aug return self._augeas[entry.get("name")] def fully_specified(self, entry): return entry.text is not None def get_commands(self, entry, unverified=False): rv = [] for cmd in entry.iterchildren(): if unverified and cmd.get("verified", "false") != "false": continue if cmd.tag in globals(): rv.append(globals()[cmd.tag](cmd, self.get_augeas(entry), self.logger)) else: err = "Augeas: Unknown command %s in %s" % (cmd.tag, entry.get("name")) self.logger.error(err) entry.set('qtext', "\n".join([entry.get('qtext', ''), err])) return rv def verify(self, entry, modlist): rv = True for cmd in self.get_commands(entry): try: if not cmd.verify(): err = "Augeas: Command has not been applied to %s: %s" % \ (entry.get("name"), cmd) self.logger.debug(err) entry.set('qtext', "\n".join([entry.get('qtext', ''), err])) rv = False cmd.command.set("verified", "false") else: cmd.command.set("verified", "true") except: # pylint: disable=W0702 err = "Augeas: Unexpected error verifying %s: %s: %s" % \ (entry.get("name"), cmd, sys.exc_info()[1]) self.logger.error(err) entry.set('qtext', "\n".join([entry.get('qtext', ''), err])) rv = False cmd.command.set("verified", "false") return POSIXTool.verify(self, entry, modlist) and rv def install(self, entry): rv = True for cmd in self.get_commands(entry, unverified=True): try: cmd.install() except: # pylint: disable=W0702 self.logger.error( "Failure running Augeas command on %s: %s: %s" % (entry.get("name"), cmd, sys.exc_info()[1])) rv = False try: self.get_augeas(entry).save() except: # pylint: disable=W0702 self.logger.error( "Failure saving Augeas changes to %s: %s" % (entry.get("name"), sys.exc_info()[1])) rv = False return POSIXTool.install(self, entry) and rv