From 7e43d4af98a12e5685f250bf2161fc7afebe02a1 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 27 Jun 2013 10:32:04 -0400 Subject: Options: migrated reporting to new parser --- src/lib/Bcfg2/Reporting/Collector.py | 58 +++++++++++----------- src/lib/Bcfg2/Reporting/Storage/DjangoORM.py | 30 ++++++----- src/lib/Bcfg2/Reporting/Storage/__init__.py | 29 ----------- src/lib/Bcfg2/Reporting/Storage/base.py | 14 ++---- src/lib/Bcfg2/Reporting/Transport/DirectStore.py | 17 ++++--- .../Bcfg2/Reporting/Transport/LocalFilesystem.py | 31 +++++------- .../Bcfg2/Reporting/Transport/RedisTransport.py | 55 ++++++++------------ src/lib/Bcfg2/Reporting/Transport/__init__.py | 32 ------------ src/lib/Bcfg2/Reporting/Transport/base.py | 15 +++--- src/sbin/bcfg2-report-collector | 14 ++---- 10 files changed, 104 insertions(+), 191 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 3d224432e..a1e6025e3 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -1,8 +1,8 @@ +import sys import atexit import daemon import logging import time -import traceback import threading # pylint: disable=E0611 @@ -14,52 +14,53 @@ except ImportError: # pylint: enable=E0611 import Bcfg2.Logger -from Bcfg2.Reporting.Transport import load_transport_from_config, \ - TransportError, TransportImportError +import Bcfg2.Options +from Bcfg2.Reporting.Transport.base import TransportError from Bcfg2.Reporting.Transport.DirectStore import DirectStore -from Bcfg2.Reporting.Storage import load_storage_from_config, \ - StorageError, StorageImportError +from Bcfg2.Reporting.Storage.base import StorageError + class ReportingError(Exception): """Generic reporting exception""" pass + class ReportingCollector(object): """The collecting process for reports""" + options = [Bcfg2.Options.Common.reporting_storage, + Bcfg2.Options.Common.reporting_transport, + Bcfg2.Options.Common.daemon] - def __init__(self, setup): - """Setup the collector. This may be called by the daemon or though + def __init__(self): + """Setup the collector. This may be called by the daemon or though bcfg2-admin""" - self.setup = setup - self.datastore = setup['repo'] - self.encoding = setup['encoding'] self.terminate = None self.context = None - if setup['debug']: + if Bcfg2.Options.setup.debug: level = logging.DEBUG - elif setup['verbose']: + elif Bcfg2.Options.setup.verbose: level = logging.INFO else: level = logging.WARNING Bcfg2.Logger.setup_logging('bcfg2-report-collector', to_console=logging.INFO, - to_syslog=setup['syslog'], - to_file=setup['logging'], + to_syslog=Bcfg2.Options.setup.syslog, + to_file=Bcfg2.Options.setup.logging, level=level) self.logger = logging.getLogger('bcfg2-report-collector') try: - self.transport = load_transport_from_config(setup) - self.storage = load_storage_from_config(setup) + self.transport = Bcfg2.Options.setup.transport() + self.storage = Bcfg2.Options.setup.reporting_storage() except TransportError: self.logger.error("Failed to load transport: %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) raise ReportingError except StorageError: self.logger.error("Failed to load storage: %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) raise ReportingError if isinstance(self.transport, DirectStore): @@ -69,14 +70,13 @@ class ReportingCollector(object): raise ReportingError try: - self.logger.debug("Validating storage %s" % - self.storage.__class__.__name__) + self.logger.debug("Validating storage %s" % + self.storage.__class__.__name__) self.storage.validate() except: self.logger.error("Storage backed %s failed to validate: %s" % - (self.storage.__class__.__name__, - traceback.format_exc().splitlines()[-1])) - + (self.storage.__class__.__name__, + sys.exc_info()[1])) def run(self): """Startup the processing and go!""" @@ -84,14 +84,14 @@ class ReportingCollector(object): atexit.register(self.shutdown) self.context = daemon.DaemonContext() - if self.setup['daemon']: + if Bcfg2.Options.setup.daemon: self.logger.debug("Daemonizing") try: - self.context.pidfile = PIDLockFile(self.setup['daemon']) + self.context.pidfile = PIDLockFile(Bcfg2.Options.setup.daemon) self.context.open() except PIDFileError: self.logger.error("Error writing pid file: %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) self.shutdown() return self.logger.info("Starting daemon") @@ -107,8 +107,8 @@ class ReportingCollector(object): start = time.time() self.storage.import_interaction(interaction) self.logger.info("Imported interaction for %s in %ss" % - (interaction.get('hostname', ''), - time.time() - start)) + (interaction.get('hostname', ''), + time.time() - start)) except: #TODO requeue? raise @@ -117,7 +117,7 @@ class ReportingCollector(object): self.shutdown() except: self.logger.error("Unhandled exception in main loop %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) def shutdown(self): """Cleanup and go""" diff --git a/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py index aea5e9d4b..9505682a7 100644 --- a/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py +++ b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py @@ -11,6 +11,7 @@ from time import strptime os.environ['DJANGO_SETTINGS_MODULE'] = 'Bcfg2.settings' from Bcfg2 import settings +import Bcfg2.Options from Bcfg2.Compat import md5 from Bcfg2.Reporting.Storage.base import StorageBase, StorageError from Bcfg2.Server.Plugin.exceptions import PluginExecutionError @@ -27,9 +28,13 @@ from Bcfg2.Reporting.models import * class DjangoORM(StorageBase): - def __init__(self, setup): - super(DjangoORM, self).__init__(setup) - self.size_limit = setup.get('reporting_file_limit') + options = StorageBase.options + [ + Bcfg2.Options.Common.repository, + Bcfg2.Options.Option( + cf=('reporting', 'file_limit'), + type=Bcfg2.Options.Types.size, + help='Reporting file size limit', + default=1024 * 1024)] def _import_default(self, entry, state, entrytype=None, defaults=None, mapping=None, boolean=None, xforms=None): @@ -184,7 +189,7 @@ class DjangoORM(StorageBase): act_dict['detail_type'] = PathEntry.DETAIL_DIFF cdata = entry.get('current_bdiff') if cdata: - if len(cdata) > self.size_limit: + if len(cdata) > Bcfg2.Options.setup.file_limit: act_dict['detail_type'] = PathEntry.DETAIL_SIZE_LIMIT act_dict['details'] = md5(cdata).hexdigest() else: @@ -364,31 +369,31 @@ class DjangoORM(StorageBase): def import_interaction(self, interaction): """Import the data into the backend""" - try: self._import_interaction(interaction) except: self.logger.error("Failed to import interaction: %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) def validate(self): """Validate backend storage. Should be called once when loaded""" - settings.read_config(repo=self.setup['repo']) + settings.read_config(repo=Bcfg2.Options.setup.repository) # verify our database schema try: - if self.setup['debug']: + if Bcfg2.Options.setup.debug: vrb = 2 - elif self.setup['verbose']: + elif Bcfg2.Options.setup.verbose: vrb = 1 else: vrb = 0 management.call_command("syncdb", verbosity=vrb, interactive=False) - management.call_command("migrate", verbosity=vrb, interactive=False) + management.call_command("migrate", verbosity=vrb, + interactive=False) except: - self.logger.error("Failed to update database schema: %s" % \ - traceback.format_exc().splitlines()[-1]) + self.logger.error("Failed to update database schema: %s" % + sys.exc_info()[1]) raise StorageError def GetExtra(self, client): @@ -451,4 +456,3 @@ class DjangoORM(StorageBase): else: ret.append(None) return ret - diff --git a/src/lib/Bcfg2/Reporting/Storage/__init__.py b/src/lib/Bcfg2/Reporting/Storage/__init__.py index 85356fcfe..953104d4b 100644 --- a/src/lib/Bcfg2/Reporting/Storage/__init__.py +++ b/src/lib/Bcfg2/Reporting/Storage/__init__.py @@ -1,32 +1,3 @@ """ Public storage routines """ - -import traceback - -from Bcfg2.Reporting.Storage.base import StorageError, \ - StorageImportError - -def load_storage(storage_name, setup): - """ - Try to load the storage. Raise StorageImportError on failure - """ - try: - mod_name = "%s.%s" % (__name__, storage_name) - mod = getattr(__import__(mod_name).Reporting.Storage, storage_name) - except ImportError: - try: - mod = __import__(storage_name) - except: - raise StorageImportError("Unavailable") - try: - cls = getattr(mod, storage_name) - return cls(setup) - except: - raise StorageImportError("Storage unavailable: %s" % - traceback.format_exc().splitlines()[-1]) - -def load_storage_from_config(setup): - """Load the storage in the config... eventually""" - return load_storage('DjangoORM', setup) - diff --git a/src/lib/Bcfg2/Reporting/Storage/base.py b/src/lib/Bcfg2/Reporting/Storage/base.py index 92cc3a68b..771f755a1 100644 --- a/src/lib/Bcfg2/Reporting/Storage/base.py +++ b/src/lib/Bcfg2/Reporting/Storage/base.py @@ -2,28 +2,25 @@ The base for all Storage backends """ -import logging +import logging + class StorageError(Exception): """Generic StorageError""" pass -class StorageImportError(StorageError): - """Raised when a storage module fails to import""" - pass - class StorageBase(object): """The base for all storages""" + options = [] + __rmi__ = ['Ping', 'GetExtra', 'GetCurrentEntry'] - def __init__(self, setup): + def __init__(self): """Do something here""" clsname = self.__class__.__name__ self.logger = logging.getLogger(clsname) self.logger.debug("Loading %s storage" % clsname) - self.setup = setup - self.encoding = setup['encoding'] def import_interaction(self, interaction): """Import the data into the backend""" @@ -48,4 +45,3 @@ class StorageBase(object): def GetCurrentEntry(self, client, e_type, e_name): """Get the current status of an entry on the client""" raise NotImplementedError - diff --git a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py index 79d1b5aba..b9d17212e 100644 --- a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py +++ b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py @@ -5,18 +5,20 @@ import os import sys import time import threading +import Bcfg2.Options from Bcfg2.Reporting.Transport.base import TransportBase, TransportError -from Bcfg2.Reporting.Storage import load_storage_from_config from Bcfg2.Compat import Queue, Full, Empty, cPickle class DirectStore(TransportBase, threading.Thread): - def __init__(self, setup): - TransportBase.__init__(self, setup) + options = TransportBase.options + [Bcfg2.Options.Common.reporting_storage] + + def __init__(self): + TransportBase.__init__(self) threading.Thread.__init__(self) self.save_file = os.path.join(self.data, ".saved") - self.storage = load_storage_from_config(setup) + self.storage = Bcfg2.Options.setup.reporting_storage() self.storage.validate() self.queue = Queue(100000) @@ -30,10 +32,9 @@ class DirectStore(TransportBase, threading.Thread): def store(self, hostname, metadata, stats): try: - self.queue.put_nowait(dict( - hostname=hostname, - metadata=metadata, - stats=stats)) + self.queue.put_nowait(dict(hostname=hostname, + metadata=metadata, + stats=stats)) except Full: self.logger.warning("Reporting: Queue is full, " "dropping statistics") diff --git a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py index c7d5c512a..d901ded56 100644 --- a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py +++ b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py @@ -9,6 +9,8 @@ import os import select import time import traceback +import Bcfg2.Options +import Bcfg2.CommonOptions import Bcfg2.Server.FileMonitor from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError from Bcfg2.Reporting.Transport.base import TransportBase, TransportError @@ -16,8 +18,10 @@ from Bcfg2.Compat import cPickle class LocalFilesystem(TransportBase): - def __init__(self, setup): - super(LocalFilesystem, self).__init__(setup) + options = TransportBase.options + [Bcfg2.Options.Common.filemonitor] + + def __init__(self): + super(LocalFilesystem, self).__init__() self.work_path = "%s/work" % self.data self.debug_log("LocalFilesystem: work path %s" % self.work_path) @@ -42,24 +46,16 @@ class LocalFilesystem(TransportBase): def start_monitor(self, collector): """Start the file monitor. Most of this comes from BaseCore""" - setup = self.setup - try: - fmon = Bcfg2.Server.FileMonitor.available[setup['filemonitor']] - except KeyError: - self.logger.error("File monitor driver %s not available; " - "forcing to default" % setup['filemonitor']) - fmon = Bcfg2.Server.FileMonitor.available['default'] - if self.debug_flag: - self.fmon.set_debug(self.debug_flag) try: - self.fmon = fmon(debug=self.debug_flag) - self.logger.info("Using the %s file monitor" % - self.fmon.__class__.__name__) + self.fmon = Bcfg2.Server.FileMonitor.get_fam() except IOError: - msg = "Failed to instantiate file monitor %s" % \ - setup['filemonitor'] + msg = "Failed to instantiate fam driver %s" % \ + Bcfg2.Options.setup.filemonitor self.logger.error(msg, exc_info=1) raise TransportError(msg) + + if self.debug_flag: + self.fmon.set_debug(self.debug_flag) self.fmon.start() self.fmon.AddMonitor(self.work_path, self) @@ -154,7 +150,7 @@ class LocalFilesystem(TransportBase): """ try: if not self._phony_collector: - self._phony_collector = ReportingCollector(self.setup) + self._phony_collector = ReportingCollector() except ReportingError: raise TransportError except: @@ -176,4 +172,3 @@ class LocalFilesystem(TransportBase): self.logger.error("RPC method %s failed: %s" % (method, traceback.format_exc().splitlines()[-1])) raise TransportError - diff --git a/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py b/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py index 22d9af57e..7427c2e1d 100644 --- a/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py +++ b/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py @@ -9,9 +9,9 @@ import signal import platform import traceback import threading +import Bcfg2.Options from Bcfg2.Reporting.Transport.base import TransportBase, TransportError from Bcfg2.Compat import cPickle -from Bcfg2.Options import Option try: import redis @@ -34,9 +34,19 @@ class RedisTransport(TransportBase): STATS_KEY = 'bcfg2_statistics' COMMAND_KEY = 'bcfg2_command' - def __init__(self, setup): - super(RedisTransport, self).__init__(setup) - self._redis = None + options = TransportBase.options + [ + Bcfg2.Options.Option( + cf=('reporting', 'redis_host'), dest="reporting_redis_host", + default='127.0.0.1', help='Reporting Redis host'), + Bcfg2.Options.Option( + cf=('reporting', 'redis_port'), dest="reporting_redis_port", + default=6379, type=int, help='Reporting Redis port'), + Bcfg2.Options.Option( + cf=('reporting', 'redis_db'), dest="reporting_redis_db", + default=0, type=int, help='Reporting Redis DB')] + + def __init__(self): + super(RedisTransport, self).__init__() self._commands = None self.logger.error("Warning: RedisTransport is experimental") @@ -45,36 +55,15 @@ class RedisTransport(TransportBase): self.logger.error("redis python module is not available") raise TransportError - setup.update(dict( - reporting_redis_host=Option( - 'Redis Host', - default='127.0.0.1', - cf=('reporting', 'redis_host')), - reporting_redis_port=Option( - 'Redis Port', - default=6379, - cf=('reporting', 'redis_port')), - reporting_redis_db=Option( - 'Redis DB', - default=0, - cf=('reporting', 'redis_db')), - )) - setup.reparse() - - self._redis_host = setup.get('reporting_redis_host', '127.0.0.1') - try: - self._redis_port = int(setup.get('reporting_redis_port', 6379)) - except ValueError: - self.logger.error("Redis port must be an integer") - raise TransportError - self._redis_db = setup.get('reporting_redis_db', 0) - self._redis = redis.Redis(host=self._redis_host, - port=self._redis_port, db=self._redis_db) + self._redis = redis.Redis( + host=Bcfg2.Options.setup.reporting_redis_host, + port=Bcfg2.Options.setup.reporting_redis_port, + db=Bcfg2.Options.setup.reporting_redis_db) def start_monitor(self, collector): """Start the monitor. Eventaully start the command thread""" - self._commands = threading.Thread(target=self.monitor_thread, + self._commands = threading.Thread(target=self.monitor_thread, args=(self._redis, collector)) self._commands.start() @@ -129,7 +118,7 @@ class RedisTransport(TransportBase): channel = "%s%s" % (platform.node(), int(time.time())) pubsub.subscribe(channel) - self._redis.rpush(RedisTransport.COMMAND_KEY, + self._redis.rpush(RedisTransport.COMMAND_KEY, cPickle.dumps(RedisMessage(channel, method, args, kwargs))) resp = pubsub.listen() @@ -160,7 +149,7 @@ class RedisTransport(TransportBase): continue message = cPickle.loads(payload[1]) if not isinstance(message, RedisMessage): - self.logger.error("Message \"%s\" is not a RedisMessage" % + self.logger.error("Message \"%s\" is not a RedisMessage" % message) if not message.method in collector.storage.__class__.__rmi__ or\ @@ -192,5 +181,3 @@ class RedisTransport(TransportBase): self.logger.error("Unhandled exception in command thread: %s" % traceback.format_exc().splitlines()[-1]) self.logger.info("Command thread shutdown") - - diff --git a/src/lib/Bcfg2/Reporting/Transport/__init__.py b/src/lib/Bcfg2/Reporting/Transport/__init__.py index 73bdd0b3a..04b574ed7 100644 --- a/src/lib/Bcfg2/Reporting/Transport/__init__.py +++ b/src/lib/Bcfg2/Reporting/Transport/__init__.py @@ -1,35 +1,3 @@ """ Public transport routines """ - -import sys -from Bcfg2.Reporting.Transport.base import TransportError, \ - TransportImportError - - -def load_transport(transport_name, setup): - """ - Try to load the transport. Raise TransportImportError on failure - """ - try: - mod_name = "%s.%s" % (__name__, transport_name) - mod = getattr(__import__(mod_name).Reporting.Transport, transport_name) - except ImportError: - try: - mod = __import__(transport_name) - except: - raise TransportImportError("Error importing transport %s: %s" % - (transport_name, sys.exc_info()[1])) - try: - return getattr(mod, transport_name)(setup) - except: - raise TransportImportError("Error instantiating transport %s: %s" % - (transport_name, sys.exc_info()[1])) - - -def load_transport_from_config(setup): - """Load the transport in the config... eventually""" - try: - return load_transport(setup['reporting_transport'], setup) - except KeyError: - raise TransportImportError('Transport missing in config') diff --git a/src/lib/Bcfg2/Reporting/Transport/base.py b/src/lib/Bcfg2/Reporting/Transport/base.py index 530011e47..9fbf8c9d5 100644 --- a/src/lib/Bcfg2/Reporting/Transport/base.py +++ b/src/lib/Bcfg2/Reporting/Transport/base.py @@ -4,7 +4,7 @@ The base for all server -> collector Transports import os import sys -from Bcfg2.Server.Plugin import Debuggable +from Bcfg2.Logger import Debuggable class TransportError(Exception): @@ -12,20 +12,18 @@ class TransportError(Exception): pass -class TransportImportError(TransportError): - """Raised when a transport fails to import""" - pass - - class TransportBase(Debuggable): """The base for all transports""" - def __init__(self, setup): + options = Debuggable.options + + def __init__(self): """Do something here""" clsname = self.__class__.__name__ Debuggable.__init__(self, name=clsname) self.debug_log("Loading %s transport" % clsname) - self.data = os.path.join(setup['repo'], 'Reporting', clsname) + self.data = os.path.join(Bcfg2.Options.setup.repository, 'Reporting', + clsname) if not os.path.exists(self.data): self.logger.info("%s does not exist, creating" % self.data) try: @@ -34,7 +32,6 @@ class TransportBase(Debuggable): self.logger.warning("Could not create %s: %s" % (self.data, sys.exc_info()[1])) self.logger.warning("The transport may not function properly") - self.setup = setup self.timeout = 2 def start_monitor(self, collector): diff --git a/src/sbin/bcfg2-report-collector b/src/sbin/bcfg2-report-collector index ae6d3b167..00e015100 100755 --- a/src/sbin/bcfg2-report-collector +++ b/src/sbin/bcfg2-report-collector @@ -11,20 +11,14 @@ from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError def main(): + parser = Bcfg2.Options.get_parser(description="Collect Bcfg2 report data", + components=[ReportingCollector]) + parser.parse() logger = logging.getLogger('bcfg2-report-collector') - optinfo = dict(daemon=Bcfg2.Options.DAEMON, - repo=Bcfg2.Options.SERVER_REPOSITORY, - filemonitor=Bcfg2.Options.SERVER_FILEMONITOR, - web_configfile=Bcfg2.Options.WEB_CFILE) - optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS) - optinfo.update(Bcfg2.Options.REPORTING_COMMON_OPTIONS) - setup = Bcfg2.Options.load_option_parser(optinfo) - setup.parse() # run collector try: - collector = ReportingCollector(setup) - collector.run() + ReportingCollector().run() except ReportingError: msg = sys.exc_info()[1] logger.error(msg) -- cgit v1.2.3-1-g7c22