From 4ef4e4f417d05ccdc3b3584d5e8f49f98126e912 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 9 Oct 2012 10:06:03 -0400 Subject: Reporting: move pickling from base transport to LocalFilesystem transport --- .../Bcfg2/Reporting/Transport/LocalFilesystem.py | 30 +++++++++++++-------- src/lib/Bcfg2/Reporting/Transport/base.py | 3 +-- src/lib/Bcfg2/Server/Plugins/Reporting.py | 31 ++++++++-------------- 3 files changed, 31 insertions(+), 33 deletions(-) (limited to 'src/lib/Bcfg2') diff --git a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py index 41741ea4b..8ccb9ed56 100644 --- a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py +++ b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py @@ -6,18 +6,14 @@ Leans on FileMonitor to detect changes """ import os -import os.path import select import time import traceback import Bcfg2.Server.FileMonitor from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError from Bcfg2.Reporting.Transport.base import TransportBase, TransportError +from Bcfg2.Compat import cPickle -try: - import cPickle as pickle -except: - import pickle class LocalFilesystem(TransportBase): def __init__(self, setup): @@ -59,11 +55,23 @@ class LocalFilesystem(TransportBase): self.fmon.start() self.fmon.AddMonitor(self.work_path, self) - def store(self, hostname, payload): + def store(self, hostname, metadata, stats): """Store the file to disk""" - save_file = "%s/%s-%s" % (self.work_path, hostname, time.time()) - tmp_file = "%s/.%s-%s" % (self.work_path, hostname, time.time()) + try: + payload = cPickle.dumps(dict(hostname=hostname, + metadata=metadata, + stats=stats)) + except: # pylint: disable=W0702 + msg = "%s: Failed to build interaction object: %s" % \ + (self.__class__.__name__, + traceback.format_exc().splitlines()[-1]) + self.logger.error(msg) + raise TransportError(msg) + + fname = "%s-%s" % (hostname, time.time()) + save_file = os.path.join(self.work_path, fname) + tmp_file = os.path.join(self.work_path, "." + fname) if os.path.exists(save_file): self.logger.error("%s: Oops.. duplicate statistic in directory." % self.__class__.__name__) @@ -108,14 +116,14 @@ class LocalFilesystem(TransportBase): payload = os.path.join(self.work_path, event.filename) try: payloadfd = open(payload, "r") - interaction = pickle.load(payloadfd) + interaction = cPickle.load(payloadfd) payloadfd.close() os.unlink(payload) return interaction except IOError: self.logger.error("Failed to read payload: %s" % traceback.format_exc().splitlines()[-1]) - except pickle.UnpicklingError: + except cPickle.UnpicklingError: self.logger.error("Failed to unpickle payload: %s" % traceback.format_exc().splitlines()[-1]) payloadfd.close() @@ -157,7 +165,7 @@ class LocalFilesystem(TransportBase): cls_method = getattr(self._phony_collector.storage, method) return cls_method(*args, **kwargs) except: - self.logger.error("RPC method %s failed: %s" % + self.logger.error("RPC method %s failed: %s" % (method, traceback.format_exc().splitlines()[-1])) raise TransportError diff --git a/src/lib/Bcfg2/Reporting/Transport/base.py b/src/lib/Bcfg2/Reporting/Transport/base.py index 8488d0e46..b9bd40aec 100644 --- a/src/lib/Bcfg2/Reporting/Transport/base.py +++ b/src/lib/Bcfg2/Reporting/Transport/base.py @@ -29,7 +29,7 @@ class TransportBase(object): """Called to start monitoring""" raise NotImplementedError - def store(self, hostname, payload): + def store(self, hostname, metadata, stats): raise NotImplementedError def fetch(self): @@ -42,4 +42,3 @@ class TransportBase(object): def rpc(self, method, *args, **kwargs): """Send a request for data to the collector""" raise NotImplementedError - diff --git a/src/lib/Bcfg2/Server/Plugins/Reporting.py b/src/lib/Bcfg2/Server/Plugins/Reporting.py index 1ea6637a8..843f8768c 100644 --- a/src/lib/Bcfg2/Server/Plugins/Reporting.py +++ b/src/lib/Bcfg2/Server/Plugins/Reporting.py @@ -3,10 +3,9 @@ import time import platform import traceback -from lxml import etree +import lxml.etree from Bcfg2.Reporting.Transport import load_transport_from_config, \ TransportError -from Bcfg2.Compat import cPickle from Bcfg2.Options import REPORTING_COMMON_OPTIONS from Bcfg2.Server.Plugin import Statistics, PullSource, PluginInitError, \ PluginExecutionError @@ -36,7 +35,7 @@ class Reporting(Statistics, PullSource): # pylint: disable=W0223 """ Unified statistics and reporting plugin """ __rmi__ = ['Ping', 'GetExtra', 'GetCurrentEntry'] - CLIENT_METADATA_FILEDS = ('profile', 'bundles', 'aliases', 'addresses', + CLIENT_METADATA_FIELDS = ('profile', 'bundles', 'aliases', 'addresses', 'groups', 'categories', 'uuid', 'version') def __init__(self, core, datastore): @@ -59,16 +58,17 @@ class Reporting(Statistics, PullSource): # pylint: disable=W0223 try: self.transport = load_transport_from_config(core.setup) except TransportError: - self.logger.error("%s: Failed to load transport: %s" % - (self.name, traceback.format_exc().splitlines()[-1])) - raise PluginInitError + msg = "%s: Failed to load transport: %s" % \ + (self.name, traceback.format_exc().splitlines()[-1]) + self.logger.error(msg) + raise PluginInitError(msg) def process_statistics(self, client, xdata): stats = xdata.find("Statistics") stats.set('time', time.asctime(time.localtime())) cdata = {'server': self.whoami} - for field in self.CLIENT_METADATA_FILEDS: + for field in self.CLIENT_METADATA_FIELDS: try: value = getattr(client, field) except AttributeError: @@ -78,22 +78,13 @@ class Reporting(Statistics, PullSource): # pylint: disable=W0223 value = [v for v in value] cdata[field] = value - try: - interaction_data = cPickle.dumps(dict( - hostname=client.hostname, - metadata=cdata, - stats=etree.tostring( - stats, - xml_declaration=False).decode('UTF-8'))) - except: # pylint: disable=W0702 - self.logger.error("%s: Failed to build interaction object: %s" % - (self.__class__.__name__, - traceback.format_exc().splitlines()[-1])) - # try 3 times to store the data for i in [1, 2, 3]: try: - self.transport.store(client.hostname, interaction_data) + self.transport.store(client.hostname, cdata, + lxml.etree.tostring( + stats, + xml_declaration=False).decode('UTF-8')) self.logger.debug("%s: Queued statistics data for %s" % (self.__class__.__name__, client.hostname)) return -- cgit v1.2.3-1-g7c22