summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/Core.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Server/Core.py')
-rw-r--r--src/lib/Bcfg2/Server/Core.py176
1 files changed, 128 insertions, 48 deletions
diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py
index c246860c1..c2cf6b7a4 100644
--- a/src/lib/Bcfg2/Server/Core.py
+++ b/src/lib/Bcfg2/Server/Core.py
@@ -2,14 +2,14 @@
implementations inherit from. """
import os
-import sys
-import time
+import pwd
import atexit
-import select
-import signal
import logging
-import inspect
+import select
+import sys
import threading
+import time
+import inspect
import lxml.etree
import Bcfg2.settings
import Bcfg2.Server
@@ -200,6 +200,10 @@ class BaseCore(object):
# load plugins
Bcfg2.settings.read_config(repo=self.datastore)
+ # mapping of group name => plugin name to record where groups
+ # that are created by Connector plugins came from
+ self._dynamic_groups = dict()
+
#: Whether or not it's possible to use the Django database
#: backend for plugins that have that capability
self._database_available = False
@@ -224,11 +228,11 @@ class BaseCore(object):
verbosity=0)
self._database_available = True
except ImproperlyConfigured:
- err = sys.exc_info()[1]
- self.logger.error("Django configuration problem: %s" % err)
+ self.logger.error("Django configuration problem: %s" %
+ sys.exc_info()[1])
except:
- err = sys.exc_info()[1]
- self.logger.error("Database update failed: %s" % err)
+ self.logger.error("Database update failed: %s" %
+ sys.exc_info()[1])
if do_chown and self._database_available:
try:
@@ -243,14 +247,6 @@ class BaseCore(object):
#: The CA that signed the server cert
self.ca = setup['ca']
- def hdlr(sig, frame): # pylint: disable=W0613
- """ Handle SIGINT/Ctrl-C by shutting down the core and exiting
- properly. """
- self.shutdown()
- os._exit(1) # pylint: disable=W0212
-
- signal.signal(signal.SIGINT, hdlr)
-
#: The FAM :class:`threading.Thread`,
#: :func:`_file_monitor_thread`
self.fam_thread = \
@@ -271,6 +267,20 @@ class BaseCore(object):
#: metadata
self.metadata_cache = Cache()
+ def expire_caches_by_type(self, base_cls, key=None):
+ """ Expire caches for all
+ :class:`Bcfg2.Server.Plugin.interfaces.Caching` plugins that
+ are instances of ``base_cls``.
+
+ :param base_cls: The base plugin interface class to match (see
+ :mod:`Bcfg2.Server.Plugin.interfaces`)
+ :type base_cls: type
+ :param key: The cache key to expire
+ """
+ for plugin in self.plugins_by_type(base_cls):
+ if isinstance(plugin, Bcfg2.Server.Plugin.Caching):
+ plugin.expire_cache(key)
+
def plugins_by_type(self, base_cls):
""" Return a list of loaded plugins that match the passed type.
@@ -297,11 +307,12 @@ class BaseCore(object):
self.logger.debug("Performance logging thread starting")
while not self.terminate.isSet():
self.terminate.wait(self.setup['perflog_interval'])
- for name, stats in self.get_statistics(None).items():
- self.logger.info("Performance statistics: "
- "%s min=%.06f, max=%.06f, average=%.06f, "
- "count=%d" % ((name, ) + stats))
- self.logger.debug("Performance logging thread terminated")
+ if not self.terminate.isSet():
+ for name, stats in self.get_statistics(None).items():
+ self.logger.info("Performance statistics: "
+ "%s min=%.06f, max=%.06f, average=%.06f, "
+ "count=%d" % ((name, ) + stats))
+ self.logger.info("Performance logging thread terminated")
def _file_monitor_thread(self):
""" The thread that runs the
@@ -318,11 +329,12 @@ class BaseCore(object):
else:
if not self.fam.pending():
terminate.wait(15)
+ if self.fam.pending():
+ self._update_vcs_revision()
self.fam.handle_event_set(self.lock)
except:
continue
- self._update_vcs_revision()
- self.logger.debug("File monitor thread terminated")
+ self.logger.info("File monitor thread terminated")
@track_statistics()
def _update_vcs_revision(self):
@@ -438,14 +450,14 @@ class BaseCore(object):
def shutdown(self):
""" Perform plugin and FAM shutdown tasks. """
- self.logger.debug("Shutting down core...")
+ self.logger.info("Shutting down core...")
if not self.terminate.isSet():
self.terminate.set()
self.fam.shutdown()
- self.logger.debug("FAM shut down")
+ self.logger.info("FAM shut down")
for plugin in list(self.plugins.values()):
plugin.shutdown()
- self.logger.debug("All plugins shut down")
+ self.logger.info("All plugins shut down")
@property
def metadata_cache_mode(self):
@@ -636,10 +648,10 @@ class BaseCore(object):
del entry.attrib['realname']
return ret
except:
- entry.set('name', oldname)
self.logger.error("Failed binding entry %s:%s with altsrc %s" %
- (entry.tag, entry.get('name'),
- entry.get('altsrc')))
+ (entry.tag, entry.get('realname'),
+ entry.get('name')))
+ entry.set('name', oldname)
self.logger.error("Falling back to %s:%s" %
(entry.tag, entry.get('name')))
@@ -734,7 +746,27 @@ class BaseCore(object):
if event.code2str() == 'deleted':
return
self.setup.reparse()
- self.metadata_cache.expire()
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
+
+ def block_for_fam_events(self, handle_events=False):
+ """ Block until all fam events have been handleed, optionally
+ handling events as well. (Setting ``handle_events=True`` is
+ useful for local server cores that don't spawn an event
+ handling thread.)"""
+ slept = 0
+ log_interval = 3
+ if handle_events:
+ self.fam.handle_events_in_interval(1)
+ slept += 1
+ if self.setup['fam_blocking']:
+ time.sleep(1)
+ slept += 1
+ while self.fam.pending() != 0:
+ time.sleep(1)
+ slept += 1
+ if slept % log_interval == 0:
+ self.logger.debug("Sleeping to handle FAM events...")
+ self.logger.debug("Slept %s seconds while handling FAM events" % slept)
def run(self):
""" Run the server core. This calls :func:`_daemonize`,
@@ -758,6 +790,11 @@ class BaseCore(object):
os.chmod(piddir, 493) # 0775
if not self._daemonize():
return False
+
+ # rewrite $HOME. pulp stores its auth creds in ~/.pulp, so
+ # this is necessary to make that work when privileges are
+ # dropped
+ os.environ['HOME'] = pwd.getpwuid(self.setup['daemon_uid'])[5]
else:
os.umask(int(self.setup['umask'], 8))
@@ -780,12 +817,9 @@ class BaseCore(object):
self.shutdown()
raise
- if self.setup['fam_blocking']:
- time.sleep(1)
- while self.fam.pending() != 0:
- time.sleep(1)
-
- self.set_debug(None, self.debug_flag)
+ if self.debug_flag:
+ self.set_debug(None, self.debug_flag)
+ self.block_for_fam_events()
self._block()
def _daemonize(self):
@@ -844,17 +878,52 @@ class BaseCore(object):
imd = self.metadata_cache.get(client_name, None)
if not imd:
self.logger.debug("Building metadata for %s" % client_name)
- imd = self.metadata.get_initial_metadata(client_name)
+ try:
+ imd = self.metadata.get_initial_metadata(client_name)
+ except MetadataConsistencyError:
+ self.critical_error(
+ "Client metadata resolution error for %s: %s" %
+ (client_name, sys.exc_info()[1]))
connectors = self.plugins_by_type(Connector)
for conn in connectors:
- grps = conn.get_additional_groups(imd)
- self.metadata.merge_additional_groups(imd, grps)
+ groups = conn.get_additional_groups(imd)
+ groupnames = []
+ for group in groups:
+ if hasattr(group, "name"):
+ groupname = group.name
+ if groupname in self._dynamic_groups:
+ if self._dynamic_groups[groupname] == conn.name:
+ self.metadata.groups[groupname] = group
+ else:
+ self.logger.warning(
+ "Refusing to clobber dynamic group %s "
+ "defined by %s" %
+ (self._dynamic_groups[groupname],
+ groupname))
+ elif groupname in self.metadata.groups:
+ # not recorded as a dynamic group, but
+ # present in metadata.groups -- i.e., a
+ # static group
+ self.logger.warning(
+ "Refusing to clobber predefined group %s" %
+ groupname)
+ else:
+ self.metadata.groups[groupname] = group
+ self._dynamic_groups[groupname] = conn.name
+ groupnames.append(groupname)
+ else:
+ groupnames.append(group)
+
+ self.metadata.merge_additional_groups(imd, groupnames)
for conn in connectors:
data = conn.get_additional_data(imd)
self.metadata.merge_additional_data(imd, conn.name, data)
imd.query.by_name = self.build_metadata
if self.metadata_cache_mode in ['cautious', 'aggressive']:
self.metadata_cache[client_name] = imd
+ else:
+ self.logger.debug("Using cached metadata object for %s" %
+ client_name)
return imd
def process_statistics(self, client_name, statistics):
@@ -882,6 +951,7 @@ class BaseCore(object):
state.get('state')))
self.client_run_hook("end_statistics", meta)
+ @track_statistics()
def resolve_client(self, address, cleanup_cache=False, metadata=True):
""" Given a client address, get the client hostname and
optionally metadata.
@@ -934,15 +1004,19 @@ class BaseCore(object):
raise xmlrpclib.Fault(xmlrpclib.APPLICATION_ERROR,
"Critical failure: %s" % message)
+ def _get_rmi_objects(self):
+ """ Get a dict (name: object) of all objects that may have RMI
+ calls. Currently, that includes all plugins and the FAM. """
+ rv = {self.fam.__class__.__name__: self.fam}
+ rv.update(self.plugins)
+ return rv
+
def _get_rmi(self):
""" Get a list of RMI calls exposed by plugins """
rmi = dict()
- for pname, pinst in list(self.plugins.items()):
+ for pname, pinst in self._get_rmi_objects().items():
for mname in pinst.__rmi__:
rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
- famname = self.fam.__class__.__name__
- for mname in self.fam.__rmi__:
- rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname)
return rmi
def _resolve_exposed_method(self, method_name):
@@ -1033,6 +1107,7 @@ class BaseCore(object):
for plugin in self.plugins_by_type(Probing):
for probe in plugin.GetProbes(metadata):
resp.append(probe)
+ self.logger.debug("Sending probe list to %s" % client)
return lxml.etree.tostring(resp,
xml_declaration=False).decode('UTF-8')
except:
@@ -1058,7 +1133,7 @@ class BaseCore(object):
# that's created for RecvProbeData doesn't get cached.
# I.e., the next metadata object that's built, after probe
# data is processed, is cached.
- self.metadata_cache.expire(client)
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
try:
xpdata = lxml.etree.XML(probedata.encode('utf-8'),
parser=Bcfg2.Server.XMLParser)
@@ -1255,9 +1330,14 @@ class BaseCore(object):
self.logger.info("Core: debug = %s" % debug)
levels = self._loglevels[self.debug_flag]
for handler in logging.root.handlers:
- level = levels.get(handler.name, levels['default'])
- self.logger.debug("Setting %s log handler to %s" %
- (handler.name, logging.getLevelName(level)))
+ try:
+ level = levels.get(handler.name, levels['default'])
+ self.logger.debug("Setting %s log handler to %s" %
+ (handler.name, logging.getLevelName(level)))
+ except AttributeError:
+ level = levels['default']
+ self.logger.debug("Setting unknown log handler %s to %s" %
+ (handler, logging.getLevelName(level)))
handler.setLevel(level)
return self.debug_flag