diff options
Diffstat (limited to 'src/lib/Bcfg2/Server/Core.py')
-rw-r--r-- | src/lib/Bcfg2/Server/Core.py | 102 |
1 files changed, 74 insertions, 28 deletions
diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index 58044447b..360b7868d 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -227,6 +227,20 @@ class Core(object): self.logger.error("Updating database %s failed: %s" % (Bcfg2.Options.setup.db_name, err)) + def expire_caches_by_type(self, base_cls, key=None): + """ Expire caches for all + :class:`Bcfg2.Server.Plugin.interfaces.Caching` plugins that + are instances of ``base_cls``. + + :param base_cls: The base plugin interface class to match (see + :mod:`Bcfg2.Server.Plugin.interfaces`) + :type base_cls: type + :param key: The cache key to expire + """ + for plugin in self.plugins_by_type(base_cls): + if isinstance(plugin, Bcfg2.Server.Plugin.Caching): + plugin.expire_cache(key) + def plugins_by_type(self, base_cls): """ Return a list of loaded plugins that match the passed type. @@ -253,11 +267,12 @@ class Core(object): self.logger.debug("Performance logging thread starting") while not self.terminate.isSet(): self.terminate.wait(Bcfg2.Options.setup.performance_interval) - for name, stats in self.get_statistics(None).items(): - self.logger.info("Performance statistics: " - "%s min=%.06f, max=%.06f, average=%.06f, " - "count=%d" % ((name, ) + stats)) - self.logger.debug("Performance logging thread terminated") + if not self.terminate.isSet(): + for name, stats in self.get_statistics(None).items(): + self.logger.info("Performance statistics: " + "%s min=%.06f, max=%.06f, average=%.06f, " + "count=%d" % ((name, ) + stats)) + self.logger.info("Performance logging thread terminated") def _file_monitor_thread(self): """ The thread that runs the @@ -274,11 +289,12 @@ class Core(object): else: if not self.fam.pending(): terminate.wait(15) + if self.fam.pending(): + self._update_vcs_revision() self.fam.handle_event_set(self.lock) except: continue - self._update_vcs_revision() - self.logger.debug("File monitor thread terminated") + self.logger.info("File monitor thread terminated") @Bcfg2.Server.Statistics.track_statistics() def _update_vcs_revision(self): @@ -372,14 +388,14 @@ class Core(object): def shutdown(self): """ Perform plugin and FAM shutdown tasks. """ - self.logger.debug("Shutting down core...") + self.logger.info("Shutting down core...") if not self.terminate.isSet(): self.terminate.set() self.fam.shutdown() - self.logger.debug("FAM shut down") + self.logger.info("FAM shut down") for plugin in list(self.plugins.values()): plugin.shutdown() - self.logger.debug("All plugins shut down") + self.logger.info("All plugins shut down") @property def metadata_cache_mode(self): @@ -667,7 +683,27 @@ class Core(object): if event.code2str() == 'deleted': return Bcfg2.Options.get_parser().reparse() - self.metadata_cache.expire() + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) + + def block_for_fam_events(self, handle_events=False): + """ Block until all fam events have been handleed, optionally + handling events as well. (Setting ``handle_events=True`` is + useful for local server cores that don't spawn an event + handling thread.)""" + slept = 0 + log_interval = 3 + if handle_events: + self.fam.handle_events_in_interval(1) + slept += 1 + if Bcfg2.Options.setup.fam_blocking: + time.sleep(1) + slept += 1 + while self.fam.pending() != 0: + time.sleep(1) + slept += 1 + if slept % log_interval == 0: + self.logger.debug("Sleeping to handle FAM events...") + self.logger.debug("Slept %s seconds while handling FAM events" % slept) def run(self): """ Run the server core. This calls :func:`_run`, starts the @@ -695,11 +731,7 @@ class Core(object): self.shutdown() raise - if Bcfg2.Options.setup.fam_blocking: - time.sleep(1) - while self.fam.pending() != 0: - time.sleep(1) - + self.block_for_fam_events() self._block() def _run(self): @@ -765,7 +797,7 @@ class Core(object): elif False in ip_checks: # if any ACL plugin returned False (deny), then deny self.logger.warning("Client %s failed IP-based ACL checks for %s" % - (address[0], rmi)) + (address[0], rmi)) return False # else, no plugins returned False, but not all plugins # returned True, so some plugin returned None (defer), so @@ -808,7 +840,12 @@ class Core(object): imd = self.metadata_cache.get(client_name, None) if not imd: self.logger.debug("Building metadata for %s" % client_name) - imd = self.metadata.get_initial_metadata(client_name) + try: + imd = self.metadata.get_initial_metadata(client_name) + except MetadataConsistencyError: + self.critical_error( + "Client metadata resolution error for %s: %s" % + (client_name, sys.exc_info()[1])) connectors = self.plugins_by_type(Connector) for conn in connectors: grps = conn.get_additional_groups(imd) @@ -819,6 +856,9 @@ class Core(object): imd.query.by_name = self.build_metadata if self.metadata_cache_mode in ['cautious', 'aggressive']: self.metadata_cache[client_name] = imd + else: + self.logger.debug("Using cached metadata object for %s" % + client_name) return imd def process_statistics(self, client_name, statistics): @@ -846,6 +886,7 @@ class Core(object): state.get('state'))) self.client_run_hook("end_statistics", meta) + @track_statistics() def resolve_client(self, address, cleanup_cache=False, metadata=True): """ Given a client address, get the client hostname and optionally metadata. @@ -901,12 +942,10 @@ class Core(object): def _get_rmi(self): """ Get a list of RMI calls exposed by plugins """ rmi = dict() - for pname, pinst in list(self.plugins.items()): + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: for mname in pinst.__rmi__: rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) - famname = self.fam.__class__.__name__ - for mname in self.fam.__rmi__: - rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname) return rmi def _resolve_exposed_method(self, method_name): @@ -999,6 +1038,7 @@ class Core(object): for plugin in self.plugins_by_type(Probing): for probe in plugin.GetProbes(metadata): resp.append(probe) + self.logger.debug("Sending probe list to %s" % client) return lxml.etree.tostring(resp, xml_declaration=False).decode('UTF-8') except: @@ -1024,7 +1064,7 @@ class Core(object): # that's created for RecvProbeData doesn't get cached. # I.e., the next metadata object that's built, after probe # data is processed, is cached. - self.metadata_cache.expire(client) + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) try: xpdata = lxml.etree.XML(probedata.encode('utf-8'), parser=Bcfg2.Server.XMLParser) @@ -1199,9 +1239,14 @@ class Core(object): self.logger.info("Core: debug = %s" % debug) levels = self._loglevels[self.debug_flag] for handler in logging.root.handlers: - level = levels.get(handler.name, levels['default']) - self.logger.debug("Setting %s log handler to %s" % - (handler.name, logging.getLevelName(level))) + try: + level = levels.get(handler.name, levels['default']) + self.logger.debug("Setting %s log handler to %s" % + (handler.name, logging.getLevelName(level))) + except AttributeError: + level = levels['default'] + self.logger.debug("Setting unknown log handler %s to %s" % + (handler, logging.getLevelName(level))) handler.setLevel(level) return self.debug_flag @@ -1271,7 +1316,7 @@ class NetworkCore(Core): self.logger.error("Failed to set ownership of database " "at %s: %s" % (db_settings['NAME'], err)) __init__.__doc__ = Core.__init__.__doc__.split(".. -----")[0] + \ -"\n.. automethod:: _daemonize\n" + "\n.. automethod:: _daemonize\n" def run(self): """ Run the server core. This calls :func:`_daemonize` before @@ -1296,7 +1341,8 @@ class NetworkCore(Core): # rewrite $HOME. pulp stores its auth creds in ~/.pulp, so # this is necessary to make that work when privileges are # dropped - os.environ['HOME'] = pwd.getpwuid(self.setup['daemon_uid'])[5] + os.environ['HOME'] = \ + pwd.getpwuid(Bcfg2.Options.setup.daemon_uid)[5] else: os.umask(int(Bcfg2.Options.setup.umask, 8)) |