From 00731b49c42442b4d15e375c874491cf4780eead Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Tue, 8 Jul 2008 09:50:37 +0000 Subject: Implement parallel build support by adding new --jobs and --load-average options that are analogous to the corresponding `make` options. Input and output handling still need work to make it look better and act more friendly for things like interactive ebuilds that require input. svn path=/main/trunk/; revision=10983 --- pym/_emerge/__init__.py | 256 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 238 insertions(+), 18 deletions(-) (limited to 'pym/_emerge/__init__.py') diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 0071fd25b..8d7a925d7 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -855,6 +855,28 @@ class SlotObject(object): myvalue = kwargs.get(myattr, None) setattr(self, myattr, myvalue) + def copy(self): + """ + Create a new instance and copy all attributes + defined from __slots__ (including those from + inherited classes). + """ + obj = self.__class__() + + classes = [self.__class__] + while classes: + c = classes.pop() + if c is SlotObject: + continue + classes.extend(c.__bases__) + slots = getattr(c, "__slots__", None) + if not slots: + continue + for myattr in slots: + setattr(obj, myattr, getattr(self, myattr)) + + return obj + class AbstractDepPriority(SlotObject): __slots__ = ("buildtime", "runtime", "runtime_post") @@ -7539,6 +7561,9 @@ class Scheduler(object): "--fetchonly", "--fetch-all-uri", "--nodeps", "--pretend"]) + _opts_no_restart = frozenset(["--buildpkgonly", + "--fetchonly", "--fetch-all-uri", "--pretend"]) + _bad_resume_opts = set(["--ask", "--changelog", "--resume", "--skipfirst"]) @@ -7591,9 +7616,11 @@ class Scheduler(object): if settings.get("PORTAGE_DEBUG", "") == "1": self.edebug = 1 self.pkgsettings = {} + self._config_pool = {} for root in trees: self.pkgsettings[root] = portage.config( clone=trees[root]["vartree"].settings) + self._config_pool[root] = [] self.curval = 0 self._logger = self._emerge_log_class( xterm_titles=("notitles" not in settings.features)) @@ -7616,6 +7643,7 @@ class Scheduler(object): self._add_task = self._task_queues.prefetch.add self._prefetchers = weakref.WeakValueDictionary() self._pkg_queue = deque() + self._completed_tasks = set() self._failed_pkgs = [] self._failed_fetches = [] self._parallel_fetch = False @@ -7623,7 +7651,14 @@ class Scheduler(object): if isinstance(x, Package) and x.operation == "merge"]) self._pkg_count = self._pkg_count_class( curval=0, maxval=merge_count) - self._max_jobs = 1 + + max_jobs = myopts.get("--jobs") + if max_jobs is None: + max_jobs = 1 + self._set_max_jobs(max_jobs) + + self._max_load = myopts.get("--load-average") + self._set_digraph(digraph) self._jobs = 0 @@ -7650,12 +7685,38 @@ class Scheduler(object): except EnvironmentError: pass + def _set_max_jobs(self, max_jobs): + self._max_jobs = max_jobs + self._task_queues.build.max_jobs = max_jobs + def _set_digraph(self, digraph): if self._max_jobs < 2: # save some memory self._digraph = None - else: - self._digraph = digraph + return + + self._digraph = digraph + self._prune_digraph() + + def _prune_digraph(self): + """ + Prune any root nodes that are irrelevant. + """ + + graph = self._digraph + completed_tasks = self._completed_tasks + removed_nodes = set() + while True: + for node in graph.root_nodes(): + if not isinstance(node, Package) or \ + node.installed or node.onlydeps or \ + node in completed_tasks: + removed_nodes.add(node) + if removed_nodes: + graph.difference_update(removed_nodes) + if not removed_nodes: + break + removed_nodes.clear() class _pkg_failure(portage.exception.PortageException): """ @@ -7797,6 +7858,39 @@ class Scheduler(object): sys.stderr.write("\n") + def _is_restart_scheduled(self): + """ + Check if the merge list contains a replacement + for the current running instance, that will result + in restart after merge. + @rtype: bool + @returns: True if a restart is scheduled, False otherwise. + """ + if self._opts_no_restart.intersection(self.myopts): + return False + + mergelist = self._mergelist + + for i, pkg in enumerate(mergelist): + if self._is_restart_necessary(pkg) and \ + i != len(mergelist) - 1: + return True + + return False + + def _is_restart_necessary(self, pkg): + """ + @return: True if merging the given package + requires restart, False otherwise. + """ + + # Figure out if we need a restart. + if pkg.root == self._running_root.root and \ + portage.match_from_list( + portage.const.PORTAGE_PACKAGE_ATOM, [pkg]): + return True + return False + def _restart_if_necessary(self, pkg): """ Use execv() to restart emerge. This happens @@ -7804,15 +7898,10 @@ class Scheduler(object): remaining packages in the list. """ - if "--pretend" in self.myopts or \ - "--fetchonly" in self.myopts or \ - "--fetch-all-uri" in self.myopts: + if self._opts_no_restart.intersection(self.myopts): return - # Figure out if we need a restart. - if pkg.root != self._running_root.root or \ - not portage.match_from_list( - portage.const.PORTAGE_PACKAGE_ATOM, [pkg]): + if not self._is_restart_necessary(pkg): return if self._pkg_count.curval >= self._pkg_count.maxval: @@ -7844,7 +7933,7 @@ class Scheduler(object): if myarg is True: mynewargv.append(myopt) else: - mynewargv.append(myopt +"="+ myarg) + mynewargv.append(myopt +"="+ str(myarg)) # priority only needs to be adjusted on the first run os.environ["PORTAGE_NICENESS"] = "0" os.execv(mynewargv[0], mynewargv) @@ -7932,12 +8021,14 @@ class Scheduler(object): pass def _merge_exit(self, merge): - self._jobs -= 1 + self._job_exit(merge.merge) pkg = merge.merge.pkg if merge.returncode != os.EX_OK: self._failed_pkgs.append((pkg, retval)) return + self._completed_tasks.add(pkg) + if pkg.installed: return @@ -7961,11 +8052,15 @@ class Scheduler(object): self._task_queues.merge.schedule() else: self._failed_pkgs.append((build.pkg, build.returncode)) - self._jobs -= 1 + self._job_exit(build) def _extract_exit(self, build): self._build_exit(build) + def _job_exit(self, job): + self._jobs -= 1 + self._deallocate_config(job.settings) + def _merge(self): self._add_prefetchers() @@ -7979,8 +8074,8 @@ class Scheduler(object): finally: # discard remaining packages if necessary pkg_queue.clear() - - # clean up child process if necessary + self._completed_tasks.clear() + self._digraph = None self._task_queues.prefetch.clear() # discard any failures and return the @@ -7993,14 +8088,75 @@ class Scheduler(object): return rval def _choose_pkg(self): - return self._pkg_queue.popleft() + if self._max_jobs < 2: + return self._pkg_queue.popleft() + + self._prune_digraph() + + chosen_pkg = None + for pkg in self._pkg_queue: + if pkg.operation == "uninstall": + continue + if not self._dependent_on_scheduled_merges(pkg): + chosen_pkg = pkg + break + + self._pkg_queue.remove(chosen_pkg) + return chosen_pkg + + def _dependent_on_scheduled_merges(self, pkg): + """ + Traverse the subgraph of the given packages deep dependencies + to see if it contains any scheduled merges. + @rtype: bool + @returns: True if the package is dependent, False otherwise. + """ + + graph = self._digraph + completed_tasks = self._completed_tasks + + dependent = False + traversed_nodes = set() + node_stack = graph.child_nodes(pkg) + while node_stack: + node = node_stack.pop() + if node in traversed_nodes: + continue + traversed_nodes.add(node) + if not node.installed and \ + node not in completed_tasks: + dependent = True + break + node_stack.extend(graph.child_nodes(node)) + + return dependent + + def _allocate_config(self, root): + """ + Allocate a unique config instance for a task in order + to prevent interference between parallel tasks. + """ + if self._config_pool[root]: + temp_settings = self._config_pool[root].pop() + else: + temp_settings = portage.config(clone=self.pkgsettings[root]) + return temp_settings + + def _deallocate_config(self, settings): + self._config_pool[settings["ROOT"]].append(settings) def _main_loop(self): + # Only allow 1 job max if a restart is scheduled + # due to portage update. + if self._is_restart_scheduled(): + self._set_max_jobs(1) + pkg_queue = self._pkg_queue failed_pkgs = self._failed_pkgs task_queues = self._task_queues max_jobs = self._max_jobs + max_load = self._max_load background = max_jobs > 1 while pkg_queue and not failed_pkgs: @@ -8009,8 +8165,26 @@ class Scheduler(object): self._schedule_main() continue + if max_load is not None and max_jobs > 1 and self._jobs > 1: + try: + avg1, avg5, avg15 = os.getloadavg() + except OSError, e: + writemsg("!!! getloadavg() failed: %s\n" % (e,), + noiselevel=-1) + del e + self._schedule_main() + continue + + if avg1 >= max_load: + self._schedule_main() + continue + pkg = self._choose_pkg() + if pkg is None: + self._schedule_main() + continue + if not pkg.installed: self._pkg_count.curval += 1 @@ -8065,10 +8239,10 @@ class Scheduler(object): emerge_opts=self.myopts, failed_fetches=self._failed_fetches, find_blockers=self._find_blockers(pkg), logger=self._logger, - mtimedb=self._mtimedb, pkg=pkg, pkg_count=self._pkg_count, + mtimedb=self._mtimedb, pkg=pkg, pkg_count=self._pkg_count.copy(), prefetcher=self._prefetchers.get(pkg), scheduler=self._sched_iface, - settings=self.pkgsettings[pkg.root], + settings=self._allocate_config(pkg.root), world_atom=self._world_atom) return task @@ -10752,6 +10926,24 @@ def parse_opts(tmpcmdline, silent=False): "type":"choice", "choices":("y", "n") }, + + "--jobs": { + + "help" : "Specifies the number of packages to build " + \ + "simultaneously.", + + "action" : "store" + }, + + "--load-average": { + + "help" :"Specifies that no new builds should be started " + \ + "if there are other builds running and the load average " + \ + "is at least LOAD (a floating-point number).", + + "action" : "store" + }, + "--with-bdeps": { "help":"include unnecessary build time dependencies", "type":"choice", @@ -10788,6 +10980,34 @@ def parse_opts(tmpcmdline, silent=False): myoptions, myargs = parser.parse_args(args=tmpcmdline) + if myoptions.jobs: + try: + jobs = int(myoptions.jobs) + except ValueError: + jobs = 0 + + if jobs < 1: + jobs = None + if not silent: + writemsg("!!! Invalid --jobs parameter: '%s'\n" % \ + (myoptions.jobs,), noiselevel=-1) + + myoptions.jobs = jobs + + if myoptions.load_average: + try: + load_average = float(myoptions.load_average) + except ValueError: + load_average = 0.0 + + if load_average <= 0.0: + load_average = None + if not silent: + writemsg("!!! Invalid --load-average parameter: '%s'\n" % \ + (myoptions.load_average,), noiselevel=-1) + + myoptions.load_average = load_average + for myopt in options: v = getattr(myoptions, myopt.lstrip("--").replace("-", "_")) if v: -- cgit v1.2.3-1-g7c22