From daa9748311522e4b0ce74264a77d5cbaeb3e97ea Mon Sep 17 00:00:00 2001 From: Tim Laszlo Date: Wed, 16 Jun 2010 15:45:09 +0000 Subject: ThreadedStatistics: load/save data on startup/shutdown git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@5941 ce84e21b-d406-0410-9b95-82705330c041 --- src/lib/Server/Core.py | 3 ++ src/lib/Server/Plugin.py | 86 ++++++++++++++++++++++++++++++++++++-- src/lib/Server/Plugins/Metadata.py | 2 + 3 files changed, 87 insertions(+), 4 deletions(-) diff --git a/src/lib/Server/Core.py b/src/lib/Server/Core.py index 1416f809c..d08782ff6 100644 --- a/src/lib/Server/Core.py +++ b/src/lib/Server/Core.py @@ -293,6 +293,9 @@ class Core(Component): return result def build_metadata(self, client_name): + if not hasattr(self, 'metadata'): + # some threads start before metadata is even loaded + raise Bcfg2.Server.Plugins.Metadata.MetadataRuntimeError imd = self.metadata.get_initial_metadata(client_name) for conn in self.connectors: grps = conn.get_additional_groups(imd) diff --git a/src/lib/Server/Plugin.py b/src/lib/Server/Plugin.py index ff87c15df..732d291b1 100644 --- a/src/lib/Server/Plugin.py +++ b/src/lib/Server/Plugin.py @@ -5,6 +5,7 @@ import copy import logging import lxml.etree import os +import pickle import posixpath import re import Queue @@ -151,22 +152,99 @@ class ThreadedStatistics(Statistics, threading.Thread.__init__(self) # Event from the core signaling an exit self.terminate = core.terminate - self.work_queue = Queue.Queue() + self.work_queue = Queue.Queue(100000) + self.pending_file = "%s/etc/%s.pending" % (datastore, self.__class__.__name__) + self.daemon = True self.start() + def save(self): + ''' Save any pending data to a file''' + pending_data = [] + try: + while not self.work_queue.empty(): + (metadata, data) = self.work_queue.get_nowait() + try: + pending_data.append( ( metadata.hostname, lxml.etree.tostring(data) ) ) + except: + self.logger.warning("Dropping interaction for %s" % metadata.hostname) + except Queue.Empty: + pass + + try: + savefile = open(self.pending_file, 'w') + pickle.dump(pending_data, savefile) + savefile.close() + self.logger.info("Saved pending %s data" % self.__class__.__name__) + except: + self.logger.warning("Failed to save pending data") + + def load(self): + ''' Load any pending data to a file''' + if not os.path.exists(self.pending_file): + return True + pending_data = [] + try: + savefile = open(self.pending_file, 'r') + pending_data = pickle.load(savefile) + savefile.close() + except Exception, e: + self.logger.warning("Failed to load pending data: %s" % e) + for (pmetadata, pdata) in pending_data: + # check that shutdown wasnt called early + if self.terminate.isSet(): + return False + + try: + while True: + try: + metadata = self.core.build_metadata(pmetadata) + break + except Bcfg2.Server.Plugins.Metadata.MetadataRuntimeError: + pass + + self.terminate.wait(5) + if self.terminate.isSet(): + return False + + self.work_queue.put_nowait( (metadata, lxml.etree.fromstring(pdata)) ) + except Queue.Full: + self.logger.warning("Queue.Full: Failed to load queue data") + break + except lxml.etree.LxmlError, lxml_error: + self.logger.error("Unable to load save interaction: %s" % lxml_error) + except Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError: + self.logger.error("Unable to load metadata for save interaction: %s" % pmetadata) + try: + os.unlink(self.pending_file) + except: + self.logger.error("Failed to unlink save file: %s" % self.pending_file) + self.logger.info("Loaded pending %s data" % self.__class__.__name__) + return True + def run(self): - while not (self.terminate.isSet() and self.work_queue.empty()): + if not self.load(): + return + while not self.terminate.isSet(): try: (xdata, client) = self.work_queue.get(block=True, timeout=5) except Queue.Empty: continue except Exception, e: - logger.error("ThreadedStatistics: %s" % e) + self.logger.error("ThreadedStatistics: %s" % e) continue self.handle_statistic(xdata, client) + if not self.work_queue.empty(): + self.save() def process_statistics(self, metadata, data): - self.work_queue.put((metadata, copy.deepcopy(data))) + warned = False + try: + self.work_queue.put_nowait((metadata, copy.deepcopy(data))) + warned = False + except Queue.Full: + if not warned: + self.logger.warning("Queue is full. Dropping data") + warned = True def handle_statistics(self, metadata, data): '''Handle stats here''' diff --git a/src/lib/Server/Plugins/Metadata.py b/src/lib/Server/Plugins/Metadata.py index 0c5876d16..de6f2f612 100644 --- a/src/lib/Server/Plugins/Metadata.py +++ b/src/lib/Server/Plugins/Metadata.py @@ -545,6 +545,8 @@ class Metadata(Bcfg2.Server.Plugin.Plugin, def get_initial_metadata(self, client): """Return the metadata for a given client.""" + if False in self.states.values(): + raise MetadataRuntimeError client = client.lower() if client in self.aliases: client = self.aliases[client] -- cgit v1.2.3-1-g7c22