diff options
author | Jason Kincl <kincljc@ornl.gov> | 2012-11-27 14:28:01 -0500 |
---|---|---|
committer | Jason Kincl <kincljc@ornl.gov> | 2012-11-27 14:28:01 -0500 |
commit | 5e0265f837f0eb72123be0b5150451aebdf8b031 (patch) | |
tree | dbd5fdbe4ec93c48cbba6fec3f608ffefb26eac5 /src/lib/Bcfg2/Reporting/Transport | |
parent | 894299b01b6138c54a99fd41f166554d175d6106 (diff) | |
parent | 4c70626094248495bf2c11c09bf2f2f60917187d (diff) | |
download | bcfg2-5e0265f837f0eb72123be0b5150451aebdf8b031.tar.gz bcfg2-5e0265f837f0eb72123be0b5150451aebdf8b031.tar.bz2 bcfg2-5e0265f837f0eb72123be0b5150451aebdf8b031.zip |
Merge remote branch 'upstream/master' into jasons-hacking
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Transport')
-rw-r--r-- | src/lib/Bcfg2/Reporting/Transport/DirectStore.py | 21 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py | 22 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Transport/base.py | 22 |
3 files changed, 49 insertions, 16 deletions
diff --git a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py index 8677efb5f..79d1b5aba 100644 --- a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py +++ b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py @@ -15,9 +15,14 @@ class DirectStore(TransportBase, threading.Thread): TransportBase.__init__(self, setup) threading.Thread.__init__(self) self.save_file = os.path.join(self.data, ".saved") + self.storage = load_storage_from_config(setup) + self.storage.validate() + self.queue = Queue(100000) self.terminate = threading.Event() + self.debug_log("Reporting: Starting %s thread" % + self.__class__.__name__) self.start() def shutdown(self): @@ -35,6 +40,8 @@ class DirectStore(TransportBase, threading.Thread): def run(self): if not self._load(): + self.logger.warning("Reporting: Failed to load saved data, " + "DirectStore thread exiting") return while not self.terminate.isSet() and self.queue is not None: try: @@ -42,16 +49,19 @@ class DirectStore(TransportBase, threading.Thread): timeout=self.timeout) start = time.time() self.storage.import_interaction(interaction) - self.logger.info("Imported data for %s in %s seconds" \ - % (interaction.get('hostname', '<unknown>'), \ - time.time() - start)) + self.logger.info("Imported data for %s in %s seconds" % + (interaction.get('hostname', '<unknown>'), + time.time() - start)) except Empty: + self.debug_log("Reporting: Queue is empty") continue except: err = sys.exc_info()[1] self.logger.error("Reporting: Could not import interaction: %s" % err) continue + self.debug_log("Reporting: Stopping %s thread" % + self.__class__.__name__) if self.queue is not None and not self.queue.empty(): self._save() @@ -74,6 +84,8 @@ class DirectStore(TransportBase, threading.Thread): def _save(self): """ Save any saved data to a file """ + self.debug_log("Reporting: Saving pending data to %s" % + self.save_file) saved_data = [] try: while not self.queue.empty(): @@ -93,6 +105,7 @@ class DirectStore(TransportBase, threading.Thread): def _load(self): """ Load any saved data from a file """ if not os.path.exists(self.save_file): + self.debug_log("Reporting: No saved data to load") return True saved_data = [] try: @@ -106,6 +119,8 @@ class DirectStore(TransportBase, threading.Thread): for interaction in saved_data: # check that shutdown wasnt called early if self.terminate.isSet(): + self.logger.warning("Reporting: Shutdown called while loading " + " saved data") return False try: diff --git a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py index 8ccb9ed56..30ea39263 100644 --- a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py +++ b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py @@ -20,7 +20,7 @@ class LocalFilesystem(TransportBase): super(LocalFilesystem, self).__init__(setup) self.work_path = "%s/work" % self.data - self.logger.debug("LocalFilesystem: work path %s" % self.work_path) + self.debug_log("LocalFilesystem: work path %s" % self.work_path) self.fmon = None self._phony_collector = None @@ -34,6 +34,11 @@ class LocalFilesystem(TransportBase): traceback.format_exc().splitlines()[-1])) raise TransportError + def set_debug(self, debug): + rv = TransportBase.set_debug(self, debug) + self.fmon.set_debug(debug) + return rv + def start_monitor(self, collector): """Start the file monitor. Most of this comes from BaseCore""" setup = self.setup @@ -44,12 +49,13 @@ class LocalFilesystem(TransportBase): "forcing to default" % setup['filemonitor']) fmon = Bcfg2.Server.FileMonitor.available['default'] - fmdebug = setup.get('debug', False) try: - self.fmon = fmon(debug=fmdebug) - self.logger.info("Using the %s file monitor" % self.fmon.__class__.__name__) + self.fmon = fmon(debug=self.debug_flag) + self.logger.info("Using the %s file monitor" % + self.fmon.__class__.__name__) except IOError: - msg = "Failed to instantiate file monitor %s" % setup['filemonitor'] + msg = "Failed to instantiate file monitor %s" % \ + setup['filemonitor'] self.logger.error(msg, exc_info=1) raise TransportError(msg) self.fmon.start() @@ -108,11 +114,11 @@ class LocalFilesystem(TransportBase): #deviate from the normal routines here we only want one event etype = event.code2str() - self.logger.debug("Recieved event %s for %s" % (etype, event.filename)) + self.debug_log("Recieved event %s for %s" % (etype, event.filename)) if os.path.basename(event.filename)[0] == '.': return None if etype in ('created', 'exists'): - self.logger.debug("Handling event %s" % event.filename) + self.debug_log("Handling event %s" % event.filename) payload = os.path.join(self.work_path, event.filename) try: payloadfd = open(payload, "r") @@ -150,7 +156,7 @@ class LocalFilesystem(TransportBase): except ReportingError: raise TransportError except: - self.logger.error("Failed to load collector: %s" % + self.logger.error("Failed to load collector: %s" % traceback.format_exc().splitlines()[-1]) raise TransportError diff --git a/src/lib/Bcfg2/Reporting/Transport/base.py b/src/lib/Bcfg2/Reporting/Transport/base.py index cca7beda0..530011e47 100644 --- a/src/lib/Bcfg2/Reporting/Transport/base.py +++ b/src/lib/Bcfg2/Reporting/Transport/base.py @@ -2,26 +2,38 @@ The base for all server -> collector Transports """ -import os.path -import logging +import os +import sys +from Bcfg2.Server.Plugin import Debuggable + class TransportError(Exception): """Generic TransportError""" pass + class TransportImportError(TransportError): """Raised when a transport fails to import""" pass -class TransportBase(object): + +class TransportBase(Debuggable): """The base for all transports""" def __init__(self, setup): """Do something here""" clsname = self.__class__.__name__ - self.logger = logging.getLogger(clsname) - self.logger.debug("Loading %s transport" % clsname) + Debuggable.__init__(self, name=clsname) + self.debug_log("Loading %s transport" % clsname) self.data = os.path.join(setup['repo'], 'Reporting', clsname) + if not os.path.exists(self.data): + self.logger.info("%s does not exist, creating" % self.data) + try: + os.makedirs(self.data) + except OSError: + self.logger.warning("Could not create %s: %s" % + (self.data, sys.exc_info()[1])) + self.logger.warning("The transport may not function properly") self.setup = setup self.timeout = 2 |