From 714458fd079343ac9698436d3c7fcc4203b7d41f Mon Sep 17 00:00:00 2001 From: Narayan Desai Date: Thu, 1 Dec 2005 21:40:48 +0000 Subject: change to event coalescing git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@1605 ce84e21b-d406-0410-9b95-82705330c041 --- src/lib/Server/Core.py | 99 ++++++++++++++++++++++++++++++++++-------------- src/lib/Server/Plugin.py | 15 +++----- src/sbin/Bcfg2debug | 5 +-- 3 files changed, 77 insertions(+), 42 deletions(-) (limited to 'src') diff --git a/src/lib/Server/Core.py b/src/lib/Server/Core.py index 979d86f5f..a2100ea53 100644 --- a/src/lib/Server/Core.py +++ b/src/lib/Server/Core.py @@ -65,12 +65,37 @@ class FamFam(object): def Service(self): '''Handle all fam work''' count = 0 - t1 = time() - while self.fm.pending(): - count += 1 - self.HandleEvent() - t2 = time() - syslog(LOG_INFO, "Processed %s fam events in %s seconds" % (count, (t2 - t1))) + collapsed = 0 + rawevents = [] + start = time() + now = time() + while (time() - now) < 0.10: + if self.fm.pending(): + while self.fm.pending(): + count += 1 + rawevents.append(self.fm.nextEvent()) + now = time() + unique = [] + bookkeeping = [] + for event in rawevents: + if event.code2str() != 'changed': + # process all non-change events + unique.append(event) + else: + if (event.filename, event.requestID) not in bookkeeping: + bookkeeping.append((event.filename, event.requestID)) + unique.append(event) + else: + collapsed += 1 + for event in unique: + if self.users.has_key(event.requestID): + try: + self.users[event.requestID].HandleEvent(event) + except: + log_failure("handling event for file %s" % (event.filename)) + end = time() + syslog(LOG_INFO, "Processed %s fam events in %03.03f seconds. %s coalesced" % + (count, (end - start), collapsed)) class GaminEvent(object): '''This class provides an event analogous to python-fam events based on gamin sources''' @@ -93,20 +118,15 @@ class GaminFam(object): self.mon = WatchMonitor() self.handles = {} self.counter = 0 + self.events = [] def fileno(self): '''return fam file handle number''' return self.mon.get_fd() - def dispatch(self, path, action, request_id): - '''find the right object to dispatch to''' - if self.handles.has_key(request_id): - evt = GaminEvent(request_id, path, action) - #print "e %s %s to obj %s handle :%s:" % (evt.code2str(), evt.filename, - # self.handles[request_id], evt.requestID) - self.handles[request_id].HandleEvent(evt) - else: - print "got crazy event for nonexistant handle %s" % request_id + def queue(self, path, action, request_id): + '''queue up the event for later handling''' + self.events.append(GaminEvent(request_id, path, action)) def AddMonitor(self, path, obj): '''add a monitor to path, installing a callback to obj.HandleEvent''' @@ -114,26 +134,49 @@ class GaminFam(object): self.counter += 1 mode = stat(path)[ST_MODE] if S_ISDIR(mode): - self.mon.watch_directory(path, self.dispatch, handle) + self.mon.watch_directory(path, self.queue, handle) #print "adding callback for directory %s to %s, handle :%s:" % ( path, obj, handle.requestID()) else: - self.mon.watch_file(path, self.dispatch, handle) + self.mon.watch_file(path, self.queue, handle) self.handles[handle] = obj return handle - def HandleEvent(self): - '''Call Gamin, which will call the callback''' - self.mon.handle_one_event() - def Service(self): - '''Handle any pending Gamin work''' + '''Handle all gamin work''' count = 0 - t1 = time() - while self.mon.event_pending(): - count += 1 - self.mon.handle_one_event() - t2 = time() - syslog(LOG_INFO, "Processed %s gamin events in %s seconds" % (count, (t2 - t1))) + collapsed = 0 + start = time() + now = time() + while (time() - now) < 0.10: + if self.mon.event_pending(): + while self.mon.event_pending(): + count += 1 + self.mon.handle_one_event() + now = time() + unique = [] + bookkeeping = [] + for event in self.events: + if event.code2str() != 'changed': + # process all non-change events + unique.append(event) + else: + if (event.filename, event.requestID) not in bookkeeping: + bookkeeping.append((event.filename, event.requestID)) + unique.append(event) + else: + collapsed += 1 + self.events = [] + for event in unique: + if self.handles.has_key(event.requestID): + try: + self.handles[event.requestID].HandleEvent(event) + except: + log_failure("handling of gamin event for %s" % (event.filename)) + else: + syslog(LOG_INFO, "Got event for unexpected id %s, file %s" % (event.requestID, event.filename)) + end = time() + syslog(LOG_INFO, "Processed %s gamin events in %03.03f seconds. %s collapsed" % + (count, (end - start), collapsed)) try: from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, GAMChanged, GAMDeleted diff --git a/src/lib/Server/Plugin.py b/src/lib/Server/Plugin.py index d7c72a357..72052e7ac 100644 --- a/src/lib/Server/Plugin.py +++ b/src/lib/Server/Plugin.py @@ -77,17 +77,12 @@ class FileBacked(object): except OSError: syslog(LOG_ERR, "Failed to stat file %s" % (self.name)) - if self.mtime > oldmtime: - try: - # if self.readonce == 0: - # self.readonce = 1 - # else: - # syslog(LOG_INFO, "Updated file %s" % (self.name)) - self.data = file(self.name).read() - except IOError: - syslog(LOG_ERR, "Failed to read file %s" % (self.name)) + try: + self.data = file(self.name).read() self.Index() - + except IOError: + syslog(LOG_ERR, "Failed to read file %s" % (self.name)) + def Index(self): '''Update local data structures based on current file state''' pass diff --git a/src/sbin/Bcfg2debug b/src/sbin/Bcfg2debug index 522baff0a..95508a5b4 100644 --- a/src/sbin/Bcfg2debug +++ b/src/sbin/Bcfg2debug @@ -64,10 +64,7 @@ if __name__ == '__main__': for instance in generator.__provides__[key].keys(): print " ", key, instance elif cmd[0] == 'update': - while core.fam.fm.pending(): - while core.fam.fm.pending(): - core.fam.HandleEvent() - sleep(0.5) + core.fam.Service() else: print "Unknown command %s" % cmd[0] cmd = get_input() -- cgit v1.2.3-1-g7c22