From cde00779a8d4d9c2624a3b82c6911ccf1cb2aaef Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Wed, 9 Oct 2013 15:48:51 -0400 Subject: Reporting: start a new thread for each import When dealing with a high-latency database connection (eg. across a WAN), the bcfg2-report-collector process can fall behind in its import queue. The imports are very much bound by the response latency of the database server and not processing throughput. This patch fires off a new thread for each database interaction. The thread itself simply falls out of scope when the interaction is finished processing. The interaction object is still read from the disk serially in order avoid having to create a locking mechanism for that part of the process. This change does potentially create greater load on the database server, but ultimately the load is limited by rate at which the bcfg2 server can generate work. --- src/lib/Bcfg2/Reporting/Collector.py | 39 +++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 10 deletions(-) (limited to 'src/lib/Bcfg2/Reporting') diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 4556cda82..68e1d6a6d 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -20,10 +20,36 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore from Bcfg2.Reporting.Storage import load_storage_from_config, \ StorageError, StorageImportError + class ReportingError(Exception): """Generic reporting exception""" pass + +class ReportingStoreThread(threading.Thread): + """Thread for calling the storage backend""" + def __init__(self, interaction, storage, group=None, target=None, + name=None, *args, **kwargs): + """Initialize the thread with a reference to the interaction + as well as the storage engine to use""" + threading.Thread.__init__(self, group, target, name, args, kwargs) + self.interaction = interaction + self.storage = storage + self.logger = logging.getLogger('bcfg2-report-collector') + + def run(self): + try: + start = time.time() + self.storage.import_interaction(self.interaction) + self.logger.info("Imported interaction for %s in %ss" % + (self.interaction.get('hostname', ''), + time.time() - start)) + except: + #TODO requeue? + self.logger.error("Unhandled exception in import thread %s" % + traceback.format_exc().splitlines()[-1]) + + class ReportingCollector(object): """The collecting process for reports""" @@ -77,7 +103,6 @@ class ReportingCollector(object): (self.storage.__class__.__name__, traceback.format_exc().splitlines()[-1])) - def run(self): """Startup the processing and go!""" self.terminate = threading.Event() @@ -103,15 +128,9 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - try: - start = time.time() - self.storage.import_interaction(interaction) - self.logger.info("Imported interaction for %s in %ss" % - (interaction.get('hostname', ''), - time.time() - start)) - except: - #TODO requeue? - raise + + t = ReportingStoreThread(interaction, self.storage) + t.start() except (SystemExit, KeyboardInterrupt): self.logger.info("Shutting down") self.shutdown() -- cgit v1.2.3-1-g7c22 From f813f86f8ac2bc7b55f4eb6a0d936f1ce4f68ba7 Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Wed, 9 Oct 2013 15:59:42 -0400 Subject: Reporting: Simple sanity check to avoid creating too many threads --- src/lib/Bcfg2/Reporting/Collector.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/lib/Bcfg2/Reporting') diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 68e1d6a6d..f348a60dc 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -130,6 +130,9 @@ class ReportingCollector(object): continue t = ReportingStoreThread(interaction, self.storage) + while len(threading.enumerate()) > 100: + self.logger.info("more than 100 threads running, sleeping") + time.sleep(1) t.start() except (SystemExit, KeyboardInterrupt): self.logger.info("Shutting down") -- cgit v1.2.3-1-g7c22 From 9b8a2b6bfa0a0586f1c25358519c8d8db1260009 Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Thu, 10 Oct 2013 14:21:07 -0400 Subject: Reporting: misc improvements to collector threading 1. Use a better convention for calling the threading.Thread constructor 2. Add docstring to ReportingStoreThread.run 3. Give the storage thread variable a better name --- src/lib/Bcfg2/Reporting/Collector.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'src/lib/Bcfg2/Reporting') diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index f348a60dc..febdfed13 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -29,15 +29,17 @@ class ReportingError(Exception): class ReportingStoreThread(threading.Thread): """Thread for calling the storage backend""" def __init__(self, interaction, storage, group=None, target=None, - name=None, *args, **kwargs): + name=None, args=(), kwargs=None): """Initialize the thread with a reference to the interaction as well as the storage engine to use""" - threading.Thread.__init__(self, group, target, name, args, kwargs) + threading.Thread.__init__(self, group, target, name, args, + kwargs or dict()) self.interaction = interaction self.storage = storage self.logger = logging.getLogger('bcfg2-report-collector') def run(self): + """Call the database storage procedure (aka import)""" try: start = time.time() self.storage.import_interaction(self.interaction) @@ -129,11 +131,11 @@ class ReportingCollector(object): if not interaction: continue - t = ReportingStoreThread(interaction, self.storage) + store_thread = ReportingStoreThread(interaction, self.storage) while len(threading.enumerate()) > 100: self.logger.info("more than 100 threads running, sleeping") time.sleep(1) - t.start() + store_thread.start() except (SystemExit, KeyboardInterrupt): self.logger.info("Shutting down") self.shutdown() -- cgit v1.2.3-1-g7c22 From 171179777d0b6104440a87bae7f4aff0c82ba297 Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Mon, 14 Oct 2013 14:33:44 -0400 Subject: Revert "Reporting: Simple sanity check to avoid creating too many threads" This reverts commit f813f86f8ac2bc7b55f4eb6a0d936f1ce4f68ba7. Premature optimization is the root of all evil, etc. Conflicts: src/lib/Bcfg2/Reporting/Collector.py --- src/lib/Bcfg2/Reporting/Collector.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'src/lib/Bcfg2/Reporting') diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index febdfed13..b42364d8d 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -132,9 +132,6 @@ class ReportingCollector(object): continue store_thread = ReportingStoreThread(interaction, self.storage) - while len(threading.enumerate()) > 100: - self.logger.info("more than 100 threads running, sleeping") - time.sleep(1) store_thread.start() except (SystemExit, KeyboardInterrupt): self.logger.info("Shutting down") -- cgit v1.2.3-1-g7c22