diff options
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Collector.py')
-rw-r--r-- | src/lib/Bcfg2/Reporting/Collector.py | 43 |
1 files changed, 32 insertions, 11 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 3d224432e..b42364d8d 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -20,10 +20,38 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore from Bcfg2.Reporting.Storage import load_storage_from_config, \ StorageError, StorageImportError + class ReportingError(Exception): """Generic reporting exception""" pass + +class ReportingStoreThread(threading.Thread): + """Thread for calling the storage backend""" + def __init__(self, interaction, storage, group=None, target=None, + name=None, args=(), kwargs=None): + """Initialize the thread with a reference to the interaction + as well as the storage engine to use""" + threading.Thread.__init__(self, group, target, name, args, + kwargs or dict()) + self.interaction = interaction + self.storage = storage + self.logger = logging.getLogger('bcfg2-report-collector') + + def run(self): + """Call the database storage procedure (aka import)""" + try: + start = time.time() + self.storage.import_interaction(self.interaction) + self.logger.info("Imported interaction for %s in %ss" % + (self.interaction.get('hostname', '<unknown>'), + time.time() - start)) + except: + #TODO requeue? + self.logger.error("Unhandled exception in import thread %s" % + traceback.format_exc().splitlines()[-1]) + + class ReportingCollector(object): """The collecting process for reports""" @@ -77,12 +105,11 @@ class ReportingCollector(object): (self.storage.__class__.__name__, traceback.format_exc().splitlines()[-1])) - def run(self): """Startup the processing and go!""" self.terminate = threading.Event() atexit.register(self.shutdown) - self.context = daemon.DaemonContext() + self.context = daemon.DaemonContext(detach_process=True) if self.setup['daemon']: self.logger.debug("Daemonizing") @@ -103,15 +130,9 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - try: - start = time.time() - self.storage.import_interaction(interaction) - self.logger.info("Imported interaction for %s in %ss" % - (interaction.get('hostname', '<unknown>'), - time.time() - start)) - except: - #TODO requeue? - raise + + store_thread = ReportingStoreThread(interaction, self.storage) + store_thread.start() except (SystemExit, KeyboardInterrupt): self.logger.info("Shutting down") self.shutdown() |