diff options
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Collector.py')
-rw-r--r-- | src/lib/Bcfg2/Reporting/Collector.py | 70 |
1 files changed, 58 insertions, 12 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index df82248d0..52700f917 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -20,10 +20,38 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore from Bcfg2.Reporting.Storage import load_storage_from_config, \ StorageError, StorageImportError + class ReportingError(Exception): """Generic reporting exception""" pass + +class ReportingStoreThread(threading.Thread): + """Thread for calling the storage backend""" + def __init__(self, interaction, storage, group=None, target=None, + name=None, args=(), kwargs=None): + """Initialize the thread with a reference to the interaction + as well as the storage engine to use""" + threading.Thread.__init__(self, group, target, name, args, + kwargs or dict()) + self.interaction = interaction + self.storage = storage + self.logger = logging.getLogger('bcfg2-report-collector') + + def run(self): + """Call the database storage procedure (aka import)""" + try: + start = time.time() + self.storage.import_interaction(self.interaction) + self.logger.info("Imported interaction for %s in %ss" % + (self.interaction.get('hostname', '<unknown>'), + time.time() - start)) + except: + #TODO requeue? + self.logger.error("Unhandled exception in import thread %s" % + traceback.format_exc().splitlines()[-1]) + + class ReportingCollector(object): """The collecting process for reports""" @@ -35,6 +63,8 @@ class ReportingCollector(object): self.encoding = setup['encoding'] self.terminate = None self.context = None + self.children = [] + self.cleanup_threshold = 25 if setup['debug']: level = logging.DEBUG @@ -77,12 +107,12 @@ class ReportingCollector(object): (self.storage.__class__.__name__, traceback.format_exc().splitlines()[-1])) - def run(self): """Startup the processing and go!""" self.terminate = threading.Event() atexit.register(self.shutdown) - self.context = daemon.DaemonContext() + self.context = daemon.DaemonContext(detach_process=True) + iter = 0 if self.setup['daemon']: self.logger.debug("Daemonizing") @@ -103,15 +133,16 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - try: - start = time.time() - self.storage.import_interaction(interaction) - self.logger.info("Imported interaction for %s in %ss" % - (interaction.get('hostname', '<unknown>'), - time.time() - start)) - except: - #TODO requeue? - raise + + store_thread = ReportingStoreThread(interaction, self.storage) + store_thread.start() + self.children.append(store_thread) + + iter += 1 + if iter >= self.cleanup_threshold: + self.reap_children() + iter = 0 + except (SystemExit, KeyboardInterrupt): self.logger.info("Shutting down") self.shutdown() @@ -125,7 +156,22 @@ class ReportingCollector(object): # this wil be missing if called from bcfg2-admin self.terminate.set() if self.transport: - self.transport.shutdown() + try: + self.transport.shutdown() + except OSError: + pass if self.storage: self.storage.shutdown() + def reap_children(self): + """Join any non-live threads""" + newlist = [] + + self.logger.debug("Starting reap_children") + for child in self.children: + if child.isAlive(): + newlist.append(child) + else: + child.join() + self.logger.debug("Joined child thread %s" % child.getName()) + self.children = newlist |