diff options
-rw-r--r-- | src/lib/Server/Core.py | 4 | ||||
-rw-r--r-- | src/lib/Server/FileMonitor.py | 179 |
2 files changed, 134 insertions, 49 deletions
diff --git a/src/lib/Server/Core.py b/src/lib/Server/Core.py index 9201622c2..1360bdf23 100644 --- a/src/lib/Server/Core.py +++ b/src/lib/Server/Core.py @@ -45,6 +45,8 @@ class Core(Component): try: self.fam = Bcfg2.Server.FileMonitor.available[filemonitor]() except IOError: + logger.error("Failed to instantiate fam driver %s" % filemonitor, + exc_info=1) raise CoreInitError, "failed to instantiate fam driver (used %s)" % \ filemonitor self.pubspace = {} @@ -285,7 +287,7 @@ class Core(Component): plugin.process_statistics(meta, statistics) except: logger.error("Plugin %s failed to process stats from %s" \ - % (plugin.name, mc.hostname), + % (plugin.name, meta.hostname), exc_info=1) logger.info("Client %s reported state %s" % (client_name, diff --git a/src/lib/Server/FileMonitor.py b/src/lib/Server/FileMonitor.py index 2ef0bcec9..c8d49f030 100644 --- a/src/lib/Server/FileMonitor.py +++ b/src/lib/Server/FileMonitor.py @@ -26,20 +26,7 @@ class Event(object): '''return static code for event''' return self.action - -class GaminEvent(Event): - ''' - This class provides an event analogous to - python-fam events based on gamin sources - ''' - def __init__(self, request_id, filename, code): - Event.__init__(self, request_id, filename, code) - action_map = {GAMCreated: 'created', GAMExists: 'exists', - GAMChanged: 'changed', GAMDeleted: 'deleted', - GAMEndExist: 'endExist', GAMMoved: 'moved'} - if code in action_map: - self.action = action_map[code] - +available = {} class FileMonitor(object): '''File Monitor baseclass''' def __init__(self, debug=False): @@ -47,6 +34,12 @@ class FileMonitor(object): self.debug = debug self.handles = dict() + def get_event(self): + return None + + def pending(self): + return False + def fileno(self): return 0 @@ -94,19 +87,30 @@ class FileMonitor(object): else: sleep(0.5) -class Fam(FileMonitor): - ''' - The fam object is a set of callbacks for - file alteration events (FAM support) - ''' + +class FamFam(object): + '''The fam object is a set of callbacks for file alteration events (FAM support)''' - def __init__(self, debug=False): - FileMonitor.__init__(self, debug) + def __init__(self): + object.__init__(self) self.fm = _fam.open() + self.users = {} + self.handles = {} + self.debug = False def fileno(self): + '''return fam file handle number''' return self.fm.fileno() + def handle_event_set(self, _): + self.Service() + + def handle_events_in_interval(self, interval): + now = time() + while (time() - now) < interval: + if self.Service(): + now = time() + def AddMonitor(self, path, obj): '''add a monitor to path, installing a callback to obj.HandleEvent''' mode = os.stat(path)[stat.ST_MODE] @@ -114,52 +118,80 @@ class Fam(FileMonitor): handle = self.fm.monitorDirectory(path, None) else: handle = self.fm.monitorFile(path, None) + self.handles[handle.requestID()] = handle if obj != None: - self.handles[handle.requestID()] = obj + self.users[handle.requestID()] = obj return handle.requestID() - def pending(self): - return self.fm.pending() + def Service(self, interval=0.50): + '''Handle all fam work''' + count = 0 + collapsed = 0 + rawevents = [] + start = time() + now = time() + while (time() - now) < interval: + if self.fm.pending(): + while self.fm.pending(): + count += 1 + rawevents.append(self.fm.nextEvent()) + now = time() + unique = [] + bookkeeping = [] + for event in rawevents: + if ShouldIgnore(event): + continue + if event.code2str() != 'changed': + # process all non-change events + unique.append(event) + else: + if (event.filename, event.requestID) not in bookkeeping: + bookkeeping.append((event.filename, event.requestID)) + unique.append(event) + else: + collapsed += 1 + for event in unique: + if event.requestID in self.users: + try: + self.users[event.requestID].HandleEvent(event) + except: + logger.error("handling event for file %s" % (event.filename), exc_info=1) + end = time() + logger.info("Processed %s fam events in %03.03f seconds. %s coalesced" % + (count, (end - start), collapsed)) + return count - def get_event(self): - return self.fm.nextEvent() -class Gamin(FileMonitor): + +class Fam(FileMonitor): ''' The fam object is a set of callbacks for - file alteration events (Gamin support) + file alteration events (FAM support) ''' + def __init__(self, debug=False): FileMonitor.__init__(self, debug) - self.mon = WatchMonitor() - self.counter = 0 - self.events = [] + self.fm = _fam.open() def fileno(self): - return self.mon.get_fd() - - def queue(self, path, action, request_id): - '''queue up the event for later handling''' - self.events.append(GaminEvent(request_id, path, action)) + return self.fm.fileno() def AddMonitor(self, path, obj): '''add a monitor to path, installing a callback to obj.HandleEvent''' - handle = self.counter - self.counter += 1 mode = os.stat(path)[stat.ST_MODE] if stat.S_ISDIR(mode): - self.mon.watch_directory(path, self.queue, handle) + handle = self.fm.monitorDirectory(path, None) else: - self.mon.watch_file(path, self.queue, handle) - self.handles[handle] = obj - return handle + handle = self.fm.monitorFile(path, None) + if obj != None: + self.handles[handle.requestID()] = obj + return handle.requestID() def pending(self): - return self.mon.event_pending() + return self.fm.pending() def get_event(self): - self.mon.handle_one_event() - return self.events.pop() + return self.fm.nextEvent() class Pseudo(FileMonitor): ''' @@ -181,7 +213,7 @@ class Pseudo(FileMonitor): '''add a monitor to path, installing a callback to obj.HandleEvent''' handleID = len(self.handles.keys()) mode = os.stat(path)[stat.ST_MODE] - handle = GaminEvent(handleID, path, 'exists') + handle = Event(handleID, path, 'exists') if stat.S_ISDIR(mode): dirList = os.listdir(path) self.pending_events.append(handle) @@ -195,16 +227,67 @@ class Pseudo(FileMonitor): return handleID -available = {} try: from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, GAMChanged, GAMDeleted, GAMMoved + + class GaminEvent(Event): + ''' + This class provides an event analogous to + python-fam events based on gamin sources + ''' + def __init__(self, request_id, filename, code): + Event.__init__(self, request_id, filename, code) + action_map = {GAMCreated: 'created', GAMExists: 'exists', + GAMChanged: 'changed', GAMDeleted: 'deleted', + GAMEndExist: 'endExist', GAMMoved: 'moved'} + if code in action_map: + self.action = action_map[code] + + class Gamin(FileMonitor): + ''' + The fam object is a set of callbacks for + file alteration events (Gamin support) + ''' + def __init__(self, debug=False): + FileMonitor.__init__(self, debug) + self.mon = WatchMonitor() + self.counter = 0 + self.events = [] + + def fileno(self): + return self.mon.get_fd() + + def queue(self, path, action, request_id): + '''queue up the event for later handling''' + self.events.append(GaminEvent(request_id, path, action)) + + def AddMonitor(self, path, obj): + '''add a monitor to path, installing a callback to obj.HandleEvent''' + handle = self.counter + self.counter += 1 + mode = os.stat(path)[stat.ST_MODE] + if stat.S_ISDIR(mode): + self.mon.watch_directory(path, self.queue, handle) + else: + self.mon.watch_file(path, self.queue, handle) + self.handles[handle] = obj + return handle + + def pending(self): + return self.mon.event_pending() + + def get_event(self): + self.mon.handle_one_event() + return self.events.pop() + available['gamin'] = Gamin except ImportError: # fall back to _fam pass + try: import _fam - available['fam'] = Fam + available['fam'] = FamFam except ImportError: pass available['pseudo'] = Pseudo |