summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2008-07-05 12:21:43 +0000
committerZac Medico <zmedico@gentoo.org>2008-07-05 12:21:43 +0000
commitd85c316ae2277076845c589b9a84656cfa967c98 (patch)
treef570d38b1bb063ede8753426f797c1b71c213954
parent6cc47b222385dc6cfce066f9c94c4fd6ecbe1c65 (diff)
downloadportage-d85c316ae2277076845c589b9a84656cfa967c98.tar.gz
portage-d85c316ae2277076845c589b9a84656cfa967c98.tar.bz2
portage-d85c316ae2277076845c589b9a84656cfa967c98.zip
Add a new CompositeTask class which can be used to combine separate
AsynchronousTask instances into a single instance. The CompositeTask instance used task exit listeners as a means to (asynchronously) trigger progression from one subtask to the next. This technique is used to group together all the ebuild phases executed by EbuildExecuter, and should be useful for grouping many more sets of tasks into similar composite tasks. svn path=/main/trunk/; revision=10940
-rw-r--r--pym/_emerge/__init__.py101
1 files changed, 90 insertions, 11 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 71df47dd5..44f7e53e0 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -1455,7 +1455,7 @@ class EbuildFetchPretend(SlotObject):
return retval
class AsynchronousTask(SlotObject):
- __slots__ = ("cancelled", "returncode")
+ __slots__ = ("cancelled", "returncode") + ("_exit_listeners",)
def start(self):
"""
@@ -1470,11 +1470,91 @@ class AsynchronousTask(SlotObject):
return self.returncode
def wait(self):
+ self._wait_hook()
return self.returncode
def cancel(self):
pass
+ def addExitListener(self, f):
+ """
+ The function will be called with one argument, a reference to self.
+ """
+ if self._exit_listeners is None:
+ self._exit_listeners = []
+ self._exit_listeners.append(f)
+
+ def removeExitListener(self, f):
+ self._exit_listeners.remove(f)
+
+ def _wait_hook(self):
+ """
+ Call this method before returning from wait. This hook is
+ used to trigger exit listeners when the returncode first
+ becomes available.
+ """
+ if self._exit_listeners is not None:
+ for f in self._exit_listeners:
+ f(self)
+ self._exit_listeners = None
+
+class CompositeTask(AsynchronousTask):
+ """
+ A collection of tasks that executes sequentially. Each task
+ must have a _set_returncode() method that can be wrapped as
+ a means to trigger movement from one task to the next.
+ """
+
+ __slots__ = ("scheduler",) + ("_current_task", "_task_queue")
+
+ def __init__(self, **kwargs):
+ AsynchronousTask.__init__(self, **kwargs)
+ self._task_queue = deque()
+
+ def add(self, task):
+ self._task_queue.append(task)
+
+ def start(self):
+ self._start_next_task()
+
+ def isAlive(self):
+ return self._current_task is not None
+
+ def cancel(self):
+ self._task_queue.clear()
+ self.cancelled = True
+ self._current_task.cancel()
+
+ def wait(self):
+
+ while True:
+ task = self._current_task
+ if task is None:
+ break
+ self.scheduler.schedule(task.reg_id)
+ task.wait()
+
+ self._wait_hook()
+ return self.returncode
+
+ def _start_next_task(self):
+ self._current_task = self._task_queue.popleft()
+ task = self._current_task
+ task.addExitListener(self._task_exit_handler)
+ task.start()
+
+ def _task_exit_handler(self, task):
+ if task is not self._current_task:
+ raise AssertionError("Unrecognized task: %s" % (task,))
+
+ if self._task_queue and \
+ task.returncode == os.EX_OK:
+ self._start_next_task()
+ return
+
+ self._current_task = None
+ self.returncode = task.returncode
+
class SubProcess(AsynchronousTask):
__slots__ = ("pid",)
@@ -1503,6 +1583,7 @@ class SubProcess(AsynchronousTask):
if self.returncode is not None:
return self.returncode
self._set_returncode(os.waitpid(self.pid, 0))
+ self._wait_hook()
return self.returncode
def _set_returncode(self, wait_retval):
@@ -1899,19 +1980,17 @@ class EbuildExecuter(SlotObject):
2 : sys.stderr.fileno(),
}
- for mydo in self._phases:
- ebuild_phase = EbuildPhase(fd_pipes=fd_pipes,
- pkg=self.pkg, phase=mydo, scheduler=self.scheduler,
- settings=settings, tree=tree)
+ composite_task = CompositeTask(scheduler=scheduler)
- ebuild_phase.start()
- self.scheduler.schedule(ebuild_phase.reg_id)
- retval = ebuild_phase.wait()
+ for phase in self._phases:
+ composite_task.add(EbuildPhase(fd_pipes=fd_pipes,
+ pkg=self.pkg, phase=phase, scheduler=self.scheduler,
+ settings=settings, tree=tree))
- if retval != os.EX_OK:
- return retval
+ composite_task.start()
+ retval = composite_task.wait()
- return os.EX_OK
+ return retval
class EbuildPhase(SubProcess):