diff options
Diffstat (limited to 'src/lib/Bcfg2/Server/Core.py')
-rw-r--r-- | src/lib/Bcfg2/Server/Core.py | 176 |
1 files changed, 128 insertions, 48 deletions
diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index c246860c1..c2cf6b7a4 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -2,14 +2,14 @@ implementations inherit from. """ import os -import sys -import time +import pwd import atexit -import select -import signal import logging -import inspect +import select +import sys import threading +import time +import inspect import lxml.etree import Bcfg2.settings import Bcfg2.Server @@ -200,6 +200,10 @@ class BaseCore(object): # load plugins Bcfg2.settings.read_config(repo=self.datastore) + # mapping of group name => plugin name to record where groups + # that are created by Connector plugins came from + self._dynamic_groups = dict() + #: Whether or not it's possible to use the Django database #: backend for plugins that have that capability self._database_available = False @@ -224,11 +228,11 @@ class BaseCore(object): verbosity=0) self._database_available = True except ImproperlyConfigured: - err = sys.exc_info()[1] - self.logger.error("Django configuration problem: %s" % err) + self.logger.error("Django configuration problem: %s" % + sys.exc_info()[1]) except: - err = sys.exc_info()[1] - self.logger.error("Database update failed: %s" % err) + self.logger.error("Database update failed: %s" % + sys.exc_info()[1]) if do_chown and self._database_available: try: @@ -243,14 +247,6 @@ class BaseCore(object): #: The CA that signed the server cert self.ca = setup['ca'] - def hdlr(sig, frame): # pylint: disable=W0613 - """ Handle SIGINT/Ctrl-C by shutting down the core and exiting - properly. """ - self.shutdown() - os._exit(1) # pylint: disable=W0212 - - signal.signal(signal.SIGINT, hdlr) - #: The FAM :class:`threading.Thread`, #: :func:`_file_monitor_thread` self.fam_thread = \ @@ -271,6 +267,20 @@ class BaseCore(object): #: metadata self.metadata_cache = Cache() + def expire_caches_by_type(self, base_cls, key=None): + """ Expire caches for all + :class:`Bcfg2.Server.Plugin.interfaces.Caching` plugins that + are instances of ``base_cls``. + + :param base_cls: The base plugin interface class to match (see + :mod:`Bcfg2.Server.Plugin.interfaces`) + :type base_cls: type + :param key: The cache key to expire + """ + for plugin in self.plugins_by_type(base_cls): + if isinstance(plugin, Bcfg2.Server.Plugin.Caching): + plugin.expire_cache(key) + def plugins_by_type(self, base_cls): """ Return a list of loaded plugins that match the passed type. @@ -297,11 +307,12 @@ class BaseCore(object): self.logger.debug("Performance logging thread starting") while not self.terminate.isSet(): self.terminate.wait(self.setup['perflog_interval']) - for name, stats in self.get_statistics(None).items(): - self.logger.info("Performance statistics: " - "%s min=%.06f, max=%.06f, average=%.06f, " - "count=%d" % ((name, ) + stats)) - self.logger.debug("Performance logging thread terminated") + if not self.terminate.isSet(): + for name, stats in self.get_statistics(None).items(): + self.logger.info("Performance statistics: " + "%s min=%.06f, max=%.06f, average=%.06f, " + "count=%d" % ((name, ) + stats)) + self.logger.info("Performance logging thread terminated") def _file_monitor_thread(self): """ The thread that runs the @@ -318,11 +329,12 @@ class BaseCore(object): else: if not self.fam.pending(): terminate.wait(15) + if self.fam.pending(): + self._update_vcs_revision() self.fam.handle_event_set(self.lock) except: continue - self._update_vcs_revision() - self.logger.debug("File monitor thread terminated") + self.logger.info("File monitor thread terminated") @track_statistics() def _update_vcs_revision(self): @@ -438,14 +450,14 @@ class BaseCore(object): def shutdown(self): """ Perform plugin and FAM shutdown tasks. """ - self.logger.debug("Shutting down core...") + self.logger.info("Shutting down core...") if not self.terminate.isSet(): self.terminate.set() self.fam.shutdown() - self.logger.debug("FAM shut down") + self.logger.info("FAM shut down") for plugin in list(self.plugins.values()): plugin.shutdown() - self.logger.debug("All plugins shut down") + self.logger.info("All plugins shut down") @property def metadata_cache_mode(self): @@ -636,10 +648,10 @@ class BaseCore(object): del entry.attrib['realname'] return ret except: - entry.set('name', oldname) self.logger.error("Failed binding entry %s:%s with altsrc %s" % - (entry.tag, entry.get('name'), - entry.get('altsrc'))) + (entry.tag, entry.get('realname'), + entry.get('name'))) + entry.set('name', oldname) self.logger.error("Falling back to %s:%s" % (entry.tag, entry.get('name'))) @@ -734,7 +746,27 @@ class BaseCore(object): if event.code2str() == 'deleted': return self.setup.reparse() - self.metadata_cache.expire() + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) + + def block_for_fam_events(self, handle_events=False): + """ Block until all fam events have been handleed, optionally + handling events as well. (Setting ``handle_events=True`` is + useful for local server cores that don't spawn an event + handling thread.)""" + slept = 0 + log_interval = 3 + if handle_events: + self.fam.handle_events_in_interval(1) + slept += 1 + if self.setup['fam_blocking']: + time.sleep(1) + slept += 1 + while self.fam.pending() != 0: + time.sleep(1) + slept += 1 + if slept % log_interval == 0: + self.logger.debug("Sleeping to handle FAM events...") + self.logger.debug("Slept %s seconds while handling FAM events" % slept) def run(self): """ Run the server core. This calls :func:`_daemonize`, @@ -758,6 +790,11 @@ class BaseCore(object): os.chmod(piddir, 493) # 0775 if not self._daemonize(): return False + + # rewrite $HOME. pulp stores its auth creds in ~/.pulp, so + # this is necessary to make that work when privileges are + # dropped + os.environ['HOME'] = pwd.getpwuid(self.setup['daemon_uid'])[5] else: os.umask(int(self.setup['umask'], 8)) @@ -780,12 +817,9 @@ class BaseCore(object): self.shutdown() raise - if self.setup['fam_blocking']: - time.sleep(1) - while self.fam.pending() != 0: - time.sleep(1) - - self.set_debug(None, self.debug_flag) + if self.debug_flag: + self.set_debug(None, self.debug_flag) + self.block_for_fam_events() self._block() def _daemonize(self): @@ -844,17 +878,52 @@ class BaseCore(object): imd = self.metadata_cache.get(client_name, None) if not imd: self.logger.debug("Building metadata for %s" % client_name) - imd = self.metadata.get_initial_metadata(client_name) + try: + imd = self.metadata.get_initial_metadata(client_name) + except MetadataConsistencyError: + self.critical_error( + "Client metadata resolution error for %s: %s" % + (client_name, sys.exc_info()[1])) connectors = self.plugins_by_type(Connector) for conn in connectors: - grps = conn.get_additional_groups(imd) - self.metadata.merge_additional_groups(imd, grps) + groups = conn.get_additional_groups(imd) + groupnames = [] + for group in groups: + if hasattr(group, "name"): + groupname = group.name + if groupname in self._dynamic_groups: + if self._dynamic_groups[groupname] == conn.name: + self.metadata.groups[groupname] = group + else: + self.logger.warning( + "Refusing to clobber dynamic group %s " + "defined by %s" % + (self._dynamic_groups[groupname], + groupname)) + elif groupname in self.metadata.groups: + # not recorded as a dynamic group, but + # present in metadata.groups -- i.e., a + # static group + self.logger.warning( + "Refusing to clobber predefined group %s" % + groupname) + else: + self.metadata.groups[groupname] = group + self._dynamic_groups[groupname] = conn.name + groupnames.append(groupname) + else: + groupnames.append(group) + + self.metadata.merge_additional_groups(imd, groupnames) for conn in connectors: data = conn.get_additional_data(imd) self.metadata.merge_additional_data(imd, conn.name, data) imd.query.by_name = self.build_metadata if self.metadata_cache_mode in ['cautious', 'aggressive']: self.metadata_cache[client_name] = imd + else: + self.logger.debug("Using cached metadata object for %s" % + client_name) return imd def process_statistics(self, client_name, statistics): @@ -882,6 +951,7 @@ class BaseCore(object): state.get('state'))) self.client_run_hook("end_statistics", meta) + @track_statistics() def resolve_client(self, address, cleanup_cache=False, metadata=True): """ Given a client address, get the client hostname and optionally metadata. @@ -934,15 +1004,19 @@ class BaseCore(object): raise xmlrpclib.Fault(xmlrpclib.APPLICATION_ERROR, "Critical failure: %s" % message) + def _get_rmi_objects(self): + """ Get a dict (name: object) of all objects that may have RMI + calls. Currently, that includes all plugins and the FAM. """ + rv = {self.fam.__class__.__name__: self.fam} + rv.update(self.plugins) + return rv + def _get_rmi(self): """ Get a list of RMI calls exposed by plugins """ rmi = dict() - for pname, pinst in list(self.plugins.items()): + for pname, pinst in self._get_rmi_objects().items(): for mname in pinst.__rmi__: rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) - famname = self.fam.__class__.__name__ - for mname in self.fam.__rmi__: - rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname) return rmi def _resolve_exposed_method(self, method_name): @@ -1033,6 +1107,7 @@ class BaseCore(object): for plugin in self.plugins_by_type(Probing): for probe in plugin.GetProbes(metadata): resp.append(probe) + self.logger.debug("Sending probe list to %s" % client) return lxml.etree.tostring(resp, xml_declaration=False).decode('UTF-8') except: @@ -1058,7 +1133,7 @@ class BaseCore(object): # that's created for RecvProbeData doesn't get cached. # I.e., the next metadata object that's built, after probe # data is processed, is cached. - self.metadata_cache.expire(client) + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) try: xpdata = lxml.etree.XML(probedata.encode('utf-8'), parser=Bcfg2.Server.XMLParser) @@ -1255,9 +1330,14 @@ class BaseCore(object): self.logger.info("Core: debug = %s" % debug) levels = self._loglevels[self.debug_flag] for handler in logging.root.handlers: - level = levels.get(handler.name, levels['default']) - self.logger.debug("Setting %s log handler to %s" % - (handler.name, logging.getLevelName(level))) + try: + level = levels.get(handler.name, levels['default']) + self.logger.debug("Setting %s log handler to %s" % + (handler.name, logging.getLevelName(level))) + except AttributeError: + level = levels['default'] + self.logger.debug("Setting unknown log handler %s to %s" % + (handler, logging.getLevelName(level))) handler.setLevel(level) return self.debug_flag |