summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting/Transport
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-06-27 10:32:04 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-06-27 10:41:53 -0400
commit7e43d4af98a12e5685f250bf2161fc7afebe02a1 (patch)
treec01973d3dcf075b394c7dec66a1a45894d8c9f4d /src/lib/Bcfg2/Reporting/Transport
parent4261f7238e3b7eb169fcb0f672e7fdb86d722189 (diff)
downloadbcfg2-7e43d4af98a12e5685f250bf2161fc7afebe02a1.tar.gz
bcfg2-7e43d4af98a12e5685f250bf2161fc7afebe02a1.tar.bz2
bcfg2-7e43d4af98a12e5685f250bf2161fc7afebe02a1.zip
Options: migrated reporting to new parser
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Transport')
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/DirectStore.py17
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py31
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/RedisTransport.py55
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/__init__.py32
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/base.py15
5 files changed, 49 insertions, 101 deletions
diff --git a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py
index 79d1b5aba..b9d17212e 100644
--- a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py
+++ b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py
@@ -5,18 +5,20 @@ import os
import sys
import time
import threading
+import Bcfg2.Options
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
-from Bcfg2.Reporting.Storage import load_storage_from_config
from Bcfg2.Compat import Queue, Full, Empty, cPickle
class DirectStore(TransportBase, threading.Thread):
- def __init__(self, setup):
- TransportBase.__init__(self, setup)
+ options = TransportBase.options + [Bcfg2.Options.Common.reporting_storage]
+
+ def __init__(self):
+ TransportBase.__init__(self)
threading.Thread.__init__(self)
self.save_file = os.path.join(self.data, ".saved")
- self.storage = load_storage_from_config(setup)
+ self.storage = Bcfg2.Options.setup.reporting_storage()
self.storage.validate()
self.queue = Queue(100000)
@@ -30,10 +32,9 @@ class DirectStore(TransportBase, threading.Thread):
def store(self, hostname, metadata, stats):
try:
- self.queue.put_nowait(dict(
- hostname=hostname,
- metadata=metadata,
- stats=stats))
+ self.queue.put_nowait(dict(hostname=hostname,
+ metadata=metadata,
+ stats=stats))
except Full:
self.logger.warning("Reporting: Queue is full, "
"dropping statistics")
diff --git a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py
index c7d5c512a..d901ded56 100644
--- a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py
+++ b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py
@@ -9,6 +9,8 @@ import os
import select
import time
import traceback
+import Bcfg2.Options
+import Bcfg2.CommonOptions
import Bcfg2.Server.FileMonitor
from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
@@ -16,8 +18,10 @@ from Bcfg2.Compat import cPickle
class LocalFilesystem(TransportBase):
- def __init__(self, setup):
- super(LocalFilesystem, self).__init__(setup)
+ options = TransportBase.options + [Bcfg2.Options.Common.filemonitor]
+
+ def __init__(self):
+ super(LocalFilesystem, self).__init__()
self.work_path = "%s/work" % self.data
self.debug_log("LocalFilesystem: work path %s" % self.work_path)
@@ -42,24 +46,16 @@ class LocalFilesystem(TransportBase):
def start_monitor(self, collector):
"""Start the file monitor. Most of this comes from BaseCore"""
- setup = self.setup
- try:
- fmon = Bcfg2.Server.FileMonitor.available[setup['filemonitor']]
- except KeyError:
- self.logger.error("File monitor driver %s not available; "
- "forcing to default" % setup['filemonitor'])
- fmon = Bcfg2.Server.FileMonitor.available['default']
- if self.debug_flag:
- self.fmon.set_debug(self.debug_flag)
try:
- self.fmon = fmon(debug=self.debug_flag)
- self.logger.info("Using the %s file monitor" %
- self.fmon.__class__.__name__)
+ self.fmon = Bcfg2.Server.FileMonitor.get_fam()
except IOError:
- msg = "Failed to instantiate file monitor %s" % \
- setup['filemonitor']
+ msg = "Failed to instantiate fam driver %s" % \
+ Bcfg2.Options.setup.filemonitor
self.logger.error(msg, exc_info=1)
raise TransportError(msg)
+
+ if self.debug_flag:
+ self.fmon.set_debug(self.debug_flag)
self.fmon.start()
self.fmon.AddMonitor(self.work_path, self)
@@ -154,7 +150,7 @@ class LocalFilesystem(TransportBase):
"""
try:
if not self._phony_collector:
- self._phony_collector = ReportingCollector(self.setup)
+ self._phony_collector = ReportingCollector()
except ReportingError:
raise TransportError
except:
@@ -176,4 +172,3 @@ class LocalFilesystem(TransportBase):
self.logger.error("RPC method %s failed: %s" %
(method, traceback.format_exc().splitlines()[-1]))
raise TransportError
-
diff --git a/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py b/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py
index 22d9af57e..7427c2e1d 100644
--- a/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py
+++ b/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py
@@ -9,9 +9,9 @@ import signal
import platform
import traceback
import threading
+import Bcfg2.Options
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
from Bcfg2.Compat import cPickle
-from Bcfg2.Options import Option
try:
import redis
@@ -34,9 +34,19 @@ class RedisTransport(TransportBase):
STATS_KEY = 'bcfg2_statistics'
COMMAND_KEY = 'bcfg2_command'
- def __init__(self, setup):
- super(RedisTransport, self).__init__(setup)
- self._redis = None
+ options = TransportBase.options + [
+ Bcfg2.Options.Option(
+ cf=('reporting', 'redis_host'), dest="reporting_redis_host",
+ default='127.0.0.1', help='Reporting Redis host'),
+ Bcfg2.Options.Option(
+ cf=('reporting', 'redis_port'), dest="reporting_redis_port",
+ default=6379, type=int, help='Reporting Redis port'),
+ Bcfg2.Options.Option(
+ cf=('reporting', 'redis_db'), dest="reporting_redis_db",
+ default=0, type=int, help='Reporting Redis DB')]
+
+ def __init__(self):
+ super(RedisTransport, self).__init__()
self._commands = None
self.logger.error("Warning: RedisTransport is experimental")
@@ -45,36 +55,15 @@ class RedisTransport(TransportBase):
self.logger.error("redis python module is not available")
raise TransportError
- setup.update(dict(
- reporting_redis_host=Option(
- 'Redis Host',
- default='127.0.0.1',
- cf=('reporting', 'redis_host')),
- reporting_redis_port=Option(
- 'Redis Port',
- default=6379,
- cf=('reporting', 'redis_port')),
- reporting_redis_db=Option(
- 'Redis DB',
- default=0,
- cf=('reporting', 'redis_db')),
- ))
- setup.reparse()
-
- self._redis_host = setup.get('reporting_redis_host', '127.0.0.1')
- try:
- self._redis_port = int(setup.get('reporting_redis_port', 6379))
- except ValueError:
- self.logger.error("Redis port must be an integer")
- raise TransportError
- self._redis_db = setup.get('reporting_redis_db', 0)
- self._redis = redis.Redis(host=self._redis_host,
- port=self._redis_port, db=self._redis_db)
+ self._redis = redis.Redis(
+ host=Bcfg2.Options.setup.reporting_redis_host,
+ port=Bcfg2.Options.setup.reporting_redis_port,
+ db=Bcfg2.Options.setup.reporting_redis_db)
def start_monitor(self, collector):
"""Start the monitor. Eventaully start the command thread"""
- self._commands = threading.Thread(target=self.monitor_thread,
+ self._commands = threading.Thread(target=self.monitor_thread,
args=(self._redis, collector))
self._commands.start()
@@ -129,7 +118,7 @@ class RedisTransport(TransportBase):
channel = "%s%s" % (platform.node(), int(time.time()))
pubsub.subscribe(channel)
- self._redis.rpush(RedisTransport.COMMAND_KEY,
+ self._redis.rpush(RedisTransport.COMMAND_KEY,
cPickle.dumps(RedisMessage(channel, method, args, kwargs)))
resp = pubsub.listen()
@@ -160,7 +149,7 @@ class RedisTransport(TransportBase):
continue
message = cPickle.loads(payload[1])
if not isinstance(message, RedisMessage):
- self.logger.error("Message \"%s\" is not a RedisMessage" %
+ self.logger.error("Message \"%s\" is not a RedisMessage" %
message)
if not message.method in collector.storage.__class__.__rmi__ or\
@@ -192,5 +181,3 @@ class RedisTransport(TransportBase):
self.logger.error("Unhandled exception in command thread: %s" %
traceback.format_exc().splitlines()[-1])
self.logger.info("Command thread shutdown")
-
-
diff --git a/src/lib/Bcfg2/Reporting/Transport/__init__.py b/src/lib/Bcfg2/Reporting/Transport/__init__.py
index 73bdd0b3a..04b574ed7 100644
--- a/src/lib/Bcfg2/Reporting/Transport/__init__.py
+++ b/src/lib/Bcfg2/Reporting/Transport/__init__.py
@@ -1,35 +1,3 @@
"""
Public transport routines
"""
-
-import sys
-from Bcfg2.Reporting.Transport.base import TransportError, \
- TransportImportError
-
-
-def load_transport(transport_name, setup):
- """
- Try to load the transport. Raise TransportImportError on failure
- """
- try:
- mod_name = "%s.%s" % (__name__, transport_name)
- mod = getattr(__import__(mod_name).Reporting.Transport, transport_name)
- except ImportError:
- try:
- mod = __import__(transport_name)
- except:
- raise TransportImportError("Error importing transport %s: %s" %
- (transport_name, sys.exc_info()[1]))
- try:
- return getattr(mod, transport_name)(setup)
- except:
- raise TransportImportError("Error instantiating transport %s: %s" %
- (transport_name, sys.exc_info()[1]))
-
-
-def load_transport_from_config(setup):
- """Load the transport in the config... eventually"""
- try:
- return load_transport(setup['reporting_transport'], setup)
- except KeyError:
- raise TransportImportError('Transport missing in config')
diff --git a/src/lib/Bcfg2/Reporting/Transport/base.py b/src/lib/Bcfg2/Reporting/Transport/base.py
index 530011e47..9fbf8c9d5 100644
--- a/src/lib/Bcfg2/Reporting/Transport/base.py
+++ b/src/lib/Bcfg2/Reporting/Transport/base.py
@@ -4,7 +4,7 @@ The base for all server -> collector Transports
import os
import sys
-from Bcfg2.Server.Plugin import Debuggable
+from Bcfg2.Logger import Debuggable
class TransportError(Exception):
@@ -12,20 +12,18 @@ class TransportError(Exception):
pass
-class TransportImportError(TransportError):
- """Raised when a transport fails to import"""
- pass
-
-
class TransportBase(Debuggable):
"""The base for all transports"""
- def __init__(self, setup):
+ options = Debuggable.options
+
+ def __init__(self):
"""Do something here"""
clsname = self.__class__.__name__
Debuggable.__init__(self, name=clsname)
self.debug_log("Loading %s transport" % clsname)
- self.data = os.path.join(setup['repo'], 'Reporting', clsname)
+ self.data = os.path.join(Bcfg2.Options.setup.repository, 'Reporting',
+ clsname)
if not os.path.exists(self.data):
self.logger.info("%s does not exist, creating" % self.data)
try:
@@ -34,7 +32,6 @@ class TransportBase(Debuggable):
self.logger.warning("Could not create %s: %s" %
(self.data, sys.exc_info()[1]))
self.logger.warning("The transport may not function properly")
- self.setup = setup
self.timeout = 2
def start_monitor(self, collector):