summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTim Laszlo <tim.laszlo@gmail.com>2010-06-16 15:45:09 +0000
committerSol Jerome <sol.jerome@gmail.com>2010-06-16 11:50:43 -0500
commitdaa9748311522e4b0ce74264a77d5cbaeb3e97ea (patch)
tree4b16642aec7e46a0c51affa3d6dc9218b90dd292
parentce8ed0ecbb9fa3cbb9404c23025a81ba375c959c (diff)
downloadbcfg2-daa9748311522e4b0ce74264a77d5cbaeb3e97ea.tar.gz
bcfg2-daa9748311522e4b0ce74264a77d5cbaeb3e97ea.tar.bz2
bcfg2-daa9748311522e4b0ce74264a77d5cbaeb3e97ea.zip
ThreadedStatistics: load/save data on startup/shutdown
git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@5941 ce84e21b-d406-0410-9b95-82705330c041
-rw-r--r--src/lib/Server/Core.py3
-rw-r--r--src/lib/Server/Plugin.py86
-rw-r--r--src/lib/Server/Plugins/Metadata.py2
3 files changed, 87 insertions, 4 deletions
diff --git a/src/lib/Server/Core.py b/src/lib/Server/Core.py
index 1416f809c..d08782ff6 100644
--- a/src/lib/Server/Core.py
+++ b/src/lib/Server/Core.py
@@ -293,6 +293,9 @@ class Core(Component):
return result
def build_metadata(self, client_name):
+ if not hasattr(self, 'metadata'):
+ # some threads start before metadata is even loaded
+ raise Bcfg2.Server.Plugins.Metadata.MetadataRuntimeError
imd = self.metadata.get_initial_metadata(client_name)
for conn in self.connectors:
grps = conn.get_additional_groups(imd)
diff --git a/src/lib/Server/Plugin.py b/src/lib/Server/Plugin.py
index ff87c15df..732d291b1 100644
--- a/src/lib/Server/Plugin.py
+++ b/src/lib/Server/Plugin.py
@@ -5,6 +5,7 @@ import copy
import logging
import lxml.etree
import os
+import pickle
import posixpath
import re
import Queue
@@ -151,22 +152,99 @@ class ThreadedStatistics(Statistics,
threading.Thread.__init__(self)
# Event from the core signaling an exit
self.terminate = core.terminate
- self.work_queue = Queue.Queue()
+ self.work_queue = Queue.Queue(100000)
+ self.pending_file = "%s/etc/%s.pending" % (datastore, self.__class__.__name__)
+ self.daemon = True
self.start()
+ def save(self):
+ ''' Save any pending data to a file'''
+ pending_data = []
+ try:
+ while not self.work_queue.empty():
+ (metadata, data) = self.work_queue.get_nowait()
+ try:
+ pending_data.append( ( metadata.hostname, lxml.etree.tostring(data) ) )
+ except:
+ self.logger.warning("Dropping interaction for %s" % metadata.hostname)
+ except Queue.Empty:
+ pass
+
+ try:
+ savefile = open(self.pending_file, 'w')
+ pickle.dump(pending_data, savefile)
+ savefile.close()
+ self.logger.info("Saved pending %s data" % self.__class__.__name__)
+ except:
+ self.logger.warning("Failed to save pending data")
+
+ def load(self):
+ ''' Load any pending data to a file'''
+ if not os.path.exists(self.pending_file):
+ return True
+ pending_data = []
+ try:
+ savefile = open(self.pending_file, 'r')
+ pending_data = pickle.load(savefile)
+ savefile.close()
+ except Exception, e:
+ self.logger.warning("Failed to load pending data: %s" % e)
+ for (pmetadata, pdata) in pending_data:
+ # check that shutdown wasnt called early
+ if self.terminate.isSet():
+ return False
+
+ try:
+ while True:
+ try:
+ metadata = self.core.build_metadata(pmetadata)
+ break
+ except Bcfg2.Server.Plugins.Metadata.MetadataRuntimeError:
+ pass
+
+ self.terminate.wait(5)
+ if self.terminate.isSet():
+ return False
+
+ self.work_queue.put_nowait( (metadata, lxml.etree.fromstring(pdata)) )
+ except Queue.Full:
+ self.logger.warning("Queue.Full: Failed to load queue data")
+ break
+ except lxml.etree.LxmlError, lxml_error:
+ self.logger.error("Unable to load save interaction: %s" % lxml_error)
+ except Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError:
+ self.logger.error("Unable to load metadata for save interaction: %s" % pmetadata)
+ try:
+ os.unlink(self.pending_file)
+ except:
+ self.logger.error("Failed to unlink save file: %s" % self.pending_file)
+ self.logger.info("Loaded pending %s data" % self.__class__.__name__)
+ return True
+
def run(self):
- while not (self.terminate.isSet() and self.work_queue.empty()):
+ if not self.load():
+ return
+ while not self.terminate.isSet():
try:
(xdata, client) = self.work_queue.get(block=True, timeout=5)
except Queue.Empty:
continue
except Exception, e:
- logger.error("ThreadedStatistics: %s" % e)
+ self.logger.error("ThreadedStatistics: %s" % e)
continue
self.handle_statistic(xdata, client)
+ if not self.work_queue.empty():
+ self.save()
def process_statistics(self, metadata, data):
- self.work_queue.put((metadata, copy.deepcopy(data)))
+ warned = False
+ try:
+ self.work_queue.put_nowait((metadata, copy.deepcopy(data)))
+ warned = False
+ except Queue.Full:
+ if not warned:
+ self.logger.warning("Queue is full. Dropping data")
+ warned = True
def handle_statistics(self, metadata, data):
'''Handle stats here'''
diff --git a/src/lib/Server/Plugins/Metadata.py b/src/lib/Server/Plugins/Metadata.py
index 0c5876d16..de6f2f612 100644
--- a/src/lib/Server/Plugins/Metadata.py
+++ b/src/lib/Server/Plugins/Metadata.py
@@ -545,6 +545,8 @@ class Metadata(Bcfg2.Server.Plugin.Plugin,
def get_initial_metadata(self, client):
"""Return the metadata for a given client."""
+ if False in self.states.values():
+ raise MetadataRuntimeError
client = client.lower()
if client in self.aliases:
client = self.aliases[client]