summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/FileMonitor
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/Bcfg2/Server/FileMonitor')
-rw-r--r--src/lib/Bcfg2/Server/FileMonitor/Fam.py82
-rw-r--r--src/lib/Bcfg2/Server/FileMonitor/Gamin.py64
-rw-r--r--src/lib/Bcfg2/Server/FileMonitor/Inotify.py126
-rw-r--r--src/lib/Bcfg2/Server/FileMonitor/Pseudo.py25
-rw-r--r--src/lib/Bcfg2/Server/FileMonitor/__init__.py143
5 files changed, 440 insertions, 0 deletions
diff --git a/src/lib/Bcfg2/Server/FileMonitor/Fam.py b/src/lib/Bcfg2/Server/FileMonitor/Fam.py
new file mode 100644
index 000000000..1a00fffa0
--- /dev/null
+++ b/src/lib/Bcfg2/Server/FileMonitor/Fam.py
@@ -0,0 +1,82 @@
+""" Fam provides FAM support for file alteration events """
+
+import os
+import _fam
+import stat
+import logging
+from time import time
+from Bcfg2.Server.FileMonitor import FileMonitor
+
+logger = logging.getLogger(__name__)
+
+class Fam(FileMonitor):
+ __priority__ = 90
+
+ def __init__(self, ignore=None, debug=False):
+ FileMonitor.__init__(self, ignore=ignore, debug=debug)
+ self.fm = _fam.open()
+ self.users = {}
+
+ def fileno(self):
+ """Return fam file handle number."""
+ return self.fm.fileno()
+
+ def handle_event_set(self, _):
+ self.Service()
+
+ def handle_events_in_interval(self, interval):
+ now = time()
+ while (time() - now) < interval:
+ if self.Service():
+ now = time()
+
+ def AddMonitor(self, path, obj):
+ """Add a monitor to path, installing a callback to obj.HandleEvent."""
+ mode = os.stat(path)[stat.ST_MODE]
+ if stat.S_ISDIR(mode):
+ handle = self.fm.monitorDirectory(path, None)
+ else:
+ handle = self.fm.monitorFile(path, None)
+ self.handles[handle.requestID()] = handle
+ if obj != None:
+ self.users[handle.requestID()] = obj
+ return handle.requestID()
+
+ def Service(self, interval=0.50):
+ """Handle all fam work."""
+ count = 0
+ collapsed = 0
+ rawevents = []
+ start = time()
+ now = time()
+ while (time() - now) < interval:
+ if self.fm.pending():
+ while self.fm.pending():
+ count += 1
+ rawevents.append(self.fm.nextEvent())
+ now = time()
+ unique = []
+ bookkeeping = []
+ for event in rawevents:
+ if self.should_ignore(event):
+ continue
+ if event.code2str() != 'changed':
+ # process all non-change events
+ unique.append(event)
+ else:
+ if (event.filename, event.requestID) not in bookkeeping:
+ bookkeeping.append((event.filename, event.requestID))
+ unique.append(event)
+ else:
+ collapsed += 1
+ for event in unique:
+ if event.requestID in self.users:
+ try:
+ self.users[event.requestID].HandleEvent(event)
+ except:
+ logger.error("Handling event for file %s" % event.filename,
+ exc_info=1)
+ end = time()
+ logger.info("Processed %s fam events in %03.03f seconds. %s coalesced" %
+ (count, (end - start), collapsed))
+ return count
diff --git a/src/lib/Bcfg2/Server/FileMonitor/Gamin.py b/src/lib/Bcfg2/Server/FileMonitor/Gamin.py
new file mode 100644
index 000000000..60f80c9c3
--- /dev/null
+++ b/src/lib/Bcfg2/Server/FileMonitor/Gamin.py
@@ -0,0 +1,64 @@
+""" Gamin driver for file alteration events """
+
+import os
+import stat
+import logging
+from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, \
+ GAMChanged, GAMDeleted
+from Bcfg2.Server.FileMonitor import Event, FileMonitor
+
+logger = logging.getLogger(__name__)
+
+class GaminEvent(Event):
+ """
+ This class provides an event analogous to
+ python-fam events based on gamin sources.
+ """
+ action_map = {GAMCreated: 'created', GAMExists: 'exists',
+ GAMChanged: 'changed', GAMDeleted: 'deleted',
+ GAMEndExist: 'endExist'}
+
+ def __init__(self, request_id, filename, code):
+ Event.__init__(self, request_id, filename, code)
+ if code in self.action_map:
+ self.action = self.action_map[code]
+
+class Gamin(FileMonitor):
+ __priority__ = 10
+
+ def __init__(self, ignore=None, debug=False):
+ FileMonitor.__init__(self, ignore=ignore, debug=debug)
+ self.mon = WatchMonitor()
+ self.counter = 0
+
+ def fileno(self):
+ return self.mon.get_fd()
+
+ def queue(self, path, action, request_id):
+ """queue up the event for later handling"""
+ self.events.append(GaminEvent(request_id, path, action))
+
+ def AddMonitor(self, path, obj):
+ """Add a monitor to path, installing a callback to obj."""
+ handle = self.counter
+ self.counter += 1
+ mode = os.stat(path)[stat.ST_MODE]
+
+ # Flush queued gamin events
+ while self.mon.event_pending():
+ self.mon.handle_one_event()
+
+ if stat.S_ISDIR(mode):
+ self.mon.watch_directory(path, self.queue, handle)
+ else:
+ self.mon.watch_file(path, self.queue, handle)
+ self.handles[handle] = obj
+ return handle
+
+ def pending(self):
+ return FileMonitor.pending(self) or self.mon.event_pending()
+
+ def get_event(self):
+ if self.mon.event_pending():
+ self.mon.handle_one_event()
+ return FileMonitor.get_event(self)
diff --git a/src/lib/Bcfg2/Server/FileMonitor/Inotify.py b/src/lib/Bcfg2/Server/FileMonitor/Inotify.py
new file mode 100644
index 000000000..880ac7e8d
--- /dev/null
+++ b/src/lib/Bcfg2/Server/FileMonitor/Inotify.py
@@ -0,0 +1,126 @@
+""" Inotify driver for file alteration events """
+
+import logging
+import operator
+import os
+import pyinotify
+import sys
+from Bcfg2.Bcfg2Py3k import reduce
+from Bcfg2.Server.FileMonitor import Event
+from Bcfg2.Server.FileMonitor.Pseudo import Pseudo
+
+logger = logging.getLogger(__name__)
+
+class Inotify(Pseudo, pyinotify.ProcessEvent):
+ __priority__ = 1
+ action_map = {pyinotify.IN_CREATE: 'created',
+ pyinotify.IN_DELETE: 'deleted',
+ pyinotify.IN_MODIFY: 'changed',
+ pyinotify.IN_MOVED_FROM: 'deleted',
+ pyinotify.IN_MOVED_TO: 'created'}
+ mask = reduce(lambda x, y: x | y, action_map.keys())
+
+ def __init__(self, ignore=None, debug=False):
+ Pseudo.__init__(self, ignore=ignore, debug=debug)
+ self.wm = pyinotify.WatchManager()
+ self.notifier = pyinotify.ThreadedNotifier(self.wm, self)
+ self.notifier.start()
+ self.event_filter = dict()
+ self.watches_by_path = dict()
+
+ def fileno(self):
+ return self.wm.get_fd()
+
+ def process_default(self, ievent):
+ action = ievent.maskname
+ for amask, aname in self.action_map.items():
+ if ievent.mask & amask:
+ action = aname
+ break
+ try:
+ watch = self.wm.watches[ievent.wd]
+ except KeyError:
+ err = sys.exc_info()[1]
+ logger.error("Error handling event for %s: Watch %s not found" %
+ (ievent.pathname, ievent.wd))
+ return
+ # FAM-style file monitors return the full path to the parent
+ # directory that is being watched, relative paths to anything
+ # contained within the directory. since we can't use inotify
+ # to watch files directly, we have to sort of guess at whether
+ # this watch was actually added on a file (and thus is in
+ # self.event_filter because we're filtering out other events
+ # on the directory) or was added directly on a directory.
+ if (watch.path == ievent.pathname or ievent.wd in self.event_filter):
+ path = ievent.pathname
+ else:
+ # relative path
+ path = os.path.basename(ievent.pathname)
+ # figure out the handleID. start with the path of the event;
+ # that should catch events on files that are watched directly.
+ # (we have to watch the directory that a file is in, so this
+ # lets us handle events on different files in the same
+ # directory -- and thus under the same watch -- with different
+ # objects.) If the path to the event doesn't have a handler,
+ # use the path of the watch itself.
+ handleID = ievent.pathname
+ if handleID not in self.handles:
+ handleID = watch.path
+ evt = Event(handleID, path, action)
+
+ if (ievent.wd not in self.event_filter or
+ ievent.pathname in self.event_filter[ievent.wd]):
+ self.events.append(evt)
+
+ def AddMonitor(self, path, obj):
+ # strip trailing slashes
+ path = path.rstrip("/")
+ if not os.path.isdir(path):
+ # inotify is a little wonky about watching files. for
+ # instance, if you watch /tmp/foo, and then do 'mv
+ # /tmp/bar /tmp/foo', it processes that as a deletion of
+ # /tmp/foo (which it technically _is_, but that's rather
+ # useless -- we care that /tmp/foo changed, not that it
+ # was first deleted and then created). In order to
+ # effectively watch a file, we have to watch the directory
+ # it's in, and filter out events for other files in the
+ # same directory that are not similarly watched.
+ # watch_transient_file requires a Processor _class_, not
+ # an object, so we can't have this object handle events,
+ # which is Wrong, so we can't use that function.
+ watch_path = os.path.dirname(path)
+ is_dir = False
+ else:
+ watch_path = path
+ is_dir = True
+
+ # see if this path is already being watched
+ try:
+ wd = self.watches_by_path[watch_path]
+ except KeyError:
+ wd = self.wm.add_watch(watch_path, self.mask,
+ quiet=False)[watch_path]
+ self.watches_by_path[watch_path] = wd
+
+ produce_exists = True
+ if not is_dir:
+ if wd not in self.event_filter:
+ self.event_filter[wd] = [path]
+ elif path not in self.event_filter[wd]:
+ self.event_filter[wd].append(path)
+ else:
+ # we've been asked to watch a file that we're already
+ # watching, so we don't need to produce 'exists'
+ # events
+ produce_exists = False
+
+ # inotify doesn't produce initial 'exists' events, so we
+ # inherit from Pseudo to produce those
+ if produce_exists:
+ return Pseudo.AddMonitor(self, path, obj, handleID=path)
+ else:
+ self.handles[path] = obj
+ return path
+
+ def shutdown(self):
+ self.notifier.stop()
diff --git a/src/lib/Bcfg2/Server/FileMonitor/Pseudo.py b/src/lib/Bcfg2/Server/FileMonitor/Pseudo.py
new file mode 100644
index 000000000..089d4cf0f
--- /dev/null
+++ b/src/lib/Bcfg2/Server/FileMonitor/Pseudo.py
@@ -0,0 +1,25 @@
+""" Pseudo provides static monitor support for file alteration events """
+
+import os
+import logging
+from Bcfg2.Server.FileMonitor import FileMonitor, Event
+
+logger = logging.getLogger(__name__)
+
+class Pseudo(FileMonitor):
+ __priority__ = 99
+
+ def AddMonitor(self, path, obj, handleID=None):
+ """add a monitor to path, installing a callback to obj.HandleEvent"""
+ if handleID is None:
+ handleID = len(list(self.handles.keys()))
+ self.events.append(Event(handleID, path, 'exists'))
+ if os.path.isdir(path):
+ dirList = os.listdir(path)
+ for includedFile in dirList:
+ self.events.append(Event(handleID, includedFile, 'exists'))
+ self.events.append(Event(handleID, path, 'endExist'))
+
+ if obj != None:
+ self.handles[handleID] = obj
+ return handleID
diff --git a/src/lib/Bcfg2/Server/FileMonitor/__init__.py b/src/lib/Bcfg2/Server/FileMonitor/__init__.py
new file mode 100644
index 000000000..c490acc81
--- /dev/null
+++ b/src/lib/Bcfg2/Server/FileMonitor/__init__.py
@@ -0,0 +1,143 @@
+"""Bcfg2.Server.FileMonitor provides the support for monitoring files."""
+
+import os
+import sys
+import fnmatch
+import logging
+import pkgutil
+from time import sleep, time
+
+logger = logging.getLogger(__name__)
+
+class Event(object):
+ def __init__(self, request_id, filename, code):
+ self.requestID = request_id
+ self.filename = filename
+ self.action = code
+
+ def code2str(self):
+ """return static code for event"""
+ return self.action
+
+ def __str__(self):
+ return "%s: %s %s" % (self.__class__.__name__,
+ self.filename, self.action)
+
+ def __repr__(self):
+ return "%s (request ID %s)" % (str(self), self.requestID)
+
+
+class FileMonitor(object):
+ """File Monitor baseclass."""
+ def __init__(self, ignore=None, debug=False):
+ object.__init__(self)
+ self.debug = debug
+ self.handles = dict()
+ self.events = []
+ if ignore is None:
+ ignore = []
+ self.ignore = ignore
+
+ def __str__(self):
+ return "%s: %s" % (__name__, self.__class__.__name__)
+
+ def __repr__(self):
+ return "%s (%s events, fd %s)" % (str(self), len(self.events), self.fileno)
+
+ def debug_log(self, msg):
+ if self.debug:
+ logger.info(msg)
+
+ def should_ignore(self, event):
+ for pattern in self.ignore:
+ if (fnmatch.fnmatch(event.filename, pattern) or
+ fnmatch.fnmatch(os.path.split(event.filename)[-1], pattern)):
+ self.debug_log("Ignoring %s" % event)
+ return True
+ return False
+
+ def pending(self):
+ return bool(self.events)
+
+ def get_event(self):
+ return self.events.pop(0)
+
+ def fileno(self):
+ return 0
+
+ def handle_one_event(self, event):
+ if self.should_ignore(event):
+ return
+ if event.requestID not in self.handles:
+ logger.info("Got event for unexpected id %s, file %s" %
+ (event.requestID, event.filename))
+ return
+ self.debug_log("Dispatching event %s %s to obj %s" %
+ (event.code2str(), event.filename,
+ self.handles[event.requestID]))
+ try:
+ self.handles[event.requestID].HandleEvent(event)
+ except:
+ err = sys.exc_info()[1]
+ logger.error("Error in handling of event %s for %s: %s" %
+ (event.code2str(), event.filename, err))
+
+ def handle_event_set(self, lock=None):
+ count = 1
+ event = self.get_event()
+ start = time()
+ if lock:
+ lock.acquire()
+ try:
+ self.handle_one_event(event)
+ while self.pending():
+ self.handle_one_event(self.get_event())
+ count += 1
+ except:
+ pass
+ if lock:
+ lock.release()
+ end = time()
+ logger.info("Handled %d events in %.03fs" % (count, (end - start)))
+
+ def handle_events_in_interval(self, interval):
+ end = time() + interval
+ while time() < end:
+ if self.pending():
+ self.handle_event_set()
+ end = time() + interval
+ else:
+ sleep(0.5)
+
+ def shutdown(self):
+ pass
+
+
+available = dict()
+
+# todo: loading the monitor drivers should be automatic
+from Bcfg2.Server.FileMonitor.Pseudo import Pseudo
+available['pseudo'] = Pseudo
+
+try:
+ from Bcfg2.Server.FileMonitor.Fam import Fam
+ available['fam'] = Fam
+except ImportError:
+ pass
+
+try:
+ from Bcfg2.Server.FileMonitor.Gamin import Gamin
+ available['gamin'] = Gamin
+except ImportError:
+ pass
+
+try:
+ from Bcfg2.Server.FileMonitor.Inotify import Inotify
+ available['inotify'] = Inotify
+except ImportError:
+ pass
+
+for fdrv in sorted(available.keys(), key=lambda k: available[k].__priority__):
+ if fdrv in available:
+ available['default'] = available[fdrv]
+ break