From 2c87c4e4de08c0070ebc5c095c03fc3b233f92d1 Mon Sep 17 00:00:00 2001 From: Alexander Sulfrian Date: Mon, 27 Oct 2014 00:26:42 +0100 Subject: Reporting/Collector: add max-children argument Add option to limit the count of child threads to import the transactions. If the number is exceeded the next import will block until one thread is ready. --- src/lib/Bcfg2/Reporting/Collector.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 12c9cdaa8..b6c35fdd0 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -30,7 +30,7 @@ class ReportingError(Exception): class ReportingStoreThread(threading.Thread): """Thread for calling the storage backend""" def __init__(self, interaction, storage, group=None, target=None, - name=None, args=(), kwargs=None): + name=None, semaphore=None, args=(), kwargs=None): """Initialize the thread with a reference to the interaction as well as the storage engine to use""" threading.Thread.__init__(self, group, target, name, args, @@ -38,6 +38,7 @@ class ReportingStoreThread(threading.Thread): self.interaction = interaction self.storage = storage self.logger = logging.getLogger('bcfg2-report-collector') + self.semaphore = semaphore def run(self): """Call the database storage procedure (aka import)""" @@ -51,13 +52,21 @@ class ReportingStoreThread(threading.Thread): #TODO requeue? self.logger.error("Unhandled exception in import thread %s" % sys.exc_info()[1]) + finally: + if self.semaphore: + self.semaphore.release() class ReportingCollector(object): """The collecting process for reports""" options = [Bcfg2.Options.Common.reporting_storage, Bcfg2.Options.Common.reporting_transport, - Bcfg2.Options.Common.daemon] + Bcfg2.Options.Common.daemon, + Bcfg2.Options.Option( + '--max-children', dest="children", + cf=('reporting', 'max_children'), type=int, + default=0, + help='Maximum number of children for the reporting collector')] def __init__(self): """Setup the collector. This may be called by the daemon or though @@ -67,6 +76,10 @@ class ReportingCollector(object): self.children = [] self.cleanup_threshold = 25 + if Bcfg2.Options.setup.children > 0: + self.semaphore = threading.Semaphore( + value=Bcfg2.Options.setup.children) + if Bcfg2.Options.setup.debug: level = logging.DEBUG elif Bcfg2.Options.setup.verbose: @@ -141,7 +154,10 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - store_thread = ReportingStoreThread(interaction, self.storage) + if Bcfg2.Options.setup.children > 0: + self.semaphore.acquire() + store_thread = ReportingStoreThread(interaction, self.storage, + semaphore=self.semaphore) store_thread.start() self.children.append(store_thread) -- cgit v1.2.3-1-g7c22