summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2008-07-08 09:50:37 +0000
committerZac Medico <zmedico@gentoo.org>2008-07-08 09:50:37 +0000
commit00731b49c42442b4d15e375c874491cf4780eead (patch)
tree13258cbbb407ed4a5a5c642f663ef82ae9996ed8
parent7e357605c8d9d44839be3fb755a6563eaf398088 (diff)
downloadportage-00731b49c42442b4d15e375c874491cf4780eead.tar.gz
portage-00731b49c42442b4d15e375c874491cf4780eead.tar.bz2
portage-00731b49c42442b4d15e375c874491cf4780eead.zip
Implement parallel build support by adding new --jobs and --load-average
options that are analogous to the corresponding `make` options. Input and output handling still need work to make it look better and act more friendly for things like interactive ebuilds that require input. svn path=/main/trunk/; revision=10983
-rw-r--r--pym/_emerge/__init__.py256
1 files changed, 238 insertions, 18 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 0071fd25b..8d7a925d7 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -855,6 +855,28 @@ class SlotObject(object):
myvalue = kwargs.get(myattr, None)
setattr(self, myattr, myvalue)
+ def copy(self):
+ """
+ Create a new instance and copy all attributes
+ defined from __slots__ (including those from
+ inherited classes).
+ """
+ obj = self.__class__()
+
+ classes = [self.__class__]
+ while classes:
+ c = classes.pop()
+ if c is SlotObject:
+ continue
+ classes.extend(c.__bases__)
+ slots = getattr(c, "__slots__", None)
+ if not slots:
+ continue
+ for myattr in slots:
+ setattr(obj, myattr, getattr(self, myattr))
+
+ return obj
+
class AbstractDepPriority(SlotObject):
__slots__ = ("buildtime", "runtime", "runtime_post")
@@ -7539,6 +7561,9 @@ class Scheduler(object):
"--fetchonly", "--fetch-all-uri",
"--nodeps", "--pretend"])
+ _opts_no_restart = frozenset(["--buildpkgonly",
+ "--fetchonly", "--fetch-all-uri", "--pretend"])
+
_bad_resume_opts = set(["--ask", "--changelog",
"--resume", "--skipfirst"])
@@ -7591,9 +7616,11 @@ class Scheduler(object):
if settings.get("PORTAGE_DEBUG", "") == "1":
self.edebug = 1
self.pkgsettings = {}
+ self._config_pool = {}
for root in trees:
self.pkgsettings[root] = portage.config(
clone=trees[root]["vartree"].settings)
+ self._config_pool[root] = []
self.curval = 0
self._logger = self._emerge_log_class(
xterm_titles=("notitles" not in settings.features))
@@ -7616,6 +7643,7 @@ class Scheduler(object):
self._add_task = self._task_queues.prefetch.add
self._prefetchers = weakref.WeakValueDictionary()
self._pkg_queue = deque()
+ self._completed_tasks = set()
self._failed_pkgs = []
self._failed_fetches = []
self._parallel_fetch = False
@@ -7623,7 +7651,14 @@ class Scheduler(object):
if isinstance(x, Package) and x.operation == "merge"])
self._pkg_count = self._pkg_count_class(
curval=0, maxval=merge_count)
- self._max_jobs = 1
+
+ max_jobs = myopts.get("--jobs")
+ if max_jobs is None:
+ max_jobs = 1
+ self._set_max_jobs(max_jobs)
+
+ self._max_load = myopts.get("--load-average")
+
self._set_digraph(digraph)
self._jobs = 0
@@ -7650,12 +7685,38 @@ class Scheduler(object):
except EnvironmentError:
pass
+ def _set_max_jobs(self, max_jobs):
+ self._max_jobs = max_jobs
+ self._task_queues.build.max_jobs = max_jobs
+
def _set_digraph(self, digraph):
if self._max_jobs < 2:
# save some memory
self._digraph = None
- else:
- self._digraph = digraph
+ return
+
+ self._digraph = digraph
+ self._prune_digraph()
+
+ def _prune_digraph(self):
+ """
+ Prune any root nodes that are irrelevant.
+ """
+
+ graph = self._digraph
+ completed_tasks = self._completed_tasks
+ removed_nodes = set()
+ while True:
+ for node in graph.root_nodes():
+ if not isinstance(node, Package) or \
+ node.installed or node.onlydeps or \
+ node in completed_tasks:
+ removed_nodes.add(node)
+ if removed_nodes:
+ graph.difference_update(removed_nodes)
+ if not removed_nodes:
+ break
+ removed_nodes.clear()
class _pkg_failure(portage.exception.PortageException):
"""
@@ -7797,6 +7858,39 @@ class Scheduler(object):
sys.stderr.write("\n")
+ def _is_restart_scheduled(self):
+ """
+ Check if the merge list contains a replacement
+ for the current running instance, that will result
+ in restart after merge.
+ @rtype: bool
+ @returns: True if a restart is scheduled, False otherwise.
+ """
+ if self._opts_no_restart.intersection(self.myopts):
+ return False
+
+ mergelist = self._mergelist
+
+ for i, pkg in enumerate(mergelist):
+ if self._is_restart_necessary(pkg) and \
+ i != len(mergelist) - 1:
+ return True
+
+ return False
+
+ def _is_restart_necessary(self, pkg):
+ """
+ @return: True if merging the given package
+ requires restart, False otherwise.
+ """
+
+ # Figure out if we need a restart.
+ if pkg.root == self._running_root.root and \
+ portage.match_from_list(
+ portage.const.PORTAGE_PACKAGE_ATOM, [pkg]):
+ return True
+ return False
+
def _restart_if_necessary(self, pkg):
"""
Use execv() to restart emerge. This happens
@@ -7804,15 +7898,10 @@ class Scheduler(object):
remaining packages in the list.
"""
- if "--pretend" in self.myopts or \
- "--fetchonly" in self.myopts or \
- "--fetch-all-uri" in self.myopts:
+ if self._opts_no_restart.intersection(self.myopts):
return
- # Figure out if we need a restart.
- if pkg.root != self._running_root.root or \
- not portage.match_from_list(
- portage.const.PORTAGE_PACKAGE_ATOM, [pkg]):
+ if not self._is_restart_necessary(pkg):
return
if self._pkg_count.curval >= self._pkg_count.maxval:
@@ -7844,7 +7933,7 @@ class Scheduler(object):
if myarg is True:
mynewargv.append(myopt)
else:
- mynewargv.append(myopt +"="+ myarg)
+ mynewargv.append(myopt +"="+ str(myarg))
# priority only needs to be adjusted on the first run
os.environ["PORTAGE_NICENESS"] = "0"
os.execv(mynewargv[0], mynewargv)
@@ -7932,12 +8021,14 @@ class Scheduler(object):
pass
def _merge_exit(self, merge):
- self._jobs -= 1
+ self._job_exit(merge.merge)
pkg = merge.merge.pkg
if merge.returncode != os.EX_OK:
self._failed_pkgs.append((pkg, retval))
return
+ self._completed_tasks.add(pkg)
+
if pkg.installed:
return
@@ -7961,11 +8052,15 @@ class Scheduler(object):
self._task_queues.merge.schedule()
else:
self._failed_pkgs.append((build.pkg, build.returncode))
- self._jobs -= 1
+ self._job_exit(build)
def _extract_exit(self, build):
self._build_exit(build)
+ def _job_exit(self, job):
+ self._jobs -= 1
+ self._deallocate_config(job.settings)
+
def _merge(self):
self._add_prefetchers()
@@ -7979,8 +8074,8 @@ class Scheduler(object):
finally:
# discard remaining packages if necessary
pkg_queue.clear()
-
- # clean up child process if necessary
+ self._completed_tasks.clear()
+ self._digraph = None
self._task_queues.prefetch.clear()
# discard any failures and return the
@@ -7993,14 +8088,75 @@ class Scheduler(object):
return rval
def _choose_pkg(self):
- return self._pkg_queue.popleft()
+ if self._max_jobs < 2:
+ return self._pkg_queue.popleft()
+
+ self._prune_digraph()
+
+ chosen_pkg = None
+ for pkg in self._pkg_queue:
+ if pkg.operation == "uninstall":
+ continue
+ if not self._dependent_on_scheduled_merges(pkg):
+ chosen_pkg = pkg
+ break
+
+ self._pkg_queue.remove(chosen_pkg)
+ return chosen_pkg
+
+ def _dependent_on_scheduled_merges(self, pkg):
+ """
+ Traverse the subgraph of the given packages deep dependencies
+ to see if it contains any scheduled merges.
+ @rtype: bool
+ @returns: True if the package is dependent, False otherwise.
+ """
+
+ graph = self._digraph
+ completed_tasks = self._completed_tasks
+
+ dependent = False
+ traversed_nodes = set()
+ node_stack = graph.child_nodes(pkg)
+ while node_stack:
+ node = node_stack.pop()
+ if node in traversed_nodes:
+ continue
+ traversed_nodes.add(node)
+ if not node.installed and \
+ node not in completed_tasks:
+ dependent = True
+ break
+ node_stack.extend(graph.child_nodes(node))
+
+ return dependent
+
+ def _allocate_config(self, root):
+ """
+ Allocate a unique config instance for a task in order
+ to prevent interference between parallel tasks.
+ """
+ if self._config_pool[root]:
+ temp_settings = self._config_pool[root].pop()
+ else:
+ temp_settings = portage.config(clone=self.pkgsettings[root])
+ return temp_settings
+
+ def _deallocate_config(self, settings):
+ self._config_pool[settings["ROOT"]].append(settings)
def _main_loop(self):
+ # Only allow 1 job max if a restart is scheduled
+ # due to portage update.
+ if self._is_restart_scheduled():
+ self._set_max_jobs(1)
+
pkg_queue = self._pkg_queue
failed_pkgs = self._failed_pkgs
task_queues = self._task_queues
max_jobs = self._max_jobs
+ max_load = self._max_load
background = max_jobs > 1
while pkg_queue and not failed_pkgs:
@@ -8009,8 +8165,26 @@ class Scheduler(object):
self._schedule_main()
continue
+ if max_load is not None and max_jobs > 1 and self._jobs > 1:
+ try:
+ avg1, avg5, avg15 = os.getloadavg()
+ except OSError, e:
+ writemsg("!!! getloadavg() failed: %s\n" % (e,),
+ noiselevel=-1)
+ del e
+ self._schedule_main()
+ continue
+
+ if avg1 >= max_load:
+ self._schedule_main()
+ continue
+
pkg = self._choose_pkg()
+ if pkg is None:
+ self._schedule_main()
+ continue
+
if not pkg.installed:
self._pkg_count.curval += 1
@@ -8065,10 +8239,10 @@ class Scheduler(object):
emerge_opts=self.myopts,
failed_fetches=self._failed_fetches,
find_blockers=self._find_blockers(pkg), logger=self._logger,
- mtimedb=self._mtimedb, pkg=pkg, pkg_count=self._pkg_count,
+ mtimedb=self._mtimedb, pkg=pkg, pkg_count=self._pkg_count.copy(),
prefetcher=self._prefetchers.get(pkg),
scheduler=self._sched_iface,
- settings=self.pkgsettings[pkg.root],
+ settings=self._allocate_config(pkg.root),
world_atom=self._world_atom)
return task
@@ -10752,6 +10926,24 @@ def parse_opts(tmpcmdline, silent=False):
"type":"choice",
"choices":("y", "n")
},
+
+ "--jobs": {
+
+ "help" : "Specifies the number of packages to build " + \
+ "simultaneously.",
+
+ "action" : "store"
+ },
+
+ "--load-average": {
+
+ "help" :"Specifies that no new builds should be started " + \
+ "if there are other builds running and the load average " + \
+ "is at least LOAD (a floating-point number).",
+
+ "action" : "store"
+ },
+
"--with-bdeps": {
"help":"include unnecessary build time dependencies",
"type":"choice",
@@ -10788,6 +10980,34 @@ def parse_opts(tmpcmdline, silent=False):
myoptions, myargs = parser.parse_args(args=tmpcmdline)
+ if myoptions.jobs:
+ try:
+ jobs = int(myoptions.jobs)
+ except ValueError:
+ jobs = 0
+
+ if jobs < 1:
+ jobs = None
+ if not silent:
+ writemsg("!!! Invalid --jobs parameter: '%s'\n" % \
+ (myoptions.jobs,), noiselevel=-1)
+
+ myoptions.jobs = jobs
+
+ if myoptions.load_average:
+ try:
+ load_average = float(myoptions.load_average)
+ except ValueError:
+ load_average = 0.0
+
+ if load_average <= 0.0:
+ load_average = None
+ if not silent:
+ writemsg("!!! Invalid --load-average parameter: '%s'\n" % \
+ (myoptions.load_average,), noiselevel=-1)
+
+ myoptions.load_average = load_average
+
for myopt in options:
v = getattr(myoptions, myopt.lstrip("--").replace("-", "_"))
if v: