diff options
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Collector.py')
-rw-r--r-- | src/lib/Bcfg2/Reporting/Collector.py | 81 |
1 files changed, 53 insertions, 28 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 12c9cdaa8..153809a35 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -1,3 +1,4 @@ +import os import sys import atexit import daemon @@ -5,13 +6,12 @@ import logging import time import threading -# pylint: disable=E0611 from lockfile import LockFailed, LockTimeout +# pylint: disable=E0611 try: - from lockfile.pidlockfile import PIDLockFile - from lockfile import Error as PIDFileError + from daemon.pidfile import TimeoutPIDLockFile except ImportError: - from daemon.pidlockfile import PIDLockFile, PIDFileError + from daemon.pidlockfile import TimeoutPIDLockFile # pylint: enable=E0611 import Bcfg2.Logger @@ -30,7 +30,7 @@ class ReportingError(Exception): class ReportingStoreThread(threading.Thread): """Thread for calling the storage backend""" def __init__(self, interaction, storage, group=None, target=None, - name=None, args=(), kwargs=None): + name=None, semaphore=None, args=(), kwargs=None): """Initialize the thread with a reference to the interaction as well as the storage engine to use""" threading.Thread.__init__(self, group, target, name, args, @@ -38,26 +38,37 @@ class ReportingStoreThread(threading.Thread): self.interaction = interaction self.storage = storage self.logger = logging.getLogger('bcfg2-report-collector') + self.semaphore = semaphore def run(self): """Call the database storage procedure (aka import)""" try: - start = time.time() - self.storage.import_interaction(self.interaction) - self.logger.info("Imported interaction for %s in %ss" % - (self.interaction.get('hostname', '<unknown>'), - time.time() - start)) - except: - #TODO requeue? - self.logger.error("Unhandled exception in import thread %s" % - sys.exc_info()[1]) + try: + start = time.time() + self.storage.import_interaction(self.interaction) + self.logger.info("Imported interaction for %s in %ss" % + (self.interaction.get('hostname', + '<unknown>'), + time.time() - start)) + except: + #TODO requeue? + self.logger.error("Unhandled exception in import thread %s" % + sys.exc_info()[1]) + finally: + if self.semaphore: + self.semaphore.release() class ReportingCollector(object): """The collecting process for reports""" options = [Bcfg2.Options.Common.reporting_storage, Bcfg2.Options.Common.reporting_transport, - Bcfg2.Options.Common.daemon] + Bcfg2.Options.Common.daemon, + Bcfg2.Options.Option( + '--max-children', dest="children", + cf=('reporting', 'max_children'), type=int, + default=0, + help='Maximum number of children for the reporting collector')] def __init__(self): """Setup the collector. This may be called by the daemon or though @@ -67,6 +78,11 @@ class ReportingCollector(object): self.children = [] self.cleanup_threshold = 25 + self.semaphore = None + if Bcfg2.Options.setup.children > 0: + self.semaphore = threading.Semaphore( + value=Bcfg2.Options.setup.children) + if Bcfg2.Options.setup.debug: level = logging.DEBUG elif Bcfg2.Options.setup.verbose: @@ -113,25 +129,31 @@ class ReportingCollector(object): if Bcfg2.Options.setup.daemon: self.logger.debug("Daemonizing") + self.context.pidfile = TimeoutPIDLockFile( + Bcfg2.Options.setup.daemon, acquire_timeout=5) + # Attempt to ensure lockfile is able to be created and not stale try: - self.context.pidfile = PIDLockFile(Bcfg2.Options.setup.daemon) - self.context.open() + self.context.pidfile.acquire() except LockFailed: self.logger.error("Failed to daemonize: %s" % sys.exc_info()[1]) self.shutdown() return except LockTimeout: - self.logger.error("Failed to daemonize: " - "Failed to acquire lock on %s" % - self.setup['daemon']) - self.shutdown() - return - except PIDFileError: - self.logger.error("Error writing pid file: %s" % - sys.exc_info()[1]) - self.shutdown() - return + try: # attempt to break the lock + os.kill(self.context.pidfile.read_pid(), 0) + except (OSError, TypeError): # No process with locked PID + self.context.pidfile.break_lock() + else: + self.logger.error("Failed to daemonize: " + "Failed to acquire lock on %s" % + Bcfg2.Options.setup.daemon) + self.shutdown() + return + else: + self.context.pidfile.release() + + self.context.open() self.logger.info("Starting daemon") self.transport.start_monitor(self) @@ -141,7 +163,10 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - store_thread = ReportingStoreThread(interaction, self.storage) + if self.semaphore: + self.semaphore.acquire() + store_thread = ReportingStoreThread(interaction, self.storage, + semaphore=self.semaphore) store_thread.start() self.children.append(store_thread) |