From d0c3ee0243e5df257a33bcac53628184f8bed892 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Mon, 7 Jul 2008 05:35:17 +0000 Subject: * Implement CompositeTask._poll(). * Make AsynchronousTask classes call self.wait() to notify exit listeners. * Rewrite Scheduler._main_loop() to bring it closer to allowing parallel build scheduling. svn path=/main/trunk/; revision=10966 --- pym/_emerge/__init__.py | 187 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 147 insertions(+), 40 deletions(-) (limited to 'pym') diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 93684885e..0037f988d 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1459,6 +1459,9 @@ class AsynchronousTask(SlotObject): Subclasses override _wait() and _poll() so that calls to public methods can be wrapped for implementing hooks such as exit listener notification. + + Sublasses should call self.wait() to notify exit listeners after + the task is complete and self.returncode has been set. """ __slots__ = ("cancelled", "returncode") + ("_exit_listeners",) @@ -1528,6 +1531,29 @@ class CompositeTask(AsynchronousTask): if self._current_task is not None: self._current_task.cancel() + def _poll(self): + """ + This does a loop calling self._current_task.poll() + repeatedly as long as the value of self._current_task + keeps changing. It calls poll() a maximum of one time + for a given self._current_task instance. This is useful + since calling poll() on a task can trigger advance to + the next task could eventually lead to the returncode + being set in cases when polling only a single task would + not have the same effect. + """ + + prev = None + while True: + task = self._current_task + if task is None or task is prev: + # don't poll the same task more than once + break + task.poll() + prev = task + + return self.returncode + def _wait(self): while True: @@ -1626,6 +1652,7 @@ class TaskSequence(CompositeTask): self._start_next_task() else: self._final_exit(task) + self.wait() class SubProcess(AsynchronousTask): __slots__ = ("pid",) @@ -1997,6 +2024,7 @@ class EbuildBuild(CompositeTask): pkg = self.pkg eerror("!!! Fetch for %s failed, continuing..." % pkg.cpv, phase="unpack", key=pkg.cpv) + self.wait() def _unlock_builddir(self): portage.elog.elog_process(self.pkg.cpv, self.settings) @@ -2012,6 +2040,7 @@ class EbuildBuild(CompositeTask): if not buildpkg: self._final_exit(build) + self.wait() return packager = EbuildBinpkg(pkg=self.pkg, @@ -2039,11 +2068,13 @@ class EbuildBuild(CompositeTask): if self._final_exit(packager) != os.EX_OK or \ self.opts.buildpkgonly: self._unlock_builddir() + self.wait() def _clean_exit(self, clean_phase): if self._final_exit(clean_phase) != os.EX_OK or \ self.opts.buildpkgonly: self._unlock_builddir() + self.wait() def install(self): """ @@ -2480,6 +2511,7 @@ class Binpkg(CompositeTask): self._fetched_pkg = True if self.opts.fetchonly: self._final_exit(fetcher) + self.wait() return elif self._default_exit(fetcher) != os.EX_OK: return @@ -2610,6 +2642,7 @@ class Binpkg(CompositeTask): self._unlock_builddir() writemsg("!!! Error Extracting '%s'\n" % self._pkg_path, noiselevel=-1) + self.wait() def _unlock_builddir(self): portage.elog.elog_process(self.pkg.cpv, self.settings) @@ -2840,6 +2873,7 @@ class MergeListItem(CompositeTask): self._install_task = build self._start_task(build, self._ebuild_exit) + self.wait() return elif pkg.type_name == "binary": @@ -2852,12 +2886,14 @@ class MergeListItem(CompositeTask): self._install_task = binpkg self._start_task(binpkg, self._final_exit) + self.wait() return def _ebuild_exit(self, build): if self._final_exit(build) != os.EX_OK: if self.build_opts.fetchonly: self.failed_fetches.append(self.pkg.cpv) + self.wait() def _poll(self): self._install_task.poll() @@ -7438,13 +7474,19 @@ class SequentialTaskQueue(SlotObject): self._task_queue.append(task) def schedule(self): + + if not self: + return False + task_queue = self._task_queue running_tasks = self.running_tasks max_jobs = self.max_jobs state_changed = False for task in list(running_tasks): - if not task.registered and task.poll() is not None: + if hasattr(task, "registered") and task.registered: + continue + if task.poll() is not None: running_tasks.remove(task) state_changed = True @@ -7465,6 +7507,12 @@ class SequentialTaskQueue(SlotObject): task = running_tasks.pop() task.cancel() + def __nonzero__(self): + return bool(self._task_queue or self.running_tasks) + + def __len__(self): + return len(self._task_queue) + len(self.running_tasks) + class Scheduler(object): _opts_ignore_blockers = \ @@ -7480,6 +7528,9 @@ class Scheduler(object): class _iface_class(SlotObject): __slots__ = ("register", "schedule") + _task_queues_class = slot_dict_class( + ("build", "extract", "merge", "prefetch",), prefix="") + class _build_opts_class(SlotObject): __slots__ = ("buildpkg", "buildpkgonly", "fetch_all_uri", "fetchonly", "pretend") @@ -7539,12 +7590,11 @@ class Scheduler(object): except AttributeError: self._poll = PollSelectAdapter() - self._task_queues = slot_dict_class(("build", "prefetch"), prefix="") + self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: setattr(self._task_queues, k, SequentialTaskQueue()) self._add_task = self._task_queues.prefetch.add - self._schedule_tasks = self._task_queues.prefetch.schedule self._prefetchers = weakref.WeakValueDictionary() self._pkg_queue = deque() self._failed_pkgs = [] @@ -7554,6 +7604,8 @@ class Scheduler(object): if isinstance(x, Package) and x.operation == "merge"]) self._pkg_count = self._pkg_count_class( curval=0, maxval=merge_count) + self._max_jobs = 1 + self._jobs = 0 features = self.settings.features if "parallel-fetch" in features and \ @@ -7852,48 +7904,40 @@ class Scheduler(object): elif isinstance(pkg, Blocker): pass - def _choose_pkg(self): - return self._pkg_queue.popleft() - - def _main_loop(self): - - pkg_queue = self._pkg_queue - - while pkg_queue: - pkg = self._choose_pkg() - - if not pkg.installed: - self._pkg_count.curval += 1 - - task = self._task(pkg) - task.start() - retval = task.wait() + def _merge_exit(self, merge): + self._jobs -= 1 + pkg = merge.merge.pkg + if merge.returncode != os.EX_OK: + self._failed_pkgs.append((pkg, retval)) + return - if retval == os.EX_OK: - task = PackageMerge(merge=task) - task.start() - retval = task.wait() + if pkg.installed: + return - if retval == os.EX_OK: - self.curval += 1 - else: - self._failed_pkgs.append((pkg, retval)) - if not self._build_opts.fetchonly: - return + self._restart_if_necessary(pkg) - if pkg.installed: - continue + # Call mtimedb.commit() after each merge so that + # --resume still works after being interrupted + # by reboot, sigkill or similar. + mtimedb = self._mtimedb + del mtimedb["resume"]["mergelist"][0] + if not mtimedb["resume"]["mergelist"]: + del mtimedb["resume"] + mtimedb.commit() - self._restart_if_necessary(pkg) + def _build_exit(self, build): + if build.returncode == os.EX_OK: + self.curval += 1 + merge = PackageMerge(merge=build) + merge.addExitListener(self._merge_exit) + self._task_queues.merge.add(merge) + self._task_queues.merge.schedule() + else: + self._failed_pkgs.append((build.pkg, build.returncode)) + self._jobs -= 1 - # Call mtimedb.commit() after each merge so that - # --resume still works after being interrupted - # by reboot, sigkill or similar. - mtimedb = self._mtimedb - del mtimedb["resume"]["mergelist"][0] - if not mtimedb["resume"]["mergelist"]: - del mtimedb["resume"] - mtimedb.commit() + def _extract_exit(self, build): + self._build_exit(build) def _merge(self): @@ -7921,6 +7965,65 @@ class Scheduler(object): return rval + def _choose_pkg(self): + return self._pkg_queue.popleft() + + def _main_loop(self): + + pkg_queue = self._pkg_queue + failed_pkgs = self._failed_pkgs + task_queues = self._task_queues + + while pkg_queue and not failed_pkgs: + + pkg = self._choose_pkg() + + if not pkg.installed: + self._pkg_count.curval += 1 + + task = self._task(pkg) + + self._jobs += 1 + if pkg.installed: + merge = PackageMerge(merge=task) + merge.addExitListener(self._merge_exit) + task_queues.merge.add(merge) + elif pkg.built: + task.addExitListener(self._extract_exit) + task_queues.extract.add(task) + else: + task.addExitListener(self._build_exit) + task_queues.build.add(task) + + self._schedule_main() + + while self._jobs: + self._schedule_main(wait=True) + + def _schedule_main(self, wait=False): + + event_handlers = self._poll_event_handlers + poll = self._poll.poll + max_jobs = self._max_jobs + + self._schedule_tasks() + + while event_handlers: + jobs = self._jobs + + for f, event in poll(): + handler, reg_id = event_handlers[f] + if not handler(f, event): + self._unregister(reg_id) + + if jobs == self._jobs: + continue + + self._schedule_tasks() + + if not wait and self._jobs < max_jobs: + break + def _task(self, pkg): task = MergeListItem(args_set=self._args_set, @@ -8025,6 +8128,10 @@ class Scheduler(object): del self._poll_event_handler_ids[reg_id] self._schedule_tasks() + def _schedule_tasks(self): + for x in self._task_queues.values(): + x.schedule() + def _schedule(self, wait_id): """ Schedule until wait_id is not longer registered -- cgit v1.2.3-1-g7c22