summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib/Bcfg2/Client/Frame.py9
-rw-r--r--src/lib/Bcfg2/Client/Tools/Chkconfig.py6
-rw-r--r--src/lib/Bcfg2/Client/Tools/DebInit.py2
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/File.py17
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/base.py25
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIXUsers.py9
-rw-r--r--src/lib/Bcfg2/Client/Tools/RcUpdate.py2
-rw-r--r--src/lib/Bcfg2/Client/Tools/__init__.py5
-rw-r--r--src/lib/Bcfg2/Client/__init__.py3
-rw-r--r--src/lib/Bcfg2/Options.py40
-rw-r--r--src/lib/Bcfg2/Proxy.py1
-rw-r--r--src/lib/Bcfg2/Reporting/Collector.py43
-rw-r--r--src/lib/Bcfg2/Reporting/models.py2
-rw-r--r--src/lib/Bcfg2/Reporting/templates/base.html2
-rw-r--r--src/lib/Bcfg2/SSLServer.py32
-rw-r--r--src/lib/Bcfg2/Server/Admin/Compare.py1
-rw-r--r--src/lib/Bcfg2/Server/Admin/Init.py11
-rw-r--r--src/lib/Bcfg2/Server/Admin/Snapshots.py1
-rw-r--r--src/lib/Bcfg2/Server/Admin/Viz.py1
-rw-r--r--src/lib/Bcfg2/Server/BuiltinCore.py4
-rw-r--r--src/lib/Bcfg2/Server/Core.py142
-rw-r--r--src/lib/Bcfg2/Server/Lint/Validate.py1
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py452
-rw-r--r--src/lib/Bcfg2/Server/Plugin/base.py23
-rw-r--r--src/lib/Bcfg2/Server/Plugin/helpers.py26
-rw-r--r--src/lib/Bcfg2/Server/Plugin/interfaces.py43
-rw-r--r--src/lib/Bcfg2/Server/Plugins/AWSTags.py217
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Bundler.py9
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgAuthorizedKeysGenerator.py23
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py18
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py53
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py52
-rw-r--r--src/lib/Bcfg2/Server/Plugins/GroupLogic.py33
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Guppy.py1
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Metadata.py357
-rw-r--r--src/lib/Bcfg2/Server/Plugins/NagiosGen.py6
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/Collection.py4
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py12
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/Yum.py138
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/__init__.py141
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Probes.py61
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Properties.py2
-rw-r--r--src/lib/Bcfg2/Server/Plugins/PuppetENC.py2
-rw-r--r--src/lib/Bcfg2/Server/Plugins/SSHbase.py5
-rw-r--r--src/lib/Bcfg2/Server/models.py2
-rw-r--r--src/lib/Bcfg2/Utils.py4
-rw-r--r--src/lib/Bcfg2/settings.py12
-rw-r--r--src/lib/Bcfg2/version.py2
-rwxr-xr-xsrc/sbin/bcfg2-crypt532
-rwxr-xr-xsrc/sbin/bcfg2-info18
-rwxr-xr-xsrc/sbin/bcfg2-lint2
-rwxr-xr-xsrc/sbin/bcfg2-test11
-rwxr-xr-xsrc/sbin/bcfg2-yum-helper92
53 files changed, 1918 insertions, 794 deletions
diff --git a/src/lib/Bcfg2/Client/Frame.py b/src/lib/Bcfg2/Client/Frame.py
index 3254da9e9..ad718749e 100644
--- a/src/lib/Bcfg2/Client/Frame.py
+++ b/src/lib/Bcfg2/Client/Frame.py
@@ -1,6 +1,7 @@
""" Frame is the Client Framework that verifies and installs entries,
and generates statistics. """
+import copy
import time
import fnmatch
import logging
@@ -328,11 +329,13 @@ class Frame(object):
if bundle.tag != 'Bundle':
continue
bmodified = len([item for item in bundle
- if item in self.whitelist])
+ if item in self.whitelist or
+ item in self.modified])
actions = [a for a in bundle.findall('./Action')
if (a.get('timing') != 'post' and
(bmodified or a.get('when') == 'always'))]
- # now we process all "always actions"
+ # now we process all "pre" and "both" actions that are either
+ # always or the bundle has been modified
if self.setup['interactive']:
self.promptFilter(iprompt, actions)
self.DispatchInstallCalls(actions)
@@ -522,7 +525,7 @@ class Frame(object):
container = Bcfg2.Client.XML.SubElement(stats, ename)
for item in data:
item.set('qtext', '')
- container.append(item)
+ container.append(copy.deepcopy(item))
item.text = None
timeinfo = Bcfg2.Client.XML.Element("OpStamps")
diff --git a/src/lib/Bcfg2/Client/Tools/Chkconfig.py b/src/lib/Bcfg2/Client/Tools/Chkconfig.py
index ac874c94c..4833f3f68 100644
--- a/src/lib/Bcfg2/Client/Tools/Chkconfig.py
+++ b/src/lib/Bcfg2/Client/Tools/Chkconfig.py
@@ -89,7 +89,7 @@ class Chkconfig(Bcfg2.Client.Tools.SvcTool):
if bootstatus is not None:
if bootstatus == 'on':
# make sure service is enabled on boot
- bootcmd = '/sbin/chkconfig %s %s --level 0123456' % \
+ bootcmd = '/sbin/chkconfig %s %s' % \
(entry.get('name'), bootstatus)
elif bootstatus == 'off':
# make sure service is disabled on boot
@@ -116,8 +116,8 @@ class Chkconfig(Bcfg2.Client.Tools.SvcTool):
def FindExtra(self):
"""Locate extra chkconfig Services."""
allsrv = [line.split()[0]
- for line in self.cmd.run("/sbin/chkconfig",
- "--list").stdout.splitlines()
+ for line in
+ self.cmd.run("/sbin/chkconfig --list").stdout.splitlines()
if ":on" in line]
self.logger.debug('Found active services:')
self.logger.debug(allsrv)
diff --git a/src/lib/Bcfg2/Client/Tools/DebInit.py b/src/lib/Bcfg2/Client/Tools/DebInit.py
index 761c51db7..b544e44d4 100644
--- a/src/lib/Bcfg2/Client/Tools/DebInit.py
+++ b/src/lib/Bcfg2/Client/Tools/DebInit.py
@@ -108,7 +108,7 @@ class DebInit(Bcfg2.Client.Tools.SvcTool):
def InstallService(self, entry):
"""Install Service entry."""
self.logger.info("Installing Service %s" % (entry.get('name')))
- bootstatus = entry.get('bootstatus')
+ bootstatus = self.get_bootstatus(entry)
# check if init script exists
try:
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/File.py b/src/lib/Bcfg2/Client/Tools/POSIX/File.py
index 9f47fb53a..b1bde1057 100644
--- a/src/lib/Bcfg2/Client/Tools/POSIX/File.py
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/File.py
@@ -53,6 +53,10 @@ class POSIXFile(POSIXTool):
def verify(self, entry, modlist):
ondisk = self._exists(entry)
tempdata, is_binary = self._get_data(entry)
+ if isinstance(tempdata, str) and str != unicode:
+ tempdatasize = len(tempdata)
+ else:
+ tempdatasize = len(tempdata.encode(self.setup['encoding']))
different = False
content = None
@@ -61,7 +65,7 @@ class POSIXFile(POSIXTool):
# they're clearly different
different = True
content = ""
- elif len(tempdata) != ondisk[stat.ST_SIZE]:
+ elif tempdatasize != ondisk[stat.ST_SIZE]:
# next, see if the size of the target file is different
# from the size of the desired content
different = True
@@ -72,6 +76,9 @@ class POSIXFile(POSIXTool):
# for everything else
try:
content = open(entry.get('name')).read()
+ except UnicodeDecodeError:
+ content = open(entry.get('name'),
+ encoding=self.setup['encoding']).read()
except IOError:
self.logger.error("POSIX: Failed to read %s: %s" %
(entry.get("name"), sys.exc_info()[1]))
@@ -89,7 +96,7 @@ class POSIXFile(POSIXTool):
def _write_tmpfile(self, entry):
""" Write the file data to a temp file """
- filedata, _ = self._get_data(entry)
+ filedata = self._get_data(entry)[0]
# get a temp file to write to that is in the same directory as
# the existing file in order to preserve any permissions
# protections on that directory, and also to avoid issues with
@@ -105,7 +112,11 @@ class POSIXFile(POSIXTool):
(os.path.dirname(entry.get('name')), err))
return False
try:
- os.fdopen(newfd, 'w').write(filedata)
+ if isinstance(filedata, str) and str != unicode:
+ os.fdopen(newfd, 'w').write(filedata)
+ else:
+ os.fdopen(newfd, 'wb').write(
+ filedata.encode(self.setup['encoding']))
except (OSError, IOError):
err = sys.exc_info()[1]
self.logger.error("POSIX: Failed to open temp file %s for writing "
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/base.py b/src/lib/Bcfg2/Client/Tools/POSIX/base.py
index 16fe0acb5..85da3576b 100644
--- a/src/lib/Bcfg2/Client/Tools/POSIX/base.py
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/base.py
@@ -232,6 +232,11 @@ class POSIXTool(Bcfg2.Client.Tools.Tool):
else:
defacl = None
+ if not acls:
+ self.logger.debug("POSIX: Removed ACLs from %s" %
+ entry.get("name"))
+ return True
+
for aclkey, perms in acls.items():
atype, scope, qualifier = aclkey
if atype == "default":
@@ -525,7 +530,8 @@ class POSIXTool(Bcfg2.Client.Tools.Tool):
if entry.get("secontext") == "__default__":
try:
wanted_secontext = \
- selinux.matchpathcon(path, 0)[1].split(":")[2]
+ selinux.matchpathcon(
+ path, ondisk[stat.ST_MODE])[1].split(":")[2]
except OSError:
errors.append("%s has no default SELinux context" %
entry.get("name"))
@@ -686,7 +692,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool):
""" os.makedirs helpfully creates all parent directories for
us, but it sets permissions according to umask, which is
probably wrong. we need to find out which directories were
- created and set permissions on those
+ created and try to set permissions on those
(http://trac.mcs.anl.gov/projects/bcfg2/ticket/1125 and
http://trac.mcs.anl.gov/projects/bcfg2/ticket/1134) """
created = []
@@ -706,22 +712,17 @@ class POSIXTool(Bcfg2.Client.Tools.Tool):
(path, err))
rv = False
- # we need to make sure that we give +x to everyone who needs
- # it. E.g., if the file that's been distributed is 0600, we
- # can't make the parent directories 0600 also; that'd be
- # pretty useless. They need to be 0700.
+ # set auto-created directories to mode 755 and use best effort for
+ # permissions. If you need something else, you should specify it in
+ # your config.
tmpentry = copy.deepcopy(entry)
- newmode = int(entry.get('mode'), 8)
- for i in range(0, 3):
- if newmode & (6 * pow(8, i)):
- newmode |= 1 * pow(8, i)
- tmpentry.set('mode', oct_mode(newmode))
+ tmpentry.set('mode', '0755')
for acl in tmpentry.findall('ACL'):
acl.set('perms',
oct_mode(self._norm_acl_perms(acl.get('perms')) |
ACL_MAP['x']))
for cpath in created:
- rv &= self._set_perms(tmpentry, path=cpath)
+ self._set_perms(tmpentry, path=cpath)
return rv
diff --git a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py
index 8226392f9..bf23aca6b 100644
--- a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py
+++ b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py
@@ -189,14 +189,18 @@ class POSIXUsers(Bcfg2.Client.Tools.Tool):
else:
for attr, idx in self.attr_mapping[entry.tag].items():
val = str(self.existing[entry.tag][entry.get("name")][idx])
- entry.set("current_%s" % attr, val)
+ entry.set("current_%s" %
+ attr, val.decode(self.setup['encoding']))
if attr in ["uid", "gid"]:
if entry.get(attr) is None:
# no uid/gid specified, so we let the tool
# automatically determine one -- i.e., it always
# verifies
continue
- if val != entry.get(attr):
+ entval = entry.get(attr)
+ if not isinstance(entval, str):
+ entval = entval.encode('utf-8')
+ if val != entval:
errors.append("%s for %s %s is incorrect. Current %s is "
"%s, but should be %s" %
(attr.title(), entry.tag, entry.get("name"),
@@ -249,7 +253,6 @@ class POSIXUsers(Bcfg2.Client.Tools.Tool):
if entry.get('gid'):
cmd.extend(['-g', entry.get('gid')])
elif entry.tag == 'POSIXUser':
- cmd.append('-m')
if entry.get('uid'):
cmd.extend(['-u', entry.get('uid')])
cmd.extend(['-g', entry.get('group')])
diff --git a/src/lib/Bcfg2/Client/Tools/RcUpdate.py b/src/lib/Bcfg2/Client/Tools/RcUpdate.py
index 8e9626521..e0c913dcd 100644
--- a/src/lib/Bcfg2/Client/Tools/RcUpdate.py
+++ b/src/lib/Bcfg2/Client/Tools/RcUpdate.py
@@ -89,7 +89,7 @@ class RcUpdate(Bcfg2.Client.Tools.SvcTool):
def InstallService(self, entry):
"""Install Service entry."""
self.logger.info('Installing Service %s' % entry.get('name'))
- bootstatus = entry.get('bootstatus')
+ bootstatus = self.get_bootstatus(entry)
if bootstatus is not None:
if bootstatus == 'on':
# make sure service is enabled on boot
diff --git a/src/lib/Bcfg2/Client/Tools/__init__.py b/src/lib/Bcfg2/Client/Tools/__init__.py
index 11fe55bd6..703b8ff57 100644
--- a/src/lib/Bcfg2/Client/Tools/__init__.py
+++ b/src/lib/Bcfg2/Client/Tools/__init__.py
@@ -594,13 +594,14 @@ class SvcTool(Tool):
if not self.handlesEntry(entry):
continue
+ estatus = entry.get('status')
restart = entry.get("restart", "true").lower()
- if (restart == "false" or
+ if (restart == "false" or estatus == 'ignore' or
(restart == "interactive" and not self.setup['interactive'])):
continue
success = False
- if entry.get('status') == 'on':
+ if estatus == 'on':
if self.setup['servicemode'] == 'build':
success = self.stop_service(entry)
elif entry.get('name') not in self.restarted:
diff --git a/src/lib/Bcfg2/Client/__init__.py b/src/lib/Bcfg2/Client/__init__.py
index 25603186e..6d1cb9d40 100644
--- a/src/lib/Bcfg2/Client/__init__.py
+++ b/src/lib/Bcfg2/Client/__init__.py
@@ -21,6 +21,9 @@ def prompt(msg):
try:
ans = input(msg)
return ans in ['y', 'Y']
+ except UnicodeEncodeError:
+ ans = input(msg.encode('utf-8'))
+ return ans in ['y', 'Y']
except EOFError:
# handle ^C on rhel-based platforms
raise SystemExit(1)
diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py
index 243c4ed2a..673fb125c 100644
--- a/src/lib/Bcfg2/Options.py
+++ b/src/lib/Bcfg2/Options.py
@@ -319,6 +319,28 @@ def colon_split(c_string):
return []
+def dict_split(c_string):
+ """ split an option string on commas, optionally surrounded by
+ whitespace and split the resulting items again on equals signs,
+ returning a dict """
+ result = dict()
+ if c_string:
+ items = re.split(r'\s*,\s*', c_string)
+ for item in items:
+ if r'=' in item:
+ key, value = item.split(r'=', 1)
+ try:
+ result[key] = get_bool(value)
+ except ValueError:
+ try:
+ result[key] = get_int(value)
+ except ValueError:
+ result[key] = value
+ else:
+ result[item] = True
+ return result
+
+
def get_bool(val):
""" given a string value of a boolean configuration option, return
an actual bool (True or False) """
@@ -651,6 +673,15 @@ DB_PORT = \
default='',
cf=('database', 'port'),
deprecated_cf=('statistics', 'database_port'))
+DB_OPTIONS = \
+ Option('Database options',
+ default=dict(),
+ cf=('database', 'options'),
+ cook=dict_split)
+DB_SCHEMA = \
+ Option('Database schema',
+ default='',
+ cf=('database', 'schema'))
# Django options
WEB_CFILE = \
@@ -1193,7 +1224,8 @@ SERVER_COMMON_OPTIONS = dict(repo=SERVER_REPOSITORY,
authentication=SERVER_AUTHENTICATION,
perflog=LOG_PERFORMANCE,
perflog_interval=PERFLOG_INTERVAL,
- children=SERVER_CHILDREN)
+ children=SERVER_CHILDREN,
+ client_timeout=CLIENT_TIMEOUT)
CRYPT_OPTIONS = dict(encrypt=ENCRYPT,
decrypt=DECRYPT,
@@ -1233,9 +1265,9 @@ DRIVER_OPTIONS = \
yum_verify_fail_action=CLIENT_YUM_VERIFY_FAIL_ACTION,
yum_verify_flags=CLIENT_YUM_VERIFY_FLAGS,
posix_uid_whitelist=CLIENT_POSIX_UID_WHITELIST,
- posix_gid_whitelist=CLIENT_POSIX_UID_WHITELIST,
+ posix_gid_whitelist=CLIENT_POSIX_GID_WHITELIST,
posix_uid_blacklist=CLIENT_POSIX_UID_BLACKLIST,
- posix_gid_blacklist=CLIENT_POSIX_UID_BLACKLIST)
+ posix_gid_blacklist=CLIENT_POSIX_GID_BLACKLIST)
CLIENT_COMMON_OPTIONS = \
dict(extra=CLIENT_EXTRA_DISPLAY,
@@ -1285,6 +1317,8 @@ DATABASE_COMMON_OPTIONS = dict(web_configfile=WEB_CFILE,
db_password=DB_PASSWORD,
db_host=DB_HOST,
db_port=DB_PORT,
+ db_options=DB_OPTIONS,
+ db_schema=DB_SCHEMA,
time_zone=DJANGO_TIME_ZONE,
django_debug=DJANGO_DEBUG,
web_prefix=DJANGO_WEB_PREFIX)
diff --git a/src/lib/Bcfg2/Proxy.py b/src/lib/Bcfg2/Proxy.py
index f6db66a93..34080da6b 100644
--- a/src/lib/Bcfg2/Proxy.py
+++ b/src/lib/Bcfg2/Proxy.py
@@ -104,7 +104,6 @@ class RetryMethod(xmlrpclib._Method):
err = sys.exc_info()[1]
msg = err
except:
- raise
etype, err = sys.exc_info()[:2]
msg = "Unknown failure: %s (%s)" % (err, etype.__name__)
if msg:
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py
index 3d224432e..b42364d8d 100644
--- a/src/lib/Bcfg2/Reporting/Collector.py
+++ b/src/lib/Bcfg2/Reporting/Collector.py
@@ -20,10 +20,38 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore
from Bcfg2.Reporting.Storage import load_storage_from_config, \
StorageError, StorageImportError
+
class ReportingError(Exception):
"""Generic reporting exception"""
pass
+
+class ReportingStoreThread(threading.Thread):
+ """Thread for calling the storage backend"""
+ def __init__(self, interaction, storage, group=None, target=None,
+ name=None, args=(), kwargs=None):
+ """Initialize the thread with a reference to the interaction
+ as well as the storage engine to use"""
+ threading.Thread.__init__(self, group, target, name, args,
+ kwargs or dict())
+ self.interaction = interaction
+ self.storage = storage
+ self.logger = logging.getLogger('bcfg2-report-collector')
+
+ def run(self):
+ """Call the database storage procedure (aka import)"""
+ try:
+ start = time.time()
+ self.storage.import_interaction(self.interaction)
+ self.logger.info("Imported interaction for %s in %ss" %
+ (self.interaction.get('hostname', '<unknown>'),
+ time.time() - start))
+ except:
+ #TODO requeue?
+ self.logger.error("Unhandled exception in import thread %s" %
+ traceback.format_exc().splitlines()[-1])
+
+
class ReportingCollector(object):
"""The collecting process for reports"""
@@ -77,12 +105,11 @@ class ReportingCollector(object):
(self.storage.__class__.__name__,
traceback.format_exc().splitlines()[-1]))
-
def run(self):
"""Startup the processing and go!"""
self.terminate = threading.Event()
atexit.register(self.shutdown)
- self.context = daemon.DaemonContext()
+ self.context = daemon.DaemonContext(detach_process=True)
if self.setup['daemon']:
self.logger.debug("Daemonizing")
@@ -103,15 +130,9 @@ class ReportingCollector(object):
interaction = self.transport.fetch()
if not interaction:
continue
- try:
- start = time.time()
- self.storage.import_interaction(interaction)
- self.logger.info("Imported interaction for %s in %ss" %
- (interaction.get('hostname', '<unknown>'),
- time.time() - start))
- except:
- #TODO requeue?
- raise
+
+ store_thread = ReportingStoreThread(interaction, self.storage)
+ store_thread.start()
except (SystemExit, KeyboardInterrupt):
self.logger.info("Shutting down")
self.shutdown()
diff --git a/src/lib/Bcfg2/Reporting/models.py b/src/lib/Bcfg2/Reporting/models.py
index 598e1c6ec..fc9523067 100644
--- a/src/lib/Bcfg2/Reporting/models.py
+++ b/src/lib/Bcfg2/Reporting/models.py
@@ -88,7 +88,7 @@ class InteractionManager(models.Manager):
Returns the ids of most recent interactions for clients as of a date.
Arguments:
- maxdate -- datetime object. Most recent date to pull. (dafault None)
+ maxdate -- datetime object. Most recent date to pull. (default None)
"""
from django.db import connection
diff --git a/src/lib/Bcfg2/Reporting/templates/base.html b/src/lib/Bcfg2/Reporting/templates/base.html
index 7f1fcba3b..0b2b7dd36 100644
--- a/src/lib/Bcfg2/Reporting/templates/base.html
+++ b/src/lib/Bcfg2/Reporting/templates/base.html
@@ -93,7 +93,7 @@ This is needed for Django versions less than 1.5
<div style='clear:both'></div>
</div><!-- document -->
<div id="footer">
- <span>Bcfg2 Version 1.3.1</span>
+ <span>Bcfg2 Version 1.3.2</span>
</div>
<div id="calendar_div" style='position:absolute; visibility:hidden; background-color:white; layer-background-color:white;'></div>
diff --git a/src/lib/Bcfg2/SSLServer.py b/src/lib/Bcfg2/SSLServer.py
index 316c2f86c..ab7e56f33 100644
--- a/src/lib/Bcfg2/SSLServer.py
+++ b/src/lib/Bcfg2/SSLServer.py
@@ -5,7 +5,6 @@ better. """
import os
import sys
import socket
-import select
import signal
import logging
import ssl
@@ -183,7 +182,6 @@ class XMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
Adds support for HTTP authentication.
"""
-
logger = logging.getLogger("Bcfg2.SSLServer.XMLRPCRequestHandler")
def authenticate(self):
@@ -228,22 +226,22 @@ class XMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
return False
return True
- ### need to override do_POST here
def do_POST(self):
try:
max_chunk_size = 10 * 1024 * 1024
size_remaining = int(self.headers["content-length"])
L = []
while size_remaining:
- try:
- select.select([self.rfile.fileno()], [], [], 3)
- except select.error:
- print("got select timeout")
- raise
chunk_size = min(size_remaining, max_chunk_size)
- L.append(self.rfile.read(chunk_size).decode('utf-8'))
+ chunk = self.rfile.read(chunk_size).decode('utf-8')
+ if not chunk:
+ break
+ L.append(chunk)
size_remaining -= len(L[-1])
data = ''.join(L)
+ if data is None:
+ return # response has been sent
+
response = self.server._marshaled_dispatch(self.client_address,
data)
if sys.hexversion >= 0x03000000:
@@ -251,6 +249,7 @@ class XMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
except: # pylint: disable=W0702
try:
self.send_response(500)
+ self.send_header("Content-length", "0")
self.end_headers()
except:
(etype, msg) = sys.exc_info()[:2]
@@ -306,14 +305,11 @@ class XMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
def finish(self):
# shut down the connection
- if not self.wfile.closed:
- try:
- self.wfile.flush()
- self.wfile.close()
- except socket.error:
- err = sys.exc_info()[1]
- self.logger.warning("Error closing connection: %s" % err)
- self.rfile.close()
+ try:
+ SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.finish(self)
+ except socket.error:
+ err = sys.exc_info()[1]
+ self.logger.warning("Error closing connection: %s" % err)
class XMLRPCServer(SocketServer.ThreadingMixIn, SSLServer,
@@ -431,8 +427,6 @@ class XMLRPCServer(SocketServer.ThreadingMixIn, SSLServer,
self.handle_request()
except socket.timeout:
pass
- except select.error:
- pass
except:
self.logger.error("Got unexpected error in handle_request",
exc_info=1)
diff --git a/src/lib/Bcfg2/Server/Admin/Compare.py b/src/lib/Bcfg2/Server/Admin/Compare.py
index e3648a6d0..d7285284a 100644
--- a/src/lib/Bcfg2/Server/Admin/Compare.py
+++ b/src/lib/Bcfg2/Server/Admin/Compare.py
@@ -115,7 +115,6 @@ class Compare(Bcfg2.Server.Admin.Mode):
return identical
def __call__(self, args):
- Bcfg2.Server.Admin.Mode.__call__(self, args)
if len(args) == 0:
self.errExit("No argument specified.\n"
"Please see bcfg2-admin compare help for usage.")
diff --git a/src/lib/Bcfg2/Server/Admin/Init.py b/src/lib/Bcfg2/Server/Admin/Init.py
index 6175d8ed0..153d7bea6 100644
--- a/src/lib/Bcfg2/Server/Admin/Init.py
+++ b/src/lib/Bcfg2/Server/Admin/Init.py
@@ -20,6 +20,8 @@ from Bcfg2.Compat import input # pylint: disable=W0622
CONFIG = '''[server]
repository = %s
plugins = %s
+# Uncomment the following to listen on all interfaces
+#listen_all = true
[statistics]
sendmailpath = %s
@@ -31,7 +33,7 @@ sendmailpath = %s
# 'postgresql', 'mysql', 'mysql_old', 'sqlite3' or 'ado_mssql'.
#name =
# Or path to database file if using sqlite3.
-#<repository>/bcfg2.sqlite is default path if left empty
+#<repository>/etc/bcfg2.sqlite is default path if left empty
#user =
# Not used with sqlite3.
#password =
@@ -78,7 +80,7 @@ CLIENTS = '''<Clients version="3.0">
'''
# Mapping of operating system names to groups
-OS_LIST = [('Red Hat/Fedora/RHEL/RHAS/Centos', 'redhat'),
+OS_LIST = [('Red Hat/Fedora/RHEL/RHAS/CentOS', 'redhat'),
('SUSE/SLES', 'suse'),
('Mandrake', 'mandrake'),
('Debian', 'debian'),
@@ -234,8 +236,9 @@ class Init(Bcfg2.Server.Admin.Mode):
def _prompt_server(self):
"""Ask for the server name."""
- newserver = safe_input("Input the server location [%s]: " %
- self.data['server_uri'])
+ newserver = safe_input(
+ "Input the server location (the server listens on a single "
+ "interface by default) [%s]: " % self.data['server_uri'])
if newserver != '':
self.data['server_uri'] = newserver
diff --git a/src/lib/Bcfg2/Server/Admin/Snapshots.py b/src/lib/Bcfg2/Server/Admin/Snapshots.py
index c2d279391..fcb240352 100644
--- a/src/lib/Bcfg2/Server/Admin/Snapshots.py
+++ b/src/lib/Bcfg2/Server/Admin/Snapshots.py
@@ -27,7 +27,6 @@ class Snapshots(Bcfg2.Server.Admin.Mode):
self.cfile = self.configfile
def __call__(self, args):
- Bcfg2.Server.Admin.Mode.__call__(self, args)
if len(args) == 0 or args[0] == '-h':
print(self.__usage__)
raise SystemExit(0)
diff --git a/src/lib/Bcfg2/Server/Admin/Viz.py b/src/lib/Bcfg2/Server/Admin/Viz.py
index 1d9d25f16..2cbd7eaf6 100644
--- a/src/lib/Bcfg2/Server/Admin/Viz.py
+++ b/src/lib/Bcfg2/Server/Admin/Viz.py
@@ -102,6 +102,7 @@ class Viz(Bcfg2.Server.Admin.MetadataCore):
dotpipe.stdin.write('\tcolor="lightblue";\n')
dotpipe.stdin.write('\tBundle [ shape="septagon" ];\n')
dotpipe.stdin.write('\tGroup [shape="ellipse"];\n')
+ dotpipe.stdin.write('\tGroup Category [shape="trapezium"];\n')
dotpipe.stdin.write('\tProfile [style="bold", shape="ellipse"];\n')
dotpipe.stdin.write('\tHblock [label="Host1|Host2|Host3", '
'shape="record"];\n')
diff --git a/src/lib/Bcfg2/Server/BuiltinCore.py b/src/lib/Bcfg2/Server/BuiltinCore.py
index e69a92b64..93da767c7 100644
--- a/src/lib/Bcfg2/Server/BuiltinCore.py
+++ b/src/lib/Bcfg2/Server/BuiltinCore.py
@@ -31,7 +31,8 @@ class Core(BaseCore):
daemon_args = dict(uid=self.setup['daemon_uid'],
gid=self.setup['daemon_gid'],
- umask=int(self.setup['umask'], 8))
+ umask=int(self.setup['umask'], 8),
+ detach_process=True)
if self.setup['daemon']:
daemon_args['pidfile'] = TimeoutPIDLockFile(self.setup['daemon'],
acquire_timeout=5)
@@ -109,7 +110,6 @@ class Core(BaseCore):
keyfile=self.setup['key'],
certfile=self.setup['cert'],
register=False,
- timeout=1,
ca=self.setup['ca'],
protocol=self.setup['protocol'])
except: # pylint: disable=W0702
diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py
index ecd68e1e4..5ec1b5bce 100644
--- a/src/lib/Bcfg2/Server/Core.py
+++ b/src/lib/Bcfg2/Server/Core.py
@@ -200,6 +200,10 @@ class BaseCore(object):
# load plugins
Bcfg2.settings.read_config(repo=self.datastore)
+ # mapping of group name => plugin name to record where groups
+ # that are created by Connector plugins came from
+ self._dynamic_groups = dict()
+
#: Whether or not it's possible to use the Django database
#: backend for plugins that have that capability
self._database_available = False
@@ -263,6 +267,20 @@ class BaseCore(object):
#: metadata
self.metadata_cache = Cache()
+ def expire_caches_by_type(self, base_cls, key=None):
+ """ Expire caches for all
+ :class:`Bcfg2.Server.Plugin.interfaces.Caching` plugins that
+ are instances of ``base_cls``.
+
+ :param base_cls: The base plugin interface class to match (see
+ :mod:`Bcfg2.Server.Plugin.interfaces`)
+ :type base_cls: type
+ :param key: The cache key to expire
+ """
+ for plugin in self.plugins_by_type(base_cls):
+ if isinstance(plugin, Bcfg2.Server.Plugin.Caching):
+ plugin.expire_cache(key)
+
def plugins_by_type(self, base_cls):
""" Return a list of loaded plugins that match the passed type.
@@ -289,11 +307,12 @@ class BaseCore(object):
self.logger.debug("Performance logging thread starting")
while not self.terminate.isSet():
self.terminate.wait(self.setup['perflog_interval'])
- for name, stats in self.get_statistics(None).items():
- self.logger.info("Performance statistics: "
- "%s min=%.06f, max=%.06f, average=%.06f, "
- "count=%d" % ((name, ) + stats))
- self.logger.debug("Performance logging thread terminated")
+ if not self.terminate.isSet():
+ for name, stats in self.get_statistics(None).items():
+ self.logger.info("Performance statistics: "
+ "%s min=%.06f, max=%.06f, average=%.06f, "
+ "count=%d" % ((name, ) + stats))
+ self.logger.info("Performance logging thread terminated")
def _file_monitor_thread(self):
""" The thread that runs the
@@ -310,11 +329,12 @@ class BaseCore(object):
else:
if not self.fam.pending():
terminate.wait(15)
+ if self.fam.pending():
+ self._update_vcs_revision()
self.fam.handle_event_set(self.lock)
except:
continue
- self._update_vcs_revision()
- self.logger.debug("File monitor thread terminated")
+ self.logger.info("File monitor thread terminated")
@track_statistics()
def _update_vcs_revision(self):
@@ -430,14 +450,14 @@ class BaseCore(object):
def shutdown(self):
""" Perform plugin and FAM shutdown tasks. """
- self.logger.debug("Shutting down core...")
+ self.logger.info("Shutting down core...")
if not self.terminate.isSet():
self.terminate.set()
self.fam.shutdown()
- self.logger.debug("FAM shut down")
+ self.logger.info("FAM shut down")
for plugin in list(self.plugins.values()):
plugin.shutdown()
- self.logger.debug("All plugins shut down")
+ self.logger.info("All plugins shut down")
@property
def metadata_cache_mode(self):
@@ -628,10 +648,10 @@ class BaseCore(object):
del entry.attrib['realname']
return ret
except:
- entry.set('name', oldname)
self.logger.error("Failed binding entry %s:%s with altsrc %s" %
- (entry.tag, entry.get('name'),
- entry.get('altsrc')))
+ (entry.tag, entry.get('realname'),
+ entry.get('name')))
+ entry.set('name', oldname)
self.logger.error("Falling back to %s:%s" %
(entry.tag, entry.get('name')))
@@ -726,7 +746,27 @@ class BaseCore(object):
if event.code2str() == 'deleted':
return
self.setup.reparse()
- self.metadata_cache.expire()
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
+
+ def block_for_fam_events(self, handle_events=False):
+ """ Block until all fam events have been handleed, optionally
+ handling events as well. (Setting ``handle_events=True`` is
+ useful for local server cores that don't spawn an event
+ handling thread.)"""
+ slept = 0
+ log_interval = 3
+ if handle_events:
+ self.fam.handle_events_in_interval(1)
+ slept += 1
+ if self.setup['fam_blocking']:
+ time.sleep(1)
+ slept += 1
+ while self.fam.pending() != 0:
+ time.sleep(1)
+ slept += 1
+ if slept % log_interval == 0:
+ self.logger.debug("Sleeping to handle FAM events...")
+ self.logger.debug("Slept %s seconds while handling FAM events" % slept)
def run(self):
""" Run the server core. This calls :func:`_daemonize`,
@@ -777,13 +817,9 @@ class BaseCore(object):
self.shutdown()
raise
- if self.setup['fam_blocking']:
- time.sleep(1)
- while self.fam.pending() != 0:
- time.sleep(1)
-
if self.debug_flag:
self.set_debug(None, self.debug_flag)
+ self.block_for_fam_events()
self._block()
def _daemonize(self):
@@ -842,17 +878,52 @@ class BaseCore(object):
imd = self.metadata_cache.get(client_name, None)
if not imd:
self.logger.debug("Building metadata for %s" % client_name)
- imd = self.metadata.get_initial_metadata(client_name)
+ try:
+ imd = self.metadata.get_initial_metadata(client_name)
+ except MetadataConsistencyError:
+ self.critical_error(
+ "Client metadata resolution error for %s: %s" %
+ (client_name, sys.exc_info()[1]))
connectors = self.plugins_by_type(Connector)
for conn in connectors:
- grps = conn.get_additional_groups(imd)
- self.metadata.merge_additional_groups(imd, grps)
+ groups = conn.get_additional_groups(imd)
+ groupnames = []
+ for group in groups:
+ if hasattr(group, "name"):
+ groupname = group.name
+ if groupname in self._dynamic_groups:
+ if self._dynamic_groups[groupname] == conn.name:
+ self.metadata.groups[groupname] = group
+ else:
+ self.logger.warning(
+ "Refusing to clobber dynamic group %s "
+ "defined by %s" %
+ (self._dynamic_groups[groupname],
+ groupname))
+ elif groupname in self.metadata.groups:
+ # not recorded as a dynamic group, but
+ # present in metadata.groups -- i.e., a
+ # static group
+ self.logger.warning(
+ "Refusing to clobber predefined group %s" %
+ groupname)
+ else:
+ self.metadata.groups[groupname] = group
+ self._dynamic_groups[groupname] = conn.name
+ groupnames.append(groupname)
+ else:
+ groupnames.append(group)
+
+ self.metadata.merge_additional_groups(imd, groupnames)
for conn in connectors:
data = conn.get_additional_data(imd)
self.metadata.merge_additional_data(imd, conn.name, data)
imd.query.by_name = self.build_metadata
if self.metadata_cache_mode in ['cautious', 'aggressive']:
self.metadata_cache[client_name] = imd
+ else:
+ self.logger.debug("Using cached metadata object for %s" %
+ client_name)
return imd
def process_statistics(self, client_name, statistics):
@@ -880,6 +951,7 @@ class BaseCore(object):
state.get('state')))
self.client_run_hook("end_statistics", meta)
+ @track_statistics()
def resolve_client(self, address, cleanup_cache=False, metadata=True):
""" Given a client address, get the client hostname and
optionally metadata.
@@ -932,15 +1004,19 @@ class BaseCore(object):
raise xmlrpclib.Fault(xmlrpclib.APPLICATION_ERROR,
"Critical failure: %s" % message)
+ def _get_rmi_objects(self):
+ """ Get a dict (name: object) of all objects that may have RMI
+ calls. Currently, that includes all plugins and the FAM. """
+ rv = {self.fam.__class__.__name__: self.fam}
+ rv.update(self.plugins)
+ return rv
+
def _get_rmi(self):
""" Get a list of RMI calls exposed by plugins """
rmi = dict()
- for pname, pinst in list(self.plugins.items()):
+ for pname, pinst in self._get_rmi_objects().items():
for mname in pinst.__rmi__:
rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
- famname = self.fam.__class__.__name__
- for mname in self.fam.__rmi__:
- rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname)
return rmi
def _resolve_exposed_method(self, method_name):
@@ -1031,6 +1107,7 @@ class BaseCore(object):
for plugin in self.plugins_by_type(Probing):
for probe in plugin.GetProbes(metadata):
resp.append(probe)
+ self.logger.debug("Sending probe list to %s" % client)
return lxml.etree.tostring(resp,
xml_declaration=False).decode('UTF-8')
except:
@@ -1056,7 +1133,7 @@ class BaseCore(object):
# that's created for RecvProbeData doesn't get cached.
# I.e., the next metadata object that's built, after probe
# data is processed, is cached.
- self.metadata_cache.expire(client)
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
try:
xpdata = lxml.etree.XML(probedata.encode('utf-8'),
parser=Bcfg2.Server.XMLParser)
@@ -1253,9 +1330,14 @@ class BaseCore(object):
self.logger.info("Core: debug = %s" % debug)
levels = self._loglevels[self.debug_flag]
for handler in logging.root.handlers:
- level = levels.get(handler.name, levels['default'])
- self.logger.debug("Setting %s log handler to %s" %
- (handler.name, logging.getLevelName(level)))
+ try:
+ level = levels.get(handler.name, levels['default'])
+ self.logger.debug("Setting %s log handler to %s" %
+ (handler.name, logging.getLevelName(level)))
+ except AttributeError:
+ level = levels['default']
+ self.logger.debug("Setting unknown log handler %s to %s" %
+ (handler, logging.getLevelName(level)))
handler.setLevel(level)
return self.debug_flag
diff --git a/src/lib/Bcfg2/Server/Lint/Validate.py b/src/lib/Bcfg2/Server/Lint/Validate.py
index 09f3f3d25..c537877a0 100644
--- a/src/lib/Bcfg2/Server/Lint/Validate.py
+++ b/src/lib/Bcfg2/Server/Lint/Validate.py
@@ -47,6 +47,7 @@ class Validate(Bcfg2.Server.Lint.ServerlessPlugin):
"Decisions/*.xml": "decisions.xsd",
"Packages/sources.xml": "packages.xsd",
"GroupPatterns/config.xml": "grouppatterns.xsd",
+ "AWSTags/config.xml": "awstags.xsd",
"NagiosGen/config.xml": "nagiosgen.xsd",
"FileProbes/config.xml": "fileprobes.xsd",
"SSLCA/**/cert.xml": "sslca-cert.xsd",
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 81fba7092..3cc308b1c 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -2,14 +2,133 @@
:mod:`Bcfg2.Server.BuiltinCore` that uses the Python
:mod:`multiprocessing` library to offload work to multiple child
processes. As such, it requires Python 2.6+.
+
+The parent communicates with the children over
+:class:`multiprocessing.Queue` objects via a
+:class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object.
+
+A method being called via the RPCQueue must be exposed by the child by
+decorating it with :func:`Bcfg2.Server.Core.exposed`.
"""
+import time
import threading
import lxml.etree
import multiprocessing
-from Bcfg2.Compat import Queue
+import Bcfg2.Server.Plugin
+from itertools import cycle
+from Bcfg2.Cache import Cache
+from Bcfg2.Compat import Queue, Empty, wraps
from Bcfg2.Server.Core import BaseCore, exposed
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
+from multiprocessing.connection import Listener, Client
+
+
+class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable):
+ """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates
+ cache expiration events to child nodes. """
+
+ #: The method to send over the pipe to expire the cache
+ method = "expire_metadata_cache"
+
+ def __init__(self, *args, **kwargs):
+ self.rpc_q = kwargs.pop("queue")
+ Bcfg2.Server.Plugin.Debuggable.__init__(self)
+ Cache.__init__(self, *args, **kwargs)
+
+ def expire(self, key=None):
+ self.rpc_q.publish(self.method, args=[key])
+ Cache.expire(self, key=key)
+
+
+class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
+ """ An implementation of a :class:`multiprocessing.Queue` designed
+ for several additional use patterns:
+
+ * Random-access reads, based on a key that identifies the data;
+ * Publish-subscribe, where a datum is sent to all hosts.
+
+ The subscribers can deal with this as a normal Queue with no
+ special handling.
+ """
+ poll_wait = 3.0
+
+ def __init__(self):
+ Bcfg2.Server.Plugin.Debuggable.__init__(self)
+ self._terminate = threading.Event()
+ self._queues = dict()
+ self._available_listeners = Queue()
+ self._blocking_listeners = []
+
+ def add_subscriber(self, name):
+ """ Add a subscriber to the queue. This returns the
+ :class:`multiprocessing.Queue` object that the subscriber
+ should read from. """
+ self._queues[name] = multiprocessing.Queue()
+ return self._queues[name]
+
+ def publish(self, method, args=None, kwargs=None):
+ """ Publish an RPC call to the queue for consumption by all
+ subscribers. """
+ for queue in self._queues.values():
+ queue.put((None, (method, args or [], kwargs or dict())))
+
+ def rpc(self, dest, method, args=None, kwargs=None):
+ """ Make an RPC call to the named subscriber, expecting a
+ response. This opens a
+ :class:`multiprocessing.connection.Listener` and passes the
+ Listener address to the child as part of the RPC call, so that
+ the child can connect to the Listener to submit its results.
+
+ Listeners are reused when possible to minimize overhead.
+ """
+ try:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Reusing existing RPC listener at %s" %
+ listener.address)
+ except Empty:
+ listener = Listener()
+ self.logger.debug("Created new RPC listener at %s" %
+ listener.address)
+ self._blocking_listeners.append(listener)
+ try:
+ self._queues[dest].put((listener.address,
+ (method, args or [], kwargs or dict())))
+ conn = listener.accept()
+ self._blocking_listeners.remove(listener)
+ try:
+ while not self._terminate.is_set():
+ if conn.poll(self.poll_wait):
+ return conn.recv()
+ finally:
+ conn.close()
+ finally:
+ self._available_listeners.put(listener)
+
+ def close(self):
+ """ Close queues and connections. """
+ self._terminate.set()
+ self.logger.debug("Closing RPC queues")
+ for name, queue in self._queues.items():
+ self.logger.debug("Closing RPC queue to %s" % name)
+ queue.close()
+
+ # close any listeners that are waiting for connections
+ self.logger.debug("Closing RPC connections")
+ for listener in self._blocking_listeners:
+ self.logger.debug("Closing RPC connection at %s" %
+ listener.address)
+ listener.close()
+
+ self.logger.debug("Closing RPC listeners")
+ try:
+ while True:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Closing RPC listener at %s" %
+ listener.address)
+ listener.close()
+ except Empty:
+ pass
class DualEvent(object):
@@ -60,68 +179,153 @@ class ChildCore(BaseCore):
those, though, if the pipe communication "protocol" were made more
robust. """
- #: How long to wait while polling for new clients to build. This
- #: doesn't affect the speed with which a client is built, but
+ #: How long to wait while polling for new RPC commands. This
+ #: doesn't affect the speed with which a command is processed, but
#: setting it too high will result in longer shutdown times, since
#: we only check for the termination event from the main process
#: every ``poll_wait`` seconds.
- poll_wait = 5.0
+ poll_wait = 3.0
- def __init__(self, setup, pipe, terminate):
+ def __init__(self, name, setup, rpc_q, terminate):
"""
+ :param name: The name of this child
+ :type name: string
:param setup: A Bcfg2 options dict
:type setup: Bcfg2.Options.OptionParser
- :param pipe: The pipe to which client hostnames are added for
- ChildCore objects to build configurations, and to
- which client configurations are added after
- having been built by ChildCore objects.
- :type pipe: multiprocessing.Pipe
+ :param read_q: The queue the child will read from for RPC
+ communications from the parent process.
+ :type read_q: multiprocessing.Queue
+ :param write_q: The queue the child will write the results of
+ RPC calls to.
+ :type write_q: multiprocessing.Queue
:param terminate: An event that flags ChildCore objects to shut
themselves down.
:type terminate: multiprocessing.Event
"""
BaseCore.__init__(self, setup)
- #: The pipe to which client hostnames are added for ChildCore
- #: objects to build configurations, and to which client
- #: configurations are added after having been built by
- #: ChildCore objects.
- self.pipe = pipe
+ #: The name of this child
+ self.name = name
#: The :class:`multiprocessing.Event` that will be monitored
#: to determine when this child should shut down.
self.terminate = terminate
- def _daemonize(self):
- return True
+ #: The queue used for RPC communication
+ self.rpc_q = rpc_q
+
+ # override this setting so that the child doesn't try to write
+ # the pidfile
+ self.setup['daemon'] = False
+
+ # ensure that the child doesn't start a perflog thread
+ self.perflog_thread = None
+
+ self._rmi = dict()
def _run(self):
return True
+ def _daemonize(self):
+ return True
+
+ def _dispatch(self, address, data):
+ """ Method dispatcher used for commands received from
+ the RPC queue. """
+ if address is not None:
+ # if the key is None, then no response is expected. we
+ # make the return connection before dispatching the actual
+ # RPC call so that the parent is blocking for a connection
+ # as briefly as possible
+ self.logger.debug("Connecting to parent via %s" % address)
+ client = Client(address)
+ method, args, kwargs = data
+ func = None
+ rv = None
+ if "." in method:
+ if method in self._rmi:
+ func = self._rmi[method]
+ else:
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ elif not hasattr(self, method):
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ else: # method is not a plugin RMI, and exists
+ func = getattr(self, method)
+ if not func.exposed:
+ self.logger.error("%s: Method %s is not exposed" % (self.name,
+ method))
+ func = None
+ if func is not None:
+ self.logger.debug("%s: Calling RPC method %s" % (self.name,
+ method))
+ rv = func(*args, **kwargs)
+ if address is not None:
+ # if the key is None, then no response is expected
+ self.logger.debug("Returning data to parent via %s" % address)
+ client.send(rv)
+
def _block(self):
- while not self.terminate.isSet():
+ self._rmi = self._get_rmi()
+ while not self.terminate.is_set():
try:
- if self.pipe.poll(self.poll_wait):
- if not self.metadata.use_database:
- # handle FAM events, in case (for instance) the
- # client has just been added to clients.xml, or a
- # profile has just been asserted. but really, you
- # should be using the metadata database if you're
- # using this core.
- self.fam.handle_events_in_interval(0.1)
- client = self.pipe.recv()
- self.logger.debug("Building configuration for %s" % client)
- config = \
- lxml.etree.tostring(self.BuildConfiguration(client))
- self.logger.debug("Returning configuration for %s to main "
- "process" % client)
- self.pipe.send(config)
- self.logger.debug("Returned configuration for %s to main "
- "process" % client)
+ address, data = self.rpc_q.get(timeout=self.poll_wait)
+ threadname = "-".join(str(i) for i in data)
+ rpc_thread = threading.Thread(name=threadname,
+ target=self._dispatch,
+ args=[address, data])
+ rpc_thread.start()
+ except Empty:
+ pass
except KeyboardInterrupt:
break
self.shutdown()
+ def shutdown(self):
+ BaseCore.shutdown(self)
+ self.logger.info("%s: Closing RPC command queue" % self.name)
+ self.rpc_q.close()
+
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("%s: Waiting for %d thread(s): %s" %
+ (self.name, len(threads),
+ [t.name for t in threads]))
+ time.sleep(1)
+ self.logger.info("%s: All threads stopped" % self.name)
+
+ def _get_rmi(self):
+ rmi = dict()
+ for pname, pinst in self._get_rmi_objects().items():
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ mname = crmi[1]
+ else:
+ mname = crmi
+ rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
+ return rmi
+
+ @exposed
+ def expire_metadata_cache(self, client=None):
+ """ Expire the metadata cache for a client """
+ self.metadata_cache.expire(client)
+
+ @exposed
+ def RecvProbeData(self, address, _):
+ """ Expire the probe cache for a client """
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing,
+ key=self.resolve_client(address,
+ metadata=False)[0])
+
+ @exposed
+ def GetConfig(self, client):
+ """ Render the configuration for a client """
+ self.logger.debug("%s: Building configuration for %s" %
+ (self.name, client))
+ return lxml.etree.tostring(self.BuildConfiguration(client))
+
class Core(BuiltinCore):
""" A multiprocessing core that delegates building the actual
@@ -140,65 +344,163 @@ class Core(BuiltinCore):
if setup['children'] is None:
setup['children'] = multiprocessing.cpu_count()
- #: A dict of child name -> one end of the
- #: :class:`multiprocessing.Pipe` object used to communicate
- #: with that child. (The child is given the other end of the
- #: Pipe.)
- self.pipes = dict()
-
- #: A queue that keeps track of which children are available to
- #: render a configuration. A child is popped from the queue
- #: when it starts to render a config, then it's pushed back on
- #: when it's done. This lets us use a blocking call to
- #: :func:`Queue.Queue.get` when waiting for an available
- #: child.
- self.available_children = Queue(maxsize=self.setup['children'])
-
- # sigh. multiprocessing was added in py2.6, which is when the
- # camelCase methods for threading objects were deprecated in
- # favor of the Pythonic under_score methods. So
- # multiprocessing.Event *only* has is_set(), while
- # threading.Event has *both* isSet() and is_set(). In order
- # to make the core work with Python 2.4+, and with both
- # multiprocessing and threading Event objects, we just
- # monkeypatch self.terminate to have isSet().
+ #: The flag that indicates when to stop child threads and
+ #: processes
self.terminate = DualEvent(threading_event=self.terminate)
+ #: A :class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object
+ #: used to send or publish commands to children.
+ self.rpc_q = RPCQueue()
+
+ self.metadata_cache = DispatchingCache(queue=self.rpc_q)
+
+ #: A list of children that will be cycled through
+ self._all_children = []
+
+ #: An iterator that each child will be taken from in sequence,
+ #: to provide a round-robin distribution of render requests
+ self.children = None
+
def _run(self):
for cnum in range(self.setup['children']):
name = "Child-%s" % cnum
- (mainpipe, childpipe) = multiprocessing.Pipe()
- self.pipes[name] = mainpipe
+
self.logger.debug("Starting child %s" % name)
- childcore = ChildCore(self.setup, childpipe, self.terminate)
+ child_q = self.rpc_q.add_subscriber(name)
+ childcore = ChildCore(name, self.setup, child_q, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
child.start()
self.logger.debug("Child %s started with PID %s" % (name,
child.pid))
- self.available_children.put(name)
+ self._all_children.append(name)
+ self.logger.debug("Started %s children: %s" % (len(self._all_children),
+ self._all_children))
+ self.children = cycle(self._all_children)
return BuiltinCore._run(self)
def shutdown(self):
BuiltinCore.shutdown(self)
- for child in multiprocessing.active_children():
- self.logger.debug("Shutting down child %s" % child.name)
- child.join(self.shutdown_timeout)
- if child.is_alive():
+ self.logger.info("Closing RPC command queues")
+ self.rpc_q.close()
+
+ def term_children():
+ """ Terminate all remaining multiprocessing children. """
+ for child in multiprocessing.active_children():
self.logger.error("Waited %s seconds to shut down %s, "
"terminating" % (self.shutdown_timeout,
child.name))
child.terminate()
- else:
- self.logger.debug("Child %s shut down" % child.name)
- self.logger.debug("All children shut down")
+
+ timer = threading.Timer(self.shutdown_timeout, term_children)
+ timer.start()
+ while len(multiprocessing.active_children()):
+ self.logger.info("Waiting for %s child(ren): %s" %
+ (len(multiprocessing.active_children()),
+ [c.name
+ for c in multiprocessing.active_children()]))
+ time.sleep(1)
+ timer.cancel()
+ self.logger.info("All children shut down")
+
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("Waiting for %s thread(s): %s" %
+ (len(threads), [t.name for t in threads]))
+ time.sleep(1)
+ self.logger.info("Shutdown complete")
+
+ def _get_rmi(self):
+ child_rmi = dict()
+ for pname, pinst in self._get_rmi_objects().items():
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ parentname, childname = crmi
+ else:
+ parentname = childname = crmi
+ child_rmi["%s.%s" % (pname, parentname)] = \
+ "%s.%s" % (pname, childname)
+
+ rmi = BuiltinCore._get_rmi(self)
+ for method in rmi.keys():
+ if method in child_rmi:
+ rmi[method] = self._child_rmi_wrapper(method,
+ rmi[method],
+ child_rmi[method])
+ return rmi
+
+ def _child_rmi_wrapper(self, method, parent_rmi, child_rmi):
+ """ Returns a callable that dispatches a call to the given
+ child RMI to child processes, and calls the parent RMI locally
+ (i.e., in the parent process). """
+ @wraps(parent_rmi)
+ def inner(*args, **kwargs):
+ """ Function that dispatches an RMI call to child
+ processes and to the (original) parent function. """
+ self.logger.debug("Dispatching RMI call to %s to children: %s" %
+ (method, child_rmi))
+ self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs)
+ return parent_rmi(*args, **kwargs)
+
+ return inner
+
+ @exposed
+ def set_debug(self, address, debug):
+ self.rpc_q.set_debug(debug)
+ self.rpc_q.publish("set_debug", args=[address, debug])
+ self.metadata_cache.set_debug(debug)
+ return BuiltinCore.set_debug(self, address, debug)
+
+ @exposed
+ def RecvProbeData(self, address, probedata):
+ rv = BuiltinCore.RecvProbeData(self, address, probedata)
+ # we don't want the children to actually process probe data,
+ # so we don't send the data, just the fact that we got some.
+ self.rpc_q.publish("RecvProbeData", args=[address, None])
+ return rv
@exposed
def GetConfig(self, address):
client = self.resolve_client(address)[0]
- childname = self.available_children.get()
- self.logger.debug("Building configuration on child %s" % childname)
- pipe = self.pipes[childname]
- pipe.send(client)
- config = pipe.recv()
- self.available_children.put_nowait(childname)
- return config
+ childname = self.children.next()
+ self.logger.debug("Building configuration for %s on %s" % (client,
+ childname))
+ return self.rpc_q.rpc(childname, "GetConfig", args=[client])
+
+ @exposed
+ def get_statistics(self, address):
+ stats = dict()
+
+ def _aggregate_statistics(newstats, prefix=None):
+ """ Aggregate a set of statistics from a child or parent
+ server core. This adds the statistics to the overall
+ statistics dict (optionally prepending a prefix, such as
+ "Child-1", to uniquely identify this set of statistics),
+ and aggregates it with the set of running totals that are
+ kept from all cores. """
+ for statname, vals in newstats.items():
+ if statname.startswith("ChildCore:"):
+ statname = statname[5:]
+ if prefix:
+ prettyname = "%s:%s" % (prefix, statname)
+ else:
+ prettyname = statname
+ stats[prettyname] = vals
+ totalname = "Total:%s" % statname
+ if totalname not in stats:
+ stats[totalname] = vals
+ else:
+ newmin = min(stats[totalname][0], vals[0])
+ newmax = max(stats[totalname][1], vals[1])
+ newcount = stats[totalname][3] + vals[3]
+ newmean = ((stats[totalname][2] * stats[totalname][3]) +
+ (vals[2] * vals[3])) / newcount
+ stats[totalname] = (newmin, newmax, newmean, newcount)
+
+ stats = dict()
+ for childname in self._all_children:
+ _aggregate_statistics(
+ self.rpc_q.rpc(childname, "get_statistics", args=[address]),
+ prefix=childname)
+ _aggregate_statistics(BuiltinCore.get_statistics(self, address))
+ return stats
diff --git a/src/lib/Bcfg2/Server/Plugin/base.py b/src/lib/Bcfg2/Server/Plugin/base.py
index c825a57b5..03feceb6f 100644
--- a/src/lib/Bcfg2/Server/Plugin/base.py
+++ b/src/lib/Bcfg2/Server/Plugin/base.py
@@ -12,6 +12,10 @@ class Debuggable(object):
#: List of names of methods to be exposed as XML-RPC functions
__rmi__ = ['toggle_debug', 'set_debug']
+ #: How exposed XML-RPC functions should be dispatched to child
+ #: processes.
+ __child_rmi__ = __rmi__[:]
+
def __init__(self, name=None):
"""
:param name: The name of the logger object to get. If none is
@@ -34,9 +38,6 @@ class Debuggable(object):
:returns: bool - The new value of the debug flag
"""
self.debug_flag = debug
- self.debug_log("%s: debug = %s" % (self.__class__.__name__,
- self.debug_flag),
- flag=True)
return debug
def toggle_debug(self):
@@ -94,6 +95,20 @@ class Plugin(Debuggable):
#: List of names of methods to be exposed as XML-RPC functions
__rmi__ = Debuggable.__rmi__
+ #: How exposed XML-RPC functions should be dispatched to child
+ #: processes, if :mod:`Bcfg2.Server.MultiprocessingCore` is in
+ #: use. Items ``__child_rmi__`` can either be strings (in which
+ #: case the same function is called on child processes as on the
+ #: parent) or 2-tuples, in which case the first element is the
+ #: name of the RPC function called on the parent process, and the
+ #: second element is the name of the function to call on child
+ #: processes. Functions that are not listed in the list will not
+ #: be dispatched to child processes, i.e., they will only be
+ #: called on the parent. A function must be listed in ``__rmi__``
+ #: in order to be exposed; functions listed in ``_child_rmi__``
+ #: but not ``__rmi__`` will be ignored.
+ __child_rmi__ = Debuggable.__child_rmi__
+
def __init__(self, core, datastore):
"""
:param core: The Bcfg2.Server.Core initializing the plugin
@@ -136,6 +151,8 @@ class Plugin(Debuggable):
self.running = False
def set_debug(self, debug):
+ self.debug_log("%s: debug = %s" % (self.name, self.debug_flag),
+ flag=True)
for entry in self.Entries.values():
if isinstance(entry, Debuggable):
entry.set_debug(debug)
diff --git a/src/lib/Bcfg2/Server/Plugin/helpers.py b/src/lib/Bcfg2/Server/Plugin/helpers.py
index 81dc1d736..d9e208746 100644
--- a/src/lib/Bcfg2/Server/Plugin/helpers.py
+++ b/src/lib/Bcfg2/Server/Plugin/helpers.py
@@ -555,16 +555,12 @@ class XMLFileBacked(FileBacked):
xdata = self.xdata.getroottree()
else:
xdata = lxml.etree.parse(fname)
- included = [el for el in xdata.findall('//' + xinclude)]
- for el in included:
+ for el in xdata.findall('//' + xinclude):
name = el.get("href")
if name.startswith("/"):
fpath = name
else:
- if fname:
- rel = fname
- else:
- rel = self.name
+ rel = fname or self.name
fpath = os.path.join(os.path.dirname(rel), name)
# expand globs in xinclude, a bcfg2-specific extension
@@ -579,12 +575,13 @@ class XMLFileBacked(FileBacked):
parent = el.getparent()
parent.remove(el)
for extra in extras:
- if extra != self.name and extra not in self.extras:
- self.extras.append(extra)
+ if extra != self.name:
lxml.etree.SubElement(parent, xinclude, href=extra)
- self._follow_xincludes(fname=extra)
- if extra not in self.extra_monitors:
- self.add_monitor(extra)
+ if extra not in self.extras:
+ self.extras.append(extra)
+ self._follow_xincludes(fname=extra)
+ if extra not in self.extra_monitors:
+ self.add_monitor(extra)
def Index(self):
self.xdata = lxml.etree.XML(self.data, base_url=self.name,
@@ -606,15 +603,16 @@ class XMLFileBacked(FileBacked):
def add_monitor(self, fpath):
""" Add a FAM monitor to a file that has been XIncluded. This
- is only done if the constructor got both a ``fam`` object and
- ``should_monitor`` set to True.
+ is only done if the constructor got a ``fam`` object,
+ regardless of whether ``should_monitor`` is set to True (i.e.,
+ whether or not the base file is monitored).
:param fpath: The full path to the file to monitor
:type fpath: string
:returns: None
"""
self.extra_monitors.append(fpath)
- if self.fam and self.should_monitor:
+ if self.fam:
self.fam.AddMonitor(fpath, self)
def __iter__(self):
diff --git a/src/lib/Bcfg2/Server/Plugin/interfaces.py b/src/lib/Bcfg2/Server/Plugin/interfaces.py
index 222b94fe3..33f6d338c 100644
--- a/src/lib/Bcfg2/Server/Plugin/interfaces.py
+++ b/src/lib/Bcfg2/Server/Plugin/interfaces.py
@@ -220,10 +220,32 @@ class Connector(object):
def get_additional_groups(self, metadata): # pylint: disable=W0613
""" Return a list of additional groups for the given client.
+ Each group can be either the name of a group (a string), or a
+ :class:`Bcfg2.Server.Plugins.Metadata.MetadataGroup` object
+ that defines other data besides just the name. Note that you
+ cannot return a
+ :class:`Bcfg2.Server.Plugins.Metadata.MetadataGroup` object
+ that clobbers a group defined by another plugin; the original
+ group will be used instead. For instance, assume the
+ following in ``Metadata/groups.xml``:
+
+ .. code-block:: xml
+
+ <Groups>
+ ...
+ <Group name="foo" public="false"/>
+ </Groups>
+
+ You could not subsequently return a
+ :class:`Bcfg2.Server.Plugins.Metadata.MetadataGroup` object
+ with ``public=True``; a warning would be issued, and the
+ original (non-public) ``foo`` group would be used.
:param metadata: The client metadata
:type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :return: list of strings
+ :return: list of strings or
+ :class:`Bcfg2.Server.Plugins.Metadata.MetadataGroup`
+ objects.
"""
return list()
@@ -598,3 +620,22 @@ class ClientRunHooks(object):
:returns: None
"""
pass
+
+
+class Caching(object):
+ """ A plugin that caches more than just the data received from the
+ FAM. This presents a unified interface to clear the cache. """
+
+ def expire_cache(self, key=None):
+ """ Expire the cache associated with the given key.
+
+ :param key: The key to expire the cache for. Because cache
+ implementations vary tremendously between plugins,
+ this could be any number of things, but generally
+ a hostname. It also may or may not be possible to
+ expire the cache for a single host; this interface
+ does not require any guarantee about that.
+ :type key: varies
+ :returns: None
+ """
+ raise NotImplementedError
diff --git a/src/lib/Bcfg2/Server/Plugins/AWSTags.py b/src/lib/Bcfg2/Server/Plugins/AWSTags.py
new file mode 100644
index 000000000..147f37fbf
--- /dev/null
+++ b/src/lib/Bcfg2/Server/Plugins/AWSTags.py
@@ -0,0 +1,217 @@
+""" Query tags from AWS via boto, optionally setting group membership """
+
+import os
+import re
+import sys
+import Bcfg2.Server.Lint
+import Bcfg2.Server.Plugin
+from boto import connect_ec2
+from Bcfg2.Cache import Cache
+from Bcfg2.Compat import ConfigParser
+
+
+class NoInstanceFound(Exception):
+ """ Raised when there's no AWS instance for a given hostname """
+
+
+class AWSTagPattern(object):
+ """ Handler for a single Tag entry """
+
+ def __init__(self, name, value, groups):
+ self.name = re.compile(name)
+ if value is not None:
+ self.value = re.compile(value)
+ else:
+ self.value = value
+ self.groups = groups
+
+ def get_groups(self, tags):
+ """ Get groups that apply to the given tag set """
+ for key, value in tags.items():
+ name_match = self.name.search(key)
+ if name_match:
+ if self.value is not None:
+ value_match = self.value.search(value)
+ if value_match:
+ return self._munge_groups(value_match)
+ else:
+ return self._munge_groups(name_match)
+ break
+ return []
+
+ def _munge_groups(self, match):
+ """ Replace backreferences (``$1``, ``$2``) in Group tags with
+ their values in the regex. """
+ rv = []
+ sub = match.groups()
+ for group in self.groups:
+ newg = group
+ for idx in range(len(sub)):
+ newg = newg.replace('$%s' % (idx + 1), sub[idx])
+ rv.append(newg)
+ return rv
+
+ def __str__(self):
+ if self.value:
+ return "%s: %s=%s: %s" % (self.__class__.__name__, self.name,
+ self.value, self.groups)
+ else:
+ return "%s: %s: %s" % (self.__class__.__name__, self.name,
+ self.groups)
+
+
+class PatternFile(Bcfg2.Server.Plugin.XMLFileBacked):
+ """ representation of AWSTags config.xml """
+ __identifier__ = None
+ create = 'AWSTags'
+
+ def __init__(self, filename, core=None):
+ try:
+ fam = core.fam
+ except AttributeError:
+ fam = None
+ Bcfg2.Server.Plugin.XMLFileBacked.__init__(self, filename, fam=fam,
+ should_monitor=True)
+ self.core = core
+ self.tags = []
+
+ def Index(self):
+ Bcfg2.Server.Plugin.XMLFileBacked.Index(self)
+ if (self.core and
+ self.core.metadata_cache_mode in ['cautious', 'aggressive']):
+ self.core.metadata_cache.expire()
+ self.tags = []
+ for entry in self.xdata.xpath('//Tag'):
+ try:
+ groups = [g.text for g in entry.findall('Group')]
+ self.tags.append(AWSTagPattern(entry.get("name"),
+ entry.get("value"),
+ groups))
+ except: # pylint: disable=W0702
+ self.logger.error("AWSTags: Failed to initialize pattern %s: "
+ "%s" % (entry.get("name"),
+ sys.exc_info()[1]))
+
+ def get_groups(self, hostname, tags):
+ """ return a list of groups that should be added to the given
+ client based on patterns that match the hostname """
+ ret = []
+ for pattern in self.tags:
+ try:
+ ret.extend(pattern.get_groups(tags))
+ except: # pylint: disable=W0702
+ self.logger.error("AWSTags: Failed to process pattern %s for "
+ "%s" % (pattern, hostname),
+ exc_info=1)
+ return ret
+
+
+class AWSTags(Bcfg2.Server.Plugin.Plugin,
+ Bcfg2.Server.Plugin.Caching,
+ Bcfg2.Server.Plugin.ClientRunHooks,
+ Bcfg2.Server.Plugin.Connector):
+ """ Query tags from AWS via boto, optionally setting group membership """
+ __rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['expire_cache']
+
+ def __init__(self, core, datastore):
+ Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
+ Bcfg2.Server.Plugin.ClientRunHooks.__init__(self)
+ Bcfg2.Server.Plugin.Connector.__init__(self)
+ try:
+ key_id = self.core.setup.cfp.get("awstags", "access_key_id")
+ secret_key = self.core.setup.cfp.get("awstags",
+ "secret_access_key")
+ except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
+ err = sys.exc_info()[1]
+ raise Bcfg2.Server.Plugin.PluginInitError(
+ "AWSTags is not configured in bcfg2.conf: %s" % err)
+ self.debug_log("%s: Connecting to EC2" % self.name)
+ self._ec2 = connect_ec2(aws_access_key_id=key_id,
+ aws_secret_access_key=secret_key)
+ self._tagcache = Cache()
+ try:
+ self._keep_cache = self.core.setup.cfp.getboolean("awstags",
+ "cache")
+ except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
+ self._keep_cache = True
+
+ self.config = PatternFile(os.path.join(self.data, 'config.xml'),
+ core=core)
+
+ def _load_instance(self, hostname):
+ """ Load an instance from EC2 whose private DNS name matches
+ the given hostname """
+ self.debug_log("AWSTags: Loading instance with private-dns-name=%s" %
+ hostname)
+ filters = {'private-dns-name': hostname}
+ reservations = self._ec2.get_all_instances(filters=filters)
+ if reservations:
+ res = reservations[0]
+ if res.instances:
+ return res.instances[0]
+ raise NoInstanceFound(
+ "AWSTags: No instance found with private-dns-name=%s" %
+ hostname)
+
+ def _get_tags_from_ec2(self, hostname):
+ """ Get tags for the given host from EC2. This does not use
+ the local caching layer. """
+ self.debug_log("AWSTags: Getting tags for %s from AWS" %
+ hostname)
+ try:
+ return self._load_instance(hostname).tags
+ except NoInstanceFound:
+ self.debug_log(sys.exc_info()[1])
+ return dict()
+
+ def get_tags(self, metadata):
+ """ Get tags for the given host. This caches the tags locally
+ if 'cache' in the ``[awstags]`` section of ``bcfg2.conf`` is
+ true. """
+ if not self._keep_cache:
+ return self._get_tags_from_ec2(metadata)
+
+ if metadata.hostname not in self._tagcache:
+ self._tagcache[metadata.hostname] = \
+ self._get_tags_from_ec2(metadata.hostname)
+ return self._tagcache[metadata.hostname]
+
+ def expire_cache(self, key=None):
+ self._tagcache.expire(key=key)
+
+ def start_client_run(self, metadata):
+ self.expire_cache(key=metadata.hostname)
+
+ def get_additional_data(self, metadata):
+ return self.get_tags(metadata)
+
+ def get_additional_groups(self, metadata):
+ return self.config.get_groups(metadata.hostname,
+ self.get_tags(metadata))
+
+
+class AWSTagsLint(Bcfg2.Server.Lint.ServerPlugin):
+ """ ``bcfg2-lint`` plugin to check all given :ref:`AWSTags
+ <server-plugins-connectors-awstags>` patterns for validity. """
+
+ def Run(self):
+ cfg = self.core.plugins['AWSTags'].config
+ for entry in cfg.xdata.xpath('//Tag'):
+ self.check(entry, "name")
+ if entry.get("value"):
+ self.check(entry, "value")
+
+ @classmethod
+ def Errors(cls):
+ return {"pattern-fails-to-initialize": "error"}
+
+ def check(self, entry, attr):
+ """ Check a single attribute (``name`` or ``value``) of a
+ single entry for validity. """
+ try:
+ re.compile(entry.get(attr))
+ except re.error:
+ self.LintError("pattern-fails-to-initialize",
+ "'%s' regex could not be compiled: %s\n %s" %
+ (attr, sys.exc_info()[1], entry.get("name")))
diff --git a/src/lib/Bcfg2/Server/Plugins/Bundler.py b/src/lib/Bcfg2/Server/Plugins/Bundler.py
index eef176cca..fb327f7ef 100644
--- a/src/lib/Bcfg2/Server/Plugins/Bundler.py
+++ b/src/lib/Bcfg2/Server/Plugins/Bundler.py
@@ -38,9 +38,9 @@ if HAS_GENSHI:
Bcfg2.Server.Plugin.StructFile):
""" Representation of a Genshi-templated bundle XML file """
- def __init__(self, name, specific, encoding):
+ def __init__(self, name, specific, encoding, fam=None):
TemplateFile.__init__(self, name, specific, encoding)
- Bcfg2.Server.Plugin.StructFile.__init__(self, name)
+ Bcfg2.Server.Plugin.StructFile.__init__(self, name, fam=fam)
self.logger = logging.getLogger(name)
def get_xml_value(self, metadata):
@@ -106,13 +106,14 @@ class Bundler(Bcfg2.Server.Plugin.Plugin,
nsmap['py'] == 'http://genshi.edgewall.org/')):
if HAS_GENSHI:
spec = Bcfg2.Server.Plugin.Specificity()
- return BundleTemplateFile(name, spec, self.encoding)
+ return BundleTemplateFile(name, spec, self.encoding,
+ fam=self.core.fam)
else:
raise Bcfg2.Server.Plugin.PluginExecutionError("Genshi not "
"available: %s"
% name)
else:
- return BundleFile(name, self.fam)
+ return BundleFile(name, fam=self.fam)
def BuildStructures(self, metadata):
"""Build all structures for client (metadata)."""
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgAuthorizedKeysGenerator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgAuthorizedKeysGenerator.py
index 824d01023..41d5588e4 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgAuthorizedKeysGenerator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgAuthorizedKeysGenerator.py
@@ -50,27 +50,36 @@ class CfgAuthorizedKeysGenerator(CfgGenerator, StructFile):
spec = self.XMLMatch(metadata)
rv = []
for allow in spec.findall("Allow"):
- params = ''
+ options = []
if allow.find("Params") is not None:
- params = ",".join("=".join(p)
- for p in allow.find("Params").attrib.items())
+ self.logger.warning("Use of <Params> in authorized_keys.xml "
+ "is deprecated; use <Option> instead")
+ options.extend("=".join(p)
+ for p in allow.find("Params").attrib.items())
+
+ for opt in allow.findall("Option"):
+ if opt.get("value"):
+ options.append("%s=%s" % (opt.get("name"),
+ opt.get("value")))
+ else:
+ options.append(opt.get("name"))
pubkey_name = allow.get("from")
if pubkey_name:
host = allow.get("host")
group = allow.get("group")
+ category = allow.get("category", self.category)
if host:
key_md = self.core.build_metadata(host)
elif group:
key_md = ClientMetadata("dummy", group, [group], [],
set(), set(), dict(), None,
None, None, None)
- elif (self.category and
- not metadata.group_in_category(self.category)):
+ elif category and not metadata.group_in_category(category):
self.logger.warning("Cfg: %s ignoring Allow from %s: "
"No group in category %s" %
(metadata.hostname, pubkey_name,
- self.category))
+ category))
continue
else:
key_md = metadata
@@ -96,6 +105,6 @@ class CfgAuthorizedKeysGenerator(CfgGenerator, StructFile):
(metadata.hostname,
lxml.etree.tostring(allow)))
continue
- rv.append(" ".join([params, pubkey]).strip())
+ rv.append(" ".join([",".join(options), pubkey]).strip())
return "\n".join(rv)
get_data.__doc__ = CfgGenerator.get_data.__doc__
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py
index c7b62f352..e890fdecb 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPrivateKeyCreator.py
@@ -159,7 +159,7 @@ class CfgPrivateKeyCreator(CfgCreator, StructFile):
return specificity
# pylint: disable=W0221
- def create_data(self, entry, metadata, return_pair=False):
+ def create_data(self, entry, metadata):
""" Create data for the given entry on the given client
:param entry: The abstract entry to create data for. This
@@ -167,15 +167,7 @@ class CfgPrivateKeyCreator(CfgCreator, StructFile):
:type entry: lxml.etree._Element
:param metadata: The client metadata to create data for
:type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
- :param return_pair: Return a tuple of ``(public key, private
- key)`` instead of just the private key.
- This is used by
- :class:`Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator.CfgPublicKeyCreator`
- to create public keys as requested.
- :type return_pair: bool
:returns: string - The private key data
- :returns: tuple - Tuple of ``(public key, private key)``, if
- ``return_pair`` is set to True
"""
spec = self.XMLMatch(metadata)
specificity = self.get_specificity(metadata, spec)
@@ -201,11 +193,7 @@ class CfgPrivateKeyCreator(CfgCreator, StructFile):
specificity['ext'] = '.crypt'
self.write_data(privkey, **specificity)
-
- if return_pair:
- return (pubkey, privkey)
- else:
- return privkey
+ return privkey
finally:
shutil.rmtree(os.path.dirname(filename))
# pylint: enable=W0221
@@ -230,7 +218,7 @@ class CfgPrivateKeyCreator(CfgCreator, StructFile):
if strict:
raise PluginExecutionError(msg)
else:
- self.logger.warning(msg)
+ self.logger.info(msg)
Index.__doc__ = StructFile.Index.__doc__
def _decrypt(self, element):
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py
index 6be438462..4bd8690ed 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPublicKeyCreator.py
@@ -2,7 +2,11 @@
:class:`Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator`
to create SSH keys on the fly. """
+import os
+import sys
+import tempfile
import lxml.etree
+from Bcfg2.Utils import Executor
from Bcfg2.Server.Plugin import StructFile, PluginExecutionError
from Bcfg2.Server.Plugins.Cfg import CfgCreator, CfgCreationError, CFG
@@ -27,7 +31,8 @@ class CfgPublicKeyCreator(CfgCreator, StructFile):
CfgCreator.__init__(self, fname)
StructFile.__init__(self, fname)
self.cfg = CFG
- __init__.__doc__ = CfgCreator.__init__.__doc__
+ self.core = CFG.core
+ self.cmd = Executor()
def create_data(self, entry, metadata):
if entry.get("name").endswith(".pub"):
@@ -37,25 +42,51 @@ class CfgPublicKeyCreator(CfgCreator, StructFile):
"%s: Filename does not end in .pub" %
entry.get("name"))
- if privkey not in self.cfg.entries:
- raise CfgCreationError("Cfg: Could not find Cfg entry for %s "
- "(private key for %s)" % (privkey,
- self.name))
- eset = self.cfg.entries[privkey]
+ privkey_entry = lxml.etree.Element("Path", name=privkey)
try:
+ self.core.Bind(privkey_entry, metadata)
+ except PluginExecutionError:
+ raise CfgCreationError("Cfg: Could not bind %s (private key for "
+ "%s): %s" % (privkey, self.name,
+ sys.exc_info()[1]))
+
+ try:
+ eset = self.cfg.entries[privkey]
creator = eset.best_matching(metadata,
eset.get_handlers(metadata,
CfgCreator))
+ except KeyError:
+ raise CfgCreationError("Cfg: No private key defined for %s (%s)" %
+ (self.name, privkey))
except PluginExecutionError:
raise CfgCreationError("Cfg: No privkey.xml defined for %s "
"(private key for %s)" % (privkey,
self.name))
- privkey_entry = lxml.etree.Element("Path", name=privkey)
- pubkey = creator.create_data(privkey_entry, metadata,
- return_pair=True)[0]
- return pubkey
- create_data.__doc__ = CfgCreator.create_data.__doc__
+ specificity = creator.get_specificity(metadata)
+ fname = self.get_filename(**specificity)
+
+ # if the private key didn't exist, then creating it may have
+ # created the private key, too. check for it first.
+ if os.path.exists(fname):
+ return open(fname).read()
+ else:
+ # generate public key from private key
+ fd, privfile = tempfile.mkstemp()
+ try:
+ os.fdopen(fd, 'w').write(privkey_entry.text)
+ cmd = ["ssh-keygen", "-y", "-f", privfile]
+ self.debug_log("Cfg: Extracting SSH public key from %s: %s" %
+ (privkey, " ".join(cmd)))
+ result = self.cmd.run(cmd)
+ if not result.success:
+ raise CfgCreationError("Cfg: Failed to extract public key "
+ "from %s: %s" % (privkey,
+ result.error))
+ self.write_data(result.stdout, **specificity)
+ return result.stdout
+ finally:
+ os.unlink(privfile)
def handle_event(self, event):
CfgCreator.handle_event(self, event)
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
index 154cd5e63..c6e2d0acb 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
@@ -10,6 +10,7 @@ import lxml.etree
import Bcfg2.Options
import Bcfg2.Server.Plugin
import Bcfg2.Server.Lint
+from fnmatch import fnmatch
from Bcfg2.Server.Plugin import PluginExecutionError
# pylint: disable=W0622
from Bcfg2.Compat import u_str, unicode, b64encode, walk_packages, \
@@ -100,6 +101,8 @@ class CfgBaseFileMatcher(Bcfg2.Server.Plugin.SpecificData,
experimental = False
def __init__(self, name, specific, encoding):
+ if not self.__specific__ and not specific:
+ specific = Bcfg2.Server.Plugin.Specificity(all=True)
Bcfg2.Server.Plugin.SpecificData.__init__(self, name, specific,
encoding)
Bcfg2.Server.Plugin.Debuggable.__init__(self)
@@ -898,6 +901,7 @@ class CfgLint(Bcfg2.Server.Lint.ServerPlugin):
self.check_delta(basename, entry)
self.check_pubkey(basename, entry)
self.check_missing_files()
+ self.check_conflicting_handlers()
@classmethod
def Errors(cls):
@@ -905,7 +909,8 @@ class CfgLint(Bcfg2.Server.Lint.ServerPlugin):
"diff-file-used": "warning",
"no-pubkey-xml": "warning",
"unknown-cfg-files": "error",
- "extra-cfg-files": "error"}
+ "extra-cfg-files": "error",
+ "multiple-global-handlers": "error"}
def check_delta(self, basename, entry):
""" check that no .cat or .diff files are in use """
@@ -940,22 +945,55 @@ class CfgLint(Bcfg2.Server.Lint.ServerPlugin):
"%s has no corresponding pubkey.xml at %s" %
(basename, pubkey))
+ def _list_path_components(self, path):
+ """ Get a list of all components of a path. E.g.,
+ ``self._list_path_components("/foo/bar/foobaz")`` would return
+ ``["foo", "bar", "foo", "baz"]``. The list is not guaranteed
+ to be in order."""
+ rv = []
+ remaining, component = os.path.split(path)
+ while component != '':
+ rv.append(component)
+ remaining, component = os.path.split(remaining)
+ return rv
+
+ def check_conflicting_handlers(self):
+ """ Check that a single entryset doesn't have multiple
+ non-specific (i.e., 'all') handlers. """
+ cfg = self.core.plugins['Cfg']
+ for eset in cfg.entries.values():
+ alls = [e for e in eset.entries.values()
+ if (e.specific.all and
+ issubclass(e.__class__, CfgGenerator))]
+ if len(alls) > 1:
+ self.LintError("multiple-global-handlers",
+ "%s has multiple global handlers: %s" %
+ (eset.path, ", ".join(os.path.basename(e.name)
+ for e in alls)))
+
def check_missing_files(self):
""" check that all files on the filesystem are known to Cfg """
cfg = self.core.plugins['Cfg']
# first, collect ignore patterns from handlers
- ignore = []
+ ignore = set()
for hdlr in handlers():
- ignore.extend(hdlr.__ignore__)
+ ignore.update(hdlr.__ignore__)
# next, get a list of all non-ignored files on the filesystem
all_files = set()
for root, _, files in os.walk(cfg.data):
- all_files.update(os.path.join(root, fname)
- for fname in files
- if not any(fname.endswith("." + i)
- for i in ignore))
+ for fname in files:
+ fpath = os.path.join(root, fname)
+ # check against the handler ignore patterns and the
+ # global FAM ignore list
+ if (not any(fname.endswith("." + i) for i in ignore) and
+ not any(fnmatch(fpath, p)
+ for p in self.config['ignore']) and
+ not any(fnmatch(c, p)
+ for p in self.config['ignore']
+ for c in self._list_path_components(fpath))):
+ all_files.add(fpath)
# next, get a list of all files known to Cfg
cfg_files = set()
diff --git a/src/lib/Bcfg2/Server/Plugins/GroupLogic.py b/src/lib/Bcfg2/Server/Plugins/GroupLogic.py
index 810b273af..d74c16e8b 100644
--- a/src/lib/Bcfg2/Server/Plugins/GroupLogic.py
+++ b/src/lib/Bcfg2/Server/Plugins/GroupLogic.py
@@ -3,7 +3,9 @@ template to dynamically set additional groups for clients. """
import os
import lxml.etree
+from threading import local
import Bcfg2.Server.Plugin
+from Bcfg2.Server.Plugins.Metadata import MetadataGroup
try:
from Bcfg2.Server.Plugins.Bundler import BundleTemplateFile
except ImportError:
@@ -35,13 +37,40 @@ class GroupLogic(Bcfg2.Server.Plugin.Plugin,
""" GroupLogic is a connector plugin that lets you use an XML
Genshi template to dynamically set additional groups for
clients. """
+ # perform grouplogic later than other Connector plugins, so it can
+ # use groups set by them
+ sort_order = 1000
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
Bcfg2.Server.Plugin.Connector.__init__(self)
self.config = GroupLogicConfig(os.path.join(self.data, "groups.xml"),
core.fam)
+ self._local = local()
def get_additional_groups(self, metadata):
- return [el.get("name")
- for el in self.config.get_xml_value(metadata).findall("Group")]
+ if not hasattr(self._local, "building"):
+ # building is a thread-local set that tracks which
+ # machines GroupLogic is getting additional groups for.
+ # If a get_additional_groups() is called twice for a
+ # machine before the first call has completed, the second
+ # call returns an empty list. This is for infinite
+ # recursion protection; without this check, it'd be
+ # impossible to use things like metadata.query.in_group()
+ # in GroupLogic, since that requires building all
+ # metadata, which requires running
+ # GroupLogic.get_additional_groups() for all hosts, which
+ # requires building all metadata...
+ self._local.building = set()
+ if metadata.hostname in self._local.building:
+ return []
+ self._local.building.add(metadata.hostname)
+ rv = []
+ for el in self.config.get_xml_value(metadata).findall("Group"):
+ if el.get("category"):
+ rv.append(MetadataGroup(el.get("name"),
+ category=el.get("category")))
+ else:
+ rv.append(el.get("name"))
+ self._local.building.discard(metadata.hostname)
+ return rv
diff --git a/src/lib/Bcfg2/Server/Plugins/Guppy.py b/src/lib/Bcfg2/Server/Plugins/Guppy.py
index 4f2601f15..3c9b8a459 100644
--- a/src/lib/Bcfg2/Server/Plugins/Guppy.py
+++ b/src/lib/Bcfg2/Server/Plugins/Guppy.py
@@ -37,6 +37,7 @@ class Guppy(Bcfg2.Server.Plugin.Plugin):
experimental = True
__rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Enable', 'Disable']
+ __child_rmi__ = __rmi__[:]
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py
index 6934739a3..047dd4f4e 100644
--- a/src/lib/Bcfg2/Server/Plugins/Metadata.py
+++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py
@@ -16,7 +16,9 @@ import Bcfg2.Server.Lint
import Bcfg2.Server.Plugin
import Bcfg2.Server.FileMonitor
from Bcfg2.Utils import locked
-from Bcfg2.Compat import MutableMapping, all, wraps # pylint: disable=W0622
+# pylint: disable=W0622
+from Bcfg2.Compat import MutableMapping, all, any, wraps
+# pylint: enable=W0622
from Bcfg2.version import Bcfg2VersionInfo
try:
@@ -219,6 +221,7 @@ class XMLMetadataConfig(Bcfg2.Server.Plugin.XMLFileBacked):
sys.exc_info()[1])
self.logger.error(msg)
raise Bcfg2.Server.Plugin.MetadataRuntimeError(msg)
+ self.load_xml()
def find_xml_for_xpath(self, xpath):
"""Find and load xml file containing the xpath query"""
@@ -487,6 +490,7 @@ class MetadataGroup(tuple): # pylint: disable=E0012,R0924
class Metadata(Bcfg2.Server.Plugin.Metadata,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.ClientRunHooks,
Bcfg2.Server.Plugin.DatabaseBacked):
"""This class contains data for bcfg2 server metadata."""
@@ -495,6 +499,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
def __init__(self, core, datastore, watch_clients=True):
Bcfg2.Server.Plugin.Metadata.__init__(self)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.ClientRunHooks.__init__(self)
Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
self.watch_clients = watch_clients
@@ -528,21 +533,24 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.raliases = {}
# mapping of groupname -> MetadataGroup object
self.groups = {}
- # mappings of predicate -> MetadataGroup object
+ # mappings of groupname -> [predicates]
self.group_membership = dict()
self.negated_groups = dict()
+ # list of group names in document order
+ self.ordered_groups = []
# mapping of hostname -> version string
if self._use_db:
self.versions = ClientVersions(core, datastore)
else:
self.versions = dict()
+
self.uuid = {}
self.session_cache = {}
self.default = None
self.pdirty = False
self.password = core.setup['password']
self.query = MetadataQuery(core.build_metadata,
- lambda: list(self.clients),
+ self.list_clients,
self.get_client_names_by_groups,
self.get_client_names_by_profiles,
self.get_all_group_names,
@@ -672,14 +680,15 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
""" Generic method to modify XML data (group, client, etc.) """
node = self._search_xdata(tag, name, config.xdata, alias=alias)
if node is None:
- self.logger.error("%s \"%s\" does not exist" % (tag, name))
- raise Bcfg2.Server.Plugin.MetadataConsistencyError
+ msg = "%s \"%s\" does not exist" % (tag, name)
+ self.logger.error(msg)
+ raise Bcfg2.Server.Plugin.MetadataConsistencyError(msg)
xdict = config.find_xml_for_xpath('.//%s[@name="%s"]' %
(tag, node.get('name')))
if not xdict:
- self.logger.error("Unexpected error finding %s \"%s\"" %
- (tag, name))
- raise Bcfg2.Server.Plugin.MetadataConsistencyError
+ msg = 'Unexpected error finding %s "%s"' % (tag, name)
+ self.logger.error(msg)
+ raise Bcfg2.Server.Plugin.MetadataConsistencyError(msg)
for key, val in list(attribs.items()):
xdict['xquery'][0].set(key, val)
config.write_xml(xdict['filename'], xdict['xmltree'])
@@ -749,7 +758,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
return self._remove_xdata(self.groups_xml, "Bundle", bundle_name)
def remove_client(self, client_name):
- """Remove a bundle."""
+ """Remove a client."""
if self._use_db:
try:
client = MetadataClientModel.objects.get(hostname=client_name)
@@ -830,51 +839,34 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
if self._use_db:
self.clients = self.list_clients()
+ def _get_condition(self, element):
+ """ Return a predicate that returns True if a client meets
+ the condition specified in the given Group or Client
+ element """
+ negate = element.get('negate', 'false').lower() == 'true'
+ pname = element.get("name")
+ if element.tag == 'Group':
+ return lambda c, g, _: negate != (pname in g)
+ elif element.tag == 'Client':
+ return lambda c, g, _: negate != (pname == c)
+
+ def _get_category_condition(self, grpname):
+ """ get a predicate that returns False if a client is already
+ a member of a group in the given group's category, True
+ otherwise"""
+ return lambda client, _, categories: \
+ bool(self._check_category(client, grpname, categories))
+
+ def _aggregate_conditions(self, conditions):
+ """ aggregate all conditions on a given group declaration
+ into a single predicate """
+ return lambda client, groups, cats: \
+ all(cond(client, groups, cats) for cond in conditions)
+
def _handle_groups_xml_event(self, _): # pylint: disable=R0912
""" re-read groups.xml on any event on it """
self.groups = {}
- # these three functions must be separate functions in order to
- # ensure that the scope is right for the closures they return
- def get_condition(element):
- """ Return a predicate that returns True if a client meets
- the condition specified in the given Group or Client
- element """
- negate = element.get('negate', 'false').lower() == 'true'
- pname = element.get("name")
- if element.tag == 'Group':
- return lambda c, g, _: negate != (pname in g)
- elif element.tag == 'Client':
- return lambda c, g, _: negate != (pname == c)
-
- def get_category_condition(category, gname):
- """ get a predicate that returns False if a client is
- already a member of a group in the given category, True
- otherwise """
- def in_cat(client, groups, categories): # pylint: disable=W0613
- """ return True if the client is already a member of a
- group in the category given in the enclosing function,
- False otherwise """
- if category in categories:
- if (gname not in self.groups or
- client not in self.groups[gname].warned):
- self.logger.warning("%s: Group %s suppressed by "
- "category %s; %s already a member "
- "of %s" %
- (self.name, gname, category,
- client, categories[category]))
- if gname in self.groups:
- self.groups[gname].warned.append(client)
- return False
- return True
- return in_cat
-
- def aggregate_conditions(conditions):
- """ aggregate all conditions on a given group declaration
- into a single predicate """
- return lambda client, groups, cats: \
- all(cond(client, groups, cats) for cond in conditions)
-
# first, we get a list of all of the groups declared in the
# file. we do this in two stages because the old way of
# parsing groups.xml didn't support nested groups; in the old
@@ -900,6 +892,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.group_membership = dict()
self.negated_groups = dict()
+ self.ordered_groups = []
# confusing loop condition; the XPath query asks for all
# elements under a Group tag under a Groups tag; that is
@@ -910,37 +903,47 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
# XPath. We do the same thing for Client tags.
for el in self.groups_xml.xdata.xpath("//Groups/Group//*") + \
self.groups_xml.xdata.xpath("//Groups/Client//*"):
- if ((el.tag != 'Group' and el.tag != 'Client') or
- el.getchildren()):
+ if (el.tag != 'Group' and el.tag != 'Client') or el.getchildren():
continue
conditions = []
for parent in el.iterancestors():
- cond = get_condition(parent)
+ cond = self._get_condition(parent)
if cond:
conditions.append(cond)
gname = el.get("name")
if el.get("negate", "false").lower() == "true":
- self.negated_groups[aggregate_conditions(conditions)] = \
- self.groups[gname]
+ self.negated_groups.setdefault(gname, [])
+ self.negated_groups[gname].append(
+ self._aggregate_conditions(conditions))
else:
if self.groups[gname].category:
- conditions.append(
- get_category_condition(self.groups[gname].category,
- gname))
+ conditions.append(self._get_category_condition(gname))
- self.group_membership[aggregate_conditions(conditions)] = \
- self.groups[gname]
+ if gname not in self.ordered_groups:
+ self.ordered_groups.append(gname)
+ self.group_membership.setdefault(gname, [])
+ self.group_membership[gname].append(
+ self._aggregate_conditions(conditions))
self.states['groups.xml'] = True
+ def expire_cache(self, key=None):
+ self.core.metadata_cache.expire(key)
+
def HandleEvent(self, event):
"""Handle update events for data files."""
for handles, event_handler in self.handlers.items():
if handles(event):
# clear the entire cache when we get an event for any
# metadata file
- self.core.metadata_cache.expire()
+ self.expire_cache()
+
+ # clear out the list of category suppressions that
+ # have been warned about, since this may change when
+ # clients.xml or groups.xml changes.
+ for group in self.groups.values():
+ group.warned = []
event_handler(event)
if False not in list(self.states.values()) and self.debug_flag:
@@ -978,17 +981,21 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.logger.error(msg)
raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
- profiles = [g for g in self.clientgroups[client]
- if g in self.groups and self.groups[g].is_profile]
- self.logger.info("Changing %s profile from %s to %s" %
- (client, profiles, profile))
- self.update_client(client, dict(profile=profile))
- if client in self.clientgroups:
- for prof in profiles:
- self.clientgroups[client].remove(prof)
- self.clientgroups[client].append(profile)
+ metadata = self.core.build_metadata(client)
+ if metadata.profile != profile:
+ self.logger.info("Changing %s profile from %s to %s" %
+ (client, metadata.profile, profile))
+ self.update_client(client, dict(profile=profile))
+ if client in self.clientgroups:
+ if metadata.profile in self.clientgroups[client]:
+ self.clientgroups[client].remove(metadata.profile)
+ self.clientgroups[client].append(profile)
+ else:
+ self.clientgroups[client] = [profile]
else:
- self.clientgroups[client] = [profile]
+ self.logger.debug(
+ "Ignoring %s request to change profile from %s to %s"
+ % (client, metadata.profile, profile))
else:
self.logger.info("Creating new client: %s, profile %s" %
(client, profile))
@@ -1004,8 +1011,8 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.add_client(client, dict(profile=profile))
self.clients.append(client)
self.clientgroups[client] = [profile]
- if not self._use_db:
- self.clients_xml.write()
+ if not self._use_db:
+ self.clients_xml.write()
def set_version(self, client, version):
"""Set version for provided client."""
@@ -1060,7 +1067,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
if cname in self.aliases:
return self.aliases[cname]
return cname
- except socket.herror:
+ except (socket.gaierror, socket.herror):
err = "Address resolution error for %s: %s" % (address,
sys.exc_info()[1])
self.logger.error(err)
@@ -1075,22 +1082,77 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
categories = dict()
while numgroups != len(groups):
numgroups = len(groups)
- for predicate, group in self.group_membership.items():
- if group.name in groups:
+ newgroups = set()
+ removegroups = set()
+ for grpname in self.ordered_groups:
+ if grpname in groups:
continue
- if predicate(client, groups, categories):
- groups.add(group.name)
- if group.category:
- categories[group.category] = group.name
- for predicate, group in self.negated_groups.items():
- if group.name not in groups:
+ if any(p(client, groups, categories)
+ for p in self.group_membership[grpname]):
+ newgroups.add(grpname)
+ if (grpname in self.groups and
+ self.groups[grpname].category):
+ categories[self.groups[grpname].category] = grpname
+ groups.update(newgroups)
+ for grpname, predicates in self.negated_groups.items():
+ if grpname not in groups:
continue
- if predicate(client, groups, categories):
- groups.remove(group.name)
- if group.category:
- del categories[group.category]
+ if any(p(client, groups, categories) for p in predicates):
+ removegroups.add(grpname)
+ if (grpname in self.groups and
+ self.groups[grpname].category):
+ del categories[self.groups[grpname].category]
+ groups.difference_update(removegroups)
return (groups, categories)
+ def _check_category(self, client, grpname, categories):
+ """ Determine if the given client is already a member of a
+ group in the same category as the named group.
+
+ The return value is one of three possibilities:
+
+ * If the client is already a member of a group in the same
+ category, then False is returned (i.e., the category check
+ failed);
+ * If the group is not in any categories, then True is returned;
+ * If the group is not a member of a group in the category,
+ then the name of the category is returned. This makes it
+ easy to add the category to the ClientMetadata object (or
+ other category list).
+
+ If a pure boolean value is required, you can do
+ ``bool(self._check_category(...))``.
+ """
+ if grpname not in self.groups:
+ return True
+ category = self.groups[grpname].category
+ if not category:
+ return True
+ if category in categories:
+ if client not in self.groups[grpname].warned:
+ self.logger.warning("%s: Group %s suppressed by category %s; "
+ "%s already a member of %s" %
+ (self.name, grpname, category,
+ client, categories[category]))
+ self.groups[grpname].warned.append(client)
+ return False
+ return category
+
+ def _check_and_add_category(self, client, grpname, categories):
+ """ If the client is not a member of a group in the same
+ category as the named group, then the category is added to
+ ``categories``.
+ :func:`Bcfg2.Server.Plugins.Metadata._check_category` is used
+ to determine if the category can be added.
+
+ If the category check failed, returns False; otherwise,
+ returns True. """
+ rv = self._check_category(client, grpname, categories)
+ if rv and rv is not True:
+ categories[rv] = grpname
+ return True
+ return rv
+
def get_initial_metadata(self, client): # pylint: disable=R0914,R0912
"""Return the metadata for a given client."""
if False in list(self.states.values()):
@@ -1112,39 +1174,37 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
Handles setting categories and category suppression.
Returns the new profile for the client (which might be
unchanged). """
- groups.add(grpname)
if grpname in self.groups:
- group = self.groups[grpname]
- category = group.category
- if category:
- if category in categories:
- self.logger.warning("%s: Group %s suppressed by "
- "category %s; %s already a member "
- "of %s" %
- (self.name, grpname, category,
- client, categories[category]))
- return
- categories[category] = grpname
- if not profile and group.is_profile:
+ if not self._check_and_add_category(client, grpname,
+ categories):
+ return profile
+ groups.add(grpname)
+ if not profile and self.groups[grpname].is_profile:
return grpname
else:
return profile
+ else:
+ groups.add(grpname)
+ return profile
if client not in self.clients:
pgroup = None
if client in self.clientgroups:
pgroup = self.clientgroups[client][0]
+ self.debug_log("%s: Adding new client with profile %s" %
+ (self.name, pgroup))
elif self.default:
pgroup = self.default
+ self.debug_log("%s: Adding new client with default profile %s"
+ % (self.name, pgroup))
if pgroup:
self.set_profile(client, pgroup, (None, None),
require_public=False)
profile = _add_group(pgroup)
else:
- msg = "Cannot add new client %s; no default group set" % client
- self.logger.error(msg)
- raise Bcfg2.Server.Plugin.MetadataConsistencyError(msg)
+ raise Bcfg2.Server.Plugin.MetadataConsistencyError(
+ "Cannot add new client %s; no default group set" % client)
for cgroup in self.clientgroups.get(client, []):
if cgroup in groups:
@@ -1153,6 +1213,9 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.groups[cgroup] = MetadataGroup(cgroup)
profile = _add_group(cgroup)
+ # we do this before setting the default because there may be
+ # groups set in <Client> tags in groups.xml that we want to
+ # set
groups, categories = self._merge_groups(client, groups,
categories=categories)
@@ -1201,8 +1264,8 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
""" return a list of all group names """
all_groups = set()
all_groups.update(self.groups.keys())
- all_groups.update([g.name for g in self.group_membership.values()])
- all_groups.update([g.name for g in self.negated_groups.values()])
+ all_groups.update(self.group_membership.keys())
+ all_groups.update(self.negated_groups.keys())
for grp in self.clientgroups.values():
all_groups.update(grp)
return all_groups
@@ -1215,7 +1278,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
def get_client_names_by_profiles(self, profiles):
""" return a list of names of clients in the given profile groups """
rv = []
- for client in list(self.clients):
+ for client in self.list_clients():
mdata = self.core.build_metadata(client)
if mdata.profile in profiles:
rv.append(client)
@@ -1223,34 +1286,33 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
def get_client_names_by_groups(self, groups):
""" return a list of names of clients in the given groups """
- mdata = [self.core.build_metadata(client) for client in self.clients]
- return [md.hostname for md in mdata if md.groups.issuperset(groups)]
+ rv = []
+ for client in self.list_clients():
+ mdata = self.core.build_metadata(client)
+ if mdata.groups.issuperset(groups):
+ rv.append(client)
+ return rv
def get_client_names_by_bundles(self, bundles):
""" given a list of bundles, return a list of names of clients
that use those bundles """
- mdata = [self.core.build_metadata(client) for client in self.clients]
- return [md.hostname for md in mdata if md.bundles.issuperset(bundles)]
+ rv = []
+ for client in self.list_clients():
+ mdata = self.core.build_metadata(client)
+ if mdata.bundles.issuperset(bundles):
+ rv.append(client)
+ return rv
def merge_additional_groups(self, imd, groups):
for group in groups:
if group in imd.groups:
continue
- if group in self.groups and self.groups[group].category:
- category = self.groups[group].category
- if self.groups[group].category in imd.categories:
- self.logger.warning("%s: Group %s suppressed by category "
- "%s; %s already a member of %s" %
- (self.name, group, category,
- imd.hostname,
- imd.categories[category]))
- continue
- imd.categories[category] = group
+ if not self._check_and_add_category(imd.hostname, group,
+ imd.categories):
+ continue
imd.groups.add(group)
- self._merge_groups(imd.hostname, imd.groups,
- categories=imd.categories)
-
+ self._merge_groups(imd.hostname, imd.groups, categories=imd.categories)
for group in imd.groups:
if group in self.groups:
imd.bundles.update(self.groups[group].bundles)
@@ -1398,7 +1460,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
viz_str.extend(self._viz_groups(egroups, bundles, clientmeta))
if key:
for category in categories:
- viz_str.append('"%s" [label="%s", shape="record", '
+ viz_str.append('"%s" [label="%s", shape="trapezium", '
'style="filled", fillcolor="%s"];' %
(category, category, categories[category]))
return "\n".join("\t" + s for s in viz_str)
@@ -1412,8 +1474,8 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
instances = {}
rv = []
- for client in list(self.clients):
- if include_client(client):
+ for client in list(self.list_clients()):
+ if not include_client(client):
continue
if client in self.clientgroups:
grps = self.clientgroups[client]
@@ -1441,9 +1503,10 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
the graph"""
return not clientmeta or bundle in clientmeta.bundles
- bundles = list(set(bund.get('name'))
- for bund in self.groups_xml.xdata.findall('.//Bundle')
- if include_bundle(bund.get('name')))
+ bundles = \
+ list(set(bund.get('name')
+ for bund in self.groups_xml.xdata.findall('.//Bundle')
+ if include_bundle(bund.get('name'))))
bundles.sort()
return ['"bundle-%s" [ label="%s", shape="septagon"];' % (bundle,
bundle)
@@ -1589,15 +1652,35 @@ class MetadataLint(Bcfg2.Server.Lint.ServerPlugin):
"client")
def duplicate_groups(self):
- """ Check for groups that are defined more than once. We
- count a group tag as a definition if it a) has profile or
- public set; or b) has any children."""
- allgroups = [
- g
- for g in self.metadata.groups_xml.xdata.xpath("//Groups/Group") +
- self.metadata.groups_xml.xdata.xpath("//Groups/Group//Group")
- if g.get("profile") or g.get("public") or g.getchildren()]
- self.duplicate_entries(allgroups, "group")
+ """ Check for groups that are defined more than once. There
+ are two ways this can happen:
+
+ 1. The group is listed twice with contradictory options.
+ 2. The group is listed with no options *first*, and then with
+ options later.
+
+ In this context, 'first' refers to the order in which groups
+ are parsed; see the loop condition below and
+ _handle_groups_xml_event above for details. """
+ groups = dict()
+ duplicates = dict()
+ for grp in self.metadata.groups_xml.xdata.xpath("//Groups/Group") + \
+ self.metadata.groups_xml.xdata.xpath("//Groups/Group//Group"):
+ grpname = grp.get("name")
+ if grpname in duplicates:
+ duplicates[grpname].append(grp)
+ elif len(grp.attrib) > 1: # group has options
+ if grpname in groups:
+ duplicates[grpname] = [grp, groups[grpname]]
+ else:
+ groups[grpname] = grp
+ else: # group has no options
+ groups[grpname] = grp
+ for grpname, grps in duplicates.items():
+ self.LintError("duplicate-group",
+ "Group %s is defined multiple times:\n%s" %
+ (grpname,
+ "\n".join(self.RenderXML(g) for g in grps)))
def duplicate_entries(self, allentries, etype):
""" Generic duplicate entry finder.
diff --git a/src/lib/Bcfg2/Server/Plugins/NagiosGen.py b/src/lib/Bcfg2/Server/Plugins/NagiosGen.py
index 466665382..8f1d03586 100644
--- a/src/lib/Bcfg2/Server/Plugins/NagiosGen.py
+++ b/src/lib/Bcfg2/Server/Plugins/NagiosGen.py
@@ -24,9 +24,9 @@ class NagiosGen(Bcfg2.Server.Plugin.Plugin,
'config.xml'),
core.fam, should_monitor=True,
create=self.name)
- self.Entries = {'Path':
- {'/etc/nagiosgen.status': self.createhostconfig,
- '/etc/nagios/nagiosgen.cfg': self.createserverconfig}}
+ self.Entries = {
+ 'Path': {'/etc/nagiosgen.status': self.createhostconfig,
+ '/etc/nagios/nagiosgen.cfg': self.createserverconfig}}
self.client_attrib = {'encoding': 'ascii',
'owner': 'root',
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Collection.py b/src/lib/Bcfg2/Server/Plugins/Packages/Collection.py
index b25cb0fc4..39c51f351 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/Collection.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/Collection.py
@@ -614,6 +614,10 @@ class Collection(list, Bcfg2.Server.Plugin.Debuggable):
self.filter_unknown(unknown)
return packages, unknown
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__,
+ list.__repr__(self))
+
def get_collection_class(source_type):
""" Given a source type, determine the class of Collection object
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
index 332f0bbab..c47e18201 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
@@ -88,13 +88,12 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile,
:type event: Bcfg2.Server.FileMonitor.Event
:returns: None
"""
- Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event)
if event and event.filename != self.name:
for fpath in self.extras:
if fpath == os.path.abspath(event.filename):
self.parsed.add(fpath)
break
-
+ Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event)
if self.loaded:
self.logger.info("Reloading Packages plugin")
self.pkg_obj.Reload()
@@ -111,10 +110,11 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile,
def Index(self):
Bcfg2.Server.Plugin.StructFile.Index(self)
self.entries = []
- for xsource in self.xdata.findall('.//Source'):
- source = self.source_from_xml(xsource)
- if source is not None:
- self.entries.append(source)
+ if self.loaded:
+ for xsource in self.xdata.findall('.//Source'):
+ source = self.source_from_xml(xsource)
+ if source is not None:
+ self.entries.append(source)
Index.__doc__ = Bcfg2.Server.Plugin.StructFile.Index.__doc__ + """
``Index`` is responsible for calling :func:`source_from_xml`
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
index 4608bcca5..66f8e9dbe 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
@@ -53,13 +53,15 @@ The Yum Backend
import os
import re
import sys
+import time
import copy
import errno
import socket
import logging
import lxml.etree
-from subprocess import Popen, PIPE
import Bcfg2.Server.Plugin
+from lockfile import FileLock
+from Bcfg2.Utils import Executor
# pylint: disable=W0622
from Bcfg2.Compat import StringIO, cPickle, HTTPError, URLError, \
ConfigParser, any
@@ -102,9 +104,6 @@ FL = '{http://linux.duke.edu/metadata/filelists}'
PULPSERVER = None
PULPCONFIG = None
-#: The path to bcfg2-yum-helper
-HELPER = None
-
def _setup_pulp(setup):
""" Connect to a Pulp server and pass authentication credentials.
@@ -263,6 +262,8 @@ class YumCollection(Collection):
.. private-include: _add_gpg_instances, _get_pulp_consumer
"""
+ _helper = None
+
#: Options that are included in the [packages:yum] section of the
#: config but that should not be included in the temporary
#: yum.conf we write out
@@ -277,18 +278,25 @@ class YumCollection(Collection):
debug=debug)
self.keypath = os.path.join(self.cachepath, "keys")
+ #: A :class:`Bcfg2.Utils.Executor` object to use to run
+ #: external commands
+ self.cmd = Executor()
+
if self.use_yum:
#: Define a unique cache file for this collection to use
#: for cached yum metadata
self.cachefile = os.path.join(self.cachepath,
"cache-%s" % self.cachekey)
- if not os.path.exists(self.cachefile):
- os.mkdir(self.cachefile)
#: The path to the server-side config file used when
#: resolving packages with the Python yum libraries
self.cfgfile = os.path.join(self.cachefile, "yum.conf")
- self.write_config()
+
+ if not os.path.exists(self.cachefile):
+ self.debug_log("Creating common cache %s" % self.cachefile)
+ os.mkdir(self.cachefile)
+ if not self.disableMetaData:
+ self.setup_data()
else:
self.cachefile = None
@@ -309,7 +317,28 @@ class YumCollection(Collection):
self.logger.error("Could not create Pulp consumer "
"cert directory at %s: %s" %
(certdir, err))
- self.pulp_cert_set = PulpCertificateSet(certdir, self.fam)
+ self.__class__.pulp_cert_set = PulpCertificateSet(certdir,
+ self.fam)
+
+ @property
+ def disableMetaData(self):
+ """ Report whether or not metadata processing is enabled.
+ This duplicates code in Packages/__init__.py, and can probably
+ be removed in Bcfg2 1.4 when we have a module-level setup
+ object. """
+ if self.setup is None:
+ return True
+ try:
+ return not self.setup.cfp.getboolean("packages", "resolver")
+ except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
+ return False
+ except ValueError:
+ # for historical reasons we also accept "enabled" and
+ # "disabled"
+ return self.setup.cfp.get(
+ "packages",
+ "metadata",
+ default="enabled").lower() == "disabled"
@property
def __package_groups__(self):
@@ -323,20 +352,21 @@ class YumCollection(Collection):
a call to it; I wish there was a way to do this without
forking, but apparently not); finally we check in /usr/sbin,
the default location. """
- global HELPER
- if not HELPER:
+ if not self._helper:
+ # pylint: disable=W0212
try:
- HELPER = self.setup.cfp.get("packages:yum", "helper")
+ self.__class__._helper = self.setup.cfp.get("packages:yum",
+ "helper")
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
# first see if bcfg2-yum-helper is in PATH
try:
self.debug_log("Checking for bcfg2-yum-helper in $PATH")
- Popen(['bcfg2-yum-helper'],
- stdin=PIPE, stdout=PIPE, stderr=PIPE).wait()
- HELPER = 'bcfg2-yum-helper'
+ self.cmd.run(['bcfg2-yum-helper'])
+ self.__class__._helper = 'bcfg2-yum-helper'
except OSError:
- HELPER = "/usr/sbin/bcfg2-yum-helper"
- return HELPER
+ self.__class__._helper = "/usr/sbin/bcfg2-yum-helper"
+ # pylint: enable=W0212
+ return self._helper
@property
def use_yum(self):
@@ -374,6 +404,7 @@ class YumCollection(Collection):
# the rpmdb is so hopelessly intertwined with yum that we
# have to totally reinvent the dependency resolver.
mainopts = dict(cachedir='/',
+ persistdir='/',
installroot=self.cachefile,
keepcache="0",
debuglevel="0",
@@ -840,6 +871,17 @@ class YumCollection(Collection):
if not self.use_yum:
return Collection.complete(self, packagelist)
+ lock = FileLock(os.path.join(self.cachefile, "lock"))
+ slept = 0
+ while lock.is_locked():
+ if slept > 30:
+ self.logger.warning("Packages: Timeout waiting for yum cache "
+ "to release its lock")
+ return set(), set()
+ self.logger.debug("Packages: Yum cache is locked, waiting...")
+ time.sleep(3)
+ slept += 3
+
if packagelist:
try:
result = self.call_helper(
@@ -888,40 +930,30 @@ class YumCollection(Collection):
cmd.append("-v")
cmd.append(command)
self.debug_log("Packages: running %s" % " ".join(cmd))
- try:
- helper = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- except OSError:
- err = sys.exc_info()[1]
- self.logger.error("Packages: Failed to execute %s: %s" %
- (" ".join(cmd), err))
- return None
if inputdata:
- idata = json.dumps(inputdata)
- (stdout, stderr) = helper.communicate(idata)
+ result = self.cmd.run(cmd, timeout=self.setup['client_timeout'],
+ inputdata=json.dumps(inputdata))
else:
- (stdout, stderr) = helper.communicate()
- rv = helper.wait()
- errlines = stderr.splitlines()
- if rv:
- if not errlines:
- errlines.append("No error output")
- self.logger.error("Packages: error running bcfg2-yum-helper "
- "(returned %d): %s" % (rv, errlines[0]))
- for line in errlines[1:]:
- self.logger.error("Packages: %s" % line)
- elif errlines:
+ result = self.cmd.run(cmd, timeout=self.setup['client_timeout'])
+ if not result.success:
+ self.logger.error("Packages: error running bcfg2-yum-helper: %s" %
+ result.error)
+ elif result.stderr:
self.debug_log("Packages: debug info from bcfg2-yum-helper: %s" %
- errlines[0])
- for line in errlines[1:]:
- self.debug_log("Packages: %s" % line)
+ result.stderr)
try:
- return json.loads(stdout)
+ return json.loads(result.stdout)
except ValueError:
- err = sys.exc_info()[1]
- self.logger.error("Packages: error reading bcfg2-yum-helper "
- "output: %s" % err)
+ if result.stdout:
+ err = sys.exc_info()[1]
+ self.logger.error("Packages: Error reading bcfg2-yum-helper "
+ "output: %s" % err)
+ self.logger.error("Packages: bcfg2-yum-helper output: %s" %
+ result.stdout)
+ else:
+ self.logger.error("Packages: No bcfg2-yum-helper output")
raise
def setup_data(self, force_update=False):
@@ -934,8 +966,7 @@ class YumCollection(Collection):
If using the yum Python libraries, this cleans up cached yum
metadata, regenerates the server-side yum config (in order to
catch any new sources that have been added to this server),
- and then cleans up cached yum metadata again, in case the new
- config has any preexisting cache.
+ then regenerates the yum cache.
:param force_update: Ignore all local cache and setup data
from its original upstream sources (i.e.,
@@ -946,23 +977,22 @@ class YumCollection(Collection):
return Collection.setup_data(self, force_update)
if force_update:
- # we call this twice: one to clean up data from the old
- # config, and once to clean up data from the new config
+ # clean up data from the old config
try:
self.call_helper("clean")
except ValueError:
# error reported by call_helper
pass
- os.unlink(self.cfgfile)
+ if os.path.exists(self.cfgfile):
+ os.unlink(self.cfgfile)
self.write_config()
- if force_update:
- try:
- self.call_helper("clean")
- except ValueError:
- # error reported by call_helper
- pass
+ try:
+ self.call_helper("makecache")
+ except ValueError:
+ # error reported by call_helper
+ pass
class YumSource(Source):
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
index f93bd0932..479138ef1 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
@@ -9,7 +9,8 @@ import shutil
import lxml.etree
import Bcfg2.Logger
import Bcfg2.Server.Plugin
-from Bcfg2.Compat import ConfigParser, urlopen, HTTPError, URLError
+from Bcfg2.Compat import ConfigParser, urlopen, HTTPError, URLError, \
+ MutableMapping
from Bcfg2.Server.Plugins.Packages.Collection import Collection, \
get_collection_class
from Bcfg2.Server.Plugins.Packages.PackagesSources import PackagesSources
@@ -22,7 +23,54 @@ APT_CONFIG_DEFAULT = \
"/etc/apt/sources.list.d/bcfg2-packages-generated-sources.list"
+class OnDemandDict(MutableMapping):
+ """ This maps a set of keys to a set of value-getting functions;
+ the values are populated on-the-fly by the functions as the values
+ are needed (and not before). This is used by
+ :func:`Bcfg2.Server.Plugins.Packages.Packages.get_additional_data`;
+ see the docstring for that function for details on why.
+
+ Unlike a dict, you should not specify values for for the righthand
+ side of this mapping, but functions that get values. E.g.:
+
+ .. code-block:: python
+
+ d = OnDemandDict(foo=load_foo,
+ bar=lambda: "bar");
+ """
+
+ def __init__(self, **getters):
+ self._values = dict()
+ self._getters = dict(**getters)
+
+ def __getitem__(self, key):
+ if key not in self._values:
+ self._values[key] = self._getters[key]()
+ return self._values[key]
+
+ def __setitem__(self, key, getter):
+ self._getters[key] = getter
+
+ def __delitem__(self, key):
+ del self._values[key]
+ del self._getters[key]
+
+ def __len__(self):
+ return len(self._getters)
+
+ def __iter__(self):
+ return iter(self._getters.keys())
+
+ def __repr__(self):
+ rv = dict(self._values)
+ for key in self._getters.keys():
+ if key not in rv:
+ rv[key] = 'unknown'
+ return str(rv)
+
+
class Packages(Bcfg2.Server.Plugin.Plugin,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.StructureValidator,
Bcfg2.Server.Plugin.Generator,
Bcfg2.Server.Plugin.Connector,
@@ -45,8 +93,12 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
#: and :func:`Reload`
__rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Refresh', 'Reload']
+ __child_rmi__ = Bcfg2.Server.Plugin.Plugin.__child_rmi__ + \
+ [('Refresh', 'expire_cache'), ('Reload', 'expire_cache')]
+
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.StructureValidator.__init__(self)
Bcfg2.Server.Plugin.Generator.__init__(self)
Bcfg2.Server.Plugin.Connector.__init__(self)
@@ -110,8 +162,21 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
#: object when one is requested, so each entry is very
#: short-lived -- it's purged at the end of each client run.
self.clients = dict()
- # pylint: enable=C0301
+ #: groupcache caches group lookups. It maps Collections (via
+ #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`)
+ #: to sets of package groups, and thence to the packages
+ #: indicated by those groups.
+ self.groupcache = dict()
+
+ #: pkgcache caches complete package sets. It maps Collections
+ #: (via
+ #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`)
+ #: to sets of initial packages, and thence to the final
+ #: (complete) package selections resolved from the initial
+ #: packages
+ self.pkgcache = dict()
+ # pylint: enable=C0301
__init__.__doc__ = Bcfg2.Server.Plugin.Plugin.__init__.__doc__
def set_debug(self, debug):
@@ -355,14 +420,24 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
for el in to_remove:
el.getparent().remove(el)
- gpkgs = collection.get_groups(groups)
- for pkgs in gpkgs.values():
+ groups.sort()
+ # check for this set of groups in the group cache
+ gkey = hash(tuple(groups))
+ if gkey not in self.groupcache[collection.cachekey]:
+ self.groupcache[collection.cachekey][gkey] = \
+ collection.get_groups(groups)
+ for pkgs in self.groupcache[collection.cachekey][gkey].values():
base.update(pkgs)
# essential pkgs are those marked as such by the distribution
base.update(collection.get_essential())
- packages, unknown = collection.complete(base)
+ # check for this set of packages in the package cache
+ pkey = hash(tuple(base))
+ if pkey not in self.pkgcache[collection.cachekey]:
+ self.pkgcache[collection.cachekey][pkey] = \
+ collection.complete(base)
+ packages, unknown = self.pkgcache[collection.cachekey][pkey]
if unknown:
self.logger.info("Packages: Got %d unknown entries" % len(unknown))
self.logger.info("Packages: %s" % list(unknown))
@@ -388,6 +463,9 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
self._load_config()
return True
+ def expire_cache(self, _=None):
+ self.Reload()
+
def _load_config(self, force_update=False):
"""
Load the configuration data and setup sources
@@ -415,9 +493,11 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
if not self.disableMetaData:
collection.setup_data(force_update)
- # clear Collection caches
+ # clear Collection and package caches
self.clients = dict()
self.collections = dict()
+ self.groupcache = dict()
+ self.pkgcache = dict()
for source in self.sources.entries:
cachefiles.add(source.cachefile)
@@ -493,8 +573,12 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
if not self.sources.loaded:
# if sources.xml has not received a FAM event yet, defer;
# instantiate a dummy Collection object
- return Collection(metadata, [], self.cachepath, self.data,
- self.core.fam)
+ collection = Collection(metadata, [], self.cachepath, self.data,
+ self.core.fam)
+ ckey = collection.cachekey
+ self.groupcache.setdefault(ckey, dict())
+ self.pkgcache.setdefault(ckey, dict())
+ return collection
if metadata.hostname in self.clients:
return self.collections[self.clients[metadata.hostname]]
@@ -531,24 +615,47 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
if cclass != Collection:
self.clients[metadata.hostname] = ckey
self.collections[ckey] = collection
+ self.groupcache.setdefault(ckey, dict())
+ self.pkgcache.setdefault(ckey, dict())
return collection
def get_additional_data(self, metadata):
""" Return additional data for the given client. This will be
- a dict containing a single key, ``sources``, whose value is a
- list of data returned from
- :func:`Bcfg2.Server.Plugins.Packages.Collection.Collection.get_additional_data`,
- namely, a list of
- :attr:`Bcfg2.Server.Plugins.Packages.Source.Source.url_map`
- data.
+ an :class:`Bcfg2.Server.Plugins.Packages.OnDemandDict`
+ containing two keys:
+
+ * ``sources``, whose value is a list of data returned from
+ :func:`Bcfg2.Server.Plugins.Packages.Collection.Collection.get_additional_data`,
+ namely, a list of
+ :attr:`Bcfg2.Server.Plugins.Packages.Source.Source.url_map`
+ data; and
+ * ``get_config``, whose value is the
+ :func:`Bcfg2.Server.Plugins.Packages.Packages.get_config`
+ function, which can be used to get the Packages config for
+ other systems.
+
+ This uses an OnDemandDict instead of just a normal dict
+ because loading a source collection can be a fairly
+ time-consuming process, particularly for the first time. As a
+ result, when all metadata objects are built at once (such as
+ after the server is restarted, or far more frequently if
+ Metadata caching is disabled), this function would be a major
+ bottleneck if we tried to build all collections at the same
+ time. Instead, they're merely built on-demand.
:param metadata: The client metadata
:type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
:return: dict of lists of ``url_map`` data
"""
- collection = self.get_collection(metadata)
- return dict(sources=collection.get_additional_data(),
- get_config=self.get_config)
+ def get_sources():
+ """ getter for the 'sources' key of the OnDemandDict
+ returned by this function. This delays calling
+ get_collection() until it's absolutely necessary. """
+ return self.get_collection(metadata).get_additional_data()
+
+ return OnDemandDict(
+ sources=get_sources,
+ get_config=lambda: self.get_config)
def end_client_run(self, metadata):
""" Hook to clear the cache for this client in
diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py
index 0974184b4..fd6fd3bd1 100644
--- a/src/lib/Bcfg2/Server/Plugins/Probes.py
+++ b/src/lib/Bcfg2/Server/Plugins/Probes.py
@@ -9,6 +9,7 @@ import operator
import lxml.etree
import Bcfg2.Server
import Bcfg2.Server.Plugin
+from Bcfg2.Compat import unicode # pylint: disable=W0622
try:
from django.db import models
@@ -64,7 +65,10 @@ class ProbeData(str): # pylint: disable=E0012,R0924
.json, and .yaml properties to provide convenient ways to use
ProbeData objects as XML, JSON, or YAML data """
def __new__(cls, data):
- return str.__new__(cls, data)
+ if isinstance(data, unicode):
+ return str.__new__(cls, data.encode('utf-8'))
+ else:
+ return str.__new__(cls, data)
def __init__(self, data): # pylint: disable=W0613
str.__init__(self)
@@ -181,14 +185,16 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet):
class Probes(Bcfg2.Server.Plugin.Probing,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.Connector,
Bcfg2.Server.Plugin.DatabaseBacked):
""" A plugin to gather information from a client machine """
__author__ = 'bcfg-dev@mcs.anl.gov'
def __init__(self, core, datastore):
- Bcfg2.Server.Plugin.Connector.__init__(self)
Bcfg2.Server.Plugin.Probing.__init__(self)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
+ Bcfg2.Server.Plugin.Connector.__init__(self)
Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
try:
@@ -223,9 +229,15 @@ class Probes(Bcfg2.Server.Plugin.Probing,
lxml.etree.SubElement(top, 'Client', name=client,
timestamp=str(int(probedata.timestamp)))
for probe in sorted(probedata):
- lxml.etree.SubElement(
- ctag, 'Probe', name=probe,
- value=self.probedata[client][probe])
+ try:
+ lxml.etree.SubElement(
+ ctag, 'Probe', name=probe,
+ value=str(
+ self.probedata[client][probe]).decode('utf-8'))
+ except AttributeError:
+ lxml.etree.SubElement(
+ ctag, 'Probe', name=probe,
+ value=str(self.probedata[client][probe]))
for group in sorted(self.cgroups[client]):
lxml.etree.SubElement(ctag, "Group", name=group)
try:
@@ -249,7 +261,7 @@ class Probes(Bcfg2.Server.Plugin.Probing,
ProbesDataModel.objects.filter(
hostname=client.hostname).exclude(
- probe__in=self.probedata[client.hostname]).delete()
+ probe__in=self.probedata[client.hostname]).delete()
for group in self.cgroups[client.hostname]:
try:
@@ -264,14 +276,19 @@ class Probes(Bcfg2.Server.Plugin.Probing,
group=group)
ProbesGroupsModel.objects.filter(
hostname=client.hostname).exclude(
- group__in=self.cgroups[client.hostname]).delete()
+ group__in=self.cgroups[client.hostname]).delete()
+
+ def expire_cache(self, key=None):
+ self.load_data(client=key)
- def load_data(self):
+ def load_data(self, client=None):
""" Load probe data from the appropriate backend (probed.xml
or the database) """
if self._use_db:
- return self._load_data_db()
+ return self._load_data_db(client=client)
else:
+ # the XML backend doesn't support loading data for single
+ # clients, so it reloads all data
return self._load_data_xml()
def _load_data_xml(self):
@@ -296,20 +313,36 @@ class Probes(Bcfg2.Server.Plugin.Probing,
elif pdata.tag == 'Group':
self.cgroups[client.get('name')].append(pdata.get('name'))
- def _load_data_db(self):
+ if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
+ self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
+
+ def _load_data_db(self, client=None):
""" Load probe data from the database """
- self.probedata = {}
- self.cgroups = {}
- for pdata in ProbesDataModel.objects.all():
+ if client is None:
+ self.probedata = {}
+ self.cgroups = {}
+ probedata = ProbesDataModel.objects.all()
+ groupdata = ProbesGroupsModel.objects.all()
+ else:
+ self.probedata.pop(client, None)
+ self.cgroups.pop(client, None)
+ probedata = ProbesDataModel.objects.filter(hostname=client)
+ groupdata = ProbesGroupsModel.objects.filter(hostname=client)
+
+ for pdata in probedata:
if pdata.hostname not in self.probedata:
self.probedata[pdata.hostname] = ClientProbeDataSet(
timestamp=time.mktime(pdata.timestamp.timetuple()))
self.probedata[pdata.hostname][pdata.probe] = ProbeData(pdata.data)
- for pgroup in ProbesGroupsModel.objects.all():
+ for pgroup in groupdata:
if pgroup.hostname not in self.cgroups:
self.cgroups[pgroup.hostname] = []
self.cgroups[pgroup.hostname].append(pgroup.group)
+ if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
+ self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata,
+ key=client)
+
@Bcfg2.Server.Plugin.track_statistics()
def GetProbes(self, meta):
return self.probes.get_probe_data(meta)
diff --git a/src/lib/Bcfg2/Server/Plugins/Properties.py b/src/lib/Bcfg2/Server/Plugins/Properties.py
index e97f66675..89f2d21ff 100644
--- a/src/lib/Bcfg2/Server/Plugins/Properties.py
+++ b/src/lib/Bcfg2/Server/Plugins/Properties.py
@@ -223,7 +223,7 @@ class XMLPropertyFile(Bcfg2.Server.Plugin.StructFile, PropertyFile):
if strict:
raise PluginExecutionError(msg)
else:
- LOGGER.warning(msg)
+ LOGGER.info(msg)
Index.__doc__ = Bcfg2.Server.Plugin.StructFile.Index.__doc__
def _decrypt(self, element):
diff --git a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py
index 801e7006d..072f3f7e7 100644
--- a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py
+++ b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py
@@ -127,7 +127,7 @@ class PuppetENC(Bcfg2.Server.Plugin.Plugin,
self.logger.warning("PuppetENC is incompatible with aggressive "
"client metadata caching, try 'cautious' or "
"'initial' instead")
- self.core.cache.expire()
+ self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
def end_statistics(self, metadata):
self.end_client_run(self, metadata)
diff --git a/src/lib/Bcfg2/Server/Plugins/SSHbase.py b/src/lib/Bcfg2/Server/Plugins/SSHbase.py
index d8b3104b7..2deea5f07 100644
--- a/src/lib/Bcfg2/Server/Plugins/SSHbase.py
+++ b/src/lib/Bcfg2/Server/Plugins/SSHbase.py
@@ -92,6 +92,7 @@ class KnownHostsEntrySet(Bcfg2.Server.Plugin.EntrySet):
class SSHbase(Bcfg2.Server.Plugin.Plugin,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.Generator,
Bcfg2.Server.Plugin.PullTarget):
"""
@@ -125,6 +126,7 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin,
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.Generator.__init__(self)
Bcfg2.Server.Plugin.PullTarget.__init__(self)
self.ipcache = {}
@@ -149,6 +151,9 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin,
HostKeyEntrySet(keypattern, self.data)
self.Entries['Path']["/etc/ssh/" + keypattern] = self.build_hk
+ def expire_cache(self, key=None):
+ self.__skn = False
+
def get_skn(self):
"""Build memory cache of the ssh known hosts file."""
if not self.__skn:
diff --git a/src/lib/Bcfg2/Server/models.py b/src/lib/Bcfg2/Server/models.py
index 1f64111e7..7e2f5b09d 100644
--- a/src/lib/Bcfg2/Server/models.py
+++ b/src/lib/Bcfg2/Server/models.py
@@ -57,7 +57,7 @@ def load_models(plugins=None, cfile='/etc/bcfg2.conf', quiet=True):
# the second attempt.
LOGGER.error("Failed to load plugin %s: %s" % (plugin,
err))
- continue
+ continue
for sym in dir(mod):
obj = getattr(mod, sym)
if hasattr(obj, "__bases__") and models.Model in obj.__bases__:
diff --git a/src/lib/Bcfg2/Utils.py b/src/lib/Bcfg2/Utils.py
index 4293f3f69..ab1276178 100644
--- a/src/lib/Bcfg2/Utils.py
+++ b/src/lib/Bcfg2/Utils.py
@@ -244,9 +244,9 @@ class Executor(object):
# py3k fixes
if not isinstance(stdout, str):
- stdout = stdout.decode('utf-8')
+ stdout = stdout.decode('utf-8') # pylint: disable=E1103
if not isinstance(stderr, str):
- stderr = stderr.decode('utf-8')
+ stderr = stderr.decode('utf-8') # pylint: disable=E1103
for line in stdout.splitlines(): # pylint: disable=E1103
self.logger.debug('< %s' % line)
diff --git a/src/lib/Bcfg2/settings.py b/src/lib/Bcfg2/settings.py
index 9adfd66bf..82a3bdb2a 100644
--- a/src/lib/Bcfg2/settings.py
+++ b/src/lib/Bcfg2/settings.py
@@ -26,6 +26,8 @@ DATABASE_USER = None
DATABASE_PASSWORD = None
DATABASE_HOST = None
DATABASE_PORT = None
+DATABASE_OPTIONS = None
+DATABASE_SCHEMA = None
TIME_ZONE = None
@@ -58,8 +60,8 @@ def read_config(cfile=DEFAULT_CONFIG, repo=None, quiet=False):
""" read the config file and set django settings based on it """
# pylint: disable=W0602,W0603
global DATABASE_ENGINE, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, \
- DATABASE_HOST, DATABASE_PORT, DEBUG, TEMPLATE_DEBUG, TIME_ZONE, \
- MEDIA_URL
+ DATABASE_HOST, DATABASE_PORT, DATABASE_OPTIONS, DATABASE_SCHEMA, \
+ DEBUG, TEMPLATE_DEBUG, TIME_ZONE, MEDIA_URL
# pylint: enable=W0602,W0603
if not os.path.exists(cfile) and os.path.exists(DEFAULT_CONFIG):
@@ -86,7 +88,9 @@ def read_config(cfile=DEFAULT_CONFIG, repo=None, quiet=False):
USER=setup['db_user'],
PASSWORD=setup['db_password'],
HOST=setup['db_host'],
- PORT=setup['db_port'])
+ PORT=setup['db_port'],
+ OPTIONS=setup['db_options'],
+ SCHEMA=setup['db_schema'])
if HAS_DJANGO and django.VERSION[0] == 1 and django.VERSION[1] < 2:
DATABASE_ENGINE = setup['db_engine']
@@ -95,6 +99,8 @@ def read_config(cfile=DEFAULT_CONFIG, repo=None, quiet=False):
DATABASE_PASSWORD = DATABASES['default']['PASSWORD']
DATABASE_HOST = DATABASES['default']['HOST']
DATABASE_PORT = DATABASES['default']['PORT']
+ DATABASE_OPTIONS = DATABASES['default']['OPTIONS']
+ DATABASE_SCHEMA = DATABASES['default']['SCHEMA']
# dropping the version check. This was added in 1.1.2
TIME_ZONE = setup['time_zone']
diff --git a/src/lib/Bcfg2/version.py b/src/lib/Bcfg2/version.py
index 12fc584fe..140fb6937 100644
--- a/src/lib/Bcfg2/version.py
+++ b/src/lib/Bcfg2/version.py
@@ -2,7 +2,7 @@
import re
-__version__ = "1.3.1"
+__version__ = "1.3.2"
class Bcfg2VersionInfo(tuple): # pylint: disable=E0012,R0924
diff --git a/src/sbin/bcfg2-crypt b/src/sbin/bcfg2-crypt
index aad89882f..0ba84fa0a 100755
--- a/src/sbin/bcfg2-crypt
+++ b/src/sbin/bcfg2-crypt
@@ -18,291 +18,168 @@ except ImportError:
raise SystemExit(1)
-class EncryptionChunkingError(Exception):
- """ error raised when Encryptor cannot break a file up into chunks
- to be encrypted, or cannot reassemble the chunks """
- pass
+class PassphraseError(Exception):
+ """ Exception raised when there's a problem determining the
+ passphrase to encrypt or decrypt with """
-class Encryptor(object):
- """ Generic encryptor for all files """
-
- def __init__(self, setup):
+class CryptoTool(object):
+ """ Generic decryption/encryption interface base object """
+ def __init__(self, filename, setup):
self.setup = setup
- self.passphrase = None
- self.pname = None
self.logger = logging.getLogger(self.__class__.__name__)
+ self.passphrases = Bcfg2.Encryption.get_passphrases(self.setup)
- def get_encrypted_filename(self, plaintext_filename):
- """ get the name of the file encrypted data should be written to """
- return plaintext_filename
-
- def get_plaintext_filename(self, encrypted_filename):
- """ get the name of the file decrypted data should be written to """
- return encrypted_filename
-
- def chunk(self, data):
- """ generator to break the file up into smaller chunks that
- will each be individually encrypted or decrypted """
- yield data
-
- def unchunk(self, data, original): # pylint: disable=W0613
- """ given chunks of a file, reassemble then into the whole file """
+ self.filename = filename
try:
- return data[0]
- except IndexError:
- raise EncryptionChunkingError("No data to unchunk")
+ self.data = open(self.filename).read()
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("Error reading %s, skipping: %s" % (filename,
+ err))
+ return False
- def set_passphrase(self):
- """ set the passphrase for the current file """
+ self.pname, self.passphrase = self._get_passphrase()
+
+ def _get_passphrase(self):
+ """ get the passphrase for the current file """
if (not self.setup.cfp.has_section(Bcfg2.Encryption.CFG_SECTION) or
len(Bcfg2.Encryption.get_passphrases(self.setup)) == 0):
- self.logger.error("No passphrases available in %s" %
- self.setup['configfile'])
- return False
-
- if self.passphrase:
- self.logger.debug("Using previously determined passphrase %s" %
- self.pname)
- return True
+ raise PassphraseError("No passphrases available in %s" %
+ self.setup['configfile'])
+ pname = None
if self.setup['passphrase']:
- self.pname = self.setup['passphrase']
+ pname = self.setup['passphrase']
- if self.pname:
+ if pname:
if self.setup.cfp.has_option(Bcfg2.Encryption.CFG_SECTION,
- self.pname):
- self.passphrase = \
- self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION,
- self.pname)
+ pname):
+ passphrase = self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION,
+ pname)
self.logger.debug("Using passphrase %s specified on command "
- "line" % self.pname)
- return True
+ "line" % pname)
+ return (pname, passphrase)
else:
- self.logger.error("Could not find passphrase %s in %s" %
- (self.pname, self.setup['configfile']))
- return False
+ raise PassphraseError("Could not find passphrase %s in %s" %
+ (pname, self.setup['configfile']))
else:
pnames = Bcfg2.Encryption.get_passphrases(self.setup)
if len(pnames) == 1:
- self.pname = pnames.keys()[0]
- self.passphrase = pnames[self.pname]
- self.logger.info("Using passphrase %s" % self.pname)
- return True
+ pname = pnames.keys()[0]
+ passphrase = pnames[pname]
+ self.logger.info("Using passphrase %s" % pname)
+ return (pname, passphrase)
elif len(pnames) > 1:
- self.logger.warning("Multiple passphrases found in %s, "
- "specify one on the command line with -p" %
- self.setup['configfile'])
- self.logger.info("No passphrase could be determined")
- return False
-
- def encrypt(self, fname):
- """ encrypt the given file, returning the encrypted data """
+ return (None, None)
+ raise PassphraseError("No passphrase could be determined")
+
+ def get_destination_filename(self, original_filename):
+ """ Get the filename where data should be written """
+ return original_filename
+
+ def write(self, data):
+ """ write data to disk """
+ new_fname = self.get_destination_filename(self.filename)
try:
- plaintext = open(fname).read()
+ self._write(new_fname, data)
+ self.logger.info("Wrote data to %s" % new_fname)
+ return True
except IOError:
err = sys.exc_info()[1]
- self.logger.error("Error reading %s, skipping: %s" % (fname, err))
+ self.logger.error("Error writing data from %s to %s: %s" %
+ (self.filename, new_fname, err))
return False
- if not self.set_passphrase():
- return False
+ def _write(self, filename, data):
+ """ Perform the actual write of data. This is separate from
+ :func:`CryptoTool.write` so it can be easily
+ overridden. """
+ open(filename, "wb").write(data)
- crypted = []
- try:
- for chunk in self.chunk(plaintext):
- try:
- passphrase, pname = self.get_passphrase(chunk)
- except TypeError:
- return False
- crypted.append(self._encrypt(chunk, passphrase, name=pname))
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error getting data to encrypt from %s: %s" %
- (fname, err))
- return False
- return self.unchunk(crypted, plaintext)
+class Decryptor(CryptoTool):
+ """ Decryptor interface """
+ def decrypt(self):
+ """ decrypt the file, returning the encrypted data """
+ raise NotImplementedError
- # pylint: disable=W0613
- def _encrypt(self, plaintext, passphrase, name=None):
- """ encrypt a single chunk of a file """
- return Bcfg2.Encryption.ssl_encrypt(
- plaintext, passphrase,
- Bcfg2.Encryption.get_algorithm(self.setup))
- # pylint: enable=W0613
- def decrypt(self, fname):
- """ decrypt the given file, returning the plaintext data """
- try:
- crypted = open(fname).read()
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error reading %s, skipping: %s" % (fname, err))
- return False
+class Encryptor(CryptoTool):
+ """ encryptor interface """
+ def encrypt(self):
+ """ encrypt the file, returning the encrypted data """
+ raise NotImplementedError
- self.set_passphrase()
- plaintext = []
- try:
- for chunk in self.chunk(crypted):
- try:
- passphrase, pname = self.get_passphrase(chunk)
- try:
- plaintext.append(self._decrypt(chunk, passphrase))
- except Bcfg2.Encryption.EVPError:
- self.logger.info("Could not decrypt %s with the "
- "specified passphrase" % fname)
- continue
- except:
- err = sys.exc_info()[1]
- self.logger.error("Error decrypting %s: %s" %
- (fname, err))
- continue
- except TypeError:
- pchunk = None
- passphrases = Bcfg2.Encryption.get_passphrases(self.setup)
- for pname, passphrase in passphrases.items():
- self.logger.debug("Trying passphrase %s" % pname)
- try:
- pchunk = self._decrypt(chunk, passphrase)
- break
- except Bcfg2.Encryption.EVPError:
- pass
- except:
- err = sys.exc_info()[1]
- self.logger.error("Error decrypting %s: %s" %
- (fname, err))
- if pchunk is not None:
- plaintext.append(pchunk)
- else:
- self.logger.error("Could not decrypt %s with any "
- "passphrase in %s" %
- (fname, self.setup['configfile']))
- continue
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error getting encrypted data from %s: %s" %
- (fname, err))
- return False
+class CfgEncryptor(Encryptor):
+ """ encryptor class for Cfg files """
- try:
- return self.unchunk(plaintext, crypted)
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error assembling plaintext data from %s: %s" %
- (fname, err))
- return False
+ def __init__(self, filename, setup):
+ Encryptor.__init__(self, filename, setup)
+ if self.passphrase is None:
+ raise PassphraseError("Multiple passphrases found in %s, "
+ "specify one on the command line with -p" %
+ self.setup['configfile'])
- def _decrypt(self, crypted, passphrase):
- """ decrypt a single chunk """
- return Bcfg2.Encryption.ssl_decrypt(
- crypted, passphrase,
+ def encrypt(self):
+ return Bcfg2.Encryption.ssl_encrypt(
+ self.data, self.passphrase,
Bcfg2.Encryption.get_algorithm(self.setup))
- def write_encrypted(self, fname, data=None):
- """ write encrypted data to disk """
- if data is None:
- data = self.decrypt(fname)
- new_fname = self.get_encrypted_filename(fname)
- try:
- open(new_fname, "wb").write(data)
- self.logger.info("Wrote encrypted data to %s" % new_fname)
- return True
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error writing encrypted data from %s to %s: %s"
- % (fname, new_fname, err))
- return False
- except EncryptionChunkingError:
- err = sys.exc_info()[1]
- self.logger.error("Error assembling encrypted data from %s: %s" %
- (fname, err))
- return False
+ def get_destination_filename(self, original_filename):
+ return original_filename + ".crypt"
- def write_decrypted(self, fname, data=None):
- """ write decrypted data to disk """
- if data is None:
- data = self.decrypt(fname)
- new_fname = self.get_plaintext_filename(fname)
- try:
- open(new_fname, "wb").write(data)
- self.logger.info("Wrote decrypted data to %s" % new_fname)
- return True
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Error writing encrypted data from %s to %s: %s"
- % (fname, new_fname, err))
- return False
- def get_passphrase(self, chunk):
- """ get the passphrase for a chunk of a file """
- pname = self._get_passphrase(chunk)
- if not self.pname:
- if not pname:
- self.logger.info("No passphrase given on command line or "
- "found in file")
+class CfgDecryptor(Decryptor):
+ """ Decrypt Cfg files """
+
+ def decrypt(self):
+ """ decrypt the given file, returning the plaintext data """
+ if self.passphrase:
+ try:
+ return Bcfg2.Encryption.ssl_decrypt(
+ self.data, self.passphrase,
+ Bcfg2.Encryption.get_algorithm(self.setup))
+ except Bcfg2.Encryption.EVPError:
+ self.logger.info("Could not decrypt %s with the "
+ "specified passphrase" % self.filename)
return False
- elif self.setup.cfp.has_option(Bcfg2.Encryption.CFG_SECTION,
- pname):
- passphrase = self.setup.cfp.get(Bcfg2.Encryption.CFG_SECTION,
- pname)
- else:
- self.logger.error("Could not find passphrase %s in %s" %
- (pname, self.setup['configfile']))
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("Error decrypting %s: %s" %
+ (self.filename, err))
+ return False
+ else: # no passphrase given, brute force
+ try:
+ return Bcfg2.Encryption.bruteforce_decrypt(
+ self.data, passphrases=self.passphrases.values(),
+ algorithm=Bcfg2.Encryption.get_algorithm(self.setup))
+ except Bcfg2.Encryption.EVPError:
+ self.logger.info("Could not decrypt %s with any passphrase" %
+ self.filename)
return False
- else:
- pname = self.pname
- passphrase = self.passphrase
- if self.pname != pname:
- self.logger.warning("Passphrase given on command line (%s) "
- "differs from passphrase embedded in "
- "file (%s), using command-line option" %
- (self.pname, pname))
- return (passphrase, pname)
-
- def _get_passphrase(self, chunk): # pylint: disable=W0613
- """ get the passphrase for a chunk of a file """
- return None
-
-
-class CfgEncryptor(Encryptor):
- """ encryptor class for Cfg files """
-
- def get_encrypted_filename(self, plaintext_filename):
- return plaintext_filename + ".crypt"
- def get_plaintext_filename(self, encrypted_filename):
- if encrypted_filename.endswith(".crypt"):
- return encrypted_filename[:-6]
+ def get_destination_filename(self, original_filename):
+ if original_filename.endswith(".crypt"):
+ return original_filename[:-6]
else:
- return Encryptor.get_plaintext_filename(self, encrypted_filename)
+ return Decryptor.get_plaintext_filename(self, original_filename)
-class PropertiesEncryptor(Encryptor):
- """ encryptor class for Properties files """
-
- def _encrypt(self, plaintext, passphrase, name=None):
- # plaintext is an lxml.etree._Element
- if name is None:
- name = "true"
- if plaintext.text and plaintext.text.strip():
- plaintext.text = Bcfg2.Encryption.ssl_encrypt(
- plaintext.text,
- passphrase,
- Bcfg2.Encryption.get_algorithm(self.setup)).strip()
- plaintext.set("encrypted", name)
- return plaintext
+class PropertiesCryptoMixin(object):
+ """ Mixin to provide some common methods for Properties crypto """
+ default_xpath = '//*'
- def chunk(self, data):
- xdata = lxml.etree.XML(data, parser=XMLParser)
+ def _get_elements(self, xdata):
+ """ Get the list of elements to encrypt or decrypt """
if self.setup['xpath']:
elements = xdata.xpath(self.setup['xpath'])
if not elements:
- raise EncryptionChunkingError("XPath expression %s matched no "
- "elements" % self.setup['xpath'])
+ self.logger.warning("XPath expression %s matched no "
+ "elements" % self.setup['xpath'])
else:
- elements = xdata.xpath('//*[@encrypted]')
+ elements = xdata.xpath(self.default_xpath)
if not elements:
elements = list(xdata.getiterator(tag=lxml.etree.Element))
@@ -329,50 +206,85 @@ class PropertiesEncryptor(Encryptor):
ans = input("Encrypt this element? [y/N] ")
if not ans.lower().startswith("y"):
elements.remove(element)
+ return elements
+
+ def _get_element_passphrase(self, element):
+ """ Get the passphrase to use to encrypt or decrypt a given
+ element """
+ pname = element.get("encrypted")
+ if pname in self.passphrases:
+ passphrase = self.passphrases[pname]
+ elif self.passphrase:
+ if pname:
+ self.logger.warning("Passphrase %s not found in %s, "
+ "using passphrase given on command line"
+ % (pname, self.setup['configfile']))
+ passphrase = self.passphrase
+ pname = self.pname
+ else:
+ raise PassphraseError("Multiple passphrases found in %s, "
+ "specify one on the command line with -p" %
+ self.setup['configfile'])
+ return (pname, passphrase)
- # this is not a good use of a generator, but we need to
- # generate the full list of elements in order to ensure that
- # some exist before we know what to return
- for elt in elements:
- yield elt
-
- def unchunk(self, data, original):
- # Properties elements are modified in-place, so we don't
- # actually need to unchunk anything
- xdata = data[0]
- # find root element
- while xdata.getparent() is not None:
- xdata = xdata.getparent()
- return lxml.etree.tostring(xdata,
- xml_declaration=False,
- pretty_print=True).decode('UTF-8')
-
- def _get_passphrase(self, chunk):
- pname = chunk.get("encrypted")
- if pname and pname.lower() != "true":
- return pname
- return None
-
- def _decrypt(self, crypted, passphrase):
- # crypted is in lxml.etree._Element
- if not crypted.text or not crypted.text.strip():
- self.logger.warning("Skipping empty element %s" % crypted.tag)
- return crypted
- decrypted = Bcfg2.Encryption.ssl_decrypt(
- crypted.text,
- passphrase,
- Bcfg2.Encryption.get_algorithm(self.setup)).strip()
- try:
- crypted.text = decrypted.encode('ascii', 'xmlcharrefreplace')
- except UnicodeDecodeError:
- # we managed to decrypt the value, but it contains content
- # that can't even be encoded into xml entities. what
- # probably happened here is that we coincidentally could
- # decrypt a value encrypted with a different key, and
- # wound up with gibberish.
- self.logger.warning("Decrypted %s to gibberish, skipping" %
- crypted.tag)
- return crypted
+ def _write(self, filename, data):
+ """ Write the data """
+ data.getroottree().write(filename,
+ xml_declaration=False,
+ pretty_print=True)
+
+
+class PropertiesEncryptor(Encryptor, PropertiesCryptoMixin):
+ """ encryptor class for Properties files """
+
+ def encrypt(self):
+ xdata = lxml.etree.XML(self.data, parser=XMLParser)
+ for elt in self._get_elements(xdata):
+ try:
+ pname, passphrase = self._get_element_passphrase(elt)
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ return False
+ elt.text = Bcfg2.Encryption.ssl_encrypt(
+ elt.text, passphrase,
+ Bcfg2.Encryption.get_algorithm(self.setup)).strip()
+ elt.set("encrypted", pname)
+ return xdata
+
+ def _write(self, filename, data):
+ PropertiesCryptoMixin._write(self, filename, data)
+
+
+class PropertiesDecryptor(Decryptor, PropertiesCryptoMixin):
+ """ decryptor class for Properties files """
+ default_xpath = '//*[@encrypted]'
+
+ def decrypt(self):
+ xdata = lxml.etree.XML(self.data, parser=XMLParser)
+ for elt in self._get_elements(xdata):
+ try:
+ pname, passphrase = self._get_element_passphrase(elt)
+ except PassphraseError:
+ self.logger.error(str(sys.exc_info()[1]))
+ return False
+ decrypted = Bcfg2.Encryption.ssl_decrypt(
+ elt.text, passphrase,
+ Bcfg2.Encryption.get_algorithm(self.setup)).strip()
+ try:
+ elt.text = decrypted.encode('ascii', 'xmlcharrefreplace')
+ elt.set("encrypted", pname)
+ except UnicodeDecodeError:
+ # we managed to decrypt the value, but it contains
+ # content that can't even be encoded into xml
+ # entities. what probably happened here is that we
+ # coincidentally could decrypt a value encrypted with
+ # a different key, and wound up with gibberish.
+ self.logger.warning("Decrypted %s to gibberish, skipping" %
+ elt.tag)
+ return xdata
+
+ def _write(self, filename, data):
+ PropertiesCryptoMixin._write(self, filename, data)
def main(): # pylint: disable=R0912,R0915
@@ -422,9 +334,6 @@ def main(): # pylint: disable=R0912,R0915
logger.error("--remove cannot be used with --properties, ignoring")
setup['remove'] = Bcfg2.Options.CRYPT_REMOVE.default
- props_crypt = PropertiesEncryptor(setup)
- cfg_crypt = CfgEncryptor(setup)
-
for fname in setup['args']:
if not os.path.exists(fname):
logger.error("%s does not exist, skipping" % fname)
@@ -454,10 +363,10 @@ def main(): # pylint: disable=R0912,R0915
props = False
if props:
- encryptor = props_crypt
if setup['remove']:
logger.info("Cannot use --remove with Properties file %s, "
"ignoring for this file" % fname)
+ tools = (PropertiesEncryptor, PropertiesDecryptor)
else:
if setup['xpath']:
logger.info("Cannot use xpath with Cfg file %s, ignoring "
@@ -465,31 +374,52 @@ def main(): # pylint: disable=R0912,R0915
if setup['interactive']:
logger.info("Cannot use interactive mode with Cfg file %s, "
"ignoring -I for this file" % fname)
- encryptor = cfg_crypt
+ tools = (CfgEncryptor, CfgDecryptor)
data = None
+ mode = None
if setup['encrypt']:
- xform = encryptor.encrypt
- write = encryptor.write_encrypted
+ try:
+ tool = tools[0](fname, setup)
+ except PassphraseError:
+ logger.error(str(sys.exc_info()[1]))
+ return 2
+ mode = "encrypt"
elif setup['decrypt']:
- xform = encryptor.decrypt
- write = encryptor.write_decrypted
+ try:
+ tool = tools[1](fname, setup)
+ except PassphraseError:
+ logger.error(str(sys.exc_info()[1]))
+ return 2
+ mode = "decrypt"
else:
logger.info("Neither --encrypt nor --decrypt specified, "
"determining mode")
- data = encryptor.decrypt(fname)
- if data:
- write = encryptor.write_decrypted
- else:
- logger.info("Failed to decrypt %s, trying encryption" % fname)
+ try:
+ tool = tools[1](fname, setup)
+ except PassphraseError:
+ logger.error(str(sys.exc_info()[1]))
+ return 2
+
+ try:
+ data = tool.decrypt()
+ mode = "decrypt"
+ except: # pylint: disable=W0702
+ pass
+ if data is False:
data = None
- xform = encryptor.encrypt
- write = encryptor.write_encrypted
+ logger.info("Failed to decrypt %s, trying encryption" % fname)
+ try:
+ tool = tools[0](fname, setup)
+ except PassphraseError:
+ logger.error(str(sys.exc_info()[1]))
+ return 2
+ mode = "encrypt"
if data is None:
- data = xform(fname)
+ data = getattr(tool, mode)()
if not data:
- logger.error("Failed to %s %s, skipping" % (xform.__name__, fname))
+ logger.error("Failed to %s %s, skipping" % (mode, fname))
continue
if setup['crypt_stdout']:
if len(setup['args']) > 1:
@@ -498,10 +428,10 @@ def main(): # pylint: disable=R0912,R0915
if len(setup['args']) > 1:
print("")
else:
- write(fname, data=data)
+ tool.write(data)
if (setup['remove'] and
- encryptor.get_encrypted_filename(fname) != fname):
+ tool.get_destination_filename(fname) != fname):
try:
os.unlink(fname)
except IOError:
diff --git a/src/sbin/bcfg2-info b/src/sbin/bcfg2-info
index 133e1ccb3..6008f8896 100755
--- a/src/sbin/bcfg2-info
+++ b/src/sbin/bcfg2-info
@@ -437,7 +437,7 @@ Bcfg2 client itself.""")
pname, client = alist
automatch = self.setup.cfp.getboolean("properties", "automatch",
default=False)
- pfile = self.plugins['Properties'].store.entries[pname]
+ pfile = self.plugins['Properties'].entries[pname]
if (not force and
not automatch and
pfile.xdata.get("automatch", "false").lower() != "true"):
@@ -482,6 +482,17 @@ Bcfg2 client itself.""")
('Logging', self.setup['logging'])]
print_tabular(output)
+ def do_expirecache(self, args):
+ """ expirecache [<hostname> [<hostname> ...]]- Expire the
+ metadata cache """
+ alist = args.split()
+ if len(alist):
+ for client in self._get_client_list(alist):
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata,
+ key=client)
+ else:
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
+
def do_probes(self, args):
""" probes [-p] <hostname> - Get probe list for the given
host, in XML (the default) or human-readable pretty (with -p)
@@ -717,7 +728,7 @@ Bcfg2 client itself.""")
def run(self, args): # pylint: disable=W0221
try:
self.load_plugins()
- self.fam.handle_events_in_interval(1)
+ self.block_for_fam_events(handle_events=True)
if args:
self.onecmd(" ".join(args))
else:
@@ -758,7 +769,8 @@ USAGE = build_usage()
def main():
optinfo = dict(profile=Bcfg2.Options.CORE_PROFILE,
interactive=Bcfg2.Options.INTERACTIVE,
- interpreter=Bcfg2.Options.INTERPRETER)
+ interpreter=Bcfg2.Options.INTERPRETER,
+ command_timeout=Bcfg2.Options.CLIENT_COMMAND_TIMEOUT)
optinfo.update(Bcfg2.Options.INFO_COMMON_OPTIONS)
setup = Bcfg2.Options.OptionParser(optinfo)
setup.hm = "\n".join([" bcfg2-info [options] [command <command args>]",
diff --git a/src/sbin/bcfg2-lint b/src/sbin/bcfg2-lint
index ab3b6450f..9ceb1dd04 100755
--- a/src/sbin/bcfg2-lint
+++ b/src/sbin/bcfg2-lint
@@ -73,7 +73,7 @@ def load_server(setup):
""" load server """
core = Bcfg2.Server.Core.BaseCore(setup)
core.load_plugins()
- core.fam.handle_events_in_interval(0.1)
+ core.block_for_fam_events(handle_events=True)
return core
diff --git a/src/sbin/bcfg2-test b/src/sbin/bcfg2-test
index d7a1894f0..7c38a65d8 100755
--- a/src/sbin/bcfg2-test
+++ b/src/sbin/bcfg2-test
@@ -157,7 +157,7 @@ def get_core(setup):
""" Get a server core, with events handled """
core = Bcfg2.Server.Core.BaseCore(setup)
core.load_plugins()
- core.fam.handle_events_in_interval(0.1)
+ core.block_for_fam_events(handle_events=True)
return core
@@ -298,8 +298,8 @@ def main():
for client in clients:
yield ClientTest(core, client, ignore)
- TestProgram(argv=sys.argv[:1] + core.setup['noseopts'],
- suite=LazySuite(generate_tests), exit=False)
+ result = TestProgram(argv=sys.argv[:1] + core.setup['noseopts'],
+ suite=LazySuite(generate_tests), exit=False)
# block until all children have completed -- should be
# immediate since we've already gotten all the results we
@@ -308,7 +308,10 @@ def main():
child.join()
core.shutdown()
- os._exit(0) # pylint: disable=W0212
+ if result.success:
+ os._exit(0) # pylint: disable=W0212
+ else:
+ os._exit(1) # pylint: disable=W0212
if __name__ == "__main__":
diff --git a/src/sbin/bcfg2-yum-helper b/src/sbin/bcfg2-yum-helper
index 4ef531d39..49baeb9c3 100755
--- a/src/sbin/bcfg2-yum-helper
+++ b/src/sbin/bcfg2-yum-helper
@@ -10,6 +10,8 @@ import sys
import yum
import logging
import Bcfg2.Logger
+from Bcfg2.Compat import wraps
+from lockfile import FileLock, LockTimeout
from optparse import OptionParser
try:
import json
@@ -42,8 +44,8 @@ def pkgtup_to_string(package):
return ''.join(str(e) for e in rv)
-class DepSolver(object):
- """ Yum dependency solver """
+class YumHelper(object):
+ """ Yum helper base object """
def __init__(self, cfgfile, verbose=1):
self.cfgfile = cfgfile
@@ -57,6 +59,16 @@ class DepSolver(object):
self.yumbase._getConfig(cfgfile, debuglevel=verbose)
# pylint: enable=E1121,W0212
self.logger = logging.getLogger(self.__class__.__name__)
+
+
+class DepSolver(YumHelper):
+ """ Yum dependency solver. This is used for operations that only
+ read from the yum cache, and thus operates in cacheonly mode. """
+
+ def __init__(self, cfgfile, verbose=1):
+ YumHelper.__init__(self, cfgfile, verbose=verbose)
+ # internally, yum uses an integer, not a boolean, for conf.cache
+ self.yumbase.conf.cache = 1
self._groups = None
def get_groups(self):
@@ -181,6 +193,45 @@ class DepSolver(object):
packages.add(txmbr.pkgtup)
return list(packages), list(unknown)
+
+def acquire_lock(func):
+ """ decorator for CacheManager methods that gets and release a
+ lock while the method runs """
+ @wraps(func)
+ def inner(self, *args, **kwargs):
+ """ Get and release a lock while running the function this
+ wraps. """
+ self.logger.debug("Acquiring lock at %s" % self.lockfile)
+ while not self.lock.i_am_locking():
+ try:
+ self.lock.acquire(timeout=60) # wait up to 60 seconds
+ except LockTimeout:
+ self.lock.break_lock()
+ self.lock.acquire()
+ try:
+ func(self, *args, **kwargs)
+ finally:
+ self.lock.release()
+ self.logger.debug("Released lock at %s" % self.lockfile)
+
+ return inner
+
+
+class CacheManager(YumHelper):
+ """ Yum cache manager. Unlike :class:`DepSolver`, this can write
+ to the yum cache, and so is used for operations that muck with the
+ cache. (Technically, :func:`CacheManager.clean_cache` could be in
+ either DepSolver or CacheManager, but for consistency I've put it
+ here.) """
+
+ def __init__(self, cfgfile, verbose=1):
+ YumHelper.__init__(self, cfgfile, verbose=verbose)
+ self.lockfile = \
+ os.path.join(os.path.dirname(self.yumbase.conf.config_file_path),
+ "lock")
+ self.lock = FileLock(self.lockfile)
+
+ @acquire_lock
def clean_cache(self):
""" clean the yum cache """
for mdtype in ["Headers", "Packages", "Sqlite", "Metadata",
@@ -193,6 +244,27 @@ class DepSolver(object):
if not msg.startswith("0 "):
self.logger.info(msg)
+ @acquire_lock
+ def populate_cache(self):
+ """ populate the yum cache """
+ for repo in self.yumbase.repos.findRepos('*'):
+ repo.metadata_expire = 0
+ repo.mdpolicy = "group:all"
+ self.yumbase.doRepoSetup()
+ self.yumbase.repos.doSetup()
+ for repo in self.yumbase.repos.listEnabled():
+ # this populates the cache as a side effect
+ repo.repoXML # pylint: disable=W0104
+ try:
+ repo.getGroups()
+ except yum.Errors.RepoMDError:
+ pass # this repo has no groups
+ self.yumbase.repos.populateSack(mdtype='metadata', cacheonly=1)
+ self.yumbase.repos.populateSack(mdtype='filelists', cacheonly=1)
+ self.yumbase.repos.populateSack(mdtype='otherdata', cacheonly=1)
+ # this does something with the groups cache as a side effect
+ self.yumbase.comps # pylint: disable=W0104
+
def main():
parser = OptionParser()
@@ -223,17 +295,28 @@ def main():
# pylint: disable=W0702
rv = 0
- depsolver = DepSolver(options.config, options.verbose)
if cmd == "clean":
+ cachemgr = CacheManager(options.config, options.verbose)
try:
- depsolver.clean_cache()
+ cachemgr.clean_cache()
print(json.dumps(True))
except:
logger.error("Unexpected error cleaning cache: %s" %
sys.exc_info()[1], exc_info=1)
print(json.dumps(False))
rv = 2
+ elif cmd == "makecache":
+ cachemgr = CacheManager(options.config, options.verbose)
+ try:
+ # this code copied from yumcommands.py
+ cachemgr.populate_cache()
+ print(json.dumps(True))
+ except yum.Errors.YumBaseError:
+ logger.error("Unexpected error creating cache: %s" %
+ sys.exc_info()[1], exc_info=1)
+ print(json.dumps(False))
elif cmd == "complete":
+ depsolver = DepSolver(options.config, options.verbose)
try:
data = json.loads(sys.stdin.read())
except:
@@ -252,6 +335,7 @@ def main():
print(json.dumps(dict(packages=[], unknown=data['packages'])))
rv = 2
elif cmd == "get_groups":
+ depsolver = DepSolver(options.config, options.verbose)
try:
data = json.loads(sys.stdin.read())
rv = dict()