diff options
author | Alexander Sulfrian <alexander@sulfrian.net> | 2014-10-27 00:26:42 +0100 |
---|---|---|
committer | Alexander Sulfrian <alexander@sulfrian.net> | 2014-10-27 15:31:45 +0100 |
commit | 2c87c4e4de08c0070ebc5c095c03fc3b233f92d1 (patch) | |
tree | 255d1130b8c3cda86c1dbd589e67fa68cba8e9e5 /src/lib/Bcfg2 | |
parent | 56d2b72b6f8b2e1a27a16dbfdab10f0b83816ceb (diff) | |
download | bcfg2-2c87c4e4de08c0070ebc5c095c03fc3b233f92d1.tar.gz bcfg2-2c87c4e4de08c0070ebc5c095c03fc3b233f92d1.tar.bz2 bcfg2-2c87c4e4de08c0070ebc5c095c03fc3b233f92d1.zip |
Reporting/Collector: add max-children argument
Add option to limit the count of child threads to import the transactions.
If the number is exceeded the next import will block until one thread is
ready.
Diffstat (limited to 'src/lib/Bcfg2')
-rw-r--r-- | src/lib/Bcfg2/Reporting/Collector.py | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 12c9cdaa8..b6c35fdd0 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -30,7 +30,7 @@ class ReportingError(Exception): class ReportingStoreThread(threading.Thread): """Thread for calling the storage backend""" def __init__(self, interaction, storage, group=None, target=None, - name=None, args=(), kwargs=None): + name=None, semaphore=None, args=(), kwargs=None): """Initialize the thread with a reference to the interaction as well as the storage engine to use""" threading.Thread.__init__(self, group, target, name, args, @@ -38,6 +38,7 @@ class ReportingStoreThread(threading.Thread): self.interaction = interaction self.storage = storage self.logger = logging.getLogger('bcfg2-report-collector') + self.semaphore = semaphore def run(self): """Call the database storage procedure (aka import)""" @@ -51,13 +52,21 @@ class ReportingStoreThread(threading.Thread): #TODO requeue? self.logger.error("Unhandled exception in import thread %s" % sys.exc_info()[1]) + finally: + if self.semaphore: + self.semaphore.release() class ReportingCollector(object): """The collecting process for reports""" options = [Bcfg2.Options.Common.reporting_storage, Bcfg2.Options.Common.reporting_transport, - Bcfg2.Options.Common.daemon] + Bcfg2.Options.Common.daemon, + Bcfg2.Options.Option( + '--max-children', dest="children", + cf=('reporting', 'max_children'), type=int, + default=0, + help='Maximum number of children for the reporting collector')] def __init__(self): """Setup the collector. This may be called by the daemon or though @@ -67,6 +76,10 @@ class ReportingCollector(object): self.children = [] self.cleanup_threshold = 25 + if Bcfg2.Options.setup.children > 0: + self.semaphore = threading.Semaphore( + value=Bcfg2.Options.setup.children) + if Bcfg2.Options.setup.debug: level = logging.DEBUG elif Bcfg2.Options.setup.verbose: @@ -141,7 +154,10 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - store_thread = ReportingStoreThread(interaction, self.storage) + if Bcfg2.Options.setup.children > 0: + self.semaphore.acquire() + store_thread = ReportingStoreThread(interaction, self.storage, + semaphore=self.semaphore) store_thread.start() self.children.append(store_thread) |