From 92122665e534978a1bdb499e6b8cf48e54b041d3 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 27 Nov 2012 12:09:18 -0500 Subject: made Reporting plugin and transports debuggable, respond to set_debug/toggle_debug RMI --- src/lib/Bcfg2/Reporting/Transport/DirectStore.py | 18 +++++++++++++++--- .../Bcfg2/Reporting/Transport/LocalFilesystem.py | 22 ++++++++++++++-------- src/lib/Bcfg2/Reporting/Transport/base.py | 8 ++++---- src/lib/Bcfg2/Server/Plugins/Reporting.py | 22 +++++++++++++--------- 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py index d1ad7c2f2..79d1b5aba 100644 --- a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py +++ b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py @@ -21,6 +21,8 @@ class DirectStore(TransportBase, threading.Thread): self.queue = Queue(100000) self.terminate = threading.Event() + self.debug_log("Reporting: Starting %s thread" % + self.__class__.__name__) self.start() def shutdown(self): @@ -38,6 +40,8 @@ class DirectStore(TransportBase, threading.Thread): def run(self): if not self._load(): + self.logger.warning("Reporting: Failed to load saved data, " + "DirectStore thread exiting") return while not self.terminate.isSet() and self.queue is not None: try: @@ -45,16 +49,19 @@ class DirectStore(TransportBase, threading.Thread): timeout=self.timeout) start = time.time() self.storage.import_interaction(interaction) - self.logger.info("Imported data for %s in %s seconds" \ - % (interaction.get('hostname', ''), \ - time.time() - start)) + self.logger.info("Imported data for %s in %s seconds" % + (interaction.get('hostname', ''), + time.time() - start)) except Empty: + self.debug_log("Reporting: Queue is empty") continue except: err = sys.exc_info()[1] self.logger.error("Reporting: Could not import interaction: %s" % err) continue + self.debug_log("Reporting: Stopping %s thread" % + self.__class__.__name__) if self.queue is not None and not self.queue.empty(): self._save() @@ -77,6 +84,8 @@ class DirectStore(TransportBase, threading.Thread): def _save(self): """ Save any saved data to a file """ + self.debug_log("Reporting: Saving pending data to %s" % + self.save_file) saved_data = [] try: while not self.queue.empty(): @@ -96,6 +105,7 @@ class DirectStore(TransportBase, threading.Thread): def _load(self): """ Load any saved data from a file """ if not os.path.exists(self.save_file): + self.debug_log("Reporting: No saved data to load") return True saved_data = [] try: @@ -109,6 +119,8 @@ class DirectStore(TransportBase, threading.Thread): for interaction in saved_data: # check that shutdown wasnt called early if self.terminate.isSet(): + self.logger.warning("Reporting: Shutdown called while loading " + " saved data") return False try: diff --git a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py index 8ccb9ed56..30ea39263 100644 --- a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py +++ b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py @@ -20,7 +20,7 @@ class LocalFilesystem(TransportBase): super(LocalFilesystem, self).__init__(setup) self.work_path = "%s/work" % self.data - self.logger.debug("LocalFilesystem: work path %s" % self.work_path) + self.debug_log("LocalFilesystem: work path %s" % self.work_path) self.fmon = None self._phony_collector = None @@ -34,6 +34,11 @@ class LocalFilesystem(TransportBase): traceback.format_exc().splitlines()[-1])) raise TransportError + def set_debug(self, debug): + rv = TransportBase.set_debug(self, debug) + self.fmon.set_debug(debug) + return rv + def start_monitor(self, collector): """Start the file monitor. Most of this comes from BaseCore""" setup = self.setup @@ -44,12 +49,13 @@ class LocalFilesystem(TransportBase): "forcing to default" % setup['filemonitor']) fmon = Bcfg2.Server.FileMonitor.available['default'] - fmdebug = setup.get('debug', False) try: - self.fmon = fmon(debug=fmdebug) - self.logger.info("Using the %s file monitor" % self.fmon.__class__.__name__) + self.fmon = fmon(debug=self.debug_flag) + self.logger.info("Using the %s file monitor" % + self.fmon.__class__.__name__) except IOError: - msg = "Failed to instantiate file monitor %s" % setup['filemonitor'] + msg = "Failed to instantiate file monitor %s" % \ + setup['filemonitor'] self.logger.error(msg, exc_info=1) raise TransportError(msg) self.fmon.start() @@ -108,11 +114,11 @@ class LocalFilesystem(TransportBase): #deviate from the normal routines here we only want one event etype = event.code2str() - self.logger.debug("Recieved event %s for %s" % (etype, event.filename)) + self.debug_log("Recieved event %s for %s" % (etype, event.filename)) if os.path.basename(event.filename)[0] == '.': return None if etype in ('created', 'exists'): - self.logger.debug("Handling event %s" % event.filename) + self.debug_log("Handling event %s" % event.filename) payload = os.path.join(self.work_path, event.filename) try: payloadfd = open(payload, "r") @@ -150,7 +156,7 @@ class LocalFilesystem(TransportBase): except ReportingError: raise TransportError except: - self.logger.error("Failed to load collector: %s" % + self.logger.error("Failed to load collector: %s" % traceback.format_exc().splitlines()[-1]) raise TransportError diff --git a/src/lib/Bcfg2/Reporting/Transport/base.py b/src/lib/Bcfg2/Reporting/Transport/base.py index ea2098b52..530011e47 100644 --- a/src/lib/Bcfg2/Reporting/Transport/base.py +++ b/src/lib/Bcfg2/Reporting/Transport/base.py @@ -4,7 +4,7 @@ The base for all server -> collector Transports import os import sys -import logging +from Bcfg2.Server.Plugin import Debuggable class TransportError(Exception): @@ -17,14 +17,14 @@ class TransportImportError(TransportError): pass -class TransportBase(object): +class TransportBase(Debuggable): """The base for all transports""" def __init__(self, setup): """Do something here""" clsname = self.__class__.__name__ - self.logger = logging.getLogger(clsname) - self.logger.debug("Loading %s transport" % clsname) + Debuggable.__init__(self, name=clsname) + self.debug_log("Loading %s transport" % clsname) self.data = os.path.join(setup['repo'], 'Reporting', clsname) if not os.path.exists(self.data): self.logger.info("%s does not exist, creating" % self.data) diff --git a/src/lib/Bcfg2/Server/Plugins/Reporting.py b/src/lib/Bcfg2/Server/Plugins/Reporting.py index b9d397590..60f5b1e09 100644 --- a/src/lib/Bcfg2/Server/Plugins/Reporting.py +++ b/src/lib/Bcfg2/Server/Plugins/Reporting.py @@ -8,7 +8,7 @@ from Bcfg2.Reporting.Transport import load_transport_from_config, \ TransportError from Bcfg2.Options import REPORTING_COMMON_OPTIONS from Bcfg2.Server.Plugin import Statistics, PullSource, Threaded, \ - PluginInitError, PluginExecutionError + Debuggable, PluginInitError, PluginExecutionError # required for reporting try: @@ -31,9 +31,10 @@ def _rpc_call(method): return _real_rpc_call -class Reporting(Statistics, Threaded, PullSource): # pylint: disable=W0223 +# pylint: disable=W0223 +class Reporting(Statistics, Threaded, PullSource, Debuggable): """ Unified statistics and reporting plugin """ - __rmi__ = ['Ping', 'GetExtra', 'GetCurrentEntry'] + __rmi__ = Debuggable.__rmi__ + ['Ping', 'GetExtra', 'GetCurrentEntry'] CLIENT_METADATA_FIELDS = ('profile', 'bundles', 'aliases', 'addresses', 'groups', 'categories', 'uuid', 'version') @@ -42,7 +43,7 @@ class Reporting(Statistics, Threaded, PullSource): # pylint: disable=W0223 Statistics.__init__(self, core, datastore) PullSource.__init__(self) Threaded.__init__(self) - self.core = core + Debuggable.__init__(self) self.whoami = platform.node() self.transport = None @@ -55,8 +56,6 @@ class Reporting(Statistics, Threaded, PullSource): # pylint: disable=W0223 self.logger.error(msg) raise PluginInitError(msg) - self.transport = None - def start_threads(self): try: self.transport = load_transport_from_config(self.core.setup) @@ -66,6 +65,11 @@ class Reporting(Statistics, Threaded, PullSource): # pylint: disable=W0223 self.logger.error(msg) raise PluginInitError(msg) + def set_debug(self, debug): + rv = Debuggable.set_debug(self, debug) + self.transport.set_debug(debug) + return rv + def process_statistics(self, client, xdata): stats = xdata.find("Statistics") stats.set('time', time.asctime(time.localtime())) @@ -88,8 +92,8 @@ class Reporting(Statistics, Threaded, PullSource): # pylint: disable=W0223 lxml.etree.tostring( stats, xml_declaration=False).decode('UTF-8')) - self.logger.debug("%s: Queued statistics data for %s" % - (self.__class__.__name__, client.hostname)) + self.debug_log("%s: Queued statistics data for %s" % + (self.__class__.__name__, client.hostname)) return except TransportError: continue @@ -98,7 +102,7 @@ class Reporting(Statistics, Threaded, PullSource): # pylint: disable=W0223 % (self.__class__.__name__, i, traceback.format_exc().splitlines()[-1])) self.logger.error("%s: Retry limit reached for %s" % - (self.__class__.__name__, client.hostname)) + (self.__class__.__name__, client.hostname)) def shutdown(self): super(Reporting, self).shutdown() -- cgit v1.2.3-1-g7c22