summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib/Bcfg2/Reporting/Collector.py42
1 files changed, 32 insertions, 10 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py
index 4556cda82..f348a60dc 100644
--- a/src/lib/Bcfg2/Reporting/Collector.py
+++ b/src/lib/Bcfg2/Reporting/Collector.py
@@ -20,10 +20,36 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore
from Bcfg2.Reporting.Storage import load_storage_from_config, \
StorageError, StorageImportError
+
class ReportingError(Exception):
"""Generic reporting exception"""
pass
+
+class ReportingStoreThread(threading.Thread):
+ """Thread for calling the storage backend"""
+ def __init__(self, interaction, storage, group=None, target=None,
+ name=None, *args, **kwargs):
+ """Initialize the thread with a reference to the interaction
+ as well as the storage engine to use"""
+ threading.Thread.__init__(self, group, target, name, args, kwargs)
+ self.interaction = interaction
+ self.storage = storage
+ self.logger = logging.getLogger('bcfg2-report-collector')
+
+ def run(self):
+ try:
+ start = time.time()
+ self.storage.import_interaction(self.interaction)
+ self.logger.info("Imported interaction for %s in %ss" %
+ (self.interaction.get('hostname', '<unknown>'),
+ time.time() - start))
+ except:
+ #TODO requeue?
+ self.logger.error("Unhandled exception in import thread %s" %
+ traceback.format_exc().splitlines()[-1])
+
+
class ReportingCollector(object):
"""The collecting process for reports"""
@@ -77,7 +103,6 @@ class ReportingCollector(object):
(self.storage.__class__.__name__,
traceback.format_exc().splitlines()[-1]))
-
def run(self):
"""Startup the processing and go!"""
self.terminate = threading.Event()
@@ -103,15 +128,12 @@ class ReportingCollector(object):
interaction = self.transport.fetch()
if not interaction:
continue
- try:
- start = time.time()
- self.storage.import_interaction(interaction)
- self.logger.info("Imported interaction for %s in %ss" %
- (interaction.get('hostname', '<unknown>'),
- time.time() - start))
- except:
- #TODO requeue?
- raise
+
+ t = ReportingStoreThread(interaction, self.storage)
+ while len(threading.enumerate()) > 100:
+ self.logger.info("more than 100 threads running, sleeping")
+ time.sleep(1)
+ t.start()
except (SystemExit, KeyboardInterrupt):
self.logger.info("Shutting down")
self.shutdown()