summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/Core.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Server/Core.py')
-rw-r--r--src/lib/Bcfg2/Server/Core.py102
1 files changed, 74 insertions, 28 deletions
diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py
index 58044447b..360b7868d 100644
--- a/src/lib/Bcfg2/Server/Core.py
+++ b/src/lib/Bcfg2/Server/Core.py
@@ -227,6 +227,20 @@ class Core(object):
self.logger.error("Updating database %s failed: %s" %
(Bcfg2.Options.setup.db_name, err))
+ def expire_caches_by_type(self, base_cls, key=None):
+ """ Expire caches for all
+ :class:`Bcfg2.Server.Plugin.interfaces.Caching` plugins that
+ are instances of ``base_cls``.
+
+ :param base_cls: The base plugin interface class to match (see
+ :mod:`Bcfg2.Server.Plugin.interfaces`)
+ :type base_cls: type
+ :param key: The cache key to expire
+ """
+ for plugin in self.plugins_by_type(base_cls):
+ if isinstance(plugin, Bcfg2.Server.Plugin.Caching):
+ plugin.expire_cache(key)
+
def plugins_by_type(self, base_cls):
""" Return a list of loaded plugins that match the passed type.
@@ -253,11 +267,12 @@ class Core(object):
self.logger.debug("Performance logging thread starting")
while not self.terminate.isSet():
self.terminate.wait(Bcfg2.Options.setup.performance_interval)
- for name, stats in self.get_statistics(None).items():
- self.logger.info("Performance statistics: "
- "%s min=%.06f, max=%.06f, average=%.06f, "
- "count=%d" % ((name, ) + stats))
- self.logger.debug("Performance logging thread terminated")
+ if not self.terminate.isSet():
+ for name, stats in self.get_statistics(None).items():
+ self.logger.info("Performance statistics: "
+ "%s min=%.06f, max=%.06f, average=%.06f, "
+ "count=%d" % ((name, ) + stats))
+ self.logger.info("Performance logging thread terminated")
def _file_monitor_thread(self):
""" The thread that runs the
@@ -274,11 +289,12 @@ class Core(object):
else:
if not self.fam.pending():
terminate.wait(15)
+ if self.fam.pending():
+ self._update_vcs_revision()
self.fam.handle_event_set(self.lock)
except:
continue
- self._update_vcs_revision()
- self.logger.debug("File monitor thread terminated")
+ self.logger.info("File monitor thread terminated")
@Bcfg2.Server.Statistics.track_statistics()
def _update_vcs_revision(self):
@@ -372,14 +388,14 @@ class Core(object):
def shutdown(self):
""" Perform plugin and FAM shutdown tasks. """
- self.logger.debug("Shutting down core...")
+ self.logger.info("Shutting down core...")
if not self.terminate.isSet():
self.terminate.set()
self.fam.shutdown()
- self.logger.debug("FAM shut down")
+ self.logger.info("FAM shut down")
for plugin in list(self.plugins.values()):
plugin.shutdown()
- self.logger.debug("All plugins shut down")
+ self.logger.info("All plugins shut down")
@property
def metadata_cache_mode(self):
@@ -667,7 +683,27 @@ class Core(object):
if event.code2str() == 'deleted':
return
Bcfg2.Options.get_parser().reparse()
- self.metadata_cache.expire()
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
+
+ def block_for_fam_events(self, handle_events=False):
+ """ Block until all fam events have been handleed, optionally
+ handling events as well. (Setting ``handle_events=True`` is
+ useful for local server cores that don't spawn an event
+ handling thread.)"""
+ slept = 0
+ log_interval = 3
+ if handle_events:
+ self.fam.handle_events_in_interval(1)
+ slept += 1
+ if Bcfg2.Options.setup.fam_blocking:
+ time.sleep(1)
+ slept += 1
+ while self.fam.pending() != 0:
+ time.sleep(1)
+ slept += 1
+ if slept % log_interval == 0:
+ self.logger.debug("Sleeping to handle FAM events...")
+ self.logger.debug("Slept %s seconds while handling FAM events" % slept)
def run(self):
""" Run the server core. This calls :func:`_run`, starts the
@@ -695,11 +731,7 @@ class Core(object):
self.shutdown()
raise
- if Bcfg2.Options.setup.fam_blocking:
- time.sleep(1)
- while self.fam.pending() != 0:
- time.sleep(1)
-
+ self.block_for_fam_events()
self._block()
def _run(self):
@@ -765,7 +797,7 @@ class Core(object):
elif False in ip_checks:
# if any ACL plugin returned False (deny), then deny
self.logger.warning("Client %s failed IP-based ACL checks for %s" %
- (address[0], rmi))
+ (address[0], rmi))
return False
# else, no plugins returned False, but not all plugins
# returned True, so some plugin returned None (defer), so
@@ -808,7 +840,12 @@ class Core(object):
imd = self.metadata_cache.get(client_name, None)
if not imd:
self.logger.debug("Building metadata for %s" % client_name)
- imd = self.metadata.get_initial_metadata(client_name)
+ try:
+ imd = self.metadata.get_initial_metadata(client_name)
+ except MetadataConsistencyError:
+ self.critical_error(
+ "Client metadata resolution error for %s: %s" %
+ (client_name, sys.exc_info()[1]))
connectors = self.plugins_by_type(Connector)
for conn in connectors:
grps = conn.get_additional_groups(imd)
@@ -819,6 +856,9 @@ class Core(object):
imd.query.by_name = self.build_metadata
if self.metadata_cache_mode in ['cautious', 'aggressive']:
self.metadata_cache[client_name] = imd
+ else:
+ self.logger.debug("Using cached metadata object for %s" %
+ client_name)
return imd
def process_statistics(self, client_name, statistics):
@@ -846,6 +886,7 @@ class Core(object):
state.get('state')))
self.client_run_hook("end_statistics", meta)
+ @track_statistics()
def resolve_client(self, address, cleanup_cache=False, metadata=True):
""" Given a client address, get the client hostname and
optionally metadata.
@@ -901,12 +942,10 @@ class Core(object):
def _get_rmi(self):
""" Get a list of RMI calls exposed by plugins """
rmi = dict()
- for pname, pinst in list(self.plugins.items()):
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
for mname in pinst.__rmi__:
rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
- famname = self.fam.__class__.__name__
- for mname in self.fam.__rmi__:
- rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname)
return rmi
def _resolve_exposed_method(self, method_name):
@@ -999,6 +1038,7 @@ class Core(object):
for plugin in self.plugins_by_type(Probing):
for probe in plugin.GetProbes(metadata):
resp.append(probe)
+ self.logger.debug("Sending probe list to %s" % client)
return lxml.etree.tostring(resp,
xml_declaration=False).decode('UTF-8')
except:
@@ -1024,7 +1064,7 @@ class Core(object):
# that's created for RecvProbeData doesn't get cached.
# I.e., the next metadata object that's built, after probe
# data is processed, is cached.
- self.metadata_cache.expire(client)
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
try:
xpdata = lxml.etree.XML(probedata.encode('utf-8'),
parser=Bcfg2.Server.XMLParser)
@@ -1199,9 +1239,14 @@ class Core(object):
self.logger.info("Core: debug = %s" % debug)
levels = self._loglevels[self.debug_flag]
for handler in logging.root.handlers:
- level = levels.get(handler.name, levels['default'])
- self.logger.debug("Setting %s log handler to %s" %
- (handler.name, logging.getLevelName(level)))
+ try:
+ level = levels.get(handler.name, levels['default'])
+ self.logger.debug("Setting %s log handler to %s" %
+ (handler.name, logging.getLevelName(level)))
+ except AttributeError:
+ level = levels['default']
+ self.logger.debug("Setting unknown log handler %s to %s" %
+ (handler, logging.getLevelName(level)))
handler.setLevel(level)
return self.debug_flag
@@ -1271,7 +1316,7 @@ class NetworkCore(Core):
self.logger.error("Failed to set ownership of database "
"at %s: %s" % (db_settings['NAME'], err))
__init__.__doc__ = Core.__init__.__doc__.split(".. -----")[0] + \
-"\n.. automethod:: _daemonize\n"
+ "\n.. automethod:: _daemonize\n"
def run(self):
""" Run the server core. This calls :func:`_daemonize` before
@@ -1296,7 +1341,8 @@ class NetworkCore(Core):
# rewrite $HOME. pulp stores its auth creds in ~/.pulp, so
# this is necessary to make that work when privileges are
# dropped
- os.environ['HOME'] = pwd.getpwuid(self.setup['daemon_uid'])[5]
+ os.environ['HOME'] = \
+ pwd.getpwuid(Bcfg2.Options.setup.daemon_uid)[5]
else:
os.umask(int(Bcfg2.Options.setup.umask, 8))