From 4456682c4c15a288c5a3cb6ee264500eb0bb196c Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Sun, 29 Jun 2008 17:27:37 +0000 Subject: Centralize select.poll() event handling in MergeTask._schedule(). This will allow the parent process to handle output of multiple child processes running in parllel. svn path=/main/trunk/; revision=10851 --- pym/_emerge/__init__.py | 86 +++++++++++++++++++++++++++---------------------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 84b5769a3..8115bdf52 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -20,6 +20,8 @@ try: except KeyboardInterrupt: sys.exit(1) +import array +import select import gc import os, stat import platform @@ -1527,7 +1529,7 @@ class EbuildBuild(Task): """ TODO: Support asynchronous execution, to implement parallel builds. """ - __slots__ = ("pkg", "settings") + __slots__ = ("pkg", "register", "schedule", "settings", "unregister") _phases = ("setup", "unpack", "compile", "test", "install") @@ -1562,9 +1564,10 @@ class EbuildBuild(Task): for mydo in self._phases: ebuild_phase = EbuildPhase(fd_pipes=fd_pipes, - pkg=self.pkg, phase=mydo, settings=settings) + pkg=self.pkg, phase=mydo, register=self.register, + settings=settings, unregister=self.unregister) ebuild_phase.start() - ebuild_phase._output_handler() + self.schedule() retval = ebuild_phase.wait() portage._post_phase_userpriv_perms(settings) @@ -1580,7 +1583,8 @@ class EbuildBuild(Task): class EbuildPhase(SlotObject): - __slots__ = ("fd_pipes", "phase", "pkg", "settings", + __slots__ = ("fd_pipes", "phase", "pkg", + "register", "settings", "unregister", "pid", "returncode", "files") _file_names = ("log", "stdout", "ebuild") @@ -1662,40 +1666,25 @@ class EbuildPhase(SlotObject): files["log"] = open(logfile, 'a') files["stdout"] = os.fdopen(os.dup(fd_pipes_orig[1]), 'w') files["ebuild"] = os.fdopen(master_fd, 'r') + self.register(files["ebuild"].fileno(), + select.POLLIN, self._output_handler) - def _output_handler(self): - log_file = self.files.get("log") - if log_file is None: - return - ebuild_file = self.files["ebuild"] - stdout_file = self.files["stdout"] - iwtd = [ebuild_file] - owtd = [] - ewtd = [] - import array, select - buffsize = self._bufsize - eof = False - while not eof: - events = select.select(iwtd, owtd, ewtd) - for f in events[0]: - # Use non-blocking mode to prevent read - # calls from blocking indefinitely. - buf = array.array('B') - try: - buf.fromfile(f, buffsize) - except EOFError: - pass - if not buf: - eof = True - break - if f is ebuild_file: - buf.tofile(stdout_file) - stdout_file.flush() - buf.tofile(log_file) - log_file.flush() - log_file.close() - stdout_file.close() - ebuild_file.close() + def _output_handler(self, fd, event): + files = self.files + buf = array.array('B') + try: + buf.fromfile(files["ebuild"], self._bufsize) + except EOFError: + pass + if buf: + buf.tofile(files["stdout"]) + files["stdout"].flush() + buf.tofile(files["log"]) + files["log"].flush() + else: + self.unregister(files["ebuild"].fileno()) + for f in files.values(): + f.close() def wait(self): pid = self.pid @@ -6357,6 +6346,8 @@ class MergeTask(object): clone=trees[root]["vartree"].settings) self.curval = 0 self._spawned_pids = [] + self._poll_event_handlers = {} + self._poll = select.poll() class _pkg_failure(portage.exception.PortageException): """ @@ -6521,6 +6512,19 @@ class MergeTask(object): pass spawned_pids.remove(pid) + def _register(self, f, eventmask, handler): + self._poll_event_handlers[f] = handler + self._poll.register(f, eventmask) + + def _unregister(self, f): + self._poll.unregister(f) + del self._poll_event_handlers[f] + + def _schedule(self): + while self._poll_event_handlers: + for f, event in self._poll.poll(): + self._poll_event_handlers[f](f, event) + def _merge(self): mylist = self._mergelist favorites = self._favorites @@ -6759,7 +6763,9 @@ class MergeTask(object): (mergecount, len(mymergelist), pkg_key) emergelog(xterm_titles, msg, short_msg=short_msg) - build = EbuildBuild(pkg=pkg, settings=pkgsettings) + build = EbuildBuild(pkg=pkg, register=self._register, + schedule=self._schedule, settings=pkgsettings, + unregister=self._unregister) retval = build.execute() if retval != os.EX_OK: raise self._pkg_failure(retval) @@ -6794,7 +6800,9 @@ class MergeTask(object): (mergecount, len(mymergelist), pkg_key) emergelog(xterm_titles, msg, short_msg=short_msg) - build = EbuildBuild(pkg=pkg, settings=pkgsettings) + build = EbuildBuild(pkg=pkg, register=self._register, + schedule=self._schedule, settings=pkgsettings, + unregister=self._unregister) retval = build.execute() if retval != os.EX_OK: raise self._pkg_failure(retval) -- cgit v1.2.3-1-g7c22