summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2')
-rw-r--r--src/lib/Bcfg2/Client/Tools/Chkconfig.py12
-rw-r--r--src/lib/Bcfg2/Client/Tools/DebInit.py2
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/base.py17
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIXUsers.py6
-rw-r--r--src/lib/Bcfg2/Client/Tools/RcUpdate.py2
-rw-r--r--src/lib/Bcfg2/Client/Tools/YUM.py4
-rw-r--r--src/lib/Bcfg2/Client/Tools/__init__.py2
-rw-r--r--src/lib/Bcfg2/Client/__init__.py34
-rw-r--r--src/lib/Bcfg2/Logger.py10
-rw-r--r--src/lib/Bcfg2/Options/OptionGroups.py2
-rw-r--r--src/lib/Bcfg2/Options/Types.py22
-rw-r--r--src/lib/Bcfg2/Reporting/templates/base.html2
-rw-r--r--src/lib/Bcfg2/Server/Admin.py123
-rw-r--r--src/lib/Bcfg2/Server/BuiltinCore.py3
-rw-r--r--src/lib/Bcfg2/Server/Core.py102
-rwxr-xr-xsrc/lib/Bcfg2/Server/Encryption.py550
-rw-r--r--src/lib/Bcfg2/Server/Info.py51
-rw-r--r--src/lib/Bcfg2/Server/Lint/Cfg.py33
-rw-r--r--src/lib/Bcfg2/Server/Lint/Comments.py3
-rw-r--r--src/lib/Bcfg2/Server/Lint/__init__.py2
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py439
-rw-r--r--src/lib/Bcfg2/Server/Plugin/__init__.py8
-rw-r--r--src/lib/Bcfg2/Server/Plugin/base.py16
-rw-r--r--src/lib/Bcfg2/Server/Plugin/helpers.py87
-rw-r--r--src/lib/Bcfg2/Server/Plugin/interfaces.py19
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py5
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Guppy.py1
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Metadata.py41
-rw-r--r--src/lib/Bcfg2/Server/Plugins/NagiosGen.py4
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/Collection.py4
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py12
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/Yum.py111
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/YumHelper.py106
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/__init__.py135
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Probes.py43
-rw-r--r--src/lib/Bcfg2/Server/Plugins/PuppetENC.py2
-rw-r--r--src/lib/Bcfg2/Server/Plugins/SSHbase.py6
-rw-r--r--src/lib/Bcfg2/Server/Test.py11
-rw-r--r--src/lib/Bcfg2/Utils.py8
-rw-r--r--src/lib/Bcfg2/settings.py20
-rw-r--r--src/lib/Bcfg2/version.py2
41 files changed, 1336 insertions, 726 deletions
diff --git a/src/lib/Bcfg2/Client/Tools/Chkconfig.py b/src/lib/Bcfg2/Client/Tools/Chkconfig.py
index 0d2269a3f..c2c7e21c1 100644
--- a/src/lib/Bcfg2/Client/Tools/Chkconfig.py
+++ b/src/lib/Bcfg2/Client/Tools/Chkconfig.py
@@ -84,16 +84,16 @@ class Chkconfig(Bcfg2.Client.Tools.SvcTool):
"""Install Service entry."""
self.cmd.run("/sbin/chkconfig --add %s" % (entry.get('name')))
self.logger.info("Installing Service %s" % (entry.get('name')))
- bootstatus = entry.get('bootstatus')
+ bootstatus = self.get_bootstatus(entry)
if bootstatus is not None:
if bootstatus == 'on':
# make sure service is enabled on boot
- bootcmd = '/sbin/chkconfig %s %s --level 0123456' % \
- (entry.get('name'), entry.get('bootstatus'))
+ bootcmd = '/sbin/chkconfig %s %s' % \
+ (entry.get('name'), bootstatus)
elif bootstatus == 'off':
# make sure service is disabled on boot
bootcmd = '/sbin/chkconfig %s %s' % (entry.get('name'),
- entry.get('bootstatus'))
+ bootstatus)
bootcmdrv = self.cmd.run(bootcmd).success
if Bcfg2.Options.setup.servicemode == 'disabled':
# 'disabled' means we don't attempt to modify running svcs
@@ -115,8 +115,8 @@ class Chkconfig(Bcfg2.Client.Tools.SvcTool):
def FindExtra(self):
"""Locate extra chkconfig Services."""
allsrv = [line.split()[0]
- for line in self.cmd.run("/sbin/chkconfig",
- "--list").stdout.splitlines()
+ for line in
+ self.cmd.run("/sbin/chkconfig --list").stdout.splitlines()
if ":on" in line]
self.logger.debug('Found active services:')
self.logger.debug(allsrv)
diff --git a/src/lib/Bcfg2/Client/Tools/DebInit.py b/src/lib/Bcfg2/Client/Tools/DebInit.py
index 761c51db7..b544e44d4 100644
--- a/src/lib/Bcfg2/Client/Tools/DebInit.py
+++ b/src/lib/Bcfg2/Client/Tools/DebInit.py
@@ -108,7 +108,7 @@ class DebInit(Bcfg2.Client.Tools.SvcTool):
def InstallService(self, entry):
"""Install Service entry."""
self.logger.info("Installing Service %s" % (entry.get('name')))
- bootstatus = entry.get('bootstatus')
+ bootstatus = self.get_bootstatus(entry)
# check if init script exists
try:
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/base.py b/src/lib/Bcfg2/Client/Tools/POSIX/base.py
index fad458003..c9164cb88 100644
--- a/src/lib/Bcfg2/Client/Tools/POSIX/base.py
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/base.py
@@ -686,7 +686,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool):
""" os.makedirs helpfully creates all parent directories for
us, but it sets permissions according to umask, which is
probably wrong. we need to find out which directories were
- created and set permissions on those
+ created and try to set permissions on those
(http://trac.mcs.anl.gov/projects/bcfg2/ticket/1125 and
http://trac.mcs.anl.gov/projects/bcfg2/ticket/1134) """
created = []
@@ -706,22 +706,17 @@ class POSIXTool(Bcfg2.Client.Tools.Tool):
(path, err))
rv = False
- # we need to make sure that we give +x to everyone who needs
- # it. E.g., if the file that's been distributed is 0600, we
- # can't make the parent directories 0600 also; that'd be
- # pretty useless. They need to be 0700.
+ # set auto-created directories to mode 755 and use best effort for
+ # permissions. If you need something else, you should specify it in
+ # your config.
tmpentry = copy.deepcopy(entry)
- newmode = int(entry.get('mode'), 8)
- for i in range(0, 3):
- if newmode & (6 * pow(8, i)):
- newmode |= 1 * pow(8, i)
- tmpentry.set('mode', oct_mode(newmode))
+ tmpentry.set('mode', '0755')
for acl in tmpentry.findall('ACL'):
acl.set('perms',
oct_mode(self._norm_acl_perms(acl.get('perms')) |
ACL_MAP['x']))
for cpath in created:
- rv &= self._set_perms(tmpentry, path=cpath)
+ self._set_perms(tmpentry, path=cpath)
return rv
diff --git a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py
index 7a076e680..19657f12a 100644
--- a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py
+++ b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py
@@ -208,7 +208,10 @@ class POSIXUsers(Bcfg2.Client.Tools.Tool):
# automatically determine one -- i.e., it always
# verifies
continue
- if val != entry.get(attr):
+ entval = entry.get(attr)
+ if not isinstance(entval, str):
+ entval = entval.encode('utf-8')
+ if val != entval:
errors.append("%s for %s %s is incorrect. Current %s is "
"%s, but should be %s" %
(attr.title(), entry.tag, entry.get("name"),
@@ -263,7 +266,6 @@ class POSIXUsers(Bcfg2.Client.Tools.Tool):
if entry.get('gid'):
cmd.extend(['-g', entry.get('gid')])
elif entry.tag == 'POSIXUser':
- cmd.append('-m')
if entry.get('uid'):
cmd.extend(['-u', entry.get('uid')])
cmd.extend(['-g', entry.get('group')])
diff --git a/src/lib/Bcfg2/Client/Tools/RcUpdate.py b/src/lib/Bcfg2/Client/Tools/RcUpdate.py
index 8e9626521..e0c913dcd 100644
--- a/src/lib/Bcfg2/Client/Tools/RcUpdate.py
+++ b/src/lib/Bcfg2/Client/Tools/RcUpdate.py
@@ -89,7 +89,7 @@ class RcUpdate(Bcfg2.Client.Tools.SvcTool):
def InstallService(self, entry):
"""Install Service entry."""
self.logger.info('Installing Service %s' % entry.get('name'))
- bootstatus = entry.get('bootstatus')
+ bootstatus = self.get_bootstatus(entry)
if bootstatus is not None:
if bootstatus == 'on':
# make sure service is enabled on boot
diff --git a/src/lib/Bcfg2/Client/Tools/YUM.py b/src/lib/Bcfg2/Client/Tools/YUM.py
index c9b74dcd0..ae238174b 100644
--- a/src/lib/Bcfg2/Client/Tools/YUM.py
+++ b/src/lib/Bcfg2/Client/Tools/YUM.py
@@ -954,10 +954,10 @@ class YUM(Bcfg2.Client.Tools.PkgTool):
Bcfg2.Options.setup.yum_install_missing):
queue_pkg(pkg, inst, install_pkgs)
elif (status.get('version_fail', False) and
- Bcfg2.Options.yum_fix_version):
+ Bcfg2.Options.setup.yum_fix_version):
queue_pkg(pkg, inst, upgrade_pkgs)
elif (status.get('verify_fail', False) and
- Bcfg2.Options.yum_reinstall_broken):
+ Bcfg2.Options.setup.yum_reinstall_broken):
queue_pkg(pkg, inst, reinstall_pkgs)
else:
# Either there was no Install/Version/Verify
diff --git a/src/lib/Bcfg2/Client/Tools/__init__.py b/src/lib/Bcfg2/Client/Tools/__init__.py
index 5f59e8160..ce75005fe 100644
--- a/src/lib/Bcfg2/Client/Tools/__init__.py
+++ b/src/lib/Bcfg2/Client/Tools/__init__.py
@@ -498,7 +498,7 @@ class SvcTool(Tool):
options = Tool.options + [
Bcfg2.Options.Option(
'-s', '--service-mode', default='default',
- choices = ['default', 'disabled', 'build'],
+ choices=['default', 'disabled', 'build'],
help='Set client service mode')]
def __init__(self, config):
diff --git a/src/lib/Bcfg2/Client/__init__.py b/src/lib/Bcfg2/Client/__init__.py
index dd32fc45c..2761fcddb 100644
--- a/src/lib/Bcfg2/Client/__init__.py
+++ b/src/lib/Bcfg2/Client/__init__.py
@@ -12,9 +12,9 @@ import argparse
import tempfile
import Bcfg2.Logger
import Bcfg2.Options
-import XML
-import Proxy
-import Tools
+import XML # pylint: disable=W0403
+import Proxy # pylint: disable=W0403
+import Tools # pylint: disable=W0403
from Bcfg2.Utils import locked, Executor, safe_input
from Bcfg2.version import __version__
# pylint: disable=W0622
@@ -66,6 +66,9 @@ def prompt(msg):
try:
ans = safe_input(msg)
return ans in ['y', 'Y']
+ except UnicodeEncodeError:
+ ans = input(msg.encode('utf-8'))
+ return ans in ['y', 'Y']
except EOFError:
# handle ^C on rhel-based platforms
raise SystemExit(1)
@@ -75,6 +78,7 @@ def prompt(msg):
class ClientDriverAction(Bcfg2.Options.ComponentAction):
+ """ Action to load client drivers """
bases = ['Bcfg2.Client.Tools']
fail_silently = True
@@ -129,8 +133,8 @@ class Client(object):
Bcfg2.Options.BooleanOption(
"-O", "--no-lock", help='Omit lock check'),
Bcfg2.Options.PathOption(
- cf=('components', 'lockfile'), default='/var/lock/bcfg2.run',
- help='Client lock file'),
+ cf=('components', 'lockfile'), default='/var/lock/bcfg2.run',
+ help='Client lock file'),
Bcfg2.Options.BooleanOption(
"-n", "--dry-run", help='Do not actually change the system'),
Bcfg2.Options.Option(
@@ -171,6 +175,7 @@ class Client(object):
self.whitelist = []
self.blacklist = []
self.removal = []
+ self.unhandled = []
self.logger = logging.getLogger(__name__)
def _probe_failure(self, probename, msg):
@@ -342,6 +347,7 @@ class Client(object):
return rawconfig
def parse_config(self, rawconfig):
+ """ Parse the XML configuration received from the Bcfg2 server """
try:
self.config = XML.XML(rawconfig)
except XML.ParseError:
@@ -448,6 +454,7 @@ class Client(object):
self.logger.info("Finished Bcfg2 client run at %s" % time.time())
def load_tools(self):
+ """ Load all applicable client tools """
for tool in Bcfg2.Options.setup.drivers:
try:
self.tools.append(tool(self.config))
@@ -546,7 +553,8 @@ class Client(object):
elif Bcfg2.Options.setup.decision == 'blacklist':
b_to_rem = \
[e for e in self.whitelist
- if not passes_black_list(e, Bcfg2.Options.setup.decision_list)]
+ if not
+ passes_black_list(e, Bcfg2.Options.setup.decision_list)]
if b_to_rem:
self.logger.info("In blacklist mode: "
"suppressing installation of:")
@@ -579,7 +587,7 @@ class Client(object):
self.states[cfile] = tools[0].InstallPath(cfile)
if self.states[cfile]:
tools[0].modified.append(cfile)
- except:
+ except: # pylint: disable=W0702
self.logger.error("Unexpected tool failure",
exc_info=1)
cfile.set('qtext', '')
@@ -600,7 +608,7 @@ class Client(object):
for tool in self.tools:
try:
self.states.update(tool.Inventory())
- except:
+ except: # pylint: disable=W0702
self.logger.error("%s.Inventory() call failed:" % tool.name,
exc_info=1)
@@ -715,7 +723,7 @@ class Client(object):
continue
try:
self.states.update(tool.Install(handled))
- except:
+ except: # pylint: disable=W0702
self.logger.error("%s.Install() call failed:" % tool.name,
exc_info=1)
@@ -735,7 +743,7 @@ class Client(object):
for tool, bundle in tbm:
try:
self.states.update(tool.Inventory(structures=[bundle]))
- except:
+ except: # pylint: disable=W0702
self.logger.error("%s.Inventory() call failed:" %
tool.name,
exc_info=1)
@@ -765,7 +773,7 @@ class Client(object):
for tool in self.tools:
try:
self.states.update(getattr(tool, func)(bundle))
- except:
+ except: # pylint: disable=W0702
self.logger.error("%s.%s(%s:%s) call failed:" %
(tool.name, func, bundle.tag,
bundle.get("name")), exc_info=1)
@@ -774,7 +782,7 @@ class Client(object):
for tool in self.tools:
try:
self.states.update(tool.BundleNotUpdated(indep))
- except:
+ except: # pylint: disable=W0702
self.logger.error("%s.BundleNotUpdated(%s:%s) call failed:"
% (tool.name, indep.tag,
indep.get("name")), exc_info=1)
@@ -787,7 +795,7 @@ class Client(object):
if extras:
try:
tool.Remove(extras)
- except:
+ except: # pylint: disable=W0702
self.logger.error("%s.Remove() failed" % tool.name,
exc_info=1)
diff --git a/src/lib/Bcfg2/Logger.py b/src/lib/Bcfg2/Logger.py
index d2e0ff957..f9fd42d33 100644
--- a/src/lib/Bcfg2/Logger.py
+++ b/src/lib/Bcfg2/Logger.py
@@ -193,6 +193,7 @@ def add_file_handler(level=logging.DEBUG):
def default_log_level():
+ """ Get the default log level, according to the configuration """
if Bcfg2.Options.setup.debug:
return logging.DEBUG
elif Bcfg2.Options.setup.verbose:
@@ -248,6 +249,10 @@ class Debuggable(object):
#: applicable to the child class
__rmi__ = ['toggle_debug', 'set_debug']
+ #: How exposed XML-RPC functions should be dispatched to child
+ #: processes.
+ __child_rmi__ = __rmi__[:]
+
def __init__(self, name=None):
"""
:param name: The name of the logger object to get. If none is
@@ -267,9 +272,6 @@ class Debuggable(object):
:returns: bool - The new value of the debug flag
"""
self.debug_flag = debug
- self.debug_log("%s: debug = %s" % (self.__class__.__name__,
- self.debug_flag),
- flag=True)
return debug
def toggle_debug(self):
@@ -293,6 +295,8 @@ class Debuggable(object):
class _OptionContainer(object):
+ """ Container for options loaded at import-time to configure
+ logging """
options = [
Bcfg2.Options.BooleanOption(
'-d', '--debug', help='Enable debugging output',
diff --git a/src/lib/Bcfg2/Options/OptionGroups.py b/src/lib/Bcfg2/Options/OptionGroups.py
index b14c523f4..d77c39878 100644
--- a/src/lib/Bcfg2/Options/OptionGroups.py
+++ b/src/lib/Bcfg2/Options/OptionGroups.py
@@ -115,7 +115,7 @@ class Subparser(OptionContainer):
if parser not in self._subparsers:
self._subparsers[parser] = parser.add_subparsers(dest='subcommand')
subparser = self._subparsers[parser].add_parser(self.name,
- help=self.help)
+ help=self.help)
OptionContainer.add_to_parser(self, subparser)
diff --git a/src/lib/Bcfg2/Options/Types.py b/src/lib/Bcfg2/Options/Types.py
index 329c671ea..5769d674a 100644
--- a/src/lib/Bcfg2/Options/Types.py
+++ b/src/lib/Bcfg2/Options/Types.py
@@ -28,6 +28,28 @@ def colon_list(value):
return value.split(':')
+def comma_dict(value):
+ """ Split an option string on commas, optionally surrounded by
+ whitespace, and split the resulting items again on equals signs,
+ returning a dict """
+ result = dict()
+ if value:
+ items = comma_list(value)
+ for item in items:
+ if '=' in item:
+ key, value = item.split(r'=', 1)
+ try:
+ result[key] = bool(value)
+ except ValueError:
+ try:
+ result[key] = int(value)
+ except ValueError:
+ result[key] = value
+ else:
+ result[item] = True
+ return result
+
+
def octal(value):
""" Given an octal string, get an integer representation. """
return int(value, 8)
diff --git a/src/lib/Bcfg2/Reporting/templates/base.html b/src/lib/Bcfg2/Reporting/templates/base.html
index 7f1fcba3b..0b2b7dd36 100644
--- a/src/lib/Bcfg2/Reporting/templates/base.html
+++ b/src/lib/Bcfg2/Reporting/templates/base.html
@@ -93,7 +93,7 @@ This is needed for Django versions less than 1.5
<div style='clear:both'></div>
</div><!-- document -->
<div id="footer">
- <span>Bcfg2 Version 1.3.1</span>
+ <span>Bcfg2 Version 1.3.2</span>
</div>
<div id="calendar_div" style='position:absolute; visibility:hidden; background-color:white; layer-background-color:white;'></div>
diff --git a/src/lib/Bcfg2/Server/Admin.py b/src/lib/Bcfg2/Server/Admin.py
index b88aa837f..7c2241f58 100644
--- a/src/lib/Bcfg2/Server/Admin.py
+++ b/src/lib/Bcfg2/Server/Admin.py
@@ -1,6 +1,5 @@
""" Subcommands and helpers for bcfg2-admin """
-import re
import os
import sys
import time
@@ -20,7 +19,6 @@ import Bcfg2.Server.Core
import Bcfg2.Client.Proxy
from Bcfg2.Server.Plugin import PullSource, Generator, MetadataConsistencyError
from Bcfg2.Utils import hostnames2ranges, Executor, safe_input
-from Bcfg2.Compat import xmlrpclib
import Bcfg2.Server.Plugins.Metadata
try:
@@ -413,13 +411,15 @@ class Init(AdminCmd):
config = '''[server]
repository = %s
plugins = %s
+# Uncomment the following to listen on all interfaces
+#listen_all = true
[database]
#engine = sqlite3
# 'postgresql', 'mysql', 'mysql_old', 'sqlite3' or 'ado_mssql'.
#name =
# Or path to database file if using sqlite3.
-#<repository>/bcfg2.sqlite is default path if left empty
+#<repository>/etc/bcfg2.sqlite is default path if left empty
#user =
# Not used with sqlite3.
#password =
@@ -830,6 +830,49 @@ class _ReportsCmd(AdminCmd):
Bcfg2.Reporting.models.Performance)
+if HAS_DJANGO:
+ class _DjangoProxyCmd(AdminCmd):
+ command = None
+ args = []
+
+ def run(self, _):
+ '''Call a django command'''
+ if self.command is not None:
+ command = self.command
+ else:
+ command = self.__class__.__name__.lower()
+ args = [command] + self.args
+ management.call_command(*args)
+
+ class DBShell(_DjangoProxyCmd):
+ """ Call the Django 'dbshell' command on the database """
+
+ class Shell(_DjangoProxyCmd):
+ """ Call the Django 'shell' command on the database """
+
+ class ValidateDB(_DjangoProxyCmd):
+ """ Call the Django 'validate' command on the database """
+ command = "validate"
+
+ class Syncdb(AdminCmd):
+ """ Sync the Django ORM with the configured database """
+
+ def run(self, setup):
+ management.setup_environ(Bcfg2.settings)
+ Bcfg2.Server.models.load_models()
+ try:
+ management.call_command("syncdb", interactive=False,
+ verbosity=setup.verbose + setup.debug)
+ except ImproperlyConfigured:
+ err = sys.exc_info()[1]
+ self.logger.error("Django configuration problem: %s" % err)
+ raise SystemExit(1)
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("Database update failed: %s" % err)
+ raise SystemExit(1)
+
+
if HAS_REPORTS:
import datetime
@@ -875,11 +918,9 @@ if HAS_REPORTS:
(self.__class__.__name__.title(),
sys.exc_info()[1]))
-
class UpdateReports(InitReports):
""" Apply updates to the reporting database """
-
class ReportsStats(_ReportsCmd):
""" Print Reporting database statistics """
def run(self, _):
@@ -887,7 +928,6 @@ if HAS_REPORTS:
print("%s has %s records" % (cls.__name__,
cls.objects.count()))
-
class PurgeReports(_ReportsCmd):
""" Purge records from the Reporting database """
@@ -969,12 +1009,12 @@ if HAS_REPORTS:
self.logger.debug("Deleted %s of %s" % (rnum, count))
except: # pylint: disable=W0702
self.logger.error("Failed to remove interactions: %s" %
- sys.exc_info()[1])
+ sys.exc_info()[1])
# Prune any orphaned ManyToMany relations
for m2m in self.reports_entries:
- self.logger.debug("Pruning any orphaned %s objects" % \
- m2m.__name__)
+ self.logger.debug("Pruning any orphaned %s objects" %
+ m2m.__name__)
m2m.prune_orphans()
if client and not filtered:
@@ -984,7 +1024,7 @@ if HAS_REPORTS:
cobj.delete()
except: # pylint: disable=W0702
self.logger.error("Failed to delete client %s: %s" %
- (client, sys.exc_info()[1]))
+ (client, sys.exc_info()[1]))
def purge_expired(self, maxdate=None):
""" Purge expired clients from the Reporting database """
@@ -1005,63 +1045,11 @@ if HAS_REPORTS:
client=client).delete()
client.delete()
-
- class _DjangoProxyCmd(AdminCmd):
- command = None
- args = []
- _reports_re = re.compile(r'^(?:Reports)?(?P<command>.*?)(?:Reports)?$')
-
- def run(self, _):
- '''Call a django command'''
- if self.command is not None:
- command = self.command
- else:
- match = self._reports_re.match(self.__class__.__name__)
- if match:
- command = match.group("command").lower()
- else:
- command = self.__class__.__name__.lower()
- args = [command] + self.args
- management.call_command(*args)
-
-
- class ReportsDBShell(_DjangoProxyCmd):
- """ Call the Django 'dbshell' command on the Reporting database """
-
-
- class ReportsShell(_DjangoProxyCmd):
- """ Call the Django 'shell' command on the Reporting database """
-
-
- class ValidateReports(_DjangoProxyCmd):
- """ Call the Django 'validate' command on the Reporting database """
-
-
class ReportsSQLAll(_DjangoProxyCmd):
""" Call the Django 'sqlall' command on the Reporting database """
args = ["Reporting"]
-if HAS_DJANGO:
- class Syncdb(AdminCmd):
- """ Sync the Django ORM with the configured database """
-
- def run(self, setup):
- management.setup_environ(Bcfg2.settings)
- Bcfg2.Server.models.load_models()
- try:
- management.call_command("syncdb", interactive=False,
- verbosity=setup.verbose + setup.debug)
- except ImproperlyConfigured:
- err = sys.exc_info()[1]
- self.logger.error("Django configuration problem: %s" % err)
- raise SystemExit(1)
- except:
- err = sys.exc_info()[1]
- self.logger.error("Database update failed: %s" % err)
- raise SystemExit(1)
-
-
class Viz(_ServerAdminCmd):
""" Produce graphviz diagrams of metadata structures """
@@ -1101,10 +1089,12 @@ class Viz(_ServerAdminCmd):
if setup.outfile:
cmd.extend(["-o", setup.outfile])
inputlist = ["digraph groups {",
- '\trankdir="LR";',
- self.metadata.viz(setup.includehosts, setup.includebundles,
- setup.includekey, setup.only_client,
- self.colors)]
+ '\trankdir="LR";',
+ self.metadata.viz(setup.includehosts,
+ setup.includebundles,
+ setup.includekey,
+ setup.only_client,
+ self.colors)]
if setup.includekey:
inputlist.extend(
["\tsubgraph cluster_key {",
@@ -1150,6 +1140,7 @@ class Xcmd(_ProxyAdminCmd):
class CLI(Bcfg2.Options.CommandRegistry):
+ """ CLI class for bcfg2-admin """
def __init__(self):
Bcfg2.Options.CommandRegistry.__init__(self)
Bcfg2.Options.register_commands(self.__class__, globals().values(),
diff --git a/src/lib/Bcfg2/Server/BuiltinCore.py b/src/lib/Bcfg2/Server/BuiltinCore.py
index 85f7fa228..179a6aa9f 100644
--- a/src/lib/Bcfg2/Server/BuiltinCore.py
+++ b/src/lib/Bcfg2/Server/BuiltinCore.py
@@ -32,7 +32,8 @@ class BuiltinCore(NetworkCore):
daemon_args = dict(uid=Bcfg2.Options.setup.daemon_uid,
gid=Bcfg2.Options.setup.daemon_gid,
- umask=int(Bcfg2.Options.setup.umask, 8))
+ umask=int(Bcfg2.Options.setup.umask, 8),
+ detach_process=True)
if Bcfg2.Options.setup.daemon:
daemon_args['pidfile'] = TimeoutPIDLockFile(
Bcfg2.Options.setup.daemon, acquire_timeout=5)
diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py
index 58044447b..360b7868d 100644
--- a/src/lib/Bcfg2/Server/Core.py
+++ b/src/lib/Bcfg2/Server/Core.py
@@ -227,6 +227,20 @@ class Core(object):
self.logger.error("Updating database %s failed: %s" %
(Bcfg2.Options.setup.db_name, err))
+ def expire_caches_by_type(self, base_cls, key=None):
+ """ Expire caches for all
+ :class:`Bcfg2.Server.Plugin.interfaces.Caching` plugins that
+ are instances of ``base_cls``.
+
+ :param base_cls: The base plugin interface class to match (see
+ :mod:`Bcfg2.Server.Plugin.interfaces`)
+ :type base_cls: type
+ :param key: The cache key to expire
+ """
+ for plugin in self.plugins_by_type(base_cls):
+ if isinstance(plugin, Bcfg2.Server.Plugin.Caching):
+ plugin.expire_cache(key)
+
def plugins_by_type(self, base_cls):
""" Return a list of loaded plugins that match the passed type.
@@ -253,11 +267,12 @@ class Core(object):
self.logger.debug("Performance logging thread starting")
while not self.terminate.isSet():
self.terminate.wait(Bcfg2.Options.setup.performance_interval)
- for name, stats in self.get_statistics(None).items():
- self.logger.info("Performance statistics: "
- "%s min=%.06f, max=%.06f, average=%.06f, "
- "count=%d" % ((name, ) + stats))
- self.logger.debug("Performance logging thread terminated")
+ if not self.terminate.isSet():
+ for name, stats in self.get_statistics(None).items():
+ self.logger.info("Performance statistics: "
+ "%s min=%.06f, max=%.06f, average=%.06f, "
+ "count=%d" % ((name, ) + stats))
+ self.logger.info("Performance logging thread terminated")
def _file_monitor_thread(self):
""" The thread that runs the
@@ -274,11 +289,12 @@ class Core(object):
else:
if not self.fam.pending():
terminate.wait(15)
+ if self.fam.pending():
+ self._update_vcs_revision()
self.fam.handle_event_set(self.lock)
except:
continue
- self._update_vcs_revision()
- self.logger.debug("File monitor thread terminated")
+ self.logger.info("File monitor thread terminated")
@Bcfg2.Server.Statistics.track_statistics()
def _update_vcs_revision(self):
@@ -372,14 +388,14 @@ class Core(object):
def shutdown(self):
""" Perform plugin and FAM shutdown tasks. """
- self.logger.debug("Shutting down core...")
+ self.logger.info("Shutting down core...")
if not self.terminate.isSet():
self.terminate.set()
self.fam.shutdown()
- self.logger.debug("FAM shut down")
+ self.logger.info("FAM shut down")
for plugin in list(self.plugins.values()):
plugin.shutdown()
- self.logger.debug("All plugins shut down")
+ self.logger.info("All plugins shut down")
@property
def metadata_cache_mode(self):
@@ -667,7 +683,27 @@ class Core(object):
if event.code2str() == 'deleted':
return
Bcfg2.Options.get_parser().reparse()
- self.metadata_cache.expire()
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
+
+ def block_for_fam_events(self, handle_events=False):
+ """ Block until all fam events have been handleed, optionally
+ handling events as well. (Setting ``handle_events=True`` is
+ useful for local server cores that don't spawn an event
+ handling thread.)"""
+ slept = 0
+ log_interval = 3
+ if handle_events:
+ self.fam.handle_events_in_interval(1)
+ slept += 1
+ if Bcfg2.Options.setup.fam_blocking:
+ time.sleep(1)
+ slept += 1
+ while self.fam.pending() != 0:
+ time.sleep(1)
+ slept += 1
+ if slept % log_interval == 0:
+ self.logger.debug("Sleeping to handle FAM events...")
+ self.logger.debug("Slept %s seconds while handling FAM events" % slept)
def run(self):
""" Run the server core. This calls :func:`_run`, starts the
@@ -695,11 +731,7 @@ class Core(object):
self.shutdown()
raise
- if Bcfg2.Options.setup.fam_blocking:
- time.sleep(1)
- while self.fam.pending() != 0:
- time.sleep(1)
-
+ self.block_for_fam_events()
self._block()
def _run(self):
@@ -765,7 +797,7 @@ class Core(object):
elif False in ip_checks:
# if any ACL plugin returned False (deny), then deny
self.logger.warning("Client %s failed IP-based ACL checks for %s" %
- (address[0], rmi))
+ (address[0], rmi))
return False
# else, no plugins returned False, but not all plugins
# returned True, so some plugin returned None (defer), so
@@ -808,7 +840,12 @@ class Core(object):
imd = self.metadata_cache.get(client_name, None)
if not imd:
self.logger.debug("Building metadata for %s" % client_name)
- imd = self.metadata.get_initial_metadata(client_name)
+ try:
+ imd = self.metadata.get_initial_metadata(client_name)
+ except MetadataConsistencyError:
+ self.critical_error(
+ "Client metadata resolution error for %s: %s" %
+ (client_name, sys.exc_info()[1]))
connectors = self.plugins_by_type(Connector)
for conn in connectors:
grps = conn.get_additional_groups(imd)
@@ -819,6 +856,9 @@ class Core(object):
imd.query.by_name = self.build_metadata
if self.metadata_cache_mode in ['cautious', 'aggressive']:
self.metadata_cache[client_name] = imd
+ else:
+ self.logger.debug("Using cached metadata object for %s" %
+ client_name)
return imd
def process_statistics(self, client_name, statistics):
@@ -846,6 +886,7 @@ class Core(object):
state.get('state')))
self.client_run_hook("end_statistics", meta)
+ @track_statistics()
def resolve_client(self, address, cleanup_cache=False, metadata=True):
""" Given a client address, get the client hostname and
optionally metadata.
@@ -901,12 +942,10 @@ class Core(object):
def _get_rmi(self):
""" Get a list of RMI calls exposed by plugins """
rmi = dict()
- for pname, pinst in list(self.plugins.items()):
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
for mname in pinst.__rmi__:
rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
- famname = self.fam.__class__.__name__
- for mname in self.fam.__rmi__:
- rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname)
return rmi
def _resolve_exposed_method(self, method_name):
@@ -999,6 +1038,7 @@ class Core(object):
for plugin in self.plugins_by_type(Probing):
for probe in plugin.GetProbes(metadata):
resp.append(probe)
+ self.logger.debug("Sending probe list to %s" % client)
return lxml.etree.tostring(resp,
xml_declaration=False).decode('UTF-8')
except:
@@ -1024,7 +1064,7 @@ class Core(object):
# that's created for RecvProbeData doesn't get cached.
# I.e., the next metadata object that's built, after probe
# data is processed, is cached.
- self.metadata_cache.expire(client)
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
try:
xpdata = lxml.etree.XML(probedata.encode('utf-8'),
parser=Bcfg2.Server.XMLParser)
@@ -1199,9 +1239,14 @@ class Core(object):
self.logger.info("Core: debug = %s" % debug)
levels = self._loglevels[self.debug_flag]
for handler in logging.root.handlers:
- level = levels.get(handler.name, levels['default'])
- self.logger.debug("Setting %s log handler to %s" %
- (handler.name, logging.getLevelName(level)))
+ try:
+ level = levels.get(handler.name, levels['default'])
+ self.logger.debug("Setting %s log handler to %s" %
+ (handler.name, logging.getLevelName(level)))
+ except AttributeError:
+ level = levels['default']
+ self.logger.debug("Setting unknown log handler %s to %s" %
+ (handler, logging.getLevelName(level)))
handler.setLevel(level)
return self.debug_flag
@@ -1271,7 +1316,7 @@ class NetworkCore(Core):
self.logger.error("Failed to set ownership of database "
"at %s: %s" % (db_settings['NAME'], err))
__init__.__doc__ = Core.__init__.__doc__.split(".. -----")[0] + \
-"\n.. automethod:: _daemonize\n"
+ "\n.. automethod:: _daemonize\n"
def run(self):
""" Run the server core. This calls :func:`_daemonize` before
@@ -1296,7 +1341,8 @@ class NetworkCore(Core):
# rewrite $HOME. pulp stores its auth creds in ~/.pulp, so
# this is necessary to make that work when privileges are
# dropped
- os.environ['HOME'] = pwd.getpwuid(self.setup['daemon_uid'])[5]
+ os.environ['HOME'] = \
+ pwd.getpwuid(Bcfg2.Options.setup.daemon_uid)[5]
else:
os.umask(int(Bcfg2.Options.setup.umask, 8))
diff --git a/src/lib/Bcfg2/Server/Encryption.py b/src/lib/Bcfg2/Server/Encryption.py
index 5c200410e..7e1294587 100755
--- a/src/lib/Bcfg2/Server/Encryption.py
+++ b/src/lib/Bcfg2/Server/Encryption.py
@@ -195,278 +195,148 @@ def bruteforce_decrypt(crypted, passphrases=None, algorithm=None):
raise EVPError("Failed to decrypt")
-class EncryptionChunkingError(Exception):
- """ error raised when Encryptor cannot break a file up into chunks
- to be encrypted, or cannot reassemble the chunks """
- pass
+class PassphraseError(Exception):
+ """ Exception raised when there's a problem determining the
+ passphrase to encrypt or decrypt with """
-class Encryptor(object):
- """ Generic encryptor for all files """
+class CryptoTool(object):
+ """ Generic decryption/encryption interface base object """
- def __init__(self):
- self.passphrase = None
- self.pname = None
+ def __init__(self, filename):
self.logger = logging.getLogger(self.__class__.__name__)
+ self.filename = filename
+ self.data = open(self.filename).read()
+ self.pname, self.passphrase = self._get_passphrase()
- def get_encrypted_filename(self, plaintext_filename):
- """ get the name of the file encrypted data should be written to """
- return plaintext_filename
-
- def get_plaintext_filename(self, encrypted_filename):
- """ get the name of the file decrypted data should be written to """
- return encrypted_filename
-
- def chunk(self, data):
- """ generator to break the file up into smaller chunks that
- will each be individually encrypted or decrypted """
- yield data
-
- def unchunk(self, data, original): # pylint: disable=W0613
- """ given chunks of a file, reassemble then into the whole file """
- try:
- return data[0]
- except IndexError:
- raise EncryptionChunkingError("No data to unchunk")
-
- def set_passphrase(self):
- """ set the passphrase for the current file """
+ def _get_passphrase(self):
+ """ get the passphrase for the current file """
if not Bcfg2.Options.setup.passphrases:
- self.logger.error("No passphrases available in %s" %
- Bcfg2.Options.setup.config)
- return False
-
- if self.passphrase:
- self.logger.debug("Using previously determined passphrase %s" %
- self.pname)
- return True
+ raise PassphraseError("No passphrases available in %s" %
+ Bcfg2.Options.setup.configfile)
+ pname = None
if Bcfg2.Options.setup.passphrase:
- self.pname = Bcfg2.Options.setup.passphrase
+ pname = Bcfg2.Options.setup.passphrase
- if self.pname:
+ if pname:
try:
- self.passphrase = Bcfg2.Options.setup.passphrases[self.pname]
+ passphrase = Bcfg2.Options.setup.passphrases[pname]
self.logger.debug("Using passphrase %s specified on command "
- "line" % self.pname)
- return True
+ "line" % pname)
+ return (pname, passphrase)
except KeyError:
- self.logger.error("Could not find passphrase %s in %s" %
- (self.pname, Bcfg2.Options.setup.config))
- return False
+ raise PassphraseError("Could not find passphrase %s in %s" %
+ (pname, Bcfg2.Options.setup.configfile))
else:
- pnames = Bcfg2.Options.setup.passphrases
- if len(pnames) == 1:
- self.pname = pnames.keys()[0]
- self.passphrase = pnames[self.pname]
- self.logger.info("Using passphrase %s" % self.pname)
- return True
- elif len(pnames) > 1:
- self.logger.warning("Multiple passphrases found in %s, "
- "specify one on the command line with -p" %
- Bcfg2.Options.setup.config)
- self.logger.info("No passphrase could be determined")
- return False
-
- def encrypt(self, fname):
- """ encrypt the given file, returning the encrypted data """
+ if len(Bcfg2.Options.setup.passphrases) == 1:
+ pname, passphrase = Bcfg2.Options.setup.passphrases.items()[0]
+ self.logger.info("Using passphrase %s" % pname)
+ return (pname, passphrase)
+ elif len(Bcfg2.Options.setup.passphrases) > 1:
+ return (None, None)
+ raise PassphraseError("No passphrase could be determined")
+
+ def get_destination_filename(self, original_filename):
+ """ Get the filename where data should be written """
+ return original_filename
+
+ def write(self, data):
+ """ write data to disk """
+ new_fname = self.get_destination_filename(self.filename)
try:
- plaintext = open(fname).read()
+ self._write(new_fname, data)
+ self.logger.info("Wrote data to %s" % new_fname)
+ return True
except IOError:
err = sys.exc_info()[1]
- self.logger.error("Error reading %s, skipping: %s" % (fname, err))
+ self.logger.error("Error writing data from %s to %s: %s" %
+ (self.filename, new_fname, err))
return False
- if not self.set_passphrase():
- return False
+ def _write(self, filename, data):
+ """ Perform the actual write of data. This is separate from
+ :func:`CryptoTool.write` so it can be easily
+ overridden. """
+ open(filename, "wb").write(data)
- crypted = []
- try:
- for chunk in self.chunk(plaintext):
- try:
- passphrase, pname = self.get_passphrase(chunk)
- except TypeError:
- return False
- crypted.append(self._encrypt(chunk, passphrase, name=pname))
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error getting data to encrypt from %s: %s" %
- (fname, err))
- return False
- return self.unchunk(crypted, plaintext)
+class Decryptor(CryptoTool):
+ """ Decryptor interface """
+ def decrypt(self):
+ """ decrypt the file, returning the encrypted data """
+ raise NotImplementedError
- # pylint: disable=W0613
- def _encrypt(self, plaintext, passphrase, name=None):
- """ encrypt a single chunk of a file """
- return ssl_encrypt(plaintext, passphrase)
- # pylint: enable=W0613
- def decrypt(self, fname):
- """ decrypt the given file, returning the plaintext data """
- try:
- crypted = open(fname).read()
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error reading %s, skipping: %s" % (fname, err))
- return False
+class Encryptor(CryptoTool):
+ """ encryptor interface """
+ def encrypt(self):
+ """ encrypt the file, returning the encrypted data """
+ raise NotImplementedError
- self.set_passphrase()
- plaintext = []
- try:
- for chunk in self.chunk(crypted):
- try:
- passphrase, pname = self.get_passphrase(chunk)
- try:
- plaintext.append(self._decrypt(chunk, passphrase))
- except EVPError:
- self.logger.info("Could not decrypt %s with the "
- "specified passphrase" % fname)
- continue
- except:
- err = sys.exc_info()[1]
- self.logger.error("Error decrypting %s: %s" %
- (fname, err))
- continue
- except TypeError:
- pchunk = None
- for pname, passphrase in \
- Bcfg2.Options.setup.passphrases.items():
- self.logger.debug("Trying passphrase %s" % pname)
- try:
- pchunk = self._decrypt(chunk, passphrase)
- break
- except EVPError:
- pass
- except:
- err = sys.exc_info()[1]
- self.logger.error("Error decrypting %s: %s" %
- (fname, err))
- if pchunk is not None:
- plaintext.append(pchunk)
- else:
- self.logger.error("Could not decrypt %s with any "
- "passphrase in %s" %
- (fname, Bcfg2.Options.setup.config))
- continue
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error getting encrypted data from %s: %s" %
- (fname, err))
- return False
+class CfgEncryptor(Encryptor):
+ """ encryptor class for Cfg files """
- try:
- return self.unchunk(plaintext, crypted)
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error assembling plaintext data from %s: %s" %
- (fname, err))
- return False
+ def __init__(self, filename):
+ Encryptor.__init__(self, filename)
+ if self.passphrase is None:
+ raise PassphraseError("Multiple passphrases found in %s, "
+ "specify one on the command line with -p" %
+ Bcfg2.Options.setup.configfile)
- def _decrypt(self, crypted, passphrase):
- """ decrypt a single chunk """
- return ssl_decrypt(crypted, passphrase)
+ def encrypt(self):
+ return ssl_encrypt(self.data, self.passphrase)
- def write_encrypted(self, fname, data=None):
- """ write encrypted data to disk """
- if data is None:
- data = self.decrypt(fname)
- new_fname = self.get_encrypted_filename(fname)
- try:
- open(new_fname, "wb").write(data)
- self.logger.info("Wrote encrypted data to %s" % new_fname)
- return True
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error writing encrypted data from %s to %s: %s"
- % (fname, new_fname, err))
- return False
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error assembling encrypted data from %s: %s" %
- (fname, err))
- return False
+ def get_destination_filename(self, original_filename):
+ return original_filename + ".crypt"
- def write_decrypted(self, fname, data=None):
- """ write decrypted data to disk """
- if data is None:
- data = self.decrypt(fname)
- new_fname = self.get_plaintext_filename(fname)
- try:
- open(new_fname, "wb").write(data)
- self.logger.info("Wrote decrypted data to %s" % new_fname)
- return True
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error writing encrypted data from %s to %s: %s"
- % (fname, new_fname, err))
- return False
- def get_passphrase(self, chunk):
- """ get the passphrase for a chunk of a file """
- pname = self._get_passphrase(chunk)
- if not self.pname:
- if not pname:
- self.logger.info("No passphrase given on command line or "
- "found in file")
+class CfgDecryptor(Decryptor):
+ """ Decrypt Cfg files """
+
+ def decrypt(self):
+ """ decrypt the given file, returning the plaintext data """
+ if self.passphrase:
+ try:
+ return ssl_decrypt(self.data, self.passphrase)
+ except EVPError:
+ self.logger.info("Could not decrypt %s with the "
+ "specified passphrase" % self.filename)
return False
- elif pname in Bcfg2.Options.setup.passphrases:
- passphrase = Bcfg2.Options.setup.passphrases[pname]
- else:
- self.logger.error("Could not find passphrase %s in %s" %
- (pname, Bcfg2.Options.setup.config))
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("Error decrypting %s: %s" %
+ (self.filename, err))
+ return False
+ else: # no passphrase given, brute force
+ try:
+ return bruteforce_decrypt(self.data)
+ except EVPError:
+ self.logger.info("Could not decrypt %s with any passphrase" %
+ self.filename)
return False
- else:
- pname = self.pname
- passphrase = self.passphrase
- if self.pname != pname:
- self.logger.warning("Passphrase given on command line (%s) "
- "differs from passphrase embedded in "
- "file (%s), using command-line option" %
- (self.pname, pname))
- return (passphrase, pname)
-
- def _get_passphrase(self, chunk): # pylint: disable=W0613
- """ get the passphrase for a chunk of a file """
- return None
-
-
-class CfgEncryptor(Encryptor):
- """ encryptor class for Cfg files """
-
- def get_encrypted_filename(self, plaintext_filename):
- return plaintext_filename + ".crypt"
- def get_plaintext_filename(self, encrypted_filename):
- if encrypted_filename.endswith(".crypt"):
- return encrypted_filename[:-6]
+ def get_destination_filename(self, original_filename):
+ if original_filename.endswith(".crypt"):
+ return original_filename[:-6]
else:
- return Encryptor.get_plaintext_filename(self, encrypted_filename)
+ return Decryptor.get_destination_filename(self, original_filename)
-class PropertiesEncryptor(Encryptor):
- """ encryptor class for Properties files """
+class PropertiesCryptoMixin(object):
+ """ Mixin to provide some common methods for Properties crypto """
+ default_xpath = '//*'
- def _encrypt(self, plaintext, passphrase, name=None):
- # plaintext is an lxml.etree._Element
- if name is None:
- name = "true"
- if plaintext.text and plaintext.text.strip():
- plaintext.text = ssl_encrypt(plaintext.text, passphrase).strip()
- plaintext.set("encrypted", name)
- return plaintext
-
- def chunk(self, data):
- xdata = lxml.etree.XML(data, parser=XMLParser)
+ def _get_elements(self, xdata):
+ """ Get the list of elements to encrypt or decrypt """
if Bcfg2.Options.setup.xpath:
elements = xdata.xpath(Bcfg2.Options.setup.xpath)
if not elements:
- raise EncryptionChunkingError(
- "XPath expression %s matched no elements" %
- Bcfg2.Options.setup.xpath)
+ self.logger.warning("XPath expression %s matched no elements" %
+ Bcfg2.Options.setup.xpath)
else:
- elements = xdata.xpath('//*[@encrypted]')
+ elements = xdata.xpath(self.default_xpath)
if not elements:
elements = list(xdata.getiterator(tag=lxml.etree.Element))
@@ -489,47 +359,81 @@ class PropertiesEncryptor(Encryptor):
ans = safe_input("Encrypt this element? [y/N] ")
if not ans.lower().startswith("y"):
elements.remove(element)
+ return elements
+
+ def _get_element_passphrase(self, element):
+ """ Get the passphrase to use to encrypt or decrypt a given
+ element """
+ pname = element.get("encrypted")
+ if pname in self.passphrases:
+ passphrase = self.passphrases[pname]
+ elif self.passphrase:
+ if pname:
+ self.logger.warning("Passphrase %s not found in %s, "
+ "using passphrase given on command line" %
+ (pname, Bcfg2.Option.setup.configfile))
+ passphrase = self.passphrase
+ pname = self.pname
+ else:
+ raise PassphraseError("Multiple passphrases found in %s, "
+ "specify one on the command line with -p" %
+ Bcfg2.Options.setup.configfile)
+ return (pname, passphrase)
- # this is not a good use of a generator, but we need to
- # generate the full list of elements in order to ensure that
- # some exist before we know what to return
- for elt in elements:
- yield elt
-
- def unchunk(self, data, original):
- # Properties elements are modified in-place, so we don't
- # actually need to unchunk anything
- xdata = Encryptor.unchunk(self, data, original)
- # find root element
- while xdata.getparent() is not None:
- xdata = xdata.getparent()
- return lxml.etree.tostring(xdata,
- xml_declaration=False,
- pretty_print=True).decode('UTF-8')
-
- def _get_passphrase(self, chunk):
- pname = chunk.get("encrypted")
- if pname and pname.lower() != "true":
- return pname
- return None
-
- def _decrypt(self, crypted, passphrase):
- # crypted is in lxml.etree._Element
- if not crypted.text or not crypted.text.strip():
- self.logger.warning("Skipping empty element %s" % crypted.tag)
- return crypted
- decrypted = ssl_decrypt(crypted.text, passphrase).strip()
- try:
- crypted.text = decrypted.encode('ascii', 'xmlcharrefreplace')
- except UnicodeDecodeError:
- # we managed to decrypt the value, but it contains content
- # that can't even be encoded into xml entities. what
- # probably happened here is that we coincidentally could
- # decrypt a value encrypted with a different key, and
- # wound up with gibberish.
- self.logger.warning("Decrypted %s to gibberish, skipping" %
- crypted.tag)
- return crypted
+ def _write(self, filename, data):
+ """ Write the data """
+ data.getroottree().write(filename,
+ xml_declaration=False,
+ pretty_print=True)
+
+
+class PropertiesEncryptor(Encryptor, PropertiesCryptoMixin):
+ """ encryptor class for Properties files """
+
+ def encrypt(self):
+ xdata = lxml.etree.XML(self.data, parser=XMLParser)
+ for elt in self._get_elements(xdata):
+ try:
+ pname, passphrase = self._get_element_passphrase(elt)
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ return False
+ elt.text = ssl_encrypt(elt.text, passphrase).strip()
+ elt.set("encrypted", pname)
+ return xdata
+
+ def _write(self, filename, data):
+ PropertiesCryptoMixin._write(self, filename, data)
+
+
+class PropertiesDecryptor(Decryptor, PropertiesCryptoMixin):
+ """ decryptor class for Properties files """
+ default_xpath = '//*[@encrypted]'
+
+ def decrypt(self):
+ xdata = lxml.etree.XML(self.data, parser=XMLParser)
+ for elt in self._get_elements(xdata):
+ try:
+ pname, passphrase = self._get_element_passphrase(elt)
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ return False
+ decrypted = ssl_decrypt(elt.text, passphrase).strip()
+ try:
+ elt.text = decrypted.encode('ascii', 'xmlcharrefreplace')
+ elt.set("encrypted", pname)
+ except UnicodeDecodeError:
+ # we managed to decrypt the value, but it contains
+ # content that can't even be encoded into xml
+ # entities. what probably happened here is that we
+ # coincidentally could decrypt a value encrypted with
+ # a different key, and wound up with gibberish.
+ self.logger.warning("Decrypted %s to gibberish, skipping" %
+ elt.tag)
+ return xdata
+
+ def _write(self, filename, data):
+ PropertiesCryptoMixin._write(self, filename, data)
class CLI(object):
@@ -569,7 +473,7 @@ class CLI(object):
def __init__(self):
parser = Bcfg2.Options.get_parser(
description="Encrypt and decrypt Bcfg2 data",
- components=[self, OptionContainer])
+ components=[self, _OptionContainer])
parser.parse()
self.logger = logging.getLogger(parser.prog)
@@ -582,33 +486,15 @@ class CLI(object):
self.logger.error("Cannot decrypt interactively")
Bcfg2.Options.setup.interactive = False
- if Bcfg2.Options.setup.cfg:
- if Bcfg2.Options.setup.xpath:
- self.logger.error("Specifying --xpath with --cfg is "
- "nonsensical, ignoring --xpath")
- Bcfg2.Options.setup.xpath = None
- if Bcfg2.Options.setup.interactive:
- self.logger.error("Cannot use interactive mode with --cfg, "
- "ignoring --interactive")
- Bcfg2.Options.setup.interactive = False
- elif Bcfg2.Options.setup.properties:
- if Bcfg2.Options.setup.remove:
- self.logger.error("--remove cannot be used with --properties, "
- "ignoring --remove")
- Bcfg2.Options.setup.remove = False
-
- self.props_crypt = PropertiesEncryptor()
- self.cfg_crypt = CfgEncryptor()
-
def _is_properties(self, filename):
""" Determine if a given file is a Properties file or not """
if Bcfg2.Options.setup.properties:
return True
elif Bcfg2.Options.setup.cfg:
return False
- elif fname.endswith(".xml"):
+ elif filename.endswith(".xml"):
try:
- xroot = lxml.etree.parse(fname).getroot()
+ xroot = lxml.etree.parse(filename).getroot()
return xroot.tag == "Properties"
except lxml.etree.XMLSyntaxError:
return False
@@ -632,46 +518,66 @@ class CLI(object):
continue
if props:
- encryptor = self.props_crypt
if Bcfg2.Options.setup.remove:
- self.logger.warning("Cannot use --remove with Properties "
- "file %s, ignoring for this file" %
- fname)
+ self.logger.info("Cannot use --remove with Properties "
+ "file %s, ignoring for this file" % fname)
+ try:
+ tools = (PropertiesEncryptor(fname),
+ PropertiesDecryptor(fname))
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ continue
+ except IOError:
+ self.logger.error("Error reading %s, skipping: %s" %
+ (fname, err))
+ continue
else:
if Bcfg2.Options.setup.xpath:
- self.logger.warning("Cannot use xpath with Cfg file %s, "
- "ignoring xpath for this file" % fname)
+ self.logger.error("Specifying --xpath with --cfg is "
+ "nonsensical, ignoring --xpath")
+ Bcfg2.Options.setup.xpath = None
if Bcfg2.Options.setup.interactive:
- self.logger.warning("Cannot use interactive mode with Cfg "
- "file %s, ignoring --interactive for "
- "this file" % fname)
- encryptor = self.cfg_crypt
+ self.logger.error("Cannot use interactive mode with "
+ "--cfg, ignoring --interactive")
+ Bcfg2.Options.setup.interactive = False
+ try:
+ tools = (CfgEncryptor(fname), CfgDecryptor(fname))
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ continue
+ except IOError:
+ self.logger.error("Error reading %s, skipping: %s" %
+ (fname, err))
+ continue
data = None
+ mode = None
if Bcfg2.Options.setup.encrypt:
- xform = encryptor.encrypt
- write = encryptor.write_encrypted
+ tool = tools[0]
+ mode = "encrypt"
elif Bcfg2.Options.setup.decrypt:
- xform = encryptor.decrypt
- write = encryptor.write_decrypted
+ tool = tools[1]
+ mode = "decrypt"
else:
- self.logger.warning("Neither --encrypt nor --decrypt "
- "specified, determining mode")
- data = encryptor.decrypt(fname)
- if data:
- write = encryptor.write_decrypted
- else:
- self.logger.warning("Failed to decrypt %s, trying "
- "encryption" % fname)
+ self.logger.info("Neither --encrypt nor --decrypt specified, "
+ "determining mode")
+ tool = tools[1]
+ try:
+ data = tool.decrypt()
+ mode = "decrypt"
+ except: # pylint: disable=W0702
+ pass
+ if data is False:
data = None
- xform = encryptor.encrypt
- write = encryptor.write_encrypted
+ self.logger.info("Failed to decrypt %s, trying encryption"
+ % fname)
+ tool = tools[0]
+ mode = "encrypt"
if data is None:
- data = xform(fname)
+ data = getattr(tool, mode)()
if not data:
- self.logger.error("Failed to %s %s, skipping" %
- (xform.__name__, fname))
+ self.logger.error("Failed to %s %s, skipping" % (mode, fname))
continue
if Bcfg2.Options.setup.stdout:
if len(Bcfg2.Options.setup.files) > 1:
@@ -680,10 +586,10 @@ class CLI(object):
if len(Bcfg2.Options.setup.files) > 1:
print("")
else:
- write(fname, data=data)
+ tool.write(data)
if (Bcfg2.Options.setup.remove and
- encryptor.get_encrypted_filename(fname) != fname):
+ tool.get_destination_filename(fname) != fname):
try:
os.unlink(fname)
except IOError:
diff --git a/src/lib/Bcfg2/Server/Info.py b/src/lib/Bcfg2/Server/Info.py
index 76da861ba..24d7cc637 100644
--- a/src/lib/Bcfg2/Server/Info.py
+++ b/src/lib/Bcfg2/Server/Info.py
@@ -163,9 +163,10 @@ class Build(InfoCmd):
type=argparse.FileType('w'))]
def run(self, setup):
+ etree = lxml.etree.ElementTree(
+ self.core.BuildConfiguration(setup.hostname))
try:
- lxml.etree.ElementTree(
- self.core.BuildConfiguration(setup.hostname)).write(
+ etree.write(
setup.filename,
encoding='UTF-8', xml_declaration=True,
pretty_print=True)
@@ -367,6 +368,23 @@ class Automatch(InfoCmd):
pretty_print=True).decode('UTF-8'))
+class ExpireCache(InfoCmd):
+ """ Expire the metadata cache """
+
+ options = [
+ Bcfg2.Options.PositionalArgument(
+ "hostname", nargs="*", default=[],
+ help="Expire cache for the given host(s)")]
+
+ def run(self, setup):
+ if setup.clients:
+ for client in self.get_client_list(setup.clients):
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata,
+ key=client)
+ else:
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
+
+
class Bundles(InfoCmd):
""" Print out group/bundle info """
@@ -572,7 +590,7 @@ class Mappings(InfoCmd):
print_tabular(data)
-class Packageresolve(InfoCmd):
+class PackageResolve(InfoCmd):
""" Resolve packages for the given host"""
options = [Bcfg2.Options.PositionalArgument("hostname"),
@@ -658,20 +676,12 @@ class Shell(InfoCmd):
interactive = False
def run(self, setup):
- loop = True
- while loop:
- try:
- self.core.cmdloop('Welcome to bcfg2-info\n'
- 'Type "help" for more information')
- except SystemExit:
- raise
- except Bcfg2.Server.Plugin.PluginExecutionError:
- continue
- except KeyboardInterrupt:
- print("Ctrl-C pressed, exiting...")
- loop = False
- except:
- self.core.logger.error("Command failure", exc_info=1)
+ try:
+ self.core.cmdloop('Welcome to bcfg2-info\n'
+ 'Type "help" for more information')
+ except KeyboardInterrupt:
+ print("Ctrl-C pressed, exiting...")
+ loop = False
class ProfileTemplates(InfoCmd):
@@ -735,7 +745,7 @@ class ProfileTemplates(InfoCmd):
def stdev(self, nums):
mean = float(sum(nums)) / len(nums)
- return math.sqrt(sum((n - mean)**2 for n in nums) / float(len(nums)))
+ return math.sqrt(sum((n - mean) ** 2 for n in nums) / float(len(nums)))
def run(self, setup):
clients = self.get_client_list(setup.clients)
@@ -762,7 +772,6 @@ class ProfileTemplates(InfoCmd):
std = self.stdev(ptimes)
if mean > 0.01 or median > 0.01 or std > 1 or setup.templates:
tmpltimes.append((tmpl, mean, median, std))
- else:
print("%-50s %-9s %-11s %6s" %
("Template", "Mean Time", "Median Time", "σ"))
for info in reversed(sorted(tmpltimes, key=operator.itemgetter(1))):
@@ -792,7 +801,7 @@ class InfoCore(cmd.Cmd,
cmd.Cmd.__init__(self)
Bcfg2.Server.Core.Core.__init__(self)
Bcfg2.Options.CommandRegistry.__init__(self)
- self.prompt = '> '
+ self.prompt = 'bcfg2-info> '
def get_locals(self):
return locals()
@@ -816,7 +825,7 @@ class InfoCore(cmd.Cmd,
def run(self):
self.load_plugins()
- self.fam.handle_events_in_interval(1)
+ self.block_for_fam_events(handle_events=True)
def _daemonize(self):
pass
diff --git a/src/lib/Bcfg2/Server/Lint/Cfg.py b/src/lib/Bcfg2/Server/Lint/Cfg.py
index 933e677e0..4cdf5c48a 100644
--- a/src/lib/Bcfg2/Server/Lint/Cfg.py
+++ b/src/lib/Bcfg2/Server/Lint/Cfg.py
@@ -37,22 +37,41 @@ class Cfg(ServerPlugin):
"%s has no corresponding pubkey.xml at %s" %
(basename, pubkey))
+ def _list_path_components(self, path):
+ """ Get a list of all components of a path. E.g.,
+ ``self._list_path_components("/foo/bar/foobaz")`` would return
+ ``["foo", "bar", "foo", "baz"]``. The list is not guaranteed
+ to be in order."""
+ rv = []
+ remaining, component = os.path.split(path)
+ while component != '':
+ rv.append(component)
+ remaining, component = os.path.split(remaining)
+ return rv
+
def check_missing_files(self):
""" check that all files on the filesystem are known to Cfg """
cfg = self.core.plugins['Cfg']
# first, collect ignore patterns from handlers
- ignore = []
- for hdlr in cfg.handlers:
- ignore.extend(hdlr.__ignore__)
+ ignore = set()
+ for hdlr in handlers():
+ ignore.update(hdlr.__ignore__)
# next, get a list of all non-ignored files on the filesystem
all_files = set()
for root, _, files in os.walk(cfg.data):
- all_files.update(os.path.join(root, fname)
- for fname in files
- if not any(fname.endswith("." + i)
- for i in ignore))
+ for fname in files:
+ fpath = os.path.join(root, fname)
+ # check against the handler ignore patterns and the
+ # global FAM ignore list
+ if (not any(fname.endswith("." + i) for i in ignore) and
+ not any(fnmatch(fpath, p)
+ for p in self.config['ignore']) and
+ not any(fnmatch(c, p)
+ for p in self.config['ignore']
+ for c in self._list_path_components(fpath))):
+ all_files.add(fpath)
# next, get a list of all files known to Cfg
cfg_files = set()
diff --git a/src/lib/Bcfg2/Server/Lint/Comments.py b/src/lib/Bcfg2/Server/Lint/Comments.py
index c9a34a75f..e2d1ec597 100644
--- a/src/lib/Bcfg2/Server/Lint/Comments.py
+++ b/src/lib/Bcfg2/Server/Lint/Comments.py
@@ -90,8 +90,7 @@ class Comments(Bcfg2.Server.Lint.ServerPlugin):
Bcfg2.Options.Option(
cf=("Comments", "probe_comments"),
type=Bcfg2.Options.Types.comma_list, default=[],
- help="Required comments for probes")
- ]
+ help="Required comments for probes")]
def __init__(self, *args, **kwargs):
Bcfg2.Server.Lint.ServerPlugin.__init__(self, *args, **kwargs)
diff --git a/src/lib/Bcfg2/Server/Lint/__init__.py b/src/lib/Bcfg2/Server/Lint/__init__.py
index 26de28e7c..4f64fd006 100644
--- a/src/lib/Bcfg2/Server/Lint/__init__.py
+++ b/src/lib/Bcfg2/Server/Lint/__init__.py
@@ -429,7 +429,7 @@ class CLI(object):
""" run plugins that require a running server to run """
core = Bcfg2.Server.Core.Core()
core.load_plugins()
- core.fam.handle_events_in_interval(0.1)
+ core.block_for_fam_events(handle_events=True)
try:
self.logger.debug("Running server plugins: %s" %
[p.__name__ for p in self.serverplugins])
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 7e04b1eae..678a1c95d 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -2,15 +2,134 @@
:mod:`Bcfg2.Server.BuiltinCore` that uses the Python
:mod:`multiprocessing` library to offload work to multiple child
processes. As such, it requires Python 2.6+.
+
+The parent communicates with the children over
+:class:`multiprocessing.Queue` objects via a
+:class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object.
+
+A method being called via the RPCQueue must be exposed by the child by
+decorating it with :func:`Bcfg2.Server.Core.exposed`.
"""
+import time
import threading
import lxml.etree
import multiprocessing
import Bcfg2.Options
-from Bcfg2.Compat import Queue
+import Bcfg2.Server.Plugin
+from itertools import cycle
+from Bcfg2.Cache import Cache
+from Bcfg2.Compat import Queue, Empty, wraps
from Bcfg2.Server.Core import Core, exposed
from Bcfg2.Server.BuiltinCore import BuiltinCore
+from multiprocessing.connection import Listener, Client
+
+
+class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable):
+ """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates
+ cache expiration events to child nodes. """
+
+ #: The method to send over the pipe to expire the cache
+ method = "expire_metadata_cache"
+
+ def __init__(self, *args, **kwargs):
+ self.rpc_q = kwargs.pop("queue")
+ Bcfg2.Server.Plugin.Debuggable.__init__(self)
+ Cache.__init__(self, *args, **kwargs)
+
+ def expire(self, key=None):
+ self.rpc_q.publish(self.method, args=[key])
+ Cache.expire(self, key=key)
+
+
+class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
+ """ An implementation of a :class:`multiprocessing.Queue` designed
+ for several additional use patterns:
+
+ * Random-access reads, based on a key that identifies the data;
+ * Publish-subscribe, where a datum is sent to all hosts.
+
+ The subscribers can deal with this as a normal Queue with no
+ special handling.
+ """
+ poll_wait = 3.0
+
+ def __init__(self):
+ Bcfg2.Server.Plugin.Debuggable.__init__(self)
+ self._terminate = threading.Event()
+ self._queues = dict()
+ self._available_listeners = Queue()
+ self._blocking_listeners = []
+
+ def add_subscriber(self, name):
+ """ Add a subscriber to the queue. This returns the
+ :class:`multiprocessing.Queue` object that the subscriber
+ should read from. """
+ self._queues[name] = multiprocessing.Queue()
+ return self._queues[name]
+
+ def publish(self, method, args=None, kwargs=None):
+ """ Publish an RPC call to the queue for consumption by all
+ subscribers. """
+ for queue in self._queues.values():
+ queue.put((None, (method, args or [], kwargs or dict())))
+
+ def rpc(self, dest, method, args=None, kwargs=None):
+ """ Make an RPC call to the named subscriber, expecting a
+ response. This opens a
+ :class:`multiprocessing.connection.Listener` and passes the
+ Listener address to the child as part of the RPC call, so that
+ the child can connect to the Listener to submit its results.
+
+ Listeners are reused when possible to minimize overhead.
+ """
+ try:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Reusing existing RPC listener at %s" %
+ listener.address)
+ except Empty:
+ listener = Listener()
+ self.logger.debug("Created new RPC listener at %s" %
+ listener.address)
+ self._blocking_listeners.append(listener)
+ try:
+ self._queues[dest].put((listener.address,
+ (method, args or [], kwargs or dict())))
+ conn = listener.accept()
+ self._blocking_listeners.remove(listener)
+ try:
+ while not self._terminate.is_set():
+ if conn.poll(self.poll_wait):
+ return conn.recv()
+ finally:
+ conn.close()
+ finally:
+ self._available_listeners.put(listener)
+
+ def close(self):
+ """ Close queues and connections. """
+ self._terminate.set()
+ self.logger.debug("Closing RPC queues")
+ for name, queue in self._queues.items():
+ self.logger.debug("Closing RPC queue to %s" % name)
+ queue.close()
+
+ # close any listeners that are waiting for connections
+ self.logger.debug("Closing RPC connections")
+ for listener in self._blocking_listeners:
+ self.logger.debug("Closing RPC connection at %s" %
+ listener.address)
+ listener.close()
+
+ self.logger.debug("Closing RPC listeners")
+ try:
+ while True:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Closing RPC listener at %s" %
+ listener.address)
+ listener.close()
+ except Empty:
+ pass
class DualEvent(object):
@@ -61,66 +180,152 @@ class ChildCore(Core):
those, though, if the pipe communication "protocol" were made more
robust. """
- #: How long to wait while polling for new clients to build. This
- #: doesn't affect the speed with which a client is built, but
+ #: How long to wait while polling for new RPC commands. This
+ #: doesn't affect the speed with which a command is processed, but
#: setting it too high will result in longer shutdown times, since
#: we only check for the termination event from the main process
#: every ``poll_wait`` seconds.
- poll_wait = 5.0
+ poll_wait = 3.0
- def __init__(self, pipe, terminate):
+ def __init__(self, name, rpc_q, terminate):
"""
- :param pipe: The pipe to which client hostnames are added for
- ChildCore objects to build configurations, and to
- which client configurations are added after
- having been built by ChildCore objects.
- :type pipe: multiprocessing.Pipe
+ :param name: The name of this child
+ :type name: string
+ :param read_q: The queue the child will read from for RPC
+ communications from the parent process.
+ :type read_q: multiprocessing.Queue
+ :param write_q: The queue the child will write the results of
+ RPC calls to.
+ :type write_q: multiprocessing.Queue
:param terminate: An event that flags ChildCore objects to shut
themselves down.
:type terminate: multiprocessing.Event
"""
Core.__init__(self)
- #: The pipe to which client hostnames are added for ChildCore
- #: objects to build configurations, and to which client
- #: configurations are added after having been built by
- #: ChildCore objects.
- self.pipe = pipe
+ #: The name of this child
+ self.name = name
#: The :class:`multiprocessing.Event` that will be monitored
#: to determine when this child should shut down.
self.terminate = terminate
- def _daemonize(self):
- return True
+ #: The queue used for RPC communication
+ self.rpc_q = rpc_q
+
+ # override this setting so that the child doesn't try to write
+ # the pidfile
+ Bcfg2.Options.setup.daemon = False
+
+ # ensure that the child doesn't start a perflog thread
+ self.perflog_thread = None
+
+ self._rmi = dict()
def _run(self):
return True
+ def _daemonize(self):
+ return True
+
+ def _dispatch(self, address, data):
+ """ Method dispatcher used for commands received from
+ the RPC queue. """
+ if address is not None:
+ # if the key is None, then no response is expected. we
+ # make the return connection before dispatching the actual
+ # RPC call so that the parent is blocking for a connection
+ # as briefly as possible
+ self.logger.debug("Connecting to parent via %s" % address)
+ client = Client(address)
+ method, args, kwargs = data
+ func = None
+ rv = None
+ if "." in method:
+ if method in self._rmi:
+ func = self._rmi[method]
+ else:
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ elif not hasattr(self, method):
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ else: # method is not a plugin RMI, and exists
+ func = getattr(self, method)
+ if not func.exposed:
+ self.logger.error("%s: Method %s is not exposed" % (self.name,
+ method))
+ func = None
+ if func is not None:
+ self.logger.debug("%s: Calling RPC method %s" % (self.name,
+ method))
+ rv = func(*args, **kwargs)
+ if address is not None:
+ # if the key is None, then no response is expected
+ self.logger.debug("Returning data to parent via %s" % address)
+ client.send(rv)
+
def _block(self):
- while not self.terminate.isSet():
+ self._rmi = self._get_rmi()
+ while not self.terminate.is_set():
try:
- if self.pipe.poll(self.poll_wait):
- if not self.metadata.use_database:
- # handle FAM events, in case (for instance) the
- # client has just been added to clients.xml, or a
- # profile has just been asserted. but really, you
- # should be using the metadata database if you're
- # using this core.
- self.fam.handle_events_in_interval(0.1)
- client = self.pipe.recv()
- self.logger.debug("Building configuration for %s" % client)
- config = \
- lxml.etree.tostring(self.BuildConfiguration(client))
- self.logger.debug("Returning configuration for %s to main "
- "process" % client)
- self.pipe.send(config)
- self.logger.debug("Returned configuration for %s to main "
- "process" % client)
+ address, data = self.rpc_q.get(timeout=self.poll_wait)
+ threadname = "-".join(str(i) for i in data)
+ rpc_thread = threading.Thread(name=threadname,
+ target=self._dispatch,
+ args=[address, data])
+ rpc_thread.start()
+ except Empty:
+ pass
except KeyboardInterrupt:
break
self.shutdown()
+ def shutdown(self):
+ Core.shutdown(self)
+ self.logger.info("%s: Closing RPC command queue" % self.name)
+ self.rpc_q.close()
+
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("%s: Waiting for %d thread(s): %s" %
+ (self.name, len(threads),
+ [t.name for t in threads]))
+ time.sleep(1)
+ self.logger.info("%s: All threads stopped" % self.name)
+
+ def _get_rmi(self):
+ rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ mname = crmi[1]
+ else:
+ mname = crmi
+ rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
+ return rmi
+
+ @exposed
+ def expire_metadata_cache(self, client=None):
+ """ Expire the metadata cache for a client """
+ self.metadata_cache.expire(client)
+
+ @exposed
+ def RecvProbeData(self, address, _):
+ """ Expire the probe cache for a client """
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing,
+ key=self.resolve_client(address,
+ metadata=False)[0])
+
+ @exposed
+ def GetConfig(self, client):
+ """ Render the configuration for a client """
+ self.logger.debug("%s: Building configuration for %s" %
+ (self.name, client))
+ return lxml.etree.tostring(self.BuildConfiguration(client))
+
class MultiprocessingCore(BuiltinCore):
""" A multiprocessing core that delegates building the actual
@@ -137,7 +342,6 @@ class MultiprocessingCore(BuiltinCore):
default=multiprocessing.cpu_count(),
help='Spawn this number of children for the multiprocessing core')]
-
#: How long to wait for a child process to shut down cleanly
#: before it is terminated.
shutdown_timeout = 10.0
@@ -160,51 +364,162 @@ class MultiprocessingCore(BuiltinCore):
self.available_children = \
Queue(maxsize=Bcfg2.Options.setup.core_children)
- # sigh. multiprocessing was added in py2.6, which is when the
- # camelCase methods for threading objects were deprecated in
- # favor of the Pythonic under_score methods. So
- # multiprocessing.Event *only* has is_set(), while
- # threading.Event has *both* isSet() and is_set(). In order
- # to make the core work with Python 2.4+, and with both
- # multiprocessing and threading Event objects, we just
- # monkeypatch self.terminate to have isSet().
+ #: The flag that indicates when to stop child threads and
+ #: processes
self.terminate = DualEvent(threading_event=self.terminate)
+ #: A :class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object
+ #: used to send or publish commands to children.
+ self.rpc_q = RPCQueue()
+
+ self.metadata_cache = DispatchingCache(queue=self.rpc_q)
+
+ #: A list of children that will be cycled through
+ self._all_children = []
+
+ #: An iterator that each child will be taken from in sequence,
+ #: to provide a round-robin distribution of render requests
+ self.children = None
+
def _run(self):
for cnum in range(Bcfg2.Options.setup.core_children):
name = "Child-%s" % cnum
- (mainpipe, childpipe) = multiprocessing.Pipe()
- self.pipes[name] = mainpipe
+
self.logger.debug("Starting child %s" % name)
- childcore = ChildCore(childpipe, self.terminate)
+ child_q = self.rpc_q.add_subscriber(name)
+ childcore = ChildCore(name, child_q, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
child.start()
self.logger.debug("Child %s started with PID %s" % (name,
child.pid))
- self.available_children.put(name)
+ self._all_children.append(name)
+ self.logger.debug("Started %s children: %s" % (len(self._all_children),
+ self._all_children))
+ self.children = cycle(self._all_children)
return BuiltinCore._run(self)
def shutdown(self):
BuiltinCore.shutdown(self)
- for child in multiprocessing.active_children():
- self.logger.debug("Shutting down child %s" % child.name)
- child.join(self.shutdown_timeout)
- if child.is_alive():
+ self.logger.info("Closing RPC command queues")
+ self.rpc_q.close()
+
+ def term_children():
+ """ Terminate all remaining multiprocessing children. """
+ for child in multiprocessing.active_children():
self.logger.error("Waited %s seconds to shut down %s, "
"terminating" % (self.shutdown_timeout,
child.name))
child.terminate()
- else:
- self.logger.debug("Child %s shut down" % child.name)
- self.logger.debug("All children shut down")
+
+ timer = threading.Timer(self.shutdown_timeout, term_children)
+ timer.start()
+ while len(multiprocessing.active_children()):
+ self.logger.info("Waiting for %s child(ren): %s" %
+ (len(multiprocessing.active_children()),
+ [c.name
+ for c in multiprocessing.active_children()]))
+ time.sleep(1)
+ timer.cancel()
+ self.logger.info("All children shut down")
+
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("Waiting for %s thread(s): %s" %
+ (len(threads), [t.name for t in threads]))
+ time.sleep(1)
+ self.logger.info("Shutdown complete")
+
+ def _get_rmi(self):
+ child_rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ parentname, childname = crmi
+ else:
+ parentname = childname = crmi
+ child_rmi["%s.%s" % (pname, parentname)] = \
+ "%s.%s" % (pname, childname)
+
+ rmi = BuiltinCore._get_rmi(self)
+ for method in rmi.keys():
+ if method in child_rmi:
+ rmi[method] = self._child_rmi_wrapper(method,
+ rmi[method],
+ child_rmi[method])
+ return rmi
+
+ def _child_rmi_wrapper(self, method, parent_rmi, child_rmi):
+ """ Returns a callable that dispatches a call to the given
+ child RMI to child processes, and calls the parent RMI locally
+ (i.e., in the parent process). """
+ @wraps(parent_rmi)
+ def inner(*args, **kwargs):
+ self.logger.debug("Dispatching RMI call to %s to children: %s" %
+ (method, child_rmi))
+ self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs)
+ return parent_rmi(*args, **kwargs)
+
+ return inner
+
+ @exposed
+ def set_debug(self, address, debug):
+ self.rpc_q.set_debug(debug)
+ self.rpc_q.publish("set_debug", args=[address, debug])
+ self.metadata_cache.set_debug(debug)
+ return BuiltinCore.set_debug(self, address, debug)
+
+ @exposed
+ def RecvProbeData(self, address, probedata):
+ rv = BuiltinCore.RecvProbeData(self, address, probedata)
+ # we don't want the children to actually process probe data,
+ # so we don't send the data, just the fact that we got some.
+ self.rpc_q.publish("RecvProbeData", args=[address, None])
+ return rv
@exposed
def GetConfig(self, address):
client = self.resolve_client(address)[0]
- childname = self.available_children.get()
- self.logger.debug("Building configuration on child %s" % childname)
- pipe = self.pipes[childname]
- pipe.send(client)
- config = pipe.recv()
- self.available_children.put_nowait(childname)
- return config
+ childname = self.children.next()
+ self.logger.debug("Building configuration for %s on %s" % (client,
+ childname))
+ return self.rpc_q.rpc(childname, "GetConfig", args=[client])
+
+ @exposed
+ def get_statistics(self, address):
+ stats = dict()
+
+ def _aggregate_statistics(newstats, prefix=None):
+ """ Aggregate a set of statistics from a child or parent
+ server core. This adds the statistics to the overall
+ statistics dict (optionally prepending a prefix, such as
+ "Child-1", to uniquely identify this set of statistics),
+ and aggregates it with the set of running totals that are
+ kept from all cores. """
+ for statname, vals in newstats.items():
+ if statname.startswith("ChildCore:"):
+ statname = statname[5:]
+ if prefix:
+ prettyname = "%s:%s" % (prefix, statname)
+ else:
+ prettyname = statname
+ stats[prettyname] = vals
+ totalname = "Total:%s" % statname
+ if totalname not in stats:
+ stats[totalname] = vals
+ else:
+ newmin = min(stats[totalname][0], vals[0])
+ newmax = max(stats[totalname][1], vals[1])
+ newcount = stats[totalname][3] + vals[3]
+ newmean = ((stats[totalname][2] * stats[totalname][3]) +
+ (vals[2] * vals[3])) / newcount
+ stats[totalname] = (newmin, newmax, newmean, newcount)
+
+ stats = dict()
+ for childname in self._all_children:
+ _aggregate_statistics(
+ self.rpc_q.rpc(childname, "get_statistics", args=[address]),
+ prefix=childname)
+ _aggregate_statistics(BuiltinCore.get_statistics(self, address))
+ return stats
diff --git a/src/lib/Bcfg2/Server/Plugin/__init__.py b/src/lib/Bcfg2/Server/Plugin/__init__.py
index 0c7d111f3..a85867134 100644
--- a/src/lib/Bcfg2/Server/Plugin/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugin/__init__.py
@@ -28,11 +28,11 @@ class _OptionContainer(object):
options = [
Bcfg2.Options.Common.default_paranoid,
Bcfg2.Options.Option(
- cf=('mdata', 'owner'), dest="default_owner", default='root',
- help='Default Path owner'),
+ cf=('mdata', 'owner'), dest="default_owner", default='root',
+ help='Default Path owner'),
Bcfg2.Options.Option(
- cf=('mdata', 'group'), dest="default_group", default='root',
- help='Default Path group'),
+ cf=('mdata', 'group'), dest="default_group", default='root',
+ help='Default Path group'),
Bcfg2.Options.Option(
cf=('mdata', 'important'), dest="default_important",
default='false', choices=['true', 'false'],
diff --git a/src/lib/Bcfg2/Server/Plugin/base.py b/src/lib/Bcfg2/Server/Plugin/base.py
index e94ab9335..b2d9fa7c8 100644
--- a/src/lib/Bcfg2/Server/Plugin/base.py
+++ b/src/lib/Bcfg2/Server/Plugin/base.py
@@ -39,6 +39,20 @@ class Plugin(Debuggable):
#: List of names of methods to be exposed as XML-RPC functions
__rmi__ = Debuggable.__rmi__
+ #: How exposed XML-RPC functions should be dispatched to child
+ #: processes, if :mod:`Bcfg2.Server.MultiprocessingCore` is in
+ #: use. Items ``__child_rmi__`` can either be strings (in which
+ #: case the same function is called on child processes as on the
+ #: parent) or 2-tuples, in which case the first element is the
+ #: name of the RPC function called on the parent process, and the
+ #: second element is the name of the function to call on child
+ #: processes. Functions that are not listed in the list will not
+ #: be dispatched to child processes, i.e., they will only be
+ #: called on the parent. A function must be listed in ``__rmi__``
+ #: in order to be exposed; functions listed in ``_child_rmi__``
+ #: but not ``__rmi__`` will be ignored.
+ __child_rmi__ = Debuggable.__child_rmi__
+
def __init__(self, core, datastore):
"""
:param core: The Bcfg2.Server.Core initializing the plugin
@@ -81,6 +95,8 @@ class Plugin(Debuggable):
self.running = False
def set_debug(self, debug):
+ self.debug_log("%s: debug = %s" % (self.name, self.debug_flag),
+ flag=True)
for entry in self.Entries.values():
if isinstance(entry, Debuggable):
entry.set_debug(debug)
diff --git a/src/lib/Bcfg2/Server/Plugin/helpers.py b/src/lib/Bcfg2/Server/Plugin/helpers.py
index f52662bde..0266af909 100644
--- a/src/lib/Bcfg2/Server/Plugin/helpers.py
+++ b/src/lib/Bcfg2/Server/Plugin/helpers.py
@@ -33,63 +33,8 @@ try:
except ImportError:
HAS_DJANGO = False
-#: A dict containing default metadata for Path entries from bcfg2.conf
-DEFAULT_FILE_METADATA = Bcfg2.Options.OptionParser(
- dict(configfile=Bcfg2.Options.CFILE,
- owner=Bcfg2.Options.MDATA_OWNER,
- group=Bcfg2.Options.MDATA_GROUP,
- mode=Bcfg2.Options.MDATA_MODE,
- secontext=Bcfg2.Options.MDATA_SECONTEXT,
- important=Bcfg2.Options.MDATA_IMPORTANT,
- paranoid=Bcfg2.Options.MDATA_PARANOID,
- sensitive=Bcfg2.Options.MDATA_SENSITIVE))
-DEFAULT_FILE_METADATA.parse([Bcfg2.Options.CFILE.cmd, Bcfg2.Options.CFILE])
-del DEFAULT_FILE_METADATA['args']
-del DEFAULT_FILE_METADATA['configfile']
-
LOGGER = logging.getLogger(__name__)
-#: a compiled regular expression for parsing info and :info files
-INFO_REGEX = re.compile(r'owner:\s*(?P<owner>\S+)|' +
- r'group:\s*(?P<group>\S+)|' +
- r'mode:\s*(?P<mode>\w+)|' +
- r'secontext:\s*(?P<secontext>\S+)|' +
- r'paranoid:\s*(?P<paranoid>\S+)|' +
- r'sensitive:\s*(?P<sensitive>\S+)|' +
- r'encoding:\s*(?P<encoding>\S+)|' +
- r'important:\s*(?P<important>\S+)|' +
- r'mtime:\s*(?P<mtime>\w+)')
-
-
-def bind_info(entry, metadata, infoxml=None, default=DEFAULT_FILE_METADATA):
- """ Bind the file metadata in the given
- :class:`Bcfg2.Server.Plugin.helpers.InfoXML` object to the given
- entry.
-
- :param entry: The abstract entry to bind the info to
- :type entry: lxml.etree._Element
- :param metadata: The client metadata to get info for
- :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param infoxml: The info.xml file to pull file metadata from
- :type infoxml: Bcfg2.Server.Plugin.helpers.InfoXML
- :param default: Default metadata to supply when the info.xml file
- does not include a particular attribute
- :type default: dict
- :returns: None
- :raises: :class:`Bcfg2.Server.Plugin.exceptions.PluginExecutionError`
- """
- for attr, val in list(default.items()):
- entry.set(attr, val)
- if infoxml:
- mdata = dict()
- infoxml.pnode.Match(metadata, mdata, entry=entry)
- if 'Info' not in mdata:
- msg = "Failed to set metadata for file %s" % entry.get('name')
- LOGGER.error(msg)
- raise PluginExecutionError(msg)
- for attr, val in list(mdata['Info'][None].items()):
- entry.set(attr, val)
-
class track_statistics(object): # pylint: disable=C0103
""" Decorator that tracks execution time for the given
@@ -141,6 +86,38 @@ def removecomment(stream):
yield kind, data, pos
+def bind_info(entry, metadata, infoxml=None, default=None):
+ """ Bind the file metadata in the given
+ :class:`Bcfg2.Server.Plugin.helpers.InfoXML` object to the given
+ entry.
+
+ :param entry: The abstract entry to bind the info to
+ :type entry: lxml.etree._Element
+ :param metadata: The client metadata to get info for
+ :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
+ :param infoxml: The info.xml file to pull file metadata from
+ :type infoxml: Bcfg2.Server.Plugin.helpers.InfoXML
+ :param default: Default metadata to supply when the info.xml file
+ does not include a particular attribute
+ :type default: dict
+ :returns: None
+ :raises: :class:`Bcfg2.Server.Plugin.exceptions.PluginExecutionError`
+ """
+ if default is None:
+ default = default_path_metadata()
+ for attr, val in list(default.items()):
+ entry.set(attr, val)
+ if infoxml:
+ mdata = dict()
+ infoxml.pnode.Match(metadata, mdata, entry=entry)
+ if 'Info' not in mdata:
+ msg = "Failed to set metadata for file %s" % entry.get('name')
+ LOGGER.error(msg)
+ raise PluginExecutionError(msg)
+ for attr, val in list(mdata['Info'][None].items()):
+ entry.set(attr, val)
+
+
def default_path_metadata():
""" Get the default Path entry metadata from the config.
diff --git a/src/lib/Bcfg2/Server/Plugin/interfaces.py b/src/lib/Bcfg2/Server/Plugin/interfaces.py
index 619d72afd..30275f6ad 100644
--- a/src/lib/Bcfg2/Server/Plugin/interfaces.py
+++ b/src/lib/Bcfg2/Server/Plugin/interfaces.py
@@ -632,3 +632,22 @@ class ClientACLs(object):
:returns: bool
"""
return True
+
+
+class Caching(object):
+ """ A plugin that caches more than just the data received from the
+ FAM. This presents a unified interface to clear the cache. """
+
+ def expire_cache(self, key=None):
+ """ Expire the cache associated with the given key.
+
+ :param key: The key to expire the cache for. Because cache
+ implementations vary tremendously between plugins,
+ this could be any number of things, but generally
+ a hostname. It also may or may not be possible to
+ expire the cache for a single host; this interface
+ does not require any guarantee about that.
+ :type key: varies
+ :returns: None
+ """
+ raise NotImplementedError
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
index ed349c87c..a7fa92201 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
@@ -414,7 +414,6 @@ class CfgEntrySet(Bcfg2.Server.Plugin.EntrySet):
self._handlers = None
__init__.__doc__ = Bcfg2.Server.Plugin.EntrySet.__doc__
-
def set_debug(self, debug):
rv = Bcfg2.Server.Plugin.EntrySet.set_debug(self, debug)
for entry in self.entries.values():
@@ -780,8 +779,8 @@ class Cfg(Bcfg2.Server.Plugin.GroupSpool,
options = Bcfg2.Server.Plugin.GroupSpool.options + [
Bcfg2.Options.BooleanOption(
- '--cfg-validation', cf=('cfg', 'validation'), default=True,
- help='Run validation on Cfg files'),
+ '--cfg-validation', cf=('cfg', 'validation'), default=True,
+ help='Run validation on Cfg files'),
Bcfg2.Options.Option(
cf=("cfg", "handlers"), dest="cfg_handlers",
help="Cfg handlers to load",
diff --git a/src/lib/Bcfg2/Server/Plugins/Guppy.py b/src/lib/Bcfg2/Server/Plugins/Guppy.py
index 6d6df3cc3..c5969f978 100644
--- a/src/lib/Bcfg2/Server/Plugins/Guppy.py
+++ b/src/lib/Bcfg2/Server/Plugins/Guppy.py
@@ -34,6 +34,7 @@ class Guppy(Bcfg2.Server.Plugin.Plugin):
"""Guppy is a debugging plugin to help trace memory leaks"""
__author__ = 'bcfg-dev@mcs.anl.gov'
__rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Enable', 'Disable']
+ __child_rmi__ = __rmi__[:]
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py
index a2eeffc3d..24adee4f4 100644
--- a/src/lib/Bcfg2/Server/Plugins/Metadata.py
+++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py
@@ -40,7 +40,6 @@ def load_django_models():
hostname = models.CharField(max_length=255, primary_key=True)
version = models.CharField(max_length=31, null=True)
-
class ClientVersions(MutableMapping,
Bcfg2.Server.Plugin.DatabaseBacked):
""" dict-like object to make it easier to access client bcfg2
@@ -495,6 +494,7 @@ class MetadataGroup(tuple): # pylint: disable=E0012,R0924
class Metadata(Bcfg2.Server.Plugin.Metadata,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.ClientRunHooks,
Bcfg2.Server.Plugin.DatabaseBacked):
"""This class contains data for bcfg2 server metadata."""
@@ -513,6 +513,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
def __init__(self, core, datastore, watch_clients=True):
Bcfg2.Server.Plugin.Metadata.__init__(self)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.ClientRunHooks.__init__(self)
Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
self.watch_clients = watch_clients
@@ -768,7 +769,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
return self._remove_xdata(self.groups_xml, "Bundle", bundle_name)
def remove_client(self, client_name):
- """Remove a bundle."""
+ """Remove a client."""
if self._use_db:
try:
client = MetadataClientModel.objects.get(hostname=client_name)
@@ -953,13 +954,16 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.groups[gname]
self.states['groups.xml'] = True
+ def expire_cache(self, key=None):
+ self.core.metadata_cache.expire(key)
+
def HandleEvent(self, event):
"""Handle update events for data files."""
for handles, event_handler in self.handlers.items():
if handles(event):
# clear the entire cache when we get an event for any
# metadata file
- self.core.metadata_cache.expire()
+ self.expire_cache()
event_handler(event)
if False not in list(self.states.values()) and self.debug_flag:
@@ -997,17 +1001,21 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.logger.error(msg)
raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
- profiles = [g for g in self.clientgroups[client]
- if g in self.groups and self.groups[g].is_profile]
- self.logger.info("Changing %s profile from %s to %s" %
- (client, profiles, profile))
- self.update_client(client, dict(profile=profile))
- if client in self.clientgroups:
- for prof in profiles:
- self.clientgroups[client].remove(prof)
- self.clientgroups[client].append(profile)
+ metadata = self.core.build_metadata(client)
+ if metadata.profile != profile:
+ self.logger.info("Changing %s profile from %s to %s" %
+ (client, metadata.profile, profile))
+ self.update_client(client, dict(profile=profile))
+ if client in self.clientgroups:
+ if metadata.profile in self.clientgroups[client]:
+ self.clientgroups[client].remove(metadata.profile)
+ self.clientgroups[client].append(profile)
+ else:
+ self.clientgroups[client] = [profile]
else:
- self.clientgroups[client] = [profile]
+ self.logger.debug(
+ "Ignoring %s request to change profile from %s to %s"
+ % (client, metadata.profile, profile))
else:
self.logger.info("Creating new client: %s, profile %s" %
(client, profile))
@@ -1023,8 +1031,8 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.add_client(client, dict(profile=profile))
self.clients.append(client)
self.clientgroups[client] = [profile]
- if not self._use_db:
- self.clients_xml.write()
+ if not self._use_db:
+ self.clients_xml.write()
def set_version(self, client, version):
"""Set version for provided client."""
@@ -1074,7 +1082,8 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
raise Bcfg2.Server.Plugin.MetadataConsistencyError(err)
return self.addresses[address][0]
try:
- cname = socket.gethostbyaddr(address)[0].lower()
+ cname = socket.getnameinfo(addresspair,
+ socket.NI_NAMEREQD)[0].lower()
if cname in self.aliases:
return self.aliases[cname]
return cname
diff --git a/src/lib/Bcfg2/Server/Plugins/NagiosGen.py b/src/lib/Bcfg2/Server/Plugins/NagiosGen.py
index 9603cd518..dcd495d77 100644
--- a/src/lib/Bcfg2/Server/Plugins/NagiosGen.py
+++ b/src/lib/Bcfg2/Server/Plugins/NagiosGen.py
@@ -39,8 +39,8 @@ class NagiosGen(Plugin, Generator):
def createhostconfig(self, entry, metadata):
"""Build host specific configuration file."""
try:
- host_address = socket.gethostbyname(metadata.hostname)
- except socket.gaierror:
+ host_address = socket.getaddrinfo(metadata.hostname, None)[0][4][0]
+ except socket.error:
self.logger.error()
raise PluginExecutionError("Failed to find IP address for %s" %
metadata.hostname)
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Collection.py b/src/lib/Bcfg2/Server/Plugins/Packages/Collection.py
index 1ff097471..0df8624f6 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/Collection.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/Collection.py
@@ -579,6 +579,10 @@ class Collection(list, Debuggable):
self.filter_unknown(unknown)
return packages, unknown
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__,
+ list.__repr__(self))
+
def get_collection_class(source_type):
""" Given a source type, determine the class of Collection object
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
index 1a56d77c4..1af046ec0 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
@@ -79,13 +79,12 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile):
:type event: Bcfg2.Server.FileMonitor.Event
:returns: None
"""
- Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event)
if event and event.filename != self.name:
for fpath in self.extras:
if fpath == os.path.abspath(event.filename):
self.parsed.add(fpath)
break
-
+ Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event)
if self.loaded:
self.logger.info("Reloading Packages plugin")
self.pkg_obj.Reload()
@@ -102,10 +101,11 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile):
def Index(self):
Bcfg2.Server.Plugin.StructFile.Index(self)
self.entries = []
- for xsource in self.xdata.findall('.//Source'):
- source = self.source_from_xml(xsource)
- if source is not None:
- self.entries.append(source)
+ if self.loaded:
+ for xsource in self.xdata.findall('.//Source'):
+ source = self.source_from_xml(xsource)
+ if source is not None:
+ self.entries.append(source)
Index.__doc__ = Bcfg2.Server.Plugin.StructFile.Index.__doc__ + """
``Index`` is responsible for calling :func:`source_from_xml`
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
index 4bbcc59f7..0d49473c6 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
@@ -53,14 +53,15 @@ The Yum Backend
import os
import re
import sys
+import time
import copy
import errno
import socket
import logging
import lxml.etree
-import Bcfg2.Options
import Bcfg2.Server.Plugin
import Bcfg2.Server.FileMonitor
+from lockfile import FileLock
from Bcfg2.Utils import Executor
# pylint: disable=W0622
from Bcfg2.Compat import StringIO, cPickle, HTTPError, URLError, \
@@ -274,6 +275,8 @@ class YumCollection(Collection):
.. private-include: _add_gpg_instances, _get_pulp_consumer
"""
+ _helper = None
+
#: Options that are included in the [packages:yum] section of the
#: config but that should not be included in the temporary
#: yum.conf we write out
@@ -287,19 +290,25 @@ class YumCollection(Collection):
debug=debug)
self.keypath = os.path.join(self.cachepath, "keys")
- self._helper = None
+ #: A :class:`Bcfg2.Utils.Executor` object to use to run
+ #: external commands
+ self.cmd = Executor()
+
if self.use_yum:
#: Define a unique cache file for this collection to use
#: for cached yum metadata
self.cachefile = os.path.join(self.cachepath,
"cache-%s" % self.cachekey)
- if not os.path.exists(self.cachefile):
- os.mkdir(self.cachefile)
#: The path to the server-side config file used when
#: resolving packages with the Python yum libraries
self.cfgfile = os.path.join(self.cachefile, "yum.conf")
- self.write_config()
+
+ if not os.path.exists(self.cachefile):
+ self.debug_log("Creating common cache %s" % self.cachefile)
+ os.mkdir(self.cachefile)
+ if not self.disableMetaData:
+ self.setup_data()
self.cmd = Executor()
else:
self.cachefile = None
@@ -322,7 +331,27 @@ class YumCollection(Collection):
self.logger.error("Could not create Pulp consumer "
"cert directory at %s: %s" %
(certdir, err))
- self.pulp_cert_set = PulpCertificateSet(certdir)
+ self.__class__.pulp_cert_set = PulpCertificateSet(certdir)
+
+ @property
+ def disableMetaData(self):
+ """ Report whether or not metadata processing is enabled.
+ This duplicates code in Packages/__init__.py, and can probably
+ be removed in Bcfg2 1.4 when we have a module-level setup
+ object. """
+ if self.setup is None:
+ return True
+ try:
+ return not self.setup.cfp.getboolean("packages", "resolver")
+ except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
+ return False
+ except ValueError:
+ # for historical reasons we also accept "enabled" and
+ # "disabled"
+ return self.setup.cfp.get(
+ "packages",
+ "metadata",
+ default="enabled").lower() == "disabled"
@property
def __package_groups__(self):
@@ -337,15 +366,17 @@ class YumCollection(Collection):
forking, but apparently not); finally we check in /usr/sbin,
the default location. """
if not self._helper:
- self._helper = Bcfg2.Options.setup.yum_helper
- if not self._helper:
+ # pylint: disable=W0212
+ self.__class__._helper = Bcfg2.Options.setup.yum_helper
+ if not self.__class__._helper:
# first see if bcfg2-yum-helper is in PATH
try:
self.debug_log("Checking for bcfg2-yum-helper in $PATH")
self.cmd.run(['bcfg2-yum-helper'])
- self._helper = 'bcfg2-yum-helper'
+ self.__class__._helper = 'bcfg2-yum-helper'
except OSError:
- self._helper = "/usr/sbin/bcfg2-yum-helper"
+ self.__class__._helper = "/usr/sbin/bcfg2-yum-helper"
+ # pylint: enable=W0212
return self._helper
@property
@@ -382,6 +413,7 @@ class YumCollection(Collection):
# the rpmdb is so hopelessly intertwined with yum that we
# have to totally reinvent the dependency resolver.
mainopts = dict(cachedir='/',
+ persistdir='/',
installroot=self.cachefile,
keepcache="0",
debuglevel="0",
@@ -846,6 +878,17 @@ class YumCollection(Collection):
if not self.use_yum:
return Collection.complete(self, packagelist)
+ lock = FileLock(os.path.join(self.cachefile, "lock"))
+ slept = 0
+ while lock.is_locked():
+ if slept > 30:
+ self.logger.warning("Packages: Timeout waiting for yum cache "
+ "to release its lock")
+ return set(), set()
+ self.logger.debug("Packages: Yum cache is locked, waiting...")
+ time.sleep(3)
+ slept += 3
+
if packagelist:
try:
result = self.call_helper(
@@ -890,28 +933,30 @@ class YumCollection(Collection):
cmd.append("-d")
cmd.append(command)
self.debug_log("Packages: running %s" % " ".join(cmd))
+
if inputdata:
- result = self.cmd.run(cmd, inputdata=json.dumps(inputdata))
+ result = self.cmd.run(cmd, timeout=self.setup['client_timeout'],
+ inputdata=json.dumps(inputdata))
else:
- result = self.cmd.run(cmd)
+ result = self.cmd.run(cmd, timeout=self.setup['client_timeout'])
if not result.success:
- errlines = result.error.splitlines()
self.logger.error("Packages: error running bcfg2-yum-helper: %s" %
- errlines[0])
- for line in errlines[1:]:
- self.logger.error("Packages: %s" % line)
+ result.error)
elif result.stderr:
- errlines = result.stderr.splitlines()
self.debug_log("Packages: debug info from bcfg2-yum-helper: %s" %
- errlines[0])
- for line in errlines[1:]:
- self.debug_log("Packages: %s" % line)
+ result.stderr)
+
try:
return json.loads(result.stdout)
except ValueError:
- err = sys.exc_info()[1]
- self.logger.error("Packages: error reading bcfg2-yum-helper "
- "output: %s" % err)
+ if result.stdout:
+ err = sys.exc_info()[1]
+ self.logger.error("Packages: Error reading bcfg2-yum-helper "
+ "output: %s" % err)
+ self.logger.error("Packages: bcfg2-yum-helper output: %s" %
+ result.stdout)
+ else:
+ self.logger.error("Packages: No bcfg2-yum-helper output")
raise
def setup_data(self, force_update=False):
@@ -924,8 +969,7 @@ class YumCollection(Collection):
If using the yum Python libraries, this cleans up cached yum
metadata, regenerates the server-side yum config (in order to
catch any new sources that have been added to this server),
- and then cleans up cached yum metadata again, in case the new
- config has any preexisting cache.
+ then regenerates the yum cache.
:param force_update: Ignore all local cache and setup data
from its original upstream sources (i.e.,
@@ -936,23 +980,22 @@ class YumCollection(Collection):
return Collection.setup_data(self, force_update)
if force_update:
- # we call this twice: one to clean up data from the old
- # config, and once to clean up data from the new config
+ # clean up data from the old config
try:
self.call_helper("clean")
except ValueError:
# error reported by call_helper
pass
- os.unlink(self.cfgfile)
+ if os.path.exists(self.cfgfile):
+ os.unlink(self.cfgfile)
self.write_config()
- if force_update:
- try:
- self.call_helper("clean")
- except ValueError:
- # error reported by call_helper
- pass
+ try:
+ self.call_helper("makecache")
+ except ValueError:
+ # error reported by call_helper
+ pass
class YumSource(Source):
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/YumHelper.py b/src/lib/Bcfg2/Server/Plugins/Packages/YumHelper.py
index ee0203351..32db0b32d 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/YumHelper.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/YumHelper.py
@@ -10,6 +10,8 @@ import yum
import logging
import Bcfg2.Options
import Bcfg2.Logger
+from Bcfg2.Compat import wraps
+from lockfile import FileLock, LockTimeout
try:
import json
except ImportError:
@@ -41,8 +43,8 @@ def pkgtup_to_string(package):
return ''.join(str(e) for e in rv)
-class DepSolver(object):
- """ Yum dependency solver """
+class YumHelper(object):
+ """ Yum helper base object """
def __init__(self, cfgfile, verbose=1):
self.cfgfile = cfgfile
@@ -56,6 +58,16 @@ class DepSolver(object):
self.yumbase._getConfig(cfgfile, debuglevel=verbose)
# pylint: enable=E1121,W0212
self.logger = logging.getLogger(self.__class__.__name__)
+
+
+class DepSolver(YumHelper):
+ """ Yum dependency solver. This is used for operations that only
+ read from the yum cache, and thus operates in cacheonly mode. """
+
+ def __init__(self, cfgfile, verbose=1):
+ YumHelper.__init__(self, cfgfile, verbose=verbose)
+ # internally, yum uses an integer, not a boolean, for conf.cache
+ self.yumbase.conf.cache = 1
self._groups = None
def get_groups(self):
@@ -180,6 +192,45 @@ class DepSolver(object):
packages.add(txmbr.pkgtup)
return list(packages), list(unknown)
+
+def acquire_lock(func):
+ """ decorator for CacheManager methods that gets and release a
+ lock while the method runs """
+ @wraps(func)
+ def inner(self, *args, **kwargs):
+ """ Get and release a lock while running the function this
+ wraps. """
+ self.logger.debug("Acquiring lock at %s" % self.lockfile)
+ while not self.lock.i_am_locking():
+ try:
+ self.lock.acquire(timeout=60) # wait up to 60 seconds
+ except LockTimeout:
+ self.lock.break_lock()
+ self.lock.acquire()
+ try:
+ func(self, *args, **kwargs)
+ finally:
+ self.lock.release()
+ self.logger.debug("Released lock at %s" % self.lockfile)
+
+ return inner
+
+
+class CacheManager(YumHelper):
+ """ Yum cache manager. Unlike :class:`DepSolver`, this can write
+ to the yum cache, and so is used for operations that muck with the
+ cache. (Technically, :func:`CacheManager.clean_cache` could be in
+ either DepSolver or CacheManager, but for consistency I've put it
+ here.) """
+
+ def __init__(self, cfgfile, verbose=1):
+ YumHelper.__init__(self, cfgfile, verbose=verbose)
+ self.lockfile = \
+ os.path.join(os.path.dirname(self.yumbase.conf.config_file_path),
+ "lock")
+ self.lock = FileLock(self.lockfile)
+
+ @acquire_lock
def clean_cache(self):
""" clean the yum cache """
for mdtype in ["Headers", "Packages", "Sqlite", "Metadata",
@@ -192,6 +243,27 @@ class DepSolver(object):
if not msg.startswith("0 "):
self.logger.info(msg)
+ @acquire_lock
+ def populate_cache(self):
+ """ populate the yum cache """
+ for repo in self.yumbase.repos.findRepos('*'):
+ repo.metadata_expire = 0
+ repo.mdpolicy = "group:all"
+ self.yumbase.doRepoSetup()
+ self.yumbase.repos.doSetup()
+ for repo in self.yumbase.repos.listEnabled():
+ # this populates the cache as a side effect
+ repo.repoXML # pylint: disable=W0104
+ try:
+ repo.getGroups()
+ except yum.Errors.RepoMDError:
+ pass # this repo has no groups
+ self.yumbase.repos.populateSack(mdtype='metadata', cacheonly=1)
+ self.yumbase.repos.populateSack(mdtype='filelists', cacheonly=1)
+ self.yumbase.repos.populateSack(mdtype='otherdata', cacheonly=1)
+ # this does something with the groups cache as a side effect
+ self.yumbase.comps # pylint: disable=W0104
+
class HelperSubcommand(Bcfg2.Options.Subcommand):
# the value to JSON encode and print out if the command fails
@@ -207,8 +279,6 @@ class HelperSubcommand(Bcfg2.Options.Subcommand):
self.verbosity = 5
elif Bcfg2.Options.setup.verbose:
self.verbosity = 1
- self.depsolver = DepSolver(Bcfg2.Options.setup.yum_config,
- self.verbosity)
def run(self, setup):
try:
@@ -233,16 +303,36 @@ class HelperSubcommand(Bcfg2.Options.Subcommand):
raise NotImplementedError
-class Clean(HelperSubcommand):
+class DepSolverSubcommand(HelperSubcommand):
+ def __init__(self):
+ HelperSubcommand.__init__(self)
+ self.depsolver = DepSolver(Bcfg2.Options.setup.yum_config,
+ self.verbosity)
+
+
+class CacheManagerSubcommand(HelperSubcommand):
fallback = False
accept_input = False
+ def __init__(self):
+ HelperSubcommand.__init__(self)
+ self.cachemgr = CacheManager(Bcfg2.Options.setup.yum_config,
+ self.verbosity)
+
+
+class Clean(CacheManagerSubcommand):
+ def _run(self, setup, data): # pylint: disable=W0613
+ self.cachemgr.clean_cache()
+ return True
+
+
+class MakeCache(CacheManagerSubcommand):
def _run(self, setup, data): # pylint: disable=W0613
- self.depsolver.clean_cache()
+ self.cachemgr.populate_cache()
return True
-class Complete(HelperSubcommand):
+class Complete(DepSolverSubcommand):
fallback = dict(packages=[], unknown=[])
def _run(self, _, data):
@@ -253,7 +343,7 @@ class Complete(HelperSubcommand):
return dict(packages=list(packages), unknown=list(unknown))
-class GetGroups(HelperSubcommand):
+class GetGroups(DepSolverSubcommand):
def _run(self, _, data):
rv = dict()
for gdata in data:
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
index 5b7c76765..e6240f39a 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
@@ -9,7 +9,7 @@ import shutil
import lxml.etree
import Bcfg2.Options
import Bcfg2.Server.Plugin
-from Bcfg2.Compat import urlopen, HTTPError, URLError
+from Bcfg2.Compat import urlopen, HTTPError, URLError, MutableMapping
from Bcfg2.Server.Plugins.Packages.Collection import Collection, \
get_collection_class
from Bcfg2.Server.Plugins.Packages.PackagesSources import PackagesSources
@@ -33,7 +33,54 @@ class PackagesBackendAction(Bcfg2.Options.ComponentAction):
module = True
+class OnDemandDict(MutableMapping):
+ """ This maps a set of keys to a set of value-getting functions;
+ the values are populated on-the-fly by the functions as the values
+ are needed (and not before). This is used by
+ :func:`Bcfg2.Server.Plugins.Packages.Packages.get_additional_data`;
+ see the docstring for that function for details on why.
+
+ Unlike a dict, you should not specify values for for the righthand
+ side of this mapping, but functions that get values. E.g.:
+
+ .. code-block:: python
+
+ d = OnDemandDict(foo=load_foo,
+ bar=lambda: "bar");
+ """
+
+ def __init__(self, **getters):
+ self._values = dict()
+ self._getters = dict(**getters)
+
+ def __getitem__(self, key):
+ if key not in self._values:
+ self._values[key] = self._getters[key]()
+ return self._values[key]
+
+ def __setitem__(self, key, getter):
+ self._getters[key] = getter
+
+ def __delitem__(self, key):
+ del self._values[key]
+ del self._getters[key]
+
+ def __len__(self):
+ return len(self._getters)
+
+ def __iter__(self):
+ return iter(self._getters.keys())
+
+ def __repr__(self):
+ rv = dict(self._values)
+ for key in self._getters.keys():
+ if key not in rv:
+ rv[key] = 'unknown'
+ return str(rv)
+
+
class Packages(Bcfg2.Server.Plugin.Plugin,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.StructureValidator,
Bcfg2.Server.Plugin.Generator,
Bcfg2.Server.Plugin.Connector,
@@ -87,8 +134,12 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
#: and :func:`Reload`
__rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Refresh', 'Reload']
+ __child_rmi__ = Bcfg2.Server.Plugin.Plugin.__child_rmi__ + \
+ [('Refresh', 'expire_cache'), ('Reload', 'expire_cache')]
+
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.StructureValidator.__init__(self)
Bcfg2.Server.Plugin.Generator.__init__(self)
Bcfg2.Server.Plugin.Connector.__init__(self)
@@ -141,8 +192,21 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
#: object when one is requested, so each entry is very
#: short-lived -- it's purged at the end of each client run.
self.clients = dict()
- # pylint: enable=C0301
+ #: groupcache caches group lookups. It maps Collections (via
+ #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`)
+ #: to sets of package groups, and thence to the packages
+ #: indicated by those groups.
+ self.groupcache = dict()
+
+ #: pkgcache caches complete package sets. It maps Collections
+ #: (via
+ #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`)
+ #: to sets of initial packages, and thence to the final
+ #: (complete) package selections resolved from the initial
+ #: packages
+ self.pkgcache = dict()
+ # pylint: enable=C0301
__init__.__doc__ = Bcfg2.Server.Plugin.Plugin.__init__.__doc__
def set_debug(self, debug):
@@ -349,14 +413,24 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
for el in to_remove:
el.getparent().remove(el)
- gpkgs = collection.get_groups(groups)
- for pkgs in gpkgs.values():
+ groups.sort()
+ # check for this set of groups in the group cache
+ gkey = hash(tuple(groups))
+ if gkey not in self.groupcache[collection.cachekey]:
+ self.groupcache[collection.cachekey][gkey] = \
+ collection.get_groups(groups)
+ for pkgs in self.groupcache[collection.cachekey][gkey].values():
base.update(pkgs)
# essential pkgs are those marked as such by the distribution
base.update(collection.get_essential())
- packages, unknown = collection.complete(base)
+ # check for this set of packages in the package cache
+ pkey = hash(tuple(base))
+ if pkey not in self.pkgcache[collection.cachekey]:
+ self.pkgcache[collection.cachekey][pkey] = \
+ collection.complete(base)
+ packages, unknown = self.pkgcache[collection.cachekey][pkey]
if unknown:
self.logger.info("Packages: Got %d unknown entries" % len(unknown))
self.logger.info("Packages: %s" % list(unknown))
@@ -382,6 +456,9 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
self._load_config()
return True
+ def expire_cache(self, _=None):
+ self.Reload()
+
def _load_config(self, force_update=False):
"""
Load the configuration data and setup sources
@@ -409,9 +486,11 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
if not self.disableMetaData:
collection.setup_data(force_update)
- # clear Collection caches
+ # clear Collection and package caches
self.clients = dict()
self.collections = dict()
+ self.groupcache = dict()
+ self.pkgcache = dict()
for source in self.sources.entries:
cachefiles.add(source.cachefile)
@@ -503,7 +582,8 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
if len(sclasses) > 1:
self.logger.warning("Packages: Multiple source types found for "
"%s: %s" %
- ",".join([s.__name__ for s in sclasses]))
+ (metadata.hostname,
+ ",".join([s.__name__ for s in sclasses])))
cclass = Collection
elif len(sclasses) == 0:
self.logger.error("Packages: No sources found for %s" %
@@ -523,24 +603,47 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
if cclass != Collection:
self.clients[metadata.hostname] = ckey
self.collections[ckey] = collection
+ self.groupcache.setdefault(ckey, dict())
+ self.pkgcache.setdefault(ckey, dict())
return collection
def get_additional_data(self, metadata):
""" Return additional data for the given client. This will be
- a dict containing a single key, ``sources``, whose value is a
- list of data returned from
- :func:`Bcfg2.Server.Plugins.Packages.Collection.Collection.get_additional_data`,
- namely, a list of
- :attr:`Bcfg2.Server.Plugins.Packages.Source.Source.url_map`
- data.
+ an :class:`Bcfg2.Server.Plugins.Packages.OnDemandDict`
+ containing two keys:
+
+ * ``sources``, whose value is a list of data returned from
+ :func:`Bcfg2.Server.Plugins.Packages.Collection.Collection.get_additional_data`,
+ namely, a list of
+ :attr:`Bcfg2.Server.Plugins.Packages.Source.Source.url_map`
+ data; and
+ * ``get_config``, whose value is the
+ :func:`Bcfg2.Server.Plugins.Packages.Packages.get_config`
+ function, which can be used to get the Packages config for
+ other systems.
+
+ This uses an OnDemandDict instead of just a normal dict
+ because loading a source collection can be a fairly
+ time-consuming process, particularly for the first time. As a
+ result, when all metadata objects are built at once (such as
+ after the server is restarted, or far more frequently if
+ Metadata caching is disabled), this function would be a major
+ bottleneck if we tried to build all collections at the same
+ time. Instead, they're merely built on-demand.
:param metadata: The client metadata
:type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
:return: dict of lists of ``url_map`` data
"""
- collection = self.get_collection(metadata)
- return dict(sources=collection.get_additional_data(),
- get_config=self.get_config)
+ def get_sources():
+ """ getter for the 'sources' key of the OnDemandDict
+ returned by this function. This delays calling
+ get_collection() until it's absolutely necessary. """
+ return self.get_collection(metadata).get_additional_data
+
+ return OnDemandDict(
+ sources=get_sources,
+ get_config=lambda: self.get_config)
def end_client_run(self, metadata):
""" Hook to clear the cache for this client in
diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py
index 9b485e29b..0d264a5a6 100644
--- a/src/lib/Bcfg2/Server/Plugins/Probes.py
+++ b/src/lib/Bcfg2/Server/Plugins/Probes.py
@@ -195,14 +195,16 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet):
class Probes(Bcfg2.Server.Plugin.Probing,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.Connector,
Bcfg2.Server.Plugin.DatabaseBacked):
""" A plugin to gather information from a client machine """
__author__ = 'bcfg-dev@mcs.anl.gov'
def __init__(self, core, datastore):
- Bcfg2.Server.Plugin.Connector.__init__(self)
Bcfg2.Server.Plugin.Probing.__init__(self)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
+ Bcfg2.Server.Plugin.Connector.__init__(self)
Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
try:
@@ -262,7 +264,7 @@ class Probes(Bcfg2.Server.Plugin.Probing,
ProbesDataModel.objects.filter(
hostname=client.hostname).exclude(
- probe__in=self.probedata[client.hostname]).delete()
+ probe__in=self.probedata[client.hostname]).delete()
for group in self.cgroups[client.hostname]:
try:
@@ -277,14 +279,19 @@ class Probes(Bcfg2.Server.Plugin.Probing,
group=group)
ProbesGroupsModel.objects.filter(
hostname=client.hostname).exclude(
- group__in=self.cgroups[client.hostname]).delete()
+ group__in=self.cgroups[client.hostname]).delete()
+
+ def expire_cache(self, key=None):
+ self.load_data(client=key)
- def load_data(self):
+ def load_data(self, client=None):
""" Load probe data from the appropriate backend (probed.xml
or the database) """
if self._use_db:
- return self._load_data_db()
+ return self._load_data_db(client=client)
else:
+ # the XML backend doesn't support loading data for single
+ # clients, so it reloads all data
return self._load_data_xml()
def _load_data_xml(self):
@@ -309,20 +316,36 @@ class Probes(Bcfg2.Server.Plugin.Probing,
elif pdata.tag == 'Group':
self.cgroups[client.get('name')].append(pdata.get('name'))
- def _load_data_db(self):
+ if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
+ self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
+
+ def _load_data_db(self, client=None):
""" Load probe data from the database """
- self.probedata = {}
- self.cgroups = {}
- for pdata in ProbesDataModel.objects.all():
+ if client is None:
+ self.probedata = {}
+ self.cgroups = {}
+ probedata = ProbesDataModel.objects.all()
+ groupdata = ProbesGroupsModel.objects.all()
+ else:
+ self.probedata.pop(client, None)
+ self.cgroups.pop(client, None)
+ probedata = ProbesDataModel.objects.filter(hostname=client)
+ groupdata = ProbesGroupsModel.objects.filter(hostname=client)
+
+ for pdata in probedata:
if pdata.hostname not in self.probedata:
self.probedata[pdata.hostname] = ClientProbeDataSet(
timestamp=time.mktime(pdata.timestamp.timetuple()))
self.probedata[pdata.hostname][pdata.probe] = ProbeData(pdata.data)
- for pgroup in ProbesGroupsModel.objects.all():
+ for pgroup in groupdata:
if pgroup.hostname not in self.cgroups:
self.cgroups[pgroup.hostname] = []
self.cgroups[pgroup.hostname].append(pgroup.group)
+ if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
+ self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata,
+ key=client)
+
@track_statistics()
def GetProbes(self, meta):
return self.probes.get_probe_data(meta)
diff --git a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py
index 3b367573b..a02f012a0 100644
--- a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py
+++ b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py
@@ -117,7 +117,7 @@ class PuppetENC(Bcfg2.Server.Plugin.Plugin,
self.logger.warning("PuppetENC is incompatible with aggressive "
"client metadata caching, try 'cautious' or "
"'initial' instead")
- self.core.cache.expire()
+ self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
def end_statistics(self, metadata):
self.end_client_run(self, metadata)
diff --git a/src/lib/Bcfg2/Server/Plugins/SSHbase.py b/src/lib/Bcfg2/Server/Plugins/SSHbase.py
index 186d61c6e..c858b881b 100644
--- a/src/lib/Bcfg2/Server/Plugins/SSHbase.py
+++ b/src/lib/Bcfg2/Server/Plugins/SSHbase.py
@@ -93,6 +93,7 @@ class KnownHostsEntrySet(Bcfg2.Server.Plugin.EntrySet):
class SSHbase(Bcfg2.Server.Plugin.Plugin,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.Generator,
Bcfg2.Server.Plugin.PullTarget):
"""
@@ -126,6 +127,7 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin,
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.Generator.__init__(self)
Bcfg2.Server.Plugin.PullTarget.__init__(self)
self.ipcache = {}
@@ -150,9 +152,11 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin,
self.entries["/etc/ssh/" + keypattern] = \
HostKeyEntrySet(keypattern, self.data)
self.Entries['Path']["/etc/ssh/" + keypattern] = self.build_hk
-
self.cmd = Executor()
+ def expire_cache(self, key=None):
+ self.__skn = False
+
def get_skn(self):
"""Build memory cache of the ssh known hosts file."""
if not self.__skn:
diff --git a/src/lib/Bcfg2/Server/Test.py b/src/lib/Bcfg2/Server/Test.py
index 72d64b828..912a8f19c 100644
--- a/src/lib/Bcfg2/Server/Test.py
+++ b/src/lib/Bcfg2/Server/Test.py
@@ -197,7 +197,7 @@ class CLI(object):
""" Get a server core, with events handled """
core = Bcfg2.Server.Core.Core()
core.load_plugins()
- core.fam.handle_events_in_interval(0.1)
+ core.block_for_fam_events(handle_events=True)
signal.signal(signal.SIGINT, get_sigint_handler(core))
return core
@@ -264,8 +264,9 @@ class CLI(object):
for client in clients:
yield ClientTest(core, client, ignore)
- TestProgram(argv=sys.argv[:1] + Bcfg2.Options.setup.nose_options,
- suite=LazySuite(generate_tests), exit=False)
+ result = TestProgram(
+ argv=sys.argv[:1] + Bcfg2.Options.setup.nose_options,
+ suite=LazySuite(generate_tests), exit=False)
# block until all children have completed -- should be
# immediate since we've already gotten all the results we
@@ -274,3 +275,7 @@ class CLI(object):
child.join()
core.shutdown()
+ if result.success:
+ os._exit(0) # pylint: disable=W0212
+ else:
+ os._exit(1) # pylint: disable=W0212
diff --git a/src/lib/Bcfg2/Utils.py b/src/lib/Bcfg2/Utils.py
index 5d8204460..ccb79249e 100644
--- a/src/lib/Bcfg2/Utils.py
+++ b/src/lib/Bcfg2/Utils.py
@@ -5,12 +5,11 @@ else. """
import os
import re
import sys
-import shlex
import fcntl
import select
import logging
-import threading
import subprocess
+import threading
from Bcfg2.Compat import input, any # pylint: disable=W0622
@@ -219,7 +218,6 @@ class Executor(object):
"""
if isinstance(command, str):
cmdstr = command
- command = shlex.split(cmdstr)
else:
cmdstr = " ".join(command)
self.logger.debug("Running: %s" % cmdstr)
@@ -241,9 +239,9 @@ class Executor(object):
# py3k fixes
if not isinstance(stdout, str):
- stdout = stdout.decode('utf-8')
+ stdout = stdout.decode('utf-8') # pylint: disable=E1103
if not isinstance(stderr, str):
- stderr = stderr.decode('utf-8')
+ stderr = stderr.decode('utf-8') # pylint: disable=E1103
for line in stdout.splitlines(): # pylint: disable=E1103
self.logger.debug('< %s' % line)
diff --git a/src/lib/Bcfg2/settings.py b/src/lib/Bcfg2/settings.py
index a26330a79..42d415232 100644
--- a/src/lib/Bcfg2/settings.py
+++ b/src/lib/Bcfg2/settings.py
@@ -18,14 +18,6 @@ except ImportError:
DATABASES = dict(default=dict())
-# Django < 1.2 compat
-DATABASE_ENGINE = None
-DATABASE_NAME = None
-DATABASE_USER = None
-DATABASE_PASSWORD = None
-DATABASE_HOST = None
-DATABASE_PORT = None
-
TIME_ZONE = None
TEMPLATE_DEBUG = DEBUG = False
@@ -128,7 +120,9 @@ def read_config():
USER=Bcfg2.Options.setup.db_user,
PASSWORD=Bcfg2.Options.setup.db_password,
HOST=Bcfg2.Options.setup.db_host,
- PORT=Bcfg2.Options.setup.db_port)
+ PORT=Bcfg2.Options.setup.db_port,
+ OPTIONS=Bcfg2.Options.setup.db_opts,
+ SCHEMA=Bcfg2.Options.setup.db_schema)
TIME_ZONE = Bcfg2.Options.setup.timezone
@@ -142,6 +136,8 @@ def read_config():
class _OptionContainer(object):
+ """ Container for options loaded at import-time to configure
+ databases """
options = [
Bcfg2.Options.Common.repository,
Bcfg2.Options.PathOption(
@@ -165,6 +161,12 @@ class _OptionContainer(object):
Bcfg2.Options.Option(
cf=('database', 'port'), help='Database port', dest='db_port'),
Bcfg2.Options.Option(
+ cf=('database', 'schema'), help='Database schema',
+ dest='db_schema'),
+ Bcfg2.Options.Option(
+ cf=('database', 'options'), help='Database options',
+ dest='db_opts', type=Bcfg2.Options.Types.comma_dict),
+ Bcfg2.Options.Option(
cf=('reporting', 'timezone'), help='Django timezone'),
Bcfg2.Options.BooleanOption(
cf=('reporting', 'web_debug'), help='Django debug'),
diff --git a/src/lib/Bcfg2/version.py b/src/lib/Bcfg2/version.py
index 12fc584fe..140fb6937 100644
--- a/src/lib/Bcfg2/version.py
+++ b/src/lib/Bcfg2/version.py
@@ -2,7 +2,7 @@
import re
-__version__ = "1.3.1"
+__version__ = "1.3.2"
class Bcfg2VersionInfo(tuple): # pylint: disable=E0012,R0924