summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/man/bcfg2.conf.txt4
-rw-r--r--doc/reports/dynamic.txt3
-rw-r--r--man/bcfg2.conf.54
-rw-r--r--src/lib/Bcfg2/Reporting/Collector.py22
4 files changed, 30 insertions, 3 deletions
diff --git a/doc/man/bcfg2.conf.txt b/doc/man/bcfg2.conf.txt
index 825ab2121..40766b88a 100644
--- a/doc/man/bcfg2.conf.txt
+++ b/doc/man/bcfg2.conf.txt
@@ -716,6 +716,10 @@ Reporting options
web_debug
Turn on Django debugging.
+ max_children
+ Maximum number of children for the reporting collector. Use 0 to
+ disable the limit. (default is 0)
+
See Also
--------
diff --git a/doc/reports/dynamic.txt b/doc/reports/dynamic.txt
index 53bdef24e..38d4c7e3a 100644
--- a/doc/reports/dynamic.txt
+++ b/doc/reports/dynamic.txt
@@ -270,6 +270,9 @@ reporting
* web_prefix: Prefix to be added to Django's MEDIA_URL
* file_limit: The maximum size of a diff or binary data to
store in the database.
+* max_children: Maximum number of children for the reporting
+ collector. Use 0 to disable the limit and spawn a thread
+ as soon as a working file is available.
.. _dynamic_transports:
diff --git a/man/bcfg2.conf.5 b/man/bcfg2.conf.5
index a8366721a..69ec4c71a 100644
--- a/man/bcfg2.conf.5
+++ b/man/bcfg2.conf.5
@@ -772,6 +772,10 @@ time zone as well).
.TP
.B web_debug
Turn on Django debugging.
+.TP
+.B max_children
+Maximum number of children for the reporting collector. Use 0 to
+disable the limit. (default is 0)
.UNINDENT
.UNINDENT
.UNINDENT
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py
index 12c9cdaa8..b6c35fdd0 100644
--- a/src/lib/Bcfg2/Reporting/Collector.py
+++ b/src/lib/Bcfg2/Reporting/Collector.py
@@ -30,7 +30,7 @@ class ReportingError(Exception):
class ReportingStoreThread(threading.Thread):
"""Thread for calling the storage backend"""
def __init__(self, interaction, storage, group=None, target=None,
- name=None, args=(), kwargs=None):
+ name=None, semaphore=None, args=(), kwargs=None):
"""Initialize the thread with a reference to the interaction
as well as the storage engine to use"""
threading.Thread.__init__(self, group, target, name, args,
@@ -38,6 +38,7 @@ class ReportingStoreThread(threading.Thread):
self.interaction = interaction
self.storage = storage
self.logger = logging.getLogger('bcfg2-report-collector')
+ self.semaphore = semaphore
def run(self):
"""Call the database storage procedure (aka import)"""
@@ -51,13 +52,21 @@ class ReportingStoreThread(threading.Thread):
#TODO requeue?
self.logger.error("Unhandled exception in import thread %s" %
sys.exc_info()[1])
+ finally:
+ if self.semaphore:
+ self.semaphore.release()
class ReportingCollector(object):
"""The collecting process for reports"""
options = [Bcfg2.Options.Common.reporting_storage,
Bcfg2.Options.Common.reporting_transport,
- Bcfg2.Options.Common.daemon]
+ Bcfg2.Options.Common.daemon,
+ Bcfg2.Options.Option(
+ '--max-children', dest="children",
+ cf=('reporting', 'max_children'), type=int,
+ default=0,
+ help='Maximum number of children for the reporting collector')]
def __init__(self):
"""Setup the collector. This may be called by the daemon or though
@@ -67,6 +76,10 @@ class ReportingCollector(object):
self.children = []
self.cleanup_threshold = 25
+ if Bcfg2.Options.setup.children > 0:
+ self.semaphore = threading.Semaphore(
+ value=Bcfg2.Options.setup.children)
+
if Bcfg2.Options.setup.debug:
level = logging.DEBUG
elif Bcfg2.Options.setup.verbose:
@@ -141,7 +154,10 @@ class ReportingCollector(object):
interaction = self.transport.fetch()
if not interaction:
continue
- store_thread = ReportingStoreThread(interaction, self.storage)
+ if Bcfg2.Options.setup.children > 0:
+ self.semaphore.acquire()
+ store_thread = ReportingStoreThread(interaction, self.storage,
+ semaphore=self.semaphore)
store_thread.start()
self.children.append(store_thread)