diff options
-rw-r--r-- | pym/_emerge/__init__.py | 101 |
1 files changed, 90 insertions, 11 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 71df47dd5..44f7e53e0 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1455,7 +1455,7 @@ class EbuildFetchPretend(SlotObject): return retval class AsynchronousTask(SlotObject): - __slots__ = ("cancelled", "returncode") + __slots__ = ("cancelled", "returncode") + ("_exit_listeners",) def start(self): """ @@ -1470,11 +1470,91 @@ class AsynchronousTask(SlotObject): return self.returncode def wait(self): + self._wait_hook() return self.returncode def cancel(self): pass + def addExitListener(self, f): + """ + The function will be called with one argument, a reference to self. + """ + if self._exit_listeners is None: + self._exit_listeners = [] + self._exit_listeners.append(f) + + def removeExitListener(self, f): + self._exit_listeners.remove(f) + + def _wait_hook(self): + """ + Call this method before returning from wait. This hook is + used to trigger exit listeners when the returncode first + becomes available. + """ + if self._exit_listeners is not None: + for f in self._exit_listeners: + f(self) + self._exit_listeners = None + +class CompositeTask(AsynchronousTask): + """ + A collection of tasks that executes sequentially. Each task + must have a _set_returncode() method that can be wrapped as + a means to trigger movement from one task to the next. + """ + + __slots__ = ("scheduler",) + ("_current_task", "_task_queue") + + def __init__(self, **kwargs): + AsynchronousTask.__init__(self, **kwargs) + self._task_queue = deque() + + def add(self, task): + self._task_queue.append(task) + + def start(self): + self._start_next_task() + + def isAlive(self): + return self._current_task is not None + + def cancel(self): + self._task_queue.clear() + self.cancelled = True + self._current_task.cancel() + + def wait(self): + + while True: + task = self._current_task + if task is None: + break + self.scheduler.schedule(task.reg_id) + task.wait() + + self._wait_hook() + return self.returncode + + def _start_next_task(self): + self._current_task = self._task_queue.popleft() + task = self._current_task + task.addExitListener(self._task_exit_handler) + task.start() + + def _task_exit_handler(self, task): + if task is not self._current_task: + raise AssertionError("Unrecognized task: %s" % (task,)) + + if self._task_queue and \ + task.returncode == os.EX_OK: + self._start_next_task() + return + + self._current_task = None + self.returncode = task.returncode + class SubProcess(AsynchronousTask): __slots__ = ("pid",) @@ -1503,6 +1583,7 @@ class SubProcess(AsynchronousTask): if self.returncode is not None: return self.returncode self._set_returncode(os.waitpid(self.pid, 0)) + self._wait_hook() return self.returncode def _set_returncode(self, wait_retval): @@ -1899,19 +1980,17 @@ class EbuildExecuter(SlotObject): 2 : sys.stderr.fileno(), } - for mydo in self._phases: - ebuild_phase = EbuildPhase(fd_pipes=fd_pipes, - pkg=self.pkg, phase=mydo, scheduler=self.scheduler, - settings=settings, tree=tree) + composite_task = CompositeTask(scheduler=scheduler) - ebuild_phase.start() - self.scheduler.schedule(ebuild_phase.reg_id) - retval = ebuild_phase.wait() + for phase in self._phases: + composite_task.add(EbuildPhase(fd_pipes=fd_pipes, + pkg=self.pkg, phase=phase, scheduler=self.scheduler, + settings=settings, tree=tree)) - if retval != os.EX_OK: - return retval + composite_task.start() + retval = composite_task.wait() - return os.EX_OK + return retval class EbuildPhase(SubProcess): |