summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pym/_emerge/__init__.py187
1 files changed, 147 insertions, 40 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 93684885e..0037f988d 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -1459,6 +1459,9 @@ class AsynchronousTask(SlotObject):
Subclasses override _wait() and _poll() so that calls
to public methods can be wrapped for implementing
hooks such as exit listener notification.
+
+ Sublasses should call self.wait() to notify exit listeners after
+ the task is complete and self.returncode has been set.
"""
__slots__ = ("cancelled", "returncode") + ("_exit_listeners",)
@@ -1528,6 +1531,29 @@ class CompositeTask(AsynchronousTask):
if self._current_task is not None:
self._current_task.cancel()
+ def _poll(self):
+ """
+ This does a loop calling self._current_task.poll()
+ repeatedly as long as the value of self._current_task
+ keeps changing. It calls poll() a maximum of one time
+ for a given self._current_task instance. This is useful
+ since calling poll() on a task can trigger advance to
+ the next task could eventually lead to the returncode
+ being set in cases when polling only a single task would
+ not have the same effect.
+ """
+
+ prev = None
+ while True:
+ task = self._current_task
+ if task is None or task is prev:
+ # don't poll the same task more than once
+ break
+ task.poll()
+ prev = task
+
+ return self.returncode
+
def _wait(self):
while True:
@@ -1626,6 +1652,7 @@ class TaskSequence(CompositeTask):
self._start_next_task()
else:
self._final_exit(task)
+ self.wait()
class SubProcess(AsynchronousTask):
__slots__ = ("pid",)
@@ -1997,6 +2024,7 @@ class EbuildBuild(CompositeTask):
pkg = self.pkg
eerror("!!! Fetch for %s failed, continuing..." % pkg.cpv,
phase="unpack", key=pkg.cpv)
+ self.wait()
def _unlock_builddir(self):
portage.elog.elog_process(self.pkg.cpv, self.settings)
@@ -2012,6 +2040,7 @@ class EbuildBuild(CompositeTask):
if not buildpkg:
self._final_exit(build)
+ self.wait()
return
packager = EbuildBinpkg(pkg=self.pkg,
@@ -2039,11 +2068,13 @@ class EbuildBuild(CompositeTask):
if self._final_exit(packager) != os.EX_OK or \
self.opts.buildpkgonly:
self._unlock_builddir()
+ self.wait()
def _clean_exit(self, clean_phase):
if self._final_exit(clean_phase) != os.EX_OK or \
self.opts.buildpkgonly:
self._unlock_builddir()
+ self.wait()
def install(self):
"""
@@ -2480,6 +2511,7 @@ class Binpkg(CompositeTask):
self._fetched_pkg = True
if self.opts.fetchonly:
self._final_exit(fetcher)
+ self.wait()
return
elif self._default_exit(fetcher) != os.EX_OK:
return
@@ -2610,6 +2642,7 @@ class Binpkg(CompositeTask):
self._unlock_builddir()
writemsg("!!! Error Extracting '%s'\n" % self._pkg_path,
noiselevel=-1)
+ self.wait()
def _unlock_builddir(self):
portage.elog.elog_process(self.pkg.cpv, self.settings)
@@ -2840,6 +2873,7 @@ class MergeListItem(CompositeTask):
self._install_task = build
self._start_task(build, self._ebuild_exit)
+ self.wait()
return
elif pkg.type_name == "binary":
@@ -2852,12 +2886,14 @@ class MergeListItem(CompositeTask):
self._install_task = binpkg
self._start_task(binpkg, self._final_exit)
+ self.wait()
return
def _ebuild_exit(self, build):
if self._final_exit(build) != os.EX_OK:
if self.build_opts.fetchonly:
self.failed_fetches.append(self.pkg.cpv)
+ self.wait()
def _poll(self):
self._install_task.poll()
@@ -7438,13 +7474,19 @@ class SequentialTaskQueue(SlotObject):
self._task_queue.append(task)
def schedule(self):
+
+ if not self:
+ return False
+
task_queue = self._task_queue
running_tasks = self.running_tasks
max_jobs = self.max_jobs
state_changed = False
for task in list(running_tasks):
- if not task.registered and task.poll() is not None:
+ if hasattr(task, "registered") and task.registered:
+ continue
+ if task.poll() is not None:
running_tasks.remove(task)
state_changed = True
@@ -7465,6 +7507,12 @@ class SequentialTaskQueue(SlotObject):
task = running_tasks.pop()
task.cancel()
+ def __nonzero__(self):
+ return bool(self._task_queue or self.running_tasks)
+
+ def __len__(self):
+ return len(self._task_queue) + len(self.running_tasks)
+
class Scheduler(object):
_opts_ignore_blockers = \
@@ -7480,6 +7528,9 @@ class Scheduler(object):
class _iface_class(SlotObject):
__slots__ = ("register", "schedule")
+ _task_queues_class = slot_dict_class(
+ ("build", "extract", "merge", "prefetch",), prefix="")
+
class _build_opts_class(SlotObject):
__slots__ = ("buildpkg", "buildpkgonly",
"fetch_all_uri", "fetchonly", "pretend")
@@ -7539,12 +7590,11 @@ class Scheduler(object):
except AttributeError:
self._poll = PollSelectAdapter()
- self._task_queues = slot_dict_class(("build", "prefetch"), prefix="")
+ self._task_queues = self._task_queues_class()
for k in self._task_queues.allowed_keys:
setattr(self._task_queues, k, SequentialTaskQueue())
self._add_task = self._task_queues.prefetch.add
- self._schedule_tasks = self._task_queues.prefetch.schedule
self._prefetchers = weakref.WeakValueDictionary()
self._pkg_queue = deque()
self._failed_pkgs = []
@@ -7554,6 +7604,8 @@ class Scheduler(object):
if isinstance(x, Package) and x.operation == "merge"])
self._pkg_count = self._pkg_count_class(
curval=0, maxval=merge_count)
+ self._max_jobs = 1
+ self._jobs = 0
features = self.settings.features
if "parallel-fetch" in features and \
@@ -7852,48 +7904,40 @@ class Scheduler(object):
elif isinstance(pkg, Blocker):
pass
- def _choose_pkg(self):
- return self._pkg_queue.popleft()
-
- def _main_loop(self):
-
- pkg_queue = self._pkg_queue
-
- while pkg_queue:
- pkg = self._choose_pkg()
-
- if not pkg.installed:
- self._pkg_count.curval += 1
-
- task = self._task(pkg)
- task.start()
- retval = task.wait()
+ def _merge_exit(self, merge):
+ self._jobs -= 1
+ pkg = merge.merge.pkg
+ if merge.returncode != os.EX_OK:
+ self._failed_pkgs.append((pkg, retval))
+ return
- if retval == os.EX_OK:
- task = PackageMerge(merge=task)
- task.start()
- retval = task.wait()
+ if pkg.installed:
+ return
- if retval == os.EX_OK:
- self.curval += 1
- else:
- self._failed_pkgs.append((pkg, retval))
- if not self._build_opts.fetchonly:
- return
+ self._restart_if_necessary(pkg)
- if pkg.installed:
- continue
+ # Call mtimedb.commit() after each merge so that
+ # --resume still works after being interrupted
+ # by reboot, sigkill or similar.
+ mtimedb = self._mtimedb
+ del mtimedb["resume"]["mergelist"][0]
+ if not mtimedb["resume"]["mergelist"]:
+ del mtimedb["resume"]
+ mtimedb.commit()
- self._restart_if_necessary(pkg)
+ def _build_exit(self, build):
+ if build.returncode == os.EX_OK:
+ self.curval += 1
+ merge = PackageMerge(merge=build)
+ merge.addExitListener(self._merge_exit)
+ self._task_queues.merge.add(merge)
+ self._task_queues.merge.schedule()
+ else:
+ self._failed_pkgs.append((build.pkg, build.returncode))
+ self._jobs -= 1
- # Call mtimedb.commit() after each merge so that
- # --resume still works after being interrupted
- # by reboot, sigkill or similar.
- mtimedb = self._mtimedb
- del mtimedb["resume"]["mergelist"][0]
- if not mtimedb["resume"]["mergelist"]:
- del mtimedb["resume"]
- mtimedb.commit()
+ def _extract_exit(self, build):
+ self._build_exit(build)
def _merge(self):
@@ -7921,6 +7965,65 @@ class Scheduler(object):
return rval
+ def _choose_pkg(self):
+ return self._pkg_queue.popleft()
+
+ def _main_loop(self):
+
+ pkg_queue = self._pkg_queue
+ failed_pkgs = self._failed_pkgs
+ task_queues = self._task_queues
+
+ while pkg_queue and not failed_pkgs:
+
+ pkg = self._choose_pkg()
+
+ if not pkg.installed:
+ self._pkg_count.curval += 1
+
+ task = self._task(pkg)
+
+ self._jobs += 1
+ if pkg.installed:
+ merge = PackageMerge(merge=task)
+ merge.addExitListener(self._merge_exit)
+ task_queues.merge.add(merge)
+ elif pkg.built:
+ task.addExitListener(self._extract_exit)
+ task_queues.extract.add(task)
+ else:
+ task.addExitListener(self._build_exit)
+ task_queues.build.add(task)
+
+ self._schedule_main()
+
+ while self._jobs:
+ self._schedule_main(wait=True)
+
+ def _schedule_main(self, wait=False):
+
+ event_handlers = self._poll_event_handlers
+ poll = self._poll.poll
+ max_jobs = self._max_jobs
+
+ self._schedule_tasks()
+
+ while event_handlers:
+ jobs = self._jobs
+
+ for f, event in poll():
+ handler, reg_id = event_handlers[f]
+ if not handler(f, event):
+ self._unregister(reg_id)
+
+ if jobs == self._jobs:
+ continue
+
+ self._schedule_tasks()
+
+ if not wait and self._jobs < max_jobs:
+ break
+
def _task(self, pkg):
task = MergeListItem(args_set=self._args_set,
@@ -8025,6 +8128,10 @@ class Scheduler(object):
del self._poll_event_handler_ids[reg_id]
self._schedule_tasks()
+ def _schedule_tasks(self):
+ for x in self._task_queues.values():
+ x.schedule()
+
def _schedule(self, wait_id):
"""
Schedule until wait_id is not longer registered