diff options
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Collector.py')
-rw-r--r-- | src/lib/Bcfg2/Reporting/Collector.py | 95 |
1 files changed, 54 insertions, 41 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 8e2fe1cb1..153809a35 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -4,7 +4,6 @@ import atexit import daemon import logging import time -import traceback import threading from lockfile import LockFailed, LockTimeout @@ -16,11 +15,11 @@ except ImportError: # pylint: enable=E0611 import Bcfg2.Logger -from Bcfg2.Reporting.Transport import load_transport_from_config, \ - TransportError, TransportImportError +import Bcfg2.Options +from Bcfg2.Reporting.Transport.base import TransportError from Bcfg2.Reporting.Transport.DirectStore import DirectStore -from Bcfg2.Reporting.Storage import load_storage_from_config, \ - StorageError, StorageImportError +from Bcfg2.Reporting.Storage.base import StorageError + class ReportingError(Exception): @@ -31,7 +30,7 @@ class ReportingError(Exception): class ReportingStoreThread(threading.Thread): """Thread for calling the storage backend""" def __init__(self, interaction, storage, group=None, target=None, - name=None, args=(), kwargs=None): + name=None, semaphore=None, args=(), kwargs=None): """Initialize the thread with a reference to the interaction as well as the storage engine to use""" threading.Thread.__init__(self, group, target, name, args, @@ -39,59 +38,71 @@ class ReportingStoreThread(threading.Thread): self.interaction = interaction self.storage = storage self.logger = logging.getLogger('bcfg2-report-collector') + self.semaphore = semaphore def run(self): """Call the database storage procedure (aka import)""" try: - start = time.time() - self.storage.import_interaction(self.interaction) - self.logger.info("Imported interaction for %s in %ss" % - (self.interaction.get('hostname', '<unknown>'), - time.time() - start)) - except: - #TODO requeue? - self.logger.error("Unhandled exception in import thread %s" % - traceback.format_exc().splitlines()[-1]) + try: + start = time.time() + self.storage.import_interaction(self.interaction) + self.logger.info("Imported interaction for %s in %ss" % + (self.interaction.get('hostname', + '<unknown>'), + time.time() - start)) + except: + #TODO requeue? + self.logger.error("Unhandled exception in import thread %s" % + sys.exc_info()[1]) + finally: + if self.semaphore: + self.semaphore.release() class ReportingCollector(object): """The collecting process for reports""" - - def __init__(self, setup): + options = [Bcfg2.Options.Common.reporting_storage, + Bcfg2.Options.Common.reporting_transport, + Bcfg2.Options.Common.daemon, + Bcfg2.Options.Option( + '--max-children', dest="children", + cf=('reporting', 'max_children'), type=int, + default=0, + help='Maximum number of children for the reporting collector')] + + def __init__(self): """Setup the collector. This may be called by the daemon or though bcfg2-admin""" - self.setup = setup - self.datastore = setup['repo'] - self.encoding = setup['encoding'] self.terminate = None self.context = None self.children = [] self.cleanup_threshold = 25 - if setup['debug']: + self.semaphore = None + if Bcfg2.Options.setup.children > 0: + self.semaphore = threading.Semaphore( + value=Bcfg2.Options.setup.children) + + if Bcfg2.Options.setup.debug: level = logging.DEBUG - elif setup['verbose']: + elif Bcfg2.Options.setup.verbose: level = logging.INFO else: level = logging.WARNING - Bcfg2.Logger.setup_logging('bcfg2-report-collector', - to_console=logging.INFO, - to_syslog=setup['syslog'], - to_file=setup['logging'], - level=level) + Bcfg2.Logger.setup_logging() self.logger = logging.getLogger('bcfg2-report-collector') try: - self.transport = load_transport_from_config(setup) - self.storage = load_storage_from_config(setup) + self.transport = Bcfg2.Options.setup.reporting_transport() + self.storage = Bcfg2.Options.setup.reporting_storage() except TransportError: self.logger.error("Failed to load transport: %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) raise ReportingError except StorageError: self.logger.error("Failed to load storage: %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) raise ReportingError if isinstance(self.transport, DirectStore): @@ -102,12 +113,12 @@ class ReportingCollector(object): try: self.logger.debug("Validating storage %s" % - self.storage.__class__.__name__) + self.storage.__class__.__name__) self.storage.validate() except: self.logger.error("Storage backed %s failed to validate: %s" % - (self.storage.__class__.__name__, - traceback.format_exc().splitlines()[-1])) + (self.storage.__class__.__name__, + sys.exc_info()[1])) def run(self): """Startup the processing and go!""" @@ -116,10 +127,10 @@ class ReportingCollector(object): self.context = daemon.DaemonContext(detach_process=True) iter = 0 - if self.setup['daemon']: + if Bcfg2.Options.setup.daemon: self.logger.debug("Daemonizing") - self.context.pidfile = TimeoutPIDLockFile(self.setup['daemon'], - acquire_timeout=5) + self.context.pidfile = TimeoutPIDLockFile( + Bcfg2.Options.setup.daemon, acquire_timeout=5) # Attempt to ensure lockfile is able to be created and not stale try: self.context.pidfile.acquire() @@ -136,7 +147,7 @@ class ReportingCollector(object): else: self.logger.error("Failed to daemonize: " "Failed to acquire lock on %s" % - self.setup['daemon']) + Bcfg2.Options.setup.daemon) self.shutdown() return else: @@ -152,8 +163,10 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - - store_thread = ReportingStoreThread(interaction, self.storage) + if self.semaphore: + self.semaphore.acquire() + store_thread = ReportingStoreThread(interaction, self.storage, + semaphore=self.semaphore) store_thread.start() self.children.append(store_thread) @@ -167,7 +180,7 @@ class ReportingCollector(object): self.shutdown() except: self.logger.error("Unhandled exception in main loop %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) def shutdown(self): """Cleanup and go""" |