summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorNarayan Desai <desai@mcs.anl.gov>2009-05-06 19:17:53 +0000
committerNarayan Desai <desai@mcs.anl.gov>2009-05-06 19:17:53 +0000
commitcecdedb73bf0dfed66964c91b08256b100997289 (patch)
treee066b5717b22ef84e3503e3049b0d58fa0b8c883 /src
parente55c9c0e1980b8c1bb808acbaeafd27a9b1cf5d1 (diff)
downloadbcfg2-cecdedb73bf0dfed66964c91b08256b100997289.tar.gz
bcfg2-cecdedb73bf0dfed66964c91b08256b100997289.tar.bz2
bcfg2-cecdedb73bf0dfed66964c91b08256b100997289.zip
Clean up File Monitoring code/adapt to new server infrastructure
git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@5202 ce84e21b-d406-0410-9b95-82705330c041
Diffstat (limited to 'src')
-rw-r--r--src/lib/Server/FileMonitor.py254
-rw-r--r--src/lib/Server/XMLRPC.py32
-rwxr-xr-xsrc/sbin/bcfg2-info5
3 files changed, 125 insertions, 166 deletions
diff --git a/src/lib/Server/FileMonitor.py b/src/lib/Server/FileMonitor.py
index da6eb00a4..ccf2f2f8a 100644
--- a/src/lib/Server/FileMonitor.py
+++ b/src/lib/Server/FileMonitor.py
@@ -1,4 +1,4 @@
-from time import time
+from time import sleep, time
import logging, os, stat
logger = logging.getLogger('Bcfg2.Server.FileMonitor')
@@ -14,70 +14,6 @@ def ShouldIgnore(event):
return True
return False
-class FamFam(object):
- '''The fam object is a set of callbacks for file alteration events (FAM support)'''
-
- def __init__(self):
- object.__init__(self)
- self.fm = _fam.open()
- self.users = {}
- self.handles = {}
- self.debug = False
-
- def fileno(self):
- '''return fam file handle number'''
- return self.fm.fileno()
-
- def AddMonitor(self, path, obj):
- '''add a monitor to path, installing a callback to obj.HandleEvent'''
- mode = os.stat(path)[stat.ST_MODE]
- if stat.S_ISDIR(mode):
- handle = self.fm.monitorDirectory(path, None)
- else:
- handle = self.fm.monitorFile(path, None)
- self.handles[handle.requestID()] = handle
- if obj != None:
- self.users[handle.requestID()] = obj
- return handle.requestID()
-
- def Service(self):
- '''Handle all fam work'''
- count = 0
- collapsed = 0
- rawevents = []
- start = time()
- now = time()
- while (time() - now) < 0.10:
- if self.fm.pending():
- while self.fm.pending():
- count += 1
- rawevents.append(self.fm.nextEvent())
- now = time()
- unique = []
- bookkeeping = []
- for event in rawevents:
- if ShouldIgnore(event):
- continue
- if event.code2str() != 'changed':
- # process all non-change events
- unique.append(event)
- else:
- if (event.filename, event.requestID) not in bookkeeping:
- bookkeeping.append((event.filename, event.requestID))
- unique.append(event)
- else:
- collapsed += 1
- for event in unique:
- if event.requestID in self.users:
- try:
- self.users[event.requestID].HandleEvent(event)
- except:
- logger.error("handling event for file %s" % (event.filename), exc_info=1)
- end = time()
- logger.info("Processed %s fam events in %03.03f seconds. %s coalesced" %
- (count, (end - start), collapsed))
- return count
-
class Event(object):
def __init__(self, request_id, filename, code):
self.requestID = request_id
@@ -98,18 +34,96 @@ class GaminEvent(Event):
if code in action_map:
self.action = action_map[code]
-class GaminFam(object):
- '''The fam object is a set of callbacks for file alteration events (Gamin support)'''
- def __init__(self):
+class FileMonitor(object):
+ '''File Monitor baseclass'''
+ def __init__(self, debug=False):
object.__init__(self)
+ self.debug = debug
+ self.handles = dict()
+
+ def fileno(self):
+ return 0
+
+ def handle_one_event(self, event):
+ if ShouldIgnore(event):
+ return
+ if event.requestID not in self.handles:
+ logger.info("Got event for unexpected id %s, file %s" %
+ (event.requestID, event.filename))
+ return
+ if self.debug:
+ logger.info("Dispatching event %s %s to obj %s" \
+ % (event.code2str(), event.filename,
+ self.handles[event.requestID]))
+ try:
+ self.handles[event.requestID].HandleEvent(event)
+ except:
+ logger.error("error in handling of gamin event for %s" % \
+ (event.filename), exc_info=1)
+
+ def handle_event_set(self, lock=None):
+ count = 1
+ event = self.get_event()
+ start = time()
+ if lock:
+ lock.acquire()
+ try:
+ self.handle_one_event(event)
+ while self.pending():
+ self.handle_one_event(self.get_event())
+ count += 1
+ except:
+ pass
+ if lock:
+ lock.release()
+ end = time()
+ logger.info("Handled %d events in %.02fs" % (count, (end-start)))
+
+ def handle_events_in_interval(self, interval):
+ start = time()
+ end = start + interval
+ while time() < end:
+ if self.pending():
+ self.handle_event_set()
+ else:
+ sleep(0.5)
+
+class Fam(FileMonitor):
+ '''The fam object is a set of callbacks for file alteration events (FAM support)'''
+
+ def __init__(self, debug=False):
+ FileMonitor.__init__(self, debug)
+ self.fm = _fam.open()
+
+ def fileno(self):
+ return self.fm.fileno()
+
+ def AddMonitor(self, path, obj):
+ '''add a monitor to path, installing a callback to obj.HandleEvent'''
+ mode = os.stat(path)[stat.ST_MODE]
+ if stat.S_ISDIR(mode):
+ handle = self.fm.monitorDirectory(path, None)
+ else:
+ handle = self.fm.monitorFile(path, None)
+ if obj != None:
+ self.handles[handle.requestID()] = obj
+ return handle.requestID()
+
+ def pending(self):
+ return self.fm.pending()
+
+ def get_event(self):
+ return self.fm.nextEvent()
+
+class Gamin(FileMonitor):
+ '''The fam object is a set of callbacks for file alteration events (Gamin support)'''
+ def __init__(self, debug=False):
+ FileMonitor.__init__(self, debug)
self.mon = WatchMonitor()
- self.handles = {}
self.counter = 0
self.events = []
- self.debug = False
def fileno(self):
- '''return fam file handle number'''
return self.mon.get_fd()
def queue(self, path, action, request_id):
@@ -128,60 +142,25 @@ class GaminFam(object):
self.handles[handle] = obj
return handle
- def Service(self):
- '''Handle all gamin work'''
- count = 0
- collapsed = 0
- start = time()
- if self.mon.event_pending():
- while self.mon.event_pending():
- count += 1
- self.mon.handle_one_event()
- else:
- return 0
- unique = []
- bookkeeping = []
- for event in self.events:
- if ShouldIgnore(event):
- continue
- if event.code2str() != 'changed':
- # process all non-change events
- unique.append(event)
- else:
- if (event.filename, event.requestID) not in bookkeeping:
- bookkeeping.append((event.filename, event.requestID))
- unique.append(event)
- else:
- collapsed += 1
- self.events = []
- for event in unique:
- if event.requestID not in self.handles:
- logger.info("Got event for unexpected id %s, file %s" %
- (event.requestID, event.filename))
- continue
- if self.debug:
- logger.info("Dispatching event %s %s to obj %s" \
- % (event.code2str(), event.filename,
- self.handles[event.requestID]))
- try:
- self.handles[event.requestID].HandleEvent(event)
- except:
- logger.error("error in handling of gamin event for %s" % \
- (event.filename), exc_info=1)
- end = time()
- logger.info("Processed %s gamin events in %03.03f seconds. %s collapsed" %
- (count, (end - start), collapsed))
- return count
+ def pending(self):
+ return self.mon.event_pending()
+
+ def get_event(self):
+ self.mon.handle_one_event()
+ return self.events.pop()
-class PseudoFam(object):
+class Pseudo(FileMonitor):
'''The fam object is a set of callbacks for file alteration events (FAM support)'''
- def __init__(self):
- object.__init__(self)
- self.users = {}
- self.handles = {}
- self.debug = False
- self.pending = []
+ def __init__(self, debug=False):
+ FileMonitor.__init__(self, debug=False)
+ self.pending_events = []
+
+ def pending(self):
+ return len(self.pending_events) != 0
+
+ def get_event(self):
+ return self.pending_events.pop()
def AddMonitor(self, path, obj):
'''add a monitor to path, installing a callback to obj.HandleEvent'''
@@ -190,43 +169,30 @@ class PseudoFam(object):
handle = GaminEvent(handleID, path, 'exists')
if stat.S_ISDIR(mode):
dirList = os.listdir(path)
- self.pending.append(handle)
+ self.pending_events.append(handle)
for includedFile in dirList:
- self.pending.append(GaminEvent(handleID, includedFile, 'exists'))
- self.pending.append(GaminEvent(handleID, path, 'endExist'))
+ self.pending_events.append(GaminEvent(handleID, includedFile, 'exists'))
+ self.pending_events.append(GaminEvent(handleID, path, 'endExist'))
else:
- self.pending.append(GaminEvent(handleID, path, 'exists'))
- self.handles[handleID] = handle
+ self.pending_events.append(GaminEvent(handleID, path, 'exists'))
if obj != None:
- self.users[handleID] = obj
+ self.handles[handleID] = obj
return handleID
- def Service(self):
- '''Handle all fam work'''
- count = 0
- rawevents = []
- for event in self.pending:
- count += 1
- rawevents.append(event)
- self.pending = []
- for event in rawevents:
- if event.requestID in self.users:
- self.users[event.requestID].HandleEvent(event)
- return count
-
+
available = {}
try:
from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, GAMChanged, GAMDeleted, GAMMoved
- available['gamin'] = GaminFam
+ available['gamin'] = Gamin
except ImportError:
# fall back to _fam
pass
try:
import _fam
- available['fam'] = FamFam
+ available['fam'] = Fam
except ImportError:
pass
-available['pseudo'] = PseudoFam
+available['pseudo'] = Pseudo
for fdrv in ['gamin', 'fam', 'pseudo']:
if fdrv in available:
diff --git a/src/lib/Server/XMLRPC.py b/src/lib/Server/XMLRPC.py
index 4e97271bc..36d7c3b79 100644
--- a/src/lib/Server/XMLRPC.py
+++ b/src/lib/Server/XMLRPC.py
@@ -4,10 +4,11 @@ import logging
import lxml.etree
import select
import socket
+import threading
import time
import xmlrpclib
-from Bcfg2.Component import Component, exposed
+from Bcfg2.Component import Component, automatic, exposed, locking
import Bcfg2.Server.Core
logger = logging.getLogger('server')
@@ -25,6 +26,7 @@ class bcfg2_server(Component,
Bcfg2.Server.Core.Core):
'''XML RPC interfaces for the server core'''
name = 'bcfg2-server'
+ implementation = 'bcfg2-server'
def __init__(self, setup):
Component.__init__(self)
@@ -32,26 +34,20 @@ class bcfg2_server(Component,
setup['password'],
setup['encoding'], setup['filemonitor'])
self.ca = setup['ca']
- self.process_initial_fam_events()
+ self.fam_thread = threading.Thread(target=self._file_monitor_thread)
+ self.fam_thread.start()
- def process_initial_fam_events(self):
- events = False
+ def _file_monitor_thread(self):
+ famfd = self.fam.fileno()
while True:
try:
- rsockinfo = select.select([self.fam.fileno()], [], [], 15)[0]
- if not rsockinfo:
- if events:
- break
- else:
- logger.error("Hit event timeout without getting "
- "any events; GAMIN/FAM problem?")
- continue
- events = True
- i = 0
- while self.fam.Service() or i < 10:
- i += 1
- time.sleep(0.1)
- except socket.error:
+ if famfd:
+ rsockinfo = select.select([famfd], [], [])
+ else:
+ while not self.fam.pending():
+ time.sleep(15)
+ self.fam.handle_event_set(self.lock)
+ except:
continue
@exposed
diff --git a/src/sbin/bcfg2-info b/src/sbin/bcfg2-info
index 0d166bfdc..2b079a4e1 100755
--- a/src/sbin/bcfg2-info
+++ b/src/sbin/bcfg2-info
@@ -37,10 +37,7 @@ class infoCore(cmd.Cmd, Bcfg2.Server.Core.Core):
raise SystemExit(1)
self.prompt = '> '
self.cont = True
- i = 0
- while self.fam.Service() or i < 10:
- i += 1
- time.sleep(0.1)
+ self.fam.handle_events_in_interval(10)
def do_loop(self):
self.cont = True