summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorNarayan Desai <desai@mcs.anl.gov>2005-12-01 21:40:48 +0000
committerNarayan Desai <desai@mcs.anl.gov>2005-12-01 21:40:48 +0000
commit714458fd079343ac9698436d3c7fcc4203b7d41f (patch)
tree0a6a9b8d69851f6a2276620ca5be74c2a3db36a0 /src
parentf2d1d3c9136bf5b2bee71fa2678281c989bec620 (diff)
downloadbcfg2-714458fd079343ac9698436d3c7fcc4203b7d41f.tar.gz
bcfg2-714458fd079343ac9698436d3c7fcc4203b7d41f.tar.bz2
bcfg2-714458fd079343ac9698436d3c7fcc4203b7d41f.zip
change to event coalescing
git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@1605 ce84e21b-d406-0410-9b95-82705330c041
Diffstat (limited to 'src')
-rw-r--r--src/lib/Server/Core.py99
-rw-r--r--src/lib/Server/Plugin.py15
-rw-r--r--src/sbin/Bcfg2debug5
3 files changed, 77 insertions, 42 deletions
diff --git a/src/lib/Server/Core.py b/src/lib/Server/Core.py
index 979d86f5f..a2100ea53 100644
--- a/src/lib/Server/Core.py
+++ b/src/lib/Server/Core.py
@@ -65,12 +65,37 @@ class FamFam(object):
def Service(self):
'''Handle all fam work'''
count = 0
- t1 = time()
- while self.fm.pending():
- count += 1
- self.HandleEvent()
- t2 = time()
- syslog(LOG_INFO, "Processed %s fam events in %s seconds" % (count, (t2 - t1)))
+ collapsed = 0
+ rawevents = []
+ start = time()
+ now = time()
+ while (time() - now) < 0.10:
+ if self.fm.pending():
+ while self.fm.pending():
+ count += 1
+ rawevents.append(self.fm.nextEvent())
+ now = time()
+ unique = []
+ bookkeeping = []
+ for event in rawevents:
+ if event.code2str() != 'changed':
+ # process all non-change events
+ unique.append(event)
+ else:
+ if (event.filename, event.requestID) not in bookkeeping:
+ bookkeeping.append((event.filename, event.requestID))
+ unique.append(event)
+ else:
+ collapsed += 1
+ for event in unique:
+ if self.users.has_key(event.requestID):
+ try:
+ self.users[event.requestID].HandleEvent(event)
+ except:
+ log_failure("handling event for file %s" % (event.filename))
+ end = time()
+ syslog(LOG_INFO, "Processed %s fam events in %03.03f seconds. %s coalesced" %
+ (count, (end - start), collapsed))
class GaminEvent(object):
'''This class provides an event analogous to python-fam events based on gamin sources'''
@@ -93,20 +118,15 @@ class GaminFam(object):
self.mon = WatchMonitor()
self.handles = {}
self.counter = 0
+ self.events = []
def fileno(self):
'''return fam file handle number'''
return self.mon.get_fd()
- def dispatch(self, path, action, request_id):
- '''find the right object to dispatch to'''
- if self.handles.has_key(request_id):
- evt = GaminEvent(request_id, path, action)
- #print "e %s %s to obj %s handle :%s:" % (evt.code2str(), evt.filename,
- # self.handles[request_id], evt.requestID)
- self.handles[request_id].HandleEvent(evt)
- else:
- print "got crazy event for nonexistant handle %s" % request_id
+ def queue(self, path, action, request_id):
+ '''queue up the event for later handling'''
+ self.events.append(GaminEvent(request_id, path, action))
def AddMonitor(self, path, obj):
'''add a monitor to path, installing a callback to obj.HandleEvent'''
@@ -114,26 +134,49 @@ class GaminFam(object):
self.counter += 1
mode = stat(path)[ST_MODE]
if S_ISDIR(mode):
- self.mon.watch_directory(path, self.dispatch, handle)
+ self.mon.watch_directory(path, self.queue, handle)
#print "adding callback for directory %s to %s, handle :%s:" % ( path, obj, handle.requestID())
else:
- self.mon.watch_file(path, self.dispatch, handle)
+ self.mon.watch_file(path, self.queue, handle)
self.handles[handle] = obj
return handle
- def HandleEvent(self):
- '''Call Gamin, which will call the callback'''
- self.mon.handle_one_event()
-
def Service(self):
- '''Handle any pending Gamin work'''
+ '''Handle all gamin work'''
count = 0
- t1 = time()
- while self.mon.event_pending():
- count += 1
- self.mon.handle_one_event()
- t2 = time()
- syslog(LOG_INFO, "Processed %s gamin events in %s seconds" % (count, (t2 - t1)))
+ collapsed = 0
+ start = time()
+ now = time()
+ while (time() - now) < 0.10:
+ if self.mon.event_pending():
+ while self.mon.event_pending():
+ count += 1
+ self.mon.handle_one_event()
+ now = time()
+ unique = []
+ bookkeeping = []
+ for event in self.events:
+ if event.code2str() != 'changed':
+ # process all non-change events
+ unique.append(event)
+ else:
+ if (event.filename, event.requestID) not in bookkeeping:
+ bookkeeping.append((event.filename, event.requestID))
+ unique.append(event)
+ else:
+ collapsed += 1
+ self.events = []
+ for event in unique:
+ if self.handles.has_key(event.requestID):
+ try:
+ self.handles[event.requestID].HandleEvent(event)
+ except:
+ log_failure("handling of gamin event for %s" % (event.filename))
+ else:
+ syslog(LOG_INFO, "Got event for unexpected id %s, file %s" % (event.requestID, event.filename))
+ end = time()
+ syslog(LOG_INFO, "Processed %s gamin events in %03.03f seconds. %s collapsed" %
+ (count, (end - start), collapsed))
try:
from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, GAMChanged, GAMDeleted
diff --git a/src/lib/Server/Plugin.py b/src/lib/Server/Plugin.py
index d7c72a357..72052e7ac 100644
--- a/src/lib/Server/Plugin.py
+++ b/src/lib/Server/Plugin.py
@@ -77,17 +77,12 @@ class FileBacked(object):
except OSError:
syslog(LOG_ERR, "Failed to stat file %s" % (self.name))
- if self.mtime > oldmtime:
- try:
- # if self.readonce == 0:
- # self.readonce = 1
- # else:
- # syslog(LOG_INFO, "Updated file %s" % (self.name))
- self.data = file(self.name).read()
- except IOError:
- syslog(LOG_ERR, "Failed to read file %s" % (self.name))
+ try:
+ self.data = file(self.name).read()
self.Index()
-
+ except IOError:
+ syslog(LOG_ERR, "Failed to read file %s" % (self.name))
+
def Index(self):
'''Update local data structures based on current file state'''
pass
diff --git a/src/sbin/Bcfg2debug b/src/sbin/Bcfg2debug
index 522baff0a..95508a5b4 100644
--- a/src/sbin/Bcfg2debug
+++ b/src/sbin/Bcfg2debug
@@ -64,10 +64,7 @@ if __name__ == '__main__':
for instance in generator.__provides__[key].keys():
print " ", key, instance
elif cmd[0] == 'update':
- while core.fam.fm.pending():
- while core.fam.fm.pending():
- core.fam.HandleEvent()
- sleep(0.5)
+ core.fam.Service()
else:
print "Unknown command %s" % cmd[0]
cmd = get_input()