From cde00779a8d4d9c2624a3b82c6911ccf1cb2aaef Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Wed, 9 Oct 2013 15:48:51 -0400 Subject: Reporting: start a new thread for each import When dealing with a high-latency database connection (eg. across a WAN), the bcfg2-report-collector process can fall behind in its import queue. The imports are very much bound by the response latency of the database server and not processing throughput. This patch fires off a new thread for each database interaction. The thread itself simply falls out of scope when the interaction is finished processing. The interaction object is still read from the disk serially in order avoid having to create a locking mechanism for that part of the process. This change does potentially create greater load on the database server, but ultimately the load is limited by rate at which the bcfg2 server can generate work. --- src/lib/Bcfg2/Reporting/Collector.py | 39 +++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 4556cda82..68e1d6a6d 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -20,10 +20,36 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore from Bcfg2.Reporting.Storage import load_storage_from_config, \ StorageError, StorageImportError + class ReportingError(Exception): """Generic reporting exception""" pass + +class ReportingStoreThread(threading.Thread): + """Thread for calling the storage backend""" + def __init__(self, interaction, storage, group=None, target=None, + name=None, *args, **kwargs): + """Initialize the thread with a reference to the interaction + as well as the storage engine to use""" + threading.Thread.__init__(self, group, target, name, args, kwargs) + self.interaction = interaction + self.storage = storage + self.logger = logging.getLogger('bcfg2-report-collector') + + def run(self): + try: + start = time.time() + self.storage.import_interaction(self.interaction) + self.logger.info("Imported interaction for %s in %ss" % + (self.interaction.get('hostname', ''), + time.time() - start)) + except: + #TODO requeue? + self.logger.error("Unhandled exception in import thread %s" % + traceback.format_exc().splitlines()[-1]) + + class ReportingCollector(object): """The collecting process for reports""" @@ -77,7 +103,6 @@ class ReportingCollector(object): (self.storage.__class__.__name__, traceback.format_exc().splitlines()[-1])) - def run(self): """Startup the processing and go!""" self.terminate = threading.Event() @@ -103,15 +128,9 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - try: - start = time.time() - self.storage.import_interaction(interaction) - self.logger.info("Imported interaction for %s in %ss" % - (interaction.get('hostname', ''), - time.time() - start)) - except: - #TODO requeue? - raise + + t = ReportingStoreThread(interaction, self.storage) + t.start() except (SystemExit, KeyboardInterrupt): self.logger.info("Shutting down") self.shutdown() -- cgit v1.2.3-1-g7c22