diff options
Diffstat (limited to 'src/lib/Bcfg2/Server/FileMonitor')
-rw-r--r-- | src/lib/Bcfg2/Server/FileMonitor/Fam.py | 82 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/FileMonitor/Gamin.py | 64 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/FileMonitor/Inotify.py | 126 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/FileMonitor/Pseudo.py | 25 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/FileMonitor/__init__.py | 143 |
5 files changed, 440 insertions, 0 deletions
diff --git a/src/lib/Bcfg2/Server/FileMonitor/Fam.py b/src/lib/Bcfg2/Server/FileMonitor/Fam.py new file mode 100644 index 000000000..1a00fffa0 --- /dev/null +++ b/src/lib/Bcfg2/Server/FileMonitor/Fam.py @@ -0,0 +1,82 @@ +""" Fam provides FAM support for file alteration events """ + +import os +import _fam +import stat +import logging +from time import time +from Bcfg2.Server.FileMonitor import FileMonitor + +logger = logging.getLogger(__name__) + +class Fam(FileMonitor): + __priority__ = 90 + + def __init__(self, ignore=None, debug=False): + FileMonitor.__init__(self, ignore=ignore, debug=debug) + self.fm = _fam.open() + self.users = {} + + def fileno(self): + """Return fam file handle number.""" + return self.fm.fileno() + + def handle_event_set(self, _): + self.Service() + + def handle_events_in_interval(self, interval): + now = time() + while (time() - now) < interval: + if self.Service(): + now = time() + + def AddMonitor(self, path, obj): + """Add a monitor to path, installing a callback to obj.HandleEvent.""" + mode = os.stat(path)[stat.ST_MODE] + if stat.S_ISDIR(mode): + handle = self.fm.monitorDirectory(path, None) + else: + handle = self.fm.monitorFile(path, None) + self.handles[handle.requestID()] = handle + if obj != None: + self.users[handle.requestID()] = obj + return handle.requestID() + + def Service(self, interval=0.50): + """Handle all fam work.""" + count = 0 + collapsed = 0 + rawevents = [] + start = time() + now = time() + while (time() - now) < interval: + if self.fm.pending(): + while self.fm.pending(): + count += 1 + rawevents.append(self.fm.nextEvent()) + now = time() + unique = [] + bookkeeping = [] + for event in rawevents: + if self.should_ignore(event): + continue + if event.code2str() != 'changed': + # process all non-change events + unique.append(event) + else: + if (event.filename, event.requestID) not in bookkeeping: + bookkeeping.append((event.filename, event.requestID)) + unique.append(event) + else: + collapsed += 1 + for event in unique: + if event.requestID in self.users: + try: + self.users[event.requestID].HandleEvent(event) + except: + logger.error("Handling event for file %s" % event.filename, + exc_info=1) + end = time() + logger.info("Processed %s fam events in %03.03f seconds. %s coalesced" % + (count, (end - start), collapsed)) + return count diff --git a/src/lib/Bcfg2/Server/FileMonitor/Gamin.py b/src/lib/Bcfg2/Server/FileMonitor/Gamin.py new file mode 100644 index 000000000..60f80c9c3 --- /dev/null +++ b/src/lib/Bcfg2/Server/FileMonitor/Gamin.py @@ -0,0 +1,64 @@ +""" Gamin driver for file alteration events """ + +import os +import stat +import logging +from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, \ + GAMChanged, GAMDeleted +from Bcfg2.Server.FileMonitor import Event, FileMonitor + +logger = logging.getLogger(__name__) + +class GaminEvent(Event): + """ + This class provides an event analogous to + python-fam events based on gamin sources. + """ + action_map = {GAMCreated: 'created', GAMExists: 'exists', + GAMChanged: 'changed', GAMDeleted: 'deleted', + GAMEndExist: 'endExist'} + + def __init__(self, request_id, filename, code): + Event.__init__(self, request_id, filename, code) + if code in self.action_map: + self.action = self.action_map[code] + +class Gamin(FileMonitor): + __priority__ = 10 + + def __init__(self, ignore=None, debug=False): + FileMonitor.__init__(self, ignore=ignore, debug=debug) + self.mon = WatchMonitor() + self.counter = 0 + + def fileno(self): + return self.mon.get_fd() + + def queue(self, path, action, request_id): + """queue up the event for later handling""" + self.events.append(GaminEvent(request_id, path, action)) + + def AddMonitor(self, path, obj): + """Add a monitor to path, installing a callback to obj.""" + handle = self.counter + self.counter += 1 + mode = os.stat(path)[stat.ST_MODE] + + # Flush queued gamin events + while self.mon.event_pending(): + self.mon.handle_one_event() + + if stat.S_ISDIR(mode): + self.mon.watch_directory(path, self.queue, handle) + else: + self.mon.watch_file(path, self.queue, handle) + self.handles[handle] = obj + return handle + + def pending(self): + return FileMonitor.pending(self) or self.mon.event_pending() + + def get_event(self): + if self.mon.event_pending(): + self.mon.handle_one_event() + return FileMonitor.get_event(self) diff --git a/src/lib/Bcfg2/Server/FileMonitor/Inotify.py b/src/lib/Bcfg2/Server/FileMonitor/Inotify.py new file mode 100644 index 000000000..880ac7e8d --- /dev/null +++ b/src/lib/Bcfg2/Server/FileMonitor/Inotify.py @@ -0,0 +1,126 @@ +""" Inotify driver for file alteration events """ + +import logging +import operator +import os +import pyinotify +import sys +from Bcfg2.Bcfg2Py3k import reduce +from Bcfg2.Server.FileMonitor import Event +from Bcfg2.Server.FileMonitor.Pseudo import Pseudo + +logger = logging.getLogger(__name__) + +class Inotify(Pseudo, pyinotify.ProcessEvent): + __priority__ = 1 + action_map = {pyinotify.IN_CREATE: 'created', + pyinotify.IN_DELETE: 'deleted', + pyinotify.IN_MODIFY: 'changed', + pyinotify.IN_MOVED_FROM: 'deleted', + pyinotify.IN_MOVED_TO: 'created'} + mask = reduce(lambda x, y: x | y, action_map.keys()) + + def __init__(self, ignore=None, debug=False): + Pseudo.__init__(self, ignore=ignore, debug=debug) + self.wm = pyinotify.WatchManager() + self.notifier = pyinotify.ThreadedNotifier(self.wm, self) + self.notifier.start() + self.event_filter = dict() + self.watches_by_path = dict() + + def fileno(self): + return self.wm.get_fd() + + def process_default(self, ievent): + action = ievent.maskname + for amask, aname in self.action_map.items(): + if ievent.mask & amask: + action = aname + break + try: + watch = self.wm.watches[ievent.wd] + except KeyError: + err = sys.exc_info()[1] + logger.error("Error handling event for %s: Watch %s not found" % + (ievent.pathname, ievent.wd)) + return + # FAM-style file monitors return the full path to the parent + # directory that is being watched, relative paths to anything + # contained within the directory. since we can't use inotify + # to watch files directly, we have to sort of guess at whether + # this watch was actually added on a file (and thus is in + # self.event_filter because we're filtering out other events + # on the directory) or was added directly on a directory. + if (watch.path == ievent.pathname or ievent.wd in self.event_filter): + path = ievent.pathname + else: + # relative path + path = os.path.basename(ievent.pathname) + # figure out the handleID. start with the path of the event; + # that should catch events on files that are watched directly. + # (we have to watch the directory that a file is in, so this + # lets us handle events on different files in the same + # directory -- and thus under the same watch -- with different + # objects.) If the path to the event doesn't have a handler, + # use the path of the watch itself. + handleID = ievent.pathname + if handleID not in self.handles: + handleID = watch.path + evt = Event(handleID, path, action) + + if (ievent.wd not in self.event_filter or + ievent.pathname in self.event_filter[ievent.wd]): + self.events.append(evt) + + def AddMonitor(self, path, obj): + # strip trailing slashes + path = path.rstrip("/") + if not os.path.isdir(path): + # inotify is a little wonky about watching files. for + # instance, if you watch /tmp/foo, and then do 'mv + # /tmp/bar /tmp/foo', it processes that as a deletion of + # /tmp/foo (which it technically _is_, but that's rather + # useless -- we care that /tmp/foo changed, not that it + # was first deleted and then created). In order to + # effectively watch a file, we have to watch the directory + # it's in, and filter out events for other files in the + # same directory that are not similarly watched. + # watch_transient_file requires a Processor _class_, not + # an object, so we can't have this object handle events, + # which is Wrong, so we can't use that function. + watch_path = os.path.dirname(path) + is_dir = False + else: + watch_path = path + is_dir = True + + # see if this path is already being watched + try: + wd = self.watches_by_path[watch_path] + except KeyError: + wd = self.wm.add_watch(watch_path, self.mask, + quiet=False)[watch_path] + self.watches_by_path[watch_path] = wd + + produce_exists = True + if not is_dir: + if wd not in self.event_filter: + self.event_filter[wd] = [path] + elif path not in self.event_filter[wd]: + self.event_filter[wd].append(path) + else: + # we've been asked to watch a file that we're already + # watching, so we don't need to produce 'exists' + # events + produce_exists = False + + # inotify doesn't produce initial 'exists' events, so we + # inherit from Pseudo to produce those + if produce_exists: + return Pseudo.AddMonitor(self, path, obj, handleID=path) + else: + self.handles[path] = obj + return path + + def shutdown(self): + self.notifier.stop() diff --git a/src/lib/Bcfg2/Server/FileMonitor/Pseudo.py b/src/lib/Bcfg2/Server/FileMonitor/Pseudo.py new file mode 100644 index 000000000..089d4cf0f --- /dev/null +++ b/src/lib/Bcfg2/Server/FileMonitor/Pseudo.py @@ -0,0 +1,25 @@ +""" Pseudo provides static monitor support for file alteration events """ + +import os +import logging +from Bcfg2.Server.FileMonitor import FileMonitor, Event + +logger = logging.getLogger(__name__) + +class Pseudo(FileMonitor): + __priority__ = 99 + + def AddMonitor(self, path, obj, handleID=None): + """add a monitor to path, installing a callback to obj.HandleEvent""" + if handleID is None: + handleID = len(list(self.handles.keys())) + self.events.append(Event(handleID, path, 'exists')) + if os.path.isdir(path): + dirList = os.listdir(path) + for includedFile in dirList: + self.events.append(Event(handleID, includedFile, 'exists')) + self.events.append(Event(handleID, path, 'endExist')) + + if obj != None: + self.handles[handleID] = obj + return handleID diff --git a/src/lib/Bcfg2/Server/FileMonitor/__init__.py b/src/lib/Bcfg2/Server/FileMonitor/__init__.py new file mode 100644 index 000000000..c490acc81 --- /dev/null +++ b/src/lib/Bcfg2/Server/FileMonitor/__init__.py @@ -0,0 +1,143 @@ +"""Bcfg2.Server.FileMonitor provides the support for monitoring files.""" + +import os +import sys +import fnmatch +import logging +import pkgutil +from time import sleep, time + +logger = logging.getLogger(__name__) + +class Event(object): + def __init__(self, request_id, filename, code): + self.requestID = request_id + self.filename = filename + self.action = code + + def code2str(self): + """return static code for event""" + return self.action + + def __str__(self): + return "%s: %s %s" % (self.__class__.__name__, + self.filename, self.action) + + def __repr__(self): + return "%s (request ID %s)" % (str(self), self.requestID) + + +class FileMonitor(object): + """File Monitor baseclass.""" + def __init__(self, ignore=None, debug=False): + object.__init__(self) + self.debug = debug + self.handles = dict() + self.events = [] + if ignore is None: + ignore = [] + self.ignore = ignore + + def __str__(self): + return "%s: %s" % (__name__, self.__class__.__name__) + + def __repr__(self): + return "%s (%s events, fd %s)" % (str(self), len(self.events), self.fileno) + + def debug_log(self, msg): + if self.debug: + logger.info(msg) + + def should_ignore(self, event): + for pattern in self.ignore: + if (fnmatch.fnmatch(event.filename, pattern) or + fnmatch.fnmatch(os.path.split(event.filename)[-1], pattern)): + self.debug_log("Ignoring %s" % event) + return True + return False + + def pending(self): + return bool(self.events) + + def get_event(self): + return self.events.pop(0) + + def fileno(self): + return 0 + + def handle_one_event(self, event): + if self.should_ignore(event): + return + if event.requestID not in self.handles: + logger.info("Got event for unexpected id %s, file %s" % + (event.requestID, event.filename)) + return + self.debug_log("Dispatching event %s %s to obj %s" % + (event.code2str(), event.filename, + self.handles[event.requestID])) + try: + self.handles[event.requestID].HandleEvent(event) + except: + err = sys.exc_info()[1] + logger.error("Error in handling of event %s for %s: %s" % + (event.code2str(), event.filename, err)) + + def handle_event_set(self, lock=None): + count = 1 + event = self.get_event() + start = time() + if lock: + lock.acquire() + try: + self.handle_one_event(event) + while self.pending(): + self.handle_one_event(self.get_event()) + count += 1 + except: + pass + if lock: + lock.release() + end = time() + logger.info("Handled %d events in %.03fs" % (count, (end - start))) + + def handle_events_in_interval(self, interval): + end = time() + interval + while time() < end: + if self.pending(): + self.handle_event_set() + end = time() + interval + else: + sleep(0.5) + + def shutdown(self): + pass + + +available = dict() + +# todo: loading the monitor drivers should be automatic +from Bcfg2.Server.FileMonitor.Pseudo import Pseudo +available['pseudo'] = Pseudo + +try: + from Bcfg2.Server.FileMonitor.Fam import Fam + available['fam'] = Fam +except ImportError: + pass + +try: + from Bcfg2.Server.FileMonitor.Gamin import Gamin + available['gamin'] = Gamin +except ImportError: + pass + +try: + from Bcfg2.Server.FileMonitor.Inotify import Inotify + available['inotify'] = Inotify +except ImportError: + pass + +for fdrv in sorted(available.keys(), key=lambda k: available[k].__priority__): + if fdrv in available: + available['default'] = available[fdrv] + break |