From 714458fd079343ac9698436d3c7fcc4203b7d41f Mon Sep 17 00:00:00 2001 From: Narayan Desai Date: Thu, 1 Dec 2005 21:40:48 +0000 Subject: change to event coalescing git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@1605 ce84e21b-d406-0410-9b95-82705330c041 --- src/lib/Server/Core.py | 99 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 28 deletions(-) (limited to 'src/lib/Server/Core.py') diff --git a/src/lib/Server/Core.py b/src/lib/Server/Core.py index 979d86f5f..a2100ea53 100644 --- a/src/lib/Server/Core.py +++ b/src/lib/Server/Core.py @@ -65,12 +65,37 @@ class FamFam(object): def Service(self): '''Handle all fam work''' count = 0 - t1 = time() - while self.fm.pending(): - count += 1 - self.HandleEvent() - t2 = time() - syslog(LOG_INFO, "Processed %s fam events in %s seconds" % (count, (t2 - t1))) + collapsed = 0 + rawevents = [] + start = time() + now = time() + while (time() - now) < 0.10: + if self.fm.pending(): + while self.fm.pending(): + count += 1 + rawevents.append(self.fm.nextEvent()) + now = time() + unique = [] + bookkeeping = [] + for event in rawevents: + if event.code2str() != 'changed': + # process all non-change events + unique.append(event) + else: + if (event.filename, event.requestID) not in bookkeeping: + bookkeeping.append((event.filename, event.requestID)) + unique.append(event) + else: + collapsed += 1 + for event in unique: + if self.users.has_key(event.requestID): + try: + self.users[event.requestID].HandleEvent(event) + except: + log_failure("handling event for file %s" % (event.filename)) + end = time() + syslog(LOG_INFO, "Processed %s fam events in %03.03f seconds. %s coalesced" % + (count, (end - start), collapsed)) class GaminEvent(object): '''This class provides an event analogous to python-fam events based on gamin sources''' @@ -93,20 +118,15 @@ class GaminFam(object): self.mon = WatchMonitor() self.handles = {} self.counter = 0 + self.events = [] def fileno(self): '''return fam file handle number''' return self.mon.get_fd() - def dispatch(self, path, action, request_id): - '''find the right object to dispatch to''' - if self.handles.has_key(request_id): - evt = GaminEvent(request_id, path, action) - #print "e %s %s to obj %s handle :%s:" % (evt.code2str(), evt.filename, - # self.handles[request_id], evt.requestID) - self.handles[request_id].HandleEvent(evt) - else: - print "got crazy event for nonexistant handle %s" % request_id + def queue(self, path, action, request_id): + '''queue up the event for later handling''' + self.events.append(GaminEvent(request_id, path, action)) def AddMonitor(self, path, obj): '''add a monitor to path, installing a callback to obj.HandleEvent''' @@ -114,26 +134,49 @@ class GaminFam(object): self.counter += 1 mode = stat(path)[ST_MODE] if S_ISDIR(mode): - self.mon.watch_directory(path, self.dispatch, handle) + self.mon.watch_directory(path, self.queue, handle) #print "adding callback for directory %s to %s, handle :%s:" % ( path, obj, handle.requestID()) else: - self.mon.watch_file(path, self.dispatch, handle) + self.mon.watch_file(path, self.queue, handle) self.handles[handle] = obj return handle - def HandleEvent(self): - '''Call Gamin, which will call the callback''' - self.mon.handle_one_event() - def Service(self): - '''Handle any pending Gamin work''' + '''Handle all gamin work''' count = 0 - t1 = time() - while self.mon.event_pending(): - count += 1 - self.mon.handle_one_event() - t2 = time() - syslog(LOG_INFO, "Processed %s gamin events in %s seconds" % (count, (t2 - t1))) + collapsed = 0 + start = time() + now = time() + while (time() - now) < 0.10: + if self.mon.event_pending(): + while self.mon.event_pending(): + count += 1 + self.mon.handle_one_event() + now = time() + unique = [] + bookkeeping = [] + for event in self.events: + if event.code2str() != 'changed': + # process all non-change events + unique.append(event) + else: + if (event.filename, event.requestID) not in bookkeeping: + bookkeeping.append((event.filename, event.requestID)) + unique.append(event) + else: + collapsed += 1 + self.events = [] + for event in unique: + if self.handles.has_key(event.requestID): + try: + self.handles[event.requestID].HandleEvent(event) + except: + log_failure("handling of gamin event for %s" % (event.filename)) + else: + syslog(LOG_INFO, "Got event for unexpected id %s, file %s" % (event.requestID, event.filename)) + end = time() + syslog(LOG_INFO, "Processed %s gamin events in %03.03f seconds. %s collapsed" % + (count, (end - start), collapsed)) try: from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, GAMChanged, GAMDeleted -- cgit v1.2.3-1-g7c22