summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting/Transport
diff options
context:
space:
mode:
authorTim Laszlo <tim.laszlo@gmail.com>2012-10-08 10:38:02 -0500
committerTim Laszlo <tim.laszlo@gmail.com>2012-10-08 10:38:02 -0500
commit44638176067df5231bf0be30801e36863391cd1f (patch)
tree6aaba73d03f9a5532047518b9a3e8ef3e63d3f9f /src/lib/Bcfg2/Reporting/Transport
parent1a3ced3f45423d79e08ca7d861e8118e8618d3b2 (diff)
downloadbcfg2-44638176067df5231bf0be30801e36863391cd1f.tar.gz
bcfg2-44638176067df5231bf0be30801e36863391cd1f.tar.bz2
bcfg2-44638176067df5231bf0be30801e36863391cd1f.zip
Reporting: Merge new reporting data
Move reporting data to a new schema Use south for django migrations Add bcfg2-report-collector daemon Conflicts: doc/development/index.txt doc/server/plugins/connectors/properties.txt doc/server/plugins/generators/packages.txt setup.py src/lib/Bcfg2/Client/Tools/SELinux.py src/lib/Bcfg2/Compat.py src/lib/Bcfg2/Encryption.py src/lib/Bcfg2/Options.py src/lib/Bcfg2/Server/Admin/Init.py src/lib/Bcfg2/Server/Admin/Reports.py src/lib/Bcfg2/Server/BuiltinCore.py src/lib/Bcfg2/Server/Core.py src/lib/Bcfg2/Server/FileMonitor/Inotify.py src/lib/Bcfg2/Server/Plugin/base.py src/lib/Bcfg2/Server/Plugin/interfaces.py src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py src/lib/Bcfg2/Server/Plugins/FileProbes.py src/lib/Bcfg2/Server/Plugins/Ohai.py src/lib/Bcfg2/Server/Plugins/Packages/Collection.py src/lib/Bcfg2/Server/Plugins/Packages/Source.py src/lib/Bcfg2/Server/Plugins/Packages/Yum.py src/lib/Bcfg2/Server/Plugins/Packages/__init__.py src/lib/Bcfg2/Server/Plugins/Probes.py src/lib/Bcfg2/Server/Plugins/Properties.py src/lib/Bcfg2/Server/Reports/backends.py src/lib/Bcfg2/Server/Reports/manage.py src/lib/Bcfg2/Server/Reports/nisauth.py src/lib/Bcfg2/settings.py src/sbin/bcfg2-crypt src/sbin/bcfg2-yum-helper testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Transport')
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py163
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/__init__.py32
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/base.py45
3 files changed, 240 insertions, 0 deletions
diff --git a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py
new file mode 100644
index 000000000..41741ea4b
--- /dev/null
+++ b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py
@@ -0,0 +1,163 @@
+"""
+The local transport. Stats are pickled and written to
+<repo>/store/<hostname>-timestamp
+
+Leans on FileMonitor to detect changes
+"""
+
+import os
+import os.path
+import select
+import time
+import traceback
+import Bcfg2.Server.FileMonitor
+from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError
+from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
+
+try:
+ import cPickle as pickle
+except:
+ import pickle
+
+class LocalFilesystem(TransportBase):
+ def __init__(self, setup):
+ super(LocalFilesystem, self).__init__(setup)
+
+ self.work_path = "%s/work" % self.data
+ self.logger.debug("LocalFilesystem: work path %s" % self.work_path)
+ self.fmon = None
+ self._phony_collector = None
+
+ #setup our local paths or die
+ if not os.path.exists(self.work_path):
+ try:
+ os.makedirs(self.work_path)
+ except:
+ self.logger.error("%s: Unable to create storage: %s" %
+ (self.__class__.__name__,
+ traceback.format_exc().splitlines()[-1]))
+ raise TransportError
+
+ def start_monitor(self, collector):
+ """Start the file monitor. Most of this comes from BaseCore"""
+ setup = self.setup
+ try:
+ fmon = Bcfg2.Server.FileMonitor.available[setup['filemonitor']]
+ except KeyError:
+ self.logger.error("File monitor driver %s not available; "
+ "forcing to default" % setup['filemonitor'])
+ fmon = Bcfg2.Server.FileMonitor.available['default']
+
+ fmdebug = setup.get('debug', False)
+ try:
+ self.fmon = fmon(debug=fmdebug)
+ self.logger.info("Using the %s file monitor" % self.fmon.__class__.__name__)
+ except IOError:
+ msg = "Failed to instantiate file monitor %s" % setup['filemonitor']
+ self.logger.error(msg, exc_info=1)
+ raise TransportError(msg)
+ self.fmon.start()
+ self.fmon.AddMonitor(self.work_path, self)
+
+ def store(self, hostname, payload):
+ """Store the file to disk"""
+
+ save_file = "%s/%s-%s" % (self.work_path, hostname, time.time())
+ tmp_file = "%s/.%s-%s" % (self.work_path, hostname, time.time())
+ if os.path.exists(save_file):
+ self.logger.error("%s: Oops.. duplicate statistic in directory." %
+ self.__class__.__name__)
+ raise TransportError
+
+ # using a tmpfile to hopefully avoid the file monitor from grabbing too
+ # soon
+ saved = open(tmp_file, 'w')
+ try:
+ saved.write(payload)
+ except IOError:
+ self.logger.error("Failed to store interaction for %s: %s" %
+ (hostname, traceback.format_exc().splitlines()[-1]))
+ os.unlink(tmp_file)
+ saved.close()
+ os.rename(tmp_file, save_file)
+
+ def fetch(self):
+ """Fetch the next object"""
+ event = None
+ fmonfd = self.fmon.fileno()
+ if self.fmon.pending():
+ event = self.fmon.get_event()
+ elif fmonfd:
+ select.select([fmonfd], [], [], self.timeout)
+ if self.fmon.pending():
+ event = self.fmon.get_event()
+ else:
+ # pseudo.. if nothings pending sleep and loop
+ time.sleep(self.timeout)
+
+ if not event or event.filename == self.work_path:
+ return None
+
+ #deviate from the normal routines here we only want one event
+ etype = event.code2str()
+ self.logger.debug("Recieved event %s for %s" % (etype, event.filename))
+ if os.path.basename(event.filename)[0] == '.':
+ return None
+ if etype in ('created', 'exists'):
+ self.logger.debug("Handling event %s" % event.filename)
+ payload = os.path.join(self.work_path, event.filename)
+ try:
+ payloadfd = open(payload, "r")
+ interaction = pickle.load(payloadfd)
+ payloadfd.close()
+ os.unlink(payload)
+ return interaction
+ except IOError:
+ self.logger.error("Failed to read payload: %s" %
+ traceback.format_exc().splitlines()[-1])
+ except pickle.UnpicklingError:
+ self.logger.error("Failed to unpickle payload: %s" %
+ traceback.format_exc().splitlines()[-1])
+ payloadfd.close()
+ raise TransportError
+ return None
+
+ def shutdown(self):
+ """Called at program exit"""
+ if self.fmon:
+ self.fmon.shutdown()
+ if self._phony_collector:
+ self._phony_collector.shutdown()
+
+ def rpc(self, method, *args, **kwargs):
+ """
+ Here this is more of a dummy. Rather then start a layer
+ which doesn't exist or muck with files, start the collector
+
+ This will all change when other layers are added
+ """
+ try:
+ if not self._phony_collector:
+ self._phony_collector = ReportingCollector(self.setup)
+ except ReportingError:
+ raise TransportError
+ except:
+ self.logger.error("Failed to load collector: %s" %
+ traceback.format_exc().splitlines()[-1])
+ raise TransportError
+
+ if not method in self._phony_collector.storage.__class__.__rmi__ or \
+ not hasattr(self._phony_collector.storage, method):
+ self.logger.error("Unknown method %s called on storage engine %s" %
+ (method, self._phony_collector.storage.__class__.__name__))
+ raise TransportError
+
+
+ try:
+ cls_method = getattr(self._phony_collector.storage, method)
+ return cls_method(*args, **kwargs)
+ except:
+ self.logger.error("RPC method %s failed: %s" %
+ (method, traceback.format_exc().splitlines()[-1]))
+ raise TransportError
+
diff --git a/src/lib/Bcfg2/Reporting/Transport/__init__.py b/src/lib/Bcfg2/Reporting/Transport/__init__.py
new file mode 100644
index 000000000..ec39a1628
--- /dev/null
+++ b/src/lib/Bcfg2/Reporting/Transport/__init__.py
@@ -0,0 +1,32 @@
+"""
+Public transport routines
+"""
+
+import traceback
+
+from Bcfg2.Reporting.Transport.base import TransportError, \
+ TransportImportError
+
+def load_transport(transport_name, setup):
+ """
+ Try to load the transport. Raise TransportImportError on failure
+ """
+ try:
+ mod_name = "%s.%s" % (__name__, transport_name)
+ mod = getattr(__import__(mod_name).Reporting.Transport, transport_name)
+ except ImportError:
+ try:
+ mod = __import__(transport_name)
+ except:
+ raise TransportImportError("Unavailable")
+ try:
+ cls = getattr(mod, transport_name)
+ return cls(setup)
+ except:
+ raise TransportImportError("Transport unavailable: %s" %
+ traceback.format_exc().splitlines()[-1])
+
+def load_transport_from_config(setup):
+ """Load the transport in the config... eventually"""
+ return load_transport('LocalFilesystem', setup)
+
diff --git a/src/lib/Bcfg2/Reporting/Transport/base.py b/src/lib/Bcfg2/Reporting/Transport/base.py
new file mode 100644
index 000000000..8488d0e46
--- /dev/null
+++ b/src/lib/Bcfg2/Reporting/Transport/base.py
@@ -0,0 +1,45 @@
+"""
+The base for all server -> collector Transports
+"""
+
+import os.path
+import logging
+
+class TransportError(Exception):
+ """Generic TransportError"""
+ pass
+
+class TransportImportError(TransportError):
+ """Raised when a transport fails to import"""
+ pass
+
+class TransportBase(object):
+ """The base for all transports"""
+
+ def __init__(self, setup):
+ """Do something here"""
+ clsname = self.__class__.__name__
+ self.logger = logging.getLogger(clsname)
+ self.logger.debug("Loading %s transport" % clsname)
+ self.data = os.path.join(setup['repo'], clsname.split()[-1])
+ self.setup = setup
+ self.timeout = 2
+
+ def start_monitor(self, collector):
+ """Called to start monitoring"""
+ raise NotImplementedError
+
+ def store(self, hostname, payload):
+ raise NotImplementedError
+
+ def fetch(self):
+ raise NotImplementedError
+
+ def shutdown(self):
+ """Called at program exit"""
+ pass
+
+ def rpc(self, method, *args, **kwargs):
+ """Send a request for data to the collector"""
+ raise NotImplementedError
+