"""Debconf Support for Bcfg2""" import subprocess import Bcfg2.Options import Bcfg2.Client.Tools class Debconf(Bcfg2.Client.Tools.Tool): """Debconf Support for Bcfg2.""" name = 'Debconf' __execs__ = ['/usr/bin/debconf-communicate'] __handles__ = [('Conf', 'debconf')] __req__ = {'Conf': ['name']} def __init__(self, config): Bcfg2.Client.Tools.Tool.__init__(self, config) #: This is the referrence to the Popen object of the #: running debconf-communicate process. If this is None, #: no process is runnning. self.debconf = None def _start_debconf(self): if self.debconf is None: self.debconf = subprocess.Popen( ['/usr/bin/debconf-communicate'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) def _stop_debconf(self): if self.debconf is not None: self.debconf.stdin.close() self.debconf.stdout.close() self.debconf = None def _debconf_reply(self, msg): self.logger.debug('Debconf: %s' % msg.strip()) self.debconf.stdin.write(msg) line = self.debconf.stdout.readline().rstrip('\n') self.logger.debug('< %s' % line) reply = line.split(' ', 1) result = None if len(reply) > 1: result = reply[1] return (reply[0] == '0', result) def debconf_get(self, key): (success, value) = self._debconf_reply('GET %s\n' % key) if not success: return (False, None) (_, seen) = self._debconf_reply('FGET %s seen\n' % key) return (seen, value) def debconf_set(self, key, value): (success, _) = self._debconf_reply('SET %s %s\n' % (key, value)) if success: self._debconf_reply('FSET %s seen true\n' % key) return success def debconf_reset(self, key): (success, _) = self._debconf_reply('RESET %s\n' % key) return success def VerifyConf(self, entry, _modlist): """ Verify the given Debconf entry. """ (seen, value) = self.debconf_get(entry.get('name')) if 'value' not in entry.attrib: if seen == 'true': return False else: if value != entry.get('value'): return False return True def InstallConf(self, entry): """ Install the given Debconf entry. """ if 'value' not in entry.attrib: return self.debconf_reset(entry.get('name')) return self.debconf_set(entry.get('name'), entry.get('value')) def Inventory(self, structures=None): try: self._start_debconf() result = Bcfg2.Client.Tools.Tool.Inventory(self, structures) finally: self._stop_debconf() return result Inventory.__doc__ = Bcfg2.Client.Tools.Tool.Inventory.__doc__ def Install(self, entries): try: self._start_debconf() result = Bcfg2.Client.Tools.Tool.Install(self, entries) finally: self._stop_debconf() return result Install.__doc__ = Bcfg2.Client.Tools.Tool.Install.__doc__