From cecdedb73bf0dfed66964c91b08256b100997289 Mon Sep 17 00:00:00 2001 From: Narayan Desai Date: Wed, 6 May 2009 19:17:53 +0000 Subject: Clean up File Monitoring code/adapt to new server infrastructure git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@5202 ce84e21b-d406-0410-9b95-82705330c041 --- src/lib/Server/FileMonitor.py | 254 ++++++++++++++++++------------------------ src/lib/Server/XMLRPC.py | 32 +++--- src/sbin/bcfg2-info | 5 +- 3 files changed, 125 insertions(+), 166 deletions(-) diff --git a/src/lib/Server/FileMonitor.py b/src/lib/Server/FileMonitor.py index da6eb00a4..ccf2f2f8a 100644 --- a/src/lib/Server/FileMonitor.py +++ b/src/lib/Server/FileMonitor.py @@ -1,4 +1,4 @@ -from time import time +from time import sleep, time import logging, os, stat logger = logging.getLogger('Bcfg2.Server.FileMonitor') @@ -14,70 +14,6 @@ def ShouldIgnore(event): return True return False -class FamFam(object): - '''The fam object is a set of callbacks for file alteration events (FAM support)''' - - def __init__(self): - object.__init__(self) - self.fm = _fam.open() - self.users = {} - self.handles = {} - self.debug = False - - def fileno(self): - '''return fam file handle number''' - return self.fm.fileno() - - def AddMonitor(self, path, obj): - '''add a monitor to path, installing a callback to obj.HandleEvent''' - mode = os.stat(path)[stat.ST_MODE] - if stat.S_ISDIR(mode): - handle = self.fm.monitorDirectory(path, None) - else: - handle = self.fm.monitorFile(path, None) - self.handles[handle.requestID()] = handle - if obj != None: - self.users[handle.requestID()] = obj - return handle.requestID() - - def Service(self): - '''Handle all fam work''' - count = 0 - collapsed = 0 - rawevents = [] - start = time() - now = time() - while (time() - now) < 0.10: - if self.fm.pending(): - while self.fm.pending(): - count += 1 - rawevents.append(self.fm.nextEvent()) - now = time() - unique = [] - bookkeeping = [] - for event in rawevents: - if ShouldIgnore(event): - continue - if event.code2str() != 'changed': - # process all non-change events - unique.append(event) - else: - if (event.filename, event.requestID) not in bookkeeping: - bookkeeping.append((event.filename, event.requestID)) - unique.append(event) - else: - collapsed += 1 - for event in unique: - if event.requestID in self.users: - try: - self.users[event.requestID].HandleEvent(event) - except: - logger.error("handling event for file %s" % (event.filename), exc_info=1) - end = time() - logger.info("Processed %s fam events in %03.03f seconds. %s coalesced" % - (count, (end - start), collapsed)) - return count - class Event(object): def __init__(self, request_id, filename, code): self.requestID = request_id @@ -98,18 +34,96 @@ class GaminEvent(Event): if code in action_map: self.action = action_map[code] -class GaminFam(object): - '''The fam object is a set of callbacks for file alteration events (Gamin support)''' - def __init__(self): +class FileMonitor(object): + '''File Monitor baseclass''' + def __init__(self, debug=False): object.__init__(self) + self.debug = debug + self.handles = dict() + + def fileno(self): + return 0 + + def handle_one_event(self, event): + if ShouldIgnore(event): + return + if event.requestID not in self.handles: + logger.info("Got event for unexpected id %s, file %s" % + (event.requestID, event.filename)) + return + if self.debug: + logger.info("Dispatching event %s %s to obj %s" \ + % (event.code2str(), event.filename, + self.handles[event.requestID])) + try: + self.handles[event.requestID].HandleEvent(event) + except: + logger.error("error in handling of gamin event for %s" % \ + (event.filename), exc_info=1) + + def handle_event_set(self, lock=None): + count = 1 + event = self.get_event() + start = time() + if lock: + lock.acquire() + try: + self.handle_one_event(event) + while self.pending(): + self.handle_one_event(self.get_event()) + count += 1 + except: + pass + if lock: + lock.release() + end = time() + logger.info("Handled %d events in %.02fs" % (count, (end-start))) + + def handle_events_in_interval(self, interval): + start = time() + end = start + interval + while time() < end: + if self.pending(): + self.handle_event_set() + else: + sleep(0.5) + +class Fam(FileMonitor): + '''The fam object is a set of callbacks for file alteration events (FAM support)''' + + def __init__(self, debug=False): + FileMonitor.__init__(self, debug) + self.fm = _fam.open() + + def fileno(self): + return self.fm.fileno() + + def AddMonitor(self, path, obj): + '''add a monitor to path, installing a callback to obj.HandleEvent''' + mode = os.stat(path)[stat.ST_MODE] + if stat.S_ISDIR(mode): + handle = self.fm.monitorDirectory(path, None) + else: + handle = self.fm.monitorFile(path, None) + if obj != None: + self.handles[handle.requestID()] = obj + return handle.requestID() + + def pending(self): + return self.fm.pending() + + def get_event(self): + return self.fm.nextEvent() + +class Gamin(FileMonitor): + '''The fam object is a set of callbacks for file alteration events (Gamin support)''' + def __init__(self, debug=False): + FileMonitor.__init__(self, debug) self.mon = WatchMonitor() - self.handles = {} self.counter = 0 self.events = [] - self.debug = False def fileno(self): - '''return fam file handle number''' return self.mon.get_fd() def queue(self, path, action, request_id): @@ -128,60 +142,25 @@ class GaminFam(object): self.handles[handle] = obj return handle - def Service(self): - '''Handle all gamin work''' - count = 0 - collapsed = 0 - start = time() - if self.mon.event_pending(): - while self.mon.event_pending(): - count += 1 - self.mon.handle_one_event() - else: - return 0 - unique = [] - bookkeeping = [] - for event in self.events: - if ShouldIgnore(event): - continue - if event.code2str() != 'changed': - # process all non-change events - unique.append(event) - else: - if (event.filename, event.requestID) not in bookkeeping: - bookkeeping.append((event.filename, event.requestID)) - unique.append(event) - else: - collapsed += 1 - self.events = [] - for event in unique: - if event.requestID not in self.handles: - logger.info("Got event for unexpected id %s, file %s" % - (event.requestID, event.filename)) - continue - if self.debug: - logger.info("Dispatching event %s %s to obj %s" \ - % (event.code2str(), event.filename, - self.handles[event.requestID])) - try: - self.handles[event.requestID].HandleEvent(event) - except: - logger.error("error in handling of gamin event for %s" % \ - (event.filename), exc_info=1) - end = time() - logger.info("Processed %s gamin events in %03.03f seconds. %s collapsed" % - (count, (end - start), collapsed)) - return count + def pending(self): + return self.mon.event_pending() + + def get_event(self): + self.mon.handle_one_event() + return self.events.pop() -class PseudoFam(object): +class Pseudo(FileMonitor): '''The fam object is a set of callbacks for file alteration events (FAM support)''' - def __init__(self): - object.__init__(self) - self.users = {} - self.handles = {} - self.debug = False - self.pending = [] + def __init__(self, debug=False): + FileMonitor.__init__(self, debug=False) + self.pending_events = [] + + def pending(self): + return len(self.pending_events) != 0 + + def get_event(self): + return self.pending_events.pop() def AddMonitor(self, path, obj): '''add a monitor to path, installing a callback to obj.HandleEvent''' @@ -190,43 +169,30 @@ class PseudoFam(object): handle = GaminEvent(handleID, path, 'exists') if stat.S_ISDIR(mode): dirList = os.listdir(path) - self.pending.append(handle) + self.pending_events.append(handle) for includedFile in dirList: - self.pending.append(GaminEvent(handleID, includedFile, 'exists')) - self.pending.append(GaminEvent(handleID, path, 'endExist')) + self.pending_events.append(GaminEvent(handleID, includedFile, 'exists')) + self.pending_events.append(GaminEvent(handleID, path, 'endExist')) else: - self.pending.append(GaminEvent(handleID, path, 'exists')) - self.handles[handleID] = handle + self.pending_events.append(GaminEvent(handleID, path, 'exists')) if obj != None: - self.users[handleID] = obj + self.handles[handleID] = obj return handleID - def Service(self): - '''Handle all fam work''' - count = 0 - rawevents = [] - for event in self.pending: - count += 1 - rawevents.append(event) - self.pending = [] - for event in rawevents: - if event.requestID in self.users: - self.users[event.requestID].HandleEvent(event) - return count - + available = {} try: from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, GAMChanged, GAMDeleted, GAMMoved - available['gamin'] = GaminFam + available['gamin'] = Gamin except ImportError: # fall back to _fam pass try: import _fam - available['fam'] = FamFam + available['fam'] = Fam except ImportError: pass -available['pseudo'] = PseudoFam +available['pseudo'] = Pseudo for fdrv in ['gamin', 'fam', 'pseudo']: if fdrv in available: diff --git a/src/lib/Server/XMLRPC.py b/src/lib/Server/XMLRPC.py index 4e97271bc..36d7c3b79 100644 --- a/src/lib/Server/XMLRPC.py +++ b/src/lib/Server/XMLRPC.py @@ -4,10 +4,11 @@ import logging import lxml.etree import select import socket +import threading import time import xmlrpclib -from Bcfg2.Component import Component, exposed +from Bcfg2.Component import Component, automatic, exposed, locking import Bcfg2.Server.Core logger = logging.getLogger('server') @@ -25,6 +26,7 @@ class bcfg2_server(Component, Bcfg2.Server.Core.Core): '''XML RPC interfaces for the server core''' name = 'bcfg2-server' + implementation = 'bcfg2-server' def __init__(self, setup): Component.__init__(self) @@ -32,26 +34,20 @@ class bcfg2_server(Component, setup['password'], setup['encoding'], setup['filemonitor']) self.ca = setup['ca'] - self.process_initial_fam_events() + self.fam_thread = threading.Thread(target=self._file_monitor_thread) + self.fam_thread.start() - def process_initial_fam_events(self): - events = False + def _file_monitor_thread(self): + famfd = self.fam.fileno() while True: try: - rsockinfo = select.select([self.fam.fileno()], [], [], 15)[0] - if not rsockinfo: - if events: - break - else: - logger.error("Hit event timeout without getting " - "any events; GAMIN/FAM problem?") - continue - events = True - i = 0 - while self.fam.Service() or i < 10: - i += 1 - time.sleep(0.1) - except socket.error: + if famfd: + rsockinfo = select.select([famfd], [], []) + else: + while not self.fam.pending(): + time.sleep(15) + self.fam.handle_event_set(self.lock) + except: continue @exposed diff --git a/src/sbin/bcfg2-info b/src/sbin/bcfg2-info index 0d166bfdc..2b079a4e1 100755 --- a/src/sbin/bcfg2-info +++ b/src/sbin/bcfg2-info @@ -37,10 +37,7 @@ class infoCore(cmd.Cmd, Bcfg2.Server.Core.Core): raise SystemExit(1) self.prompt = '> ' self.cont = True - i = 0 - while self.fam.Service() or i < 10: - i += 1 - time.sleep(0.1) + self.fam.handle_events_in_interval(10) def do_loop(self): self.cont = True -- cgit v1.2.3-1-g7c22