From cecdedb73bf0dfed66964c91b08256b100997289 Mon Sep 17 00:00:00 2001 From: Narayan Desai Date: Wed, 6 May 2009 19:17:53 +0000 Subject: Clean up File Monitoring code/adapt to new server infrastructure git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@5202 ce84e21b-d406-0410-9b95-82705330c041 --- src/lib/Server/FileMonitor.py | 254 ++++++++++++++++++------------------------ 1 file changed, 110 insertions(+), 144 deletions(-) (limited to 'src/lib/Server/FileMonitor.py') diff --git a/src/lib/Server/FileMonitor.py b/src/lib/Server/FileMonitor.py index da6eb00a4..ccf2f2f8a 100644 --- a/src/lib/Server/FileMonitor.py +++ b/src/lib/Server/FileMonitor.py @@ -1,4 +1,4 @@ -from time import time +from time import sleep, time import logging, os, stat logger = logging.getLogger('Bcfg2.Server.FileMonitor') @@ -14,70 +14,6 @@ def ShouldIgnore(event): return True return False -class FamFam(object): - '''The fam object is a set of callbacks for file alteration events (FAM support)''' - - def __init__(self): - object.__init__(self) - self.fm = _fam.open() - self.users = {} - self.handles = {} - self.debug = False - - def fileno(self): - '''return fam file handle number''' - return self.fm.fileno() - - def AddMonitor(self, path, obj): - '''add a monitor to path, installing a callback to obj.HandleEvent''' - mode = os.stat(path)[stat.ST_MODE] - if stat.S_ISDIR(mode): - handle = self.fm.monitorDirectory(path, None) - else: - handle = self.fm.monitorFile(path, None) - self.handles[handle.requestID()] = handle - if obj != None: - self.users[handle.requestID()] = obj - return handle.requestID() - - def Service(self): - '''Handle all fam work''' - count = 0 - collapsed = 0 - rawevents = [] - start = time() - now = time() - while (time() - now) < 0.10: - if self.fm.pending(): - while self.fm.pending(): - count += 1 - rawevents.append(self.fm.nextEvent()) - now = time() - unique = [] - bookkeeping = [] - for event in rawevents: - if ShouldIgnore(event): - continue - if event.code2str() != 'changed': - # process all non-change events - unique.append(event) - else: - if (event.filename, event.requestID) not in bookkeeping: - bookkeeping.append((event.filename, event.requestID)) - unique.append(event) - else: - collapsed += 1 - for event in unique: - if event.requestID in self.users: - try: - self.users[event.requestID].HandleEvent(event) - except: - logger.error("handling event for file %s" % (event.filename), exc_info=1) - end = time() - logger.info("Processed %s fam events in %03.03f seconds. %s coalesced" % - (count, (end - start), collapsed)) - return count - class Event(object): def __init__(self, request_id, filename, code): self.requestID = request_id @@ -98,18 +34,96 @@ class GaminEvent(Event): if code in action_map: self.action = action_map[code] -class GaminFam(object): - '''The fam object is a set of callbacks for file alteration events (Gamin support)''' - def __init__(self): +class FileMonitor(object): + '''File Monitor baseclass''' + def __init__(self, debug=False): object.__init__(self) + self.debug = debug + self.handles = dict() + + def fileno(self): + return 0 + + def handle_one_event(self, event): + if ShouldIgnore(event): + return + if event.requestID not in self.handles: + logger.info("Got event for unexpected id %s, file %s" % + (event.requestID, event.filename)) + return + if self.debug: + logger.info("Dispatching event %s %s to obj %s" \ + % (event.code2str(), event.filename, + self.handles[event.requestID])) + try: + self.handles[event.requestID].HandleEvent(event) + except: + logger.error("error in handling of gamin event for %s" % \ + (event.filename), exc_info=1) + + def handle_event_set(self, lock=None): + count = 1 + event = self.get_event() + start = time() + if lock: + lock.acquire() + try: + self.handle_one_event(event) + while self.pending(): + self.handle_one_event(self.get_event()) + count += 1 + except: + pass + if lock: + lock.release() + end = time() + logger.info("Handled %d events in %.02fs" % (count, (end-start))) + + def handle_events_in_interval(self, interval): + start = time() + end = start + interval + while time() < end: + if self.pending(): + self.handle_event_set() + else: + sleep(0.5) + +class Fam(FileMonitor): + '''The fam object is a set of callbacks for file alteration events (FAM support)''' + + def __init__(self, debug=False): + FileMonitor.__init__(self, debug) + self.fm = _fam.open() + + def fileno(self): + return self.fm.fileno() + + def AddMonitor(self, path, obj): + '''add a monitor to path, installing a callback to obj.HandleEvent''' + mode = os.stat(path)[stat.ST_MODE] + if stat.S_ISDIR(mode): + handle = self.fm.monitorDirectory(path, None) + else: + handle = self.fm.monitorFile(path, None) + if obj != None: + self.handles[handle.requestID()] = obj + return handle.requestID() + + def pending(self): + return self.fm.pending() + + def get_event(self): + return self.fm.nextEvent() + +class Gamin(FileMonitor): + '''The fam object is a set of callbacks for file alteration events (Gamin support)''' + def __init__(self, debug=False): + FileMonitor.__init__(self, debug) self.mon = WatchMonitor() - self.handles = {} self.counter = 0 self.events = [] - self.debug = False def fileno(self): - '''return fam file handle number''' return self.mon.get_fd() def queue(self, path, action, request_id): @@ -128,60 +142,25 @@ class GaminFam(object): self.handles[handle] = obj return handle - def Service(self): - '''Handle all gamin work''' - count = 0 - collapsed = 0 - start = time() - if self.mon.event_pending(): - while self.mon.event_pending(): - count += 1 - self.mon.handle_one_event() - else: - return 0 - unique = [] - bookkeeping = [] - for event in self.events: - if ShouldIgnore(event): - continue - if event.code2str() != 'changed': - # process all non-change events - unique.append(event) - else: - if (event.filename, event.requestID) not in bookkeeping: - bookkeeping.append((event.filename, event.requestID)) - unique.append(event) - else: - collapsed += 1 - self.events = [] - for event in unique: - if event.requestID not in self.handles: - logger.info("Got event for unexpected id %s, file %s" % - (event.requestID, event.filename)) - continue - if self.debug: - logger.info("Dispatching event %s %s to obj %s" \ - % (event.code2str(), event.filename, - self.handles[event.requestID])) - try: - self.handles[event.requestID].HandleEvent(event) - except: - logger.error("error in handling of gamin event for %s" % \ - (event.filename), exc_info=1) - end = time() - logger.info("Processed %s gamin events in %03.03f seconds. %s collapsed" % - (count, (end - start), collapsed)) - return count + def pending(self): + return self.mon.event_pending() + + def get_event(self): + self.mon.handle_one_event() + return self.events.pop() -class PseudoFam(object): +class Pseudo(FileMonitor): '''The fam object is a set of callbacks for file alteration events (FAM support)''' - def __init__(self): - object.__init__(self) - self.users = {} - self.handles = {} - self.debug = False - self.pending = [] + def __init__(self, debug=False): + FileMonitor.__init__(self, debug=False) + self.pending_events = [] + + def pending(self): + return len(self.pending_events) != 0 + + def get_event(self): + return self.pending_events.pop() def AddMonitor(self, path, obj): '''add a monitor to path, installing a callback to obj.HandleEvent''' @@ -190,43 +169,30 @@ class PseudoFam(object): handle = GaminEvent(handleID, path, 'exists') if stat.S_ISDIR(mode): dirList = os.listdir(path) - self.pending.append(handle) + self.pending_events.append(handle) for includedFile in dirList: - self.pending.append(GaminEvent(handleID, includedFile, 'exists')) - self.pending.append(GaminEvent(handleID, path, 'endExist')) + self.pending_events.append(GaminEvent(handleID, includedFile, 'exists')) + self.pending_events.append(GaminEvent(handleID, path, 'endExist')) else: - self.pending.append(GaminEvent(handleID, path, 'exists')) - self.handles[handleID] = handle + self.pending_events.append(GaminEvent(handleID, path, 'exists')) if obj != None: - self.users[handleID] = obj + self.handles[handleID] = obj return handleID - def Service(self): - '''Handle all fam work''' - count = 0 - rawevents = [] - for event in self.pending: - count += 1 - rawevents.append(event) - self.pending = [] - for event in rawevents: - if event.requestID in self.users: - self.users[event.requestID].HandleEvent(event) - return count - + available = {} try: from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, GAMChanged, GAMDeleted, GAMMoved - available['gamin'] = GaminFam + available['gamin'] = Gamin except ImportError: # fall back to _fam pass try: import _fam - available['fam'] = FamFam + available['fam'] = Fam except ImportError: pass -available['pseudo'] = PseudoFam +available['pseudo'] = Pseudo for fdrv in ['gamin', 'fam', 'pseudo']: if fdrv in available: -- cgit v1.2.3-1-g7c22