summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting
diff options
context:
space:
mode:
authorAlexander Sulfrian <alexander@sulfrian.net>2014-10-27 00:26:42 +0100
committerAlexander Sulfrian <alexander@sulfrian.net>2014-10-27 15:31:45 +0100
commit2c87c4e4de08c0070ebc5c095c03fc3b233f92d1 (patch)
tree255d1130b8c3cda86c1dbd589e67fa68cba8e9e5 /src/lib/Bcfg2/Reporting
parent56d2b72b6f8b2e1a27a16dbfdab10f0b83816ceb (diff)
downloadbcfg2-2c87c4e4de08c0070ebc5c095c03fc3b233f92d1.tar.gz
bcfg2-2c87c4e4de08c0070ebc5c095c03fc3b233f92d1.tar.bz2
bcfg2-2c87c4e4de08c0070ebc5c095c03fc3b233f92d1.zip
Reporting/Collector: add max-children argument
Add option to limit the count of child threads to import the transactions. If the number is exceeded the next import will block until one thread is ready.
Diffstat (limited to 'src/lib/Bcfg2/Reporting')
-rw-r--r--src/lib/Bcfg2/Reporting/Collector.py22
1 files changed, 19 insertions, 3 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py
index 12c9cdaa8..b6c35fdd0 100644
--- a/src/lib/Bcfg2/Reporting/Collector.py
+++ b/src/lib/Bcfg2/Reporting/Collector.py
@@ -30,7 +30,7 @@ class ReportingError(Exception):
class ReportingStoreThread(threading.Thread):
"""Thread for calling the storage backend"""
def __init__(self, interaction, storage, group=None, target=None,
- name=None, args=(), kwargs=None):
+ name=None, semaphore=None, args=(), kwargs=None):
"""Initialize the thread with a reference to the interaction
as well as the storage engine to use"""
threading.Thread.__init__(self, group, target, name, args,
@@ -38,6 +38,7 @@ class ReportingStoreThread(threading.Thread):
self.interaction = interaction
self.storage = storage
self.logger = logging.getLogger('bcfg2-report-collector')
+ self.semaphore = semaphore
def run(self):
"""Call the database storage procedure (aka import)"""
@@ -51,13 +52,21 @@ class ReportingStoreThread(threading.Thread):
#TODO requeue?
self.logger.error("Unhandled exception in import thread %s" %
sys.exc_info()[1])
+ finally:
+ if self.semaphore:
+ self.semaphore.release()
class ReportingCollector(object):
"""The collecting process for reports"""
options = [Bcfg2.Options.Common.reporting_storage,
Bcfg2.Options.Common.reporting_transport,
- Bcfg2.Options.Common.daemon]
+ Bcfg2.Options.Common.daemon,
+ Bcfg2.Options.Option(
+ '--max-children', dest="children",
+ cf=('reporting', 'max_children'), type=int,
+ default=0,
+ help='Maximum number of children for the reporting collector')]
def __init__(self):
"""Setup the collector. This may be called by the daemon or though
@@ -67,6 +76,10 @@ class ReportingCollector(object):
self.children = []
self.cleanup_threshold = 25
+ if Bcfg2.Options.setup.children > 0:
+ self.semaphore = threading.Semaphore(
+ value=Bcfg2.Options.setup.children)
+
if Bcfg2.Options.setup.debug:
level = logging.DEBUG
elif Bcfg2.Options.setup.verbose:
@@ -141,7 +154,10 @@ class ReportingCollector(object):
interaction = self.transport.fetch()
if not interaction:
continue
- store_thread = ReportingStoreThread(interaction, self.storage)
+ if Bcfg2.Options.setup.children > 0:
+ self.semaphore.acquire()
+ store_thread = ReportingStoreThread(interaction, self.storage,
+ semaphore=self.semaphore)
store_thread.start()
self.children.append(store_thread)