summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-10-28 04:41:17 -0700
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-10-28 04:41:17 -0700
commite4d4e60fcd671b421087f62ba442c84873ab8bda (patch)
tree075c823a87662df5ab56d0d4d050c64cc17a73f6
parent3a5eec174af0b9907b29fdfd3eb1e4fd7677beeb (diff)
parent171179777d0b6104440a87bae7f4aff0c82ba297 (diff)
downloadbcfg2-e4d4e60fcd671b421087f62ba442c84873ab8bda.tar.gz
bcfg2-e4d4e60fcd671b421087f62ba442c84873ab8bda.tar.bz2
bcfg2-e4d4e60fcd671b421087f62ba442c84873ab8bda.zip
Merge pull request #142 from fennm/reporting-thread-each-data-import
Reporting: start a new thread for each import
-rw-r--r--src/lib/Bcfg2/Reporting/Collector.py41
1 files changed, 31 insertions, 10 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py
index 4556cda82..b42364d8d 100644
--- a/src/lib/Bcfg2/Reporting/Collector.py
+++ b/src/lib/Bcfg2/Reporting/Collector.py
@@ -20,10 +20,38 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore
from Bcfg2.Reporting.Storage import load_storage_from_config, \
StorageError, StorageImportError
+
class ReportingError(Exception):
"""Generic reporting exception"""
pass
+
+class ReportingStoreThread(threading.Thread):
+ """Thread for calling the storage backend"""
+ def __init__(self, interaction, storage, group=None, target=None,
+ name=None, args=(), kwargs=None):
+ """Initialize the thread with a reference to the interaction
+ as well as the storage engine to use"""
+ threading.Thread.__init__(self, group, target, name, args,
+ kwargs or dict())
+ self.interaction = interaction
+ self.storage = storage
+ self.logger = logging.getLogger('bcfg2-report-collector')
+
+ def run(self):
+ """Call the database storage procedure (aka import)"""
+ try:
+ start = time.time()
+ self.storage.import_interaction(self.interaction)
+ self.logger.info("Imported interaction for %s in %ss" %
+ (self.interaction.get('hostname', '<unknown>'),
+ time.time() - start))
+ except:
+ #TODO requeue?
+ self.logger.error("Unhandled exception in import thread %s" %
+ traceback.format_exc().splitlines()[-1])
+
+
class ReportingCollector(object):
"""The collecting process for reports"""
@@ -77,7 +105,6 @@ class ReportingCollector(object):
(self.storage.__class__.__name__,
traceback.format_exc().splitlines()[-1]))
-
def run(self):
"""Startup the processing and go!"""
self.terminate = threading.Event()
@@ -103,15 +130,9 @@ class ReportingCollector(object):
interaction = self.transport.fetch()
if not interaction:
continue
- try:
- start = time.time()
- self.storage.import_interaction(interaction)
- self.logger.info("Imported interaction for %s in %ss" %
- (interaction.get('hostname', '<unknown>'),
- time.time() - start))
- except:
- #TODO requeue?
- raise
+
+ store_thread = ReportingStoreThread(interaction, self.storage)
+ store_thread.start()
except (SystemExit, KeyboardInterrupt):
self.logger.info("Shutting down")
self.shutdown()