summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting/Collector.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Collector.py')
-rw-r--r--src/lib/Bcfg2/Reporting/Collector.py70
1 files changed, 58 insertions, 12 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py
index df82248d0..52700f917 100644
--- a/src/lib/Bcfg2/Reporting/Collector.py
+++ b/src/lib/Bcfg2/Reporting/Collector.py
@@ -20,10 +20,38 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore
from Bcfg2.Reporting.Storage import load_storage_from_config, \
StorageError, StorageImportError
+
class ReportingError(Exception):
"""Generic reporting exception"""
pass
+
+class ReportingStoreThread(threading.Thread):
+ """Thread for calling the storage backend"""
+ def __init__(self, interaction, storage, group=None, target=None,
+ name=None, args=(), kwargs=None):
+ """Initialize the thread with a reference to the interaction
+ as well as the storage engine to use"""
+ threading.Thread.__init__(self, group, target, name, args,
+ kwargs or dict())
+ self.interaction = interaction
+ self.storage = storage
+ self.logger = logging.getLogger('bcfg2-report-collector')
+
+ def run(self):
+ """Call the database storage procedure (aka import)"""
+ try:
+ start = time.time()
+ self.storage.import_interaction(self.interaction)
+ self.logger.info("Imported interaction for %s in %ss" %
+ (self.interaction.get('hostname', '<unknown>'),
+ time.time() - start))
+ except:
+ #TODO requeue?
+ self.logger.error("Unhandled exception in import thread %s" %
+ traceback.format_exc().splitlines()[-1])
+
+
class ReportingCollector(object):
"""The collecting process for reports"""
@@ -35,6 +63,8 @@ class ReportingCollector(object):
self.encoding = setup['encoding']
self.terminate = None
self.context = None
+ self.children = []
+ self.cleanup_threshold = 25
if setup['debug']:
level = logging.DEBUG
@@ -77,12 +107,12 @@ class ReportingCollector(object):
(self.storage.__class__.__name__,
traceback.format_exc().splitlines()[-1]))
-
def run(self):
"""Startup the processing and go!"""
self.terminate = threading.Event()
atexit.register(self.shutdown)
- self.context = daemon.DaemonContext()
+ self.context = daemon.DaemonContext(detach_process=True)
+ iter = 0
if self.setup['daemon']:
self.logger.debug("Daemonizing")
@@ -103,15 +133,16 @@ class ReportingCollector(object):
interaction = self.transport.fetch()
if not interaction:
continue
- try:
- start = time.time()
- self.storage.import_interaction(interaction)
- self.logger.info("Imported interaction for %s in %ss" %
- (interaction.get('hostname', '<unknown>'),
- time.time() - start))
- except:
- #TODO requeue?
- raise
+
+ store_thread = ReportingStoreThread(interaction, self.storage)
+ store_thread.start()
+ self.children.append(store_thread)
+
+ iter += 1
+ if iter >= self.cleanup_threshold:
+ self.reap_children()
+ iter = 0
+
except (SystemExit, KeyboardInterrupt):
self.logger.info("Shutting down")
self.shutdown()
@@ -125,7 +156,22 @@ class ReportingCollector(object):
# this wil be missing if called from bcfg2-admin
self.terminate.set()
if self.transport:
- self.transport.shutdown()
+ try:
+ self.transport.shutdown()
+ except OSError:
+ pass
if self.storage:
self.storage.shutdown()
+ def reap_children(self):
+ """Join any non-live threads"""
+ newlist = []
+
+ self.logger.debug("Starting reap_children")
+ for child in self.children:
+ if child.isAlive():
+ newlist.append(child)
+ else:
+ child.join()
+ self.logger.debug("Joined child thread %s" % child.getName())
+ self.children = newlist