From b359ebe3f6d70e2d3beceb6210c2681092637c81 Mon Sep 17 00:00:00 2001 From: Tim Laszlo Date: Fri, 4 Jun 2010 16:42:54 +0000 Subject: Threaded Statistics.py and fixed xml corruption git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@5888 ce84e21b-d406-0410-9b95-82705330c041 --- src/lib/Server/Plugins/Statistics.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/lib/Server/Plugins/Statistics.py b/src/lib/Server/Plugins/Statistics.py index 492e06e9c..cbd602d8c 100644 --- a/src/lib/Server/Plugins/Statistics.py +++ b/src/lib/Server/Plugins/Statistics.py @@ -8,7 +8,9 @@ import logging from lxml.etree import XML, SubElement, Element, XMLSyntaxError import lxml.etree import os +import Queue from time import asctime, localtime, time, strptime, mktime +import threading import Bcfg2.Server.Plugin @@ -27,7 +29,6 @@ class StatisticsStore(object): def WriteBack(self, force=0): '''Write statistics changes back to persistent store''' - # FIXME switch to a thread writer if (self.dirty and (self.lastwrite + self.__min_write_delay__ <= time())) \ or force: try: @@ -97,7 +98,7 @@ class StatisticsStore(object): newstat.set('time', asctime(localtime())) # Add statistic - node.append(newstat) + node.append(copy.deepcopy(newstat)) # Set dirty self.dirty = 1 @@ -122,11 +123,24 @@ class Statistics(Bcfg2.Server.Plugin.Plugin, Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) Bcfg2.Server.Plugin.Statistics.__init__(self) Bcfg2.Server.Plugin.PullSource.__init__(self) - fpath = "%s/etc/statistics.xml" % datastore - self.data_file = StatisticsStore(fpath) + self.fpath = "%s/etc/statistics.xml" % datastore + # Event from the core signaling an exit + self.terminate = core.terminate + self.work_queue = Queue.Queue() + self.worker = threading.Thread(target=self.process_statistics_loop) + self.worker.start() + + def process_statistics_loop(self): + self.data_file = StatisticsStore(self.fpath) + while not (self.terminate.isSet() and self.work_queue.empty()): + try: + (xdata, hostname) = self.work_queue.get(block=True, timeout=5) + except: + continue + self.data_file.updateStats(xdata, hostname) def process_statistics(self, client, xdata): - self.data_file.updateStats(copy.deepcopy(xdata), client.hostname) + self.work_queue.put((copy.deepcopy(xdata), client.hostname)) def FindCurrent(self, client): rt = self.data_file.element.xpath('//Node[@name="%s"]' % client)[0] -- cgit v1.2.3-1-g7c22