summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting/Collector.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Collector.py')
-rw-r--r--src/lib/Bcfg2/Reporting/Collector.py81
1 files changed, 53 insertions, 28 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py
index 12c9cdaa8..153809a35 100644
--- a/src/lib/Bcfg2/Reporting/Collector.py
+++ b/src/lib/Bcfg2/Reporting/Collector.py
@@ -1,3 +1,4 @@
+import os
import sys
import atexit
import daemon
@@ -5,13 +6,12 @@ import logging
import time
import threading
-# pylint: disable=E0611
from lockfile import LockFailed, LockTimeout
+# pylint: disable=E0611
try:
- from lockfile.pidlockfile import PIDLockFile
- from lockfile import Error as PIDFileError
+ from daemon.pidfile import TimeoutPIDLockFile
except ImportError:
- from daemon.pidlockfile import PIDLockFile, PIDFileError
+ from daemon.pidlockfile import TimeoutPIDLockFile
# pylint: enable=E0611
import Bcfg2.Logger
@@ -30,7 +30,7 @@ class ReportingError(Exception):
class ReportingStoreThread(threading.Thread):
"""Thread for calling the storage backend"""
def __init__(self, interaction, storage, group=None, target=None,
- name=None, args=(), kwargs=None):
+ name=None, semaphore=None, args=(), kwargs=None):
"""Initialize the thread with a reference to the interaction
as well as the storage engine to use"""
threading.Thread.__init__(self, group, target, name, args,
@@ -38,26 +38,37 @@ class ReportingStoreThread(threading.Thread):
self.interaction = interaction
self.storage = storage
self.logger = logging.getLogger('bcfg2-report-collector')
+ self.semaphore = semaphore
def run(self):
"""Call the database storage procedure (aka import)"""
try:
- start = time.time()
- self.storage.import_interaction(self.interaction)
- self.logger.info("Imported interaction for %s in %ss" %
- (self.interaction.get('hostname', '<unknown>'),
- time.time() - start))
- except:
- #TODO requeue?
- self.logger.error("Unhandled exception in import thread %s" %
- sys.exc_info()[1])
+ try:
+ start = time.time()
+ self.storage.import_interaction(self.interaction)
+ self.logger.info("Imported interaction for %s in %ss" %
+ (self.interaction.get('hostname',
+ '<unknown>'),
+ time.time() - start))
+ except:
+ #TODO requeue?
+ self.logger.error("Unhandled exception in import thread %s" %
+ sys.exc_info()[1])
+ finally:
+ if self.semaphore:
+ self.semaphore.release()
class ReportingCollector(object):
"""The collecting process for reports"""
options = [Bcfg2.Options.Common.reporting_storage,
Bcfg2.Options.Common.reporting_transport,
- Bcfg2.Options.Common.daemon]
+ Bcfg2.Options.Common.daemon,
+ Bcfg2.Options.Option(
+ '--max-children', dest="children",
+ cf=('reporting', 'max_children'), type=int,
+ default=0,
+ help='Maximum number of children for the reporting collector')]
def __init__(self):
"""Setup the collector. This may be called by the daemon or though
@@ -67,6 +78,11 @@ class ReportingCollector(object):
self.children = []
self.cleanup_threshold = 25
+ self.semaphore = None
+ if Bcfg2.Options.setup.children > 0:
+ self.semaphore = threading.Semaphore(
+ value=Bcfg2.Options.setup.children)
+
if Bcfg2.Options.setup.debug:
level = logging.DEBUG
elif Bcfg2.Options.setup.verbose:
@@ -113,25 +129,31 @@ class ReportingCollector(object):
if Bcfg2.Options.setup.daemon:
self.logger.debug("Daemonizing")
+ self.context.pidfile = TimeoutPIDLockFile(
+ Bcfg2.Options.setup.daemon, acquire_timeout=5)
+ # Attempt to ensure lockfile is able to be created and not stale
try:
- self.context.pidfile = PIDLockFile(Bcfg2.Options.setup.daemon)
- self.context.open()
+ self.context.pidfile.acquire()
except LockFailed:
self.logger.error("Failed to daemonize: %s" %
sys.exc_info()[1])
self.shutdown()
return
except LockTimeout:
- self.logger.error("Failed to daemonize: "
- "Failed to acquire lock on %s" %
- self.setup['daemon'])
- self.shutdown()
- return
- except PIDFileError:
- self.logger.error("Error writing pid file: %s" %
- sys.exc_info()[1])
- self.shutdown()
- return
+ try: # attempt to break the lock
+ os.kill(self.context.pidfile.read_pid(), 0)
+ except (OSError, TypeError): # No process with locked PID
+ self.context.pidfile.break_lock()
+ else:
+ self.logger.error("Failed to daemonize: "
+ "Failed to acquire lock on %s" %
+ Bcfg2.Options.setup.daemon)
+ self.shutdown()
+ return
+ else:
+ self.context.pidfile.release()
+
+ self.context.open()
self.logger.info("Starting daemon")
self.transport.start_monitor(self)
@@ -141,7 +163,10 @@ class ReportingCollector(object):
interaction = self.transport.fetch()
if not interaction:
continue
- store_thread = ReportingStoreThread(interaction, self.storage)
+ if self.semaphore:
+ self.semaphore.acquire()
+ store_thread = ReportingStoreThread(interaction, self.storage,
+ semaphore=self.semaphore)
store_thread.start()
self.children.append(store_thread)