summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting/Collector.py
diff options
context:
space:
mode:
authorMichael Fenn <fennm@deshawresearch.com>2013-10-09 15:48:51 -0400
committerMichael Fenn <fennm@deshawresearch.com>2013-10-09 15:48:51 -0400
commitcde00779a8d4d9c2624a3b82c6911ccf1cb2aaef (patch)
tree5ca1032934cfa03e606a23b17803edf6f4b9dce5 /src/lib/Bcfg2/Reporting/Collector.py
parentaf6fb8f8a16d6141f5a34b66fa81d74d4693ec67 (diff)
downloadbcfg2-cde00779a8d4d9c2624a3b82c6911ccf1cb2aaef.tar.gz
bcfg2-cde00779a8d4d9c2624a3b82c6911ccf1cb2aaef.tar.bz2
bcfg2-cde00779a8d4d9c2624a3b82c6911ccf1cb2aaef.zip
Reporting: start a new thread for each import
When dealing with a high-latency database connection (eg. across a WAN), the bcfg2-report-collector process can fall behind in its import queue. The imports are very much bound by the response latency of the database server and not processing throughput. This patch fires off a new thread for each database interaction. The thread itself simply falls out of scope when the interaction is finished processing. The interaction object is still read from the disk serially in order avoid having to create a locking mechanism for that part of the process. This change does potentially create greater load on the database server, but ultimately the load is limited by rate at which the bcfg2 server can generate work.
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Collector.py')
-rw-r--r--src/lib/Bcfg2/Reporting/Collector.py39
1 files changed, 29 insertions, 10 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py
index 4556cda82..68e1d6a6d 100644
--- a/src/lib/Bcfg2/Reporting/Collector.py
+++ b/src/lib/Bcfg2/Reporting/Collector.py
@@ -20,10 +20,36 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore
from Bcfg2.Reporting.Storage import load_storage_from_config, \
StorageError, StorageImportError
+
class ReportingError(Exception):
"""Generic reporting exception"""
pass
+
+class ReportingStoreThread(threading.Thread):
+ """Thread for calling the storage backend"""
+ def __init__(self, interaction, storage, group=None, target=None,
+ name=None, *args, **kwargs):
+ """Initialize the thread with a reference to the interaction
+ as well as the storage engine to use"""
+ threading.Thread.__init__(self, group, target, name, args, kwargs)
+ self.interaction = interaction
+ self.storage = storage
+ self.logger = logging.getLogger('bcfg2-report-collector')
+
+ def run(self):
+ try:
+ start = time.time()
+ self.storage.import_interaction(self.interaction)
+ self.logger.info("Imported interaction for %s in %ss" %
+ (self.interaction.get('hostname', '<unknown>'),
+ time.time() - start))
+ except:
+ #TODO requeue?
+ self.logger.error("Unhandled exception in import thread %s" %
+ traceback.format_exc().splitlines()[-1])
+
+
class ReportingCollector(object):
"""The collecting process for reports"""
@@ -77,7 +103,6 @@ class ReportingCollector(object):
(self.storage.__class__.__name__,
traceback.format_exc().splitlines()[-1]))
-
def run(self):
"""Startup the processing and go!"""
self.terminate = threading.Event()
@@ -103,15 +128,9 @@ class ReportingCollector(object):
interaction = self.transport.fetch()
if not interaction:
continue
- try:
- start = time.time()
- self.storage.import_interaction(interaction)
- self.logger.info("Imported interaction for %s in %ss" %
- (interaction.get('hostname', '<unknown>'),
- time.time() - start))
- except:
- #TODO requeue?
- raise
+
+ t = ReportingStoreThread(interaction, self.storage)
+ t.start()
except (SystemExit, KeyboardInterrupt):
self.logger.info("Shutting down")
self.shutdown()