summaryrefslogtreecommitdiffstats
path: root/pym
diff options
context:
space:
mode:
authorDavid James <davidjames@google.com>2011-03-24 19:36:33 -0700
committerZac Medico <zmedico@gentoo.org>2011-03-24 19:36:33 -0700
commit7535cabdf2fab76fc55df83643157613dfd66be9 (patch)
tree535e51878faa5359a7c186ca0aadfbe6ebcc02b2 /pym
parent99ec2a8f810ae7ea2c76d928665ed1d02c2d9cc7 (diff)
downloadportage-7535cabdf2fab76fc55df83643157613dfd66be9.tar.gz
portage-7535cabdf2fab76fc55df83643157613dfd66be9.tar.bz2
portage-7535cabdf2fab76fc55df83643157613dfd66be9.zip
Merge packages asynchronously in Portage.
This allows for the scheduler to continue to run while packages are being merged and installed, allowing for additional parallelism and making better use of the CPUs. Review URL: http://codereview.chromium.org/6713043
Diffstat (limited to 'pym')
-rw-r--r--pym/_emerge/Binpkg.py21
-rw-r--r--pym/_emerge/EbuildBuild.py13
-rw-r--r--pym/_emerge/EbuildMerge.py47
-rw-r--r--pym/_emerge/MergeListItem.py21
-rw-r--r--pym/_emerge/PackageMerge.py12
-rw-r--r--pym/portage/dbapi/_MergeProcess.py82
-rw-r--r--pym/portage/dbapi/vartree.py94
7 files changed, 182 insertions, 108 deletions
diff --git a/pym/_emerge/Binpkg.py b/pym/_emerge/Binpkg.py
index 00587451a..62d44c48f 100644
--- a/pym/_emerge/Binpkg.py
+++ b/pym/_emerge/Binpkg.py
@@ -307,7 +307,7 @@ class Binpkg(CompositeTask):
portage.elog.elog_process(self.pkg.cpv, self.settings)
self._build_dir.unlock()
- def install(self):
+ def install(self, handler):
# This gives bashrc users an opportunity to do various things
# such as remove binary packages after they're installed.
@@ -320,19 +320,20 @@ class Binpkg(CompositeTask):
pkg=self.pkg, pkg_count=self.pkg_count,
pkg_path=self._pkg_path, scheduler=self.scheduler,
settings=settings, tree=self._tree, world_atom=self.world_atom)
+ task = merge.create_task()
+ task.addExitListener(self._install_exit)
+ self._start_task(task, handler)
- try:
- retval = merge.execute()
- finally:
- settings.pop("PORTAGE_BINPKG_FILE", None)
- self._unlock_builddir()
+ def _install_exit(self, task):
+ self.settings.pop("PORTAGE_BINPKG_FILE", None)
+ self._unlock_builddir()
- if retval == os.EX_OK and \
- 'binpkg-logs' not in self.settings.features and \
+ if self._default_final_exit(task) != os.EX_OK:
+ return
+
+ if 'binpkg-logs' not in self.settings.features and \
self.settings.get("PORTAGE_LOG_FILE"):
try:
os.unlink(self.settings["PORTAGE_LOG_FILE"])
except OSError:
pass
- return retval
-
diff --git a/pym/_emerge/EbuildBuild.py b/pym/_emerge/EbuildBuild.py
index 98ab24522..c7a5f5cdb 100644
--- a/pym/_emerge/EbuildBuild.py
+++ b/pym/_emerge/EbuildBuild.py
@@ -314,7 +314,7 @@ class EbuildBuild(CompositeTask):
self._unlock_builddir()
self.wait()
- def install(self):
+ def install(self, exit_handler):
"""
Install the package and then clean up and release locks.
Only call this after the build has completed successfully
@@ -343,10 +343,11 @@ class EbuildBuild(CompositeTask):
(pkg_count.curval, pkg_count.maxval, pkg.cpv)
logger.log(msg, short_msg=short_msg)
- try:
- rval = merge.execute()
- finally:
- self._unlock_builddir()
+ task = merge.create_task()
+ task.addExitListener(self._install_exit)
+ self._start_task(task, exit_handler)
- return rval
+ def _install_exit(self, task):
+ self._unlock_builddir()
+ self._default_final_exit(task)
diff --git a/pym/_emerge/EbuildMerge.py b/pym/_emerge/EbuildMerge.py
index d73a262b3..6a5869270 100644
--- a/pym/_emerge/EbuildMerge.py
+++ b/pym/_emerge/EbuildMerge.py
@@ -4,6 +4,8 @@
from _emerge.SlotObject import SlotObject
import portage
from portage import os
+from portage.dbapi._MergeProcess import MergeProcess
+from portage.dbapi.vartree import dblink
class EbuildMerge(SlotObject):
@@ -11,28 +13,35 @@ class EbuildMerge(SlotObject):
"pkg", "pkg_count", "pkg_path", "pretend",
"scheduler", "settings", "tree", "world_atom")
- def execute(self):
+ def create_task(self):
root_config = self.pkg.root_config
settings = self.settings
- retval = portage.merge(settings["CATEGORY"],
- settings["PF"], settings["D"],
- os.path.join(settings["PORTAGE_BUILDDIR"],
- "build-info"), root_config.root, settings,
- myebuild=settings["EBUILD"],
- mytree=self.tree, mydbapi=root_config.trees[self.tree].dbapi,
- vartree=root_config.trees["vartree"],
- prev_mtimes=self.ldpath_mtimes,
- scheduler=self.scheduler,
- blockers=self.find_blockers)
-
- if retval == os.EX_OK:
- self.world_atom(self.pkg)
- self._log_success()
-
- return retval
-
- def _log_success(self):
+ mycat = settings["CATEGORY"]
+ mypkg = settings["PF"]
+ pkgloc = settings["D"]
+ infloc = os.path.join(settings["PORTAGE_BUILDDIR"], "build-info")
+ myroot = root_config.root
+ myebuild = settings["EBUILD"]
+ mydbapi = root_config.trees[self.tree].dbapi
+ vartree = root_config.trees["vartree"]
+ background = (settings.get('PORTAGE_BACKGROUND') == '1')
+ logfile = settings.get('PORTAGE_LOG_FILE')
+
+ merge_task = MergeProcess(
+ dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings,
+ treetype=self.tree, vartree=vartree, scheduler=self.scheduler,
+ background=background, blockers=self.find_blockers, pkgloc=pkgloc,
+ infloc=infloc, myebuild=myebuild, mydbapi=mydbapi,
+ prev_mtimes=self.ldpath_mtimes, logfile=logfile)
+ merge_task.addExitListener(self._log_exit)
+ return merge_task
+
+ def _log_exit(self, task):
+ if task.returncode != os.EX_OK:
+ return
+
pkg = self.pkg
+ self.world_atom(pkg)
pkg_count = self.pkg_count
pkg_path = self.pkg_path
logger = self.logger
diff --git a/pym/_emerge/MergeListItem.py b/pym/_emerge/MergeListItem.py
index 1dcc1780a..768865e6b 100644
--- a/pym/_emerge/MergeListItem.py
+++ b/pym/_emerge/MergeListItem.py
@@ -111,7 +111,7 @@ class MergeListItem(CompositeTask):
self._install_task.wait()
return self.returncode
- def merge(self):
+ def merge(self, exit_handler):
pkg = self.pkg
build_opts = self.build_opts
@@ -135,15 +135,14 @@ class MergeListItem(CompositeTask):
world_atom=world_atom)
uninstall.start()
- retval = uninstall.wait()
- if retval != os.EX_OK:
- return retval
- return os.EX_OK
-
- if build_opts.fetchonly or \
+ self.returncode = uninstall.wait()
+ else:
+ self.returncode = os.EX_OK
+ exit_handler(self)
+ elif build_opts.fetchonly or \
build_opts.buildpkgonly:
- return self.returncode
-
- retval = self._install_task.install()
- return retval
+ exit_handler(self)
+ else:
+ self._current_task = self._install_task
+ self._install_task.install(exit_handler)
diff --git a/pym/_emerge/PackageMerge.py b/pym/_emerge/PackageMerge.py
index 4aecf8adb..45d2e7dc6 100644
--- a/pym/_emerge/PackageMerge.py
+++ b/pym/_emerge/PackageMerge.py
@@ -4,11 +4,6 @@
from _emerge.AsynchronousTask import AsynchronousTask
from portage.output import colorize
class PackageMerge(AsynchronousTask):
- """
- TODO: Implement asynchronous merge so that the scheduler can
- run while a merge is executing.
- """
-
__slots__ = ("merge",)
def _start(self):
@@ -40,6 +35,9 @@ class PackageMerge(AsynchronousTask):
not self.merge.build_opts.buildpkgonly:
self.merge.statusMessage(msg)
- self.returncode = self.merge.merge()
- self.wait()
+ self.merge.merge(self.exit_handler)
+
+ def exit_handler(self, task):
+ self.returncode = task.returncode
+ self._wait_hook()
diff --git a/pym/portage/dbapi/_MergeProcess.py b/pym/portage/dbapi/_MergeProcess.py
index f717d12df..6e63f84fd 100644
--- a/pym/portage/dbapi/_MergeProcess.py
+++ b/pym/portage/dbapi/_MergeProcess.py
@@ -4,30 +4,72 @@
import signal
import traceback
+import errno
+import fcntl
import portage
-from portage import os
+from portage import os, StringIO
+import portage.elog.messages
+from _emerge.PollConstants import PollConstants
from _emerge.SpawnProcess import SpawnProcess
class MergeProcess(SpawnProcess):
"""
- Merge package files in a subprocess, so the Scheduler can run in the
- main thread while files are moved or copied asynchronously.
+ Merge packages in a subprocess, so the Scheduler can run in the main
+ thread while files are moved or copied asynchronously.
"""
- __slots__ = ('cfgfiledict', 'conf_mem_file', \
- 'destroot', 'dblink', 'srcroot',)
+ __slots__ = ('dblink', 'mycat', 'mypkg', 'settings', 'treetype',
+ 'vartree', 'scheduler', 'blockers', 'pkgloc', 'infloc', 'myebuild',
+ 'mydbapi', 'prev_mtimes', '_elog_reader_fd', '_elog_reg_id',
+ '_buf')
- def _spawn(self, args, fd_pipes=None, **kwargs):
+ def _elog_output_handler(self, fd, event):
+ output = None
+ if event & PollConstants.POLLIN:
+ try:
+ output = os.read(fd, self._bufsize)
+ except OSError as e:
+ if e.errno not in (errno.EAGAIN, errno.EINTR):
+ raise
+ if output:
+ lines = output.split('\n')
+ if len(lines) == 1:
+ self._buf += lines[0]
+ else:
+ lines[0] = self._buf + lines[0]
+ self._buf = lines.pop()
+ out = StringIO()
+ for line in lines:
+ funcname, phase, key, msg = line.split(' ', 3)
+ reporter = getattr(portage.elog.messages, funcname)
+ reporter(msg, phase=phase, key=key, out=out)
+
+ def _spawn(self, args, fd_pipes, **kwargs):
"""
Fork a subprocess, apply local settings, and call
- dblink._merge_process().
+ dblink.merge().
"""
+ files = self._files
+ elog_reader_fd, elog_writer_fd = os.pipe()
+ fcntl.fcntl(elog_reader_fd, fcntl.F_SETFL,
+ fcntl.fcntl(elog_reader_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+ mylink = self.dblink(self.mycat, self.mypkg, settings=self.settings,
+ treetype=self.treetype, vartree=self.vartree,
+ blockers=self.blockers, scheduler=self.scheduler,
+ pipe=elog_writer_fd)
+ fd_pipes[elog_writer_fd] = elog_writer_fd
+ self._elog_reg_id = self.scheduler.register(elog_reader_fd,
+ self._registered_events, self._elog_output_handler)
+
pid = os.fork()
if pid != 0:
+ self._elog_reader_fd = elog_reader_fd
+ self._buf = ""
portage.process.spawned_pids.append(pid)
return [pid]
+ os.close(elog_reader_fd)
portage.process._setup_pipes(fd_pipes)
# Use default signal handlers since the ones inherited
@@ -35,18 +77,19 @@ class MergeProcess(SpawnProcess):
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
- portage.output.havecolor = self.dblink.settings.get('NOCOLOR') \
+ portage.output.havecolor = self.settings.get('NOCOLOR') \
not in ('yes', 'true')
- # In this subprocess we want dblink._display_merge() to use
+ # In this subprocess we want mylink._display_merge() to use
# stdout/stderr directly since they are pipes. This behavior
- # is triggered when dblink._scheduler is None.
- self.dblink._scheduler = None
+ # is triggered when mylink._scheduler is None.
+ mylink._scheduler = None
rval = 1
try:
- rval = self.dblink._merge_process(self.srcroot, self.destroot,
- self.cfgfiledict, self.conf_mem_file)
+ rval = mylink.merge(self.pkgloc, self.infloc,
+ myebuild=self.myebuild, mydbapi=self.mydbapi,
+ prev_mtimes=self.prev_mtimes)
except SystemExit:
raise
except:
@@ -55,3 +98,16 @@ class MergeProcess(SpawnProcess):
# Call os._exit() from finally block, in order to suppress any
# finally blocks from earlier in the call stack. See bug #345289.
os._exit(rval)
+
+ def _unregister(self):
+ """
+ Unregister from the scheduler and close open files.
+ """
+ if self._elog_reg_id is not None:
+ self.scheduler.unregister(self._elog_reg_id)
+ self._elog_reg_id = None
+ if self._elog_reader_fd:
+ os.close(self._elog_reader_fd)
+ self._elog_reader_fd = None
+
+ super(MergeProcess, self)._unregister()
diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
index bf48b1508..66e2955a6 100644
--- a/pym/portage/dbapi/vartree.py
+++ b/pym/portage/dbapi/vartree.py
@@ -13,7 +13,8 @@ portage.proxy.lazyimport.lazyimport(globals(),
'portage.dbapi._MergeProcess:MergeProcess',
'portage.dep:dep_getkey,isjustname,match_from_list,' + \
'use_reduce,_slot_re',
- 'portage.elog:elog_process,_preload_elog_modules',
+ 'portage.elog:collect_ebuild_messages,collect_messages,' + \
+ 'elog_process,_merge_logentries,_preload_elog_modules',
'portage.locks:lockdir,unlockdir',
'portage.output:bold,colorize',
'portage.package.ebuild.doebuild:doebuild_environment,' + \
@@ -1200,7 +1201,7 @@ class dblink(object):
_file_merge_yield_interval = 20
def __init__(self, cat, pkg, myroot=None, settings=None, treetype=None,
- vartree=None, blockers=None, scheduler=None):
+ vartree=None, blockers=None, scheduler=None, pipe=None):
"""
Creates a DBlink object for a given CPV.
The given CPV may not be present in the database already.
@@ -1259,6 +1260,7 @@ class dblink(object):
self._md5_merge_map = {}
self._hash_key = (self.myroot, self.mycpv)
self._protect_obj = None
+ self._pipe = pipe
def __hash__(self):
return hash(self._hash_key)
@@ -1502,7 +1504,7 @@ class dblink(object):
continue
others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1],
settings=self.settings, vartree=self.vartree,
- treetype="vartree"))
+ treetype="vartree", pipe=self._pipe))
retval = self._security_check([self] + others_in_slot)
if retval:
@@ -1666,9 +1668,7 @@ class dblink(object):
self._eerror(ebuild_phase, msg_lines)
- # process logs created during pre/postrm
- elog_process(self.mycpv, self.settings,
- phasefilter=('prerm', 'postrm'))
+ self._elog_process()
if retval == os.EX_OK:
# myebuildpath might be None, so ensure
@@ -1764,7 +1764,7 @@ class dblink(object):
continue
others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1],
settings=self.settings,
- vartree=self.vartree, treetype="vartree"))
+ vartree=self.vartree, treetype="vartree", pipe=self._pipe))
dest_root = self._eroot
dest_root_len = len(dest_root) - 1
@@ -2784,19 +2784,34 @@ class dblink(object):
self._scheduler.dblinkElog(self,
phase, _eerror, lines)
- def _elog_subprocess(self, funcname, phase, lines):
- """
- Subprocesses call this in order to create elog messages in
- $T, for collection by the main process.
- """
- cmd = "source %s/isolated-functions.sh ; " % \
- portage._shell_quote(self.settings["PORTAGE_BIN_PATH"])
- for line in lines:
- cmd += "%s %s ; " % (funcname, portage._shell_quote(line))
- env = self.settings.environ()
- env['EBUILD_PHASE'] = phase
- subprocess.call([portage.const.BASH_BINARY, "-c", cmd],
- env=env)
+ def _elog_process(self):
+ cpv = self.mycpv
+ if self._pipe is None:
+ elog_process(cpv, self.settings)
+ else:
+ logdir = os.path.join(self.settings["T"], "logging")
+ ebuild_logentries = collect_ebuild_messages(logdir)
+ py_logentries = collect_messages(key=cpv).get(cpv, {})
+ logentries = _merge_logentries(py_logentries, ebuild_logentries)
+ funcnames = {
+ "INFO": "einfo",
+ "LOG": "elog",
+ "WARN": "ewarn",
+ "QA": "eqawarn",
+ "ERROR": "eerror"
+ }
+ buffer = []
+ for phase, messages in logentries.items():
+ for key, lines in messages:
+ funcname = funcnames[key]
+ if isinstance(lines, basestring):
+ lines = [lines]
+ for line in lines:
+ fields = (funcname, phase, cpv, line.rstrip('\n'))
+ buffer.append(' '.join(fields))
+ buffer.append('\n')
+ if buffer:
+ os.write(self._pipe, ''.join(buffer))
def treewalk(self, srcroot, destroot, inforoot, myebuild, cleanup=0,
mydbapi=None, prev_mtimes=None):
@@ -2811,7 +2826,6 @@ class dblink(object):
unmerges old version (if required)
calls doebuild(mydo=pkg_postinst)
calls env_update
- calls elog_process
@param srcroot: Typically this is ${D}
@type srcroot: String (Path)
@@ -2921,7 +2935,7 @@ class dblink(object):
others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1],
settings=config(clone=self.settings),
vartree=self.vartree, treetype="vartree",
- scheduler=self._scheduler))
+ scheduler=self._scheduler, pipe=self._pipe))
retval = self._security_check(others_in_slot)
if retval:
@@ -3069,8 +3083,6 @@ class dblink(object):
# check for package collisions
blockers = None
if self._blockers is not None:
- # This is only supposed to be called when
- # the vdb is locked, like it is here.
blockers = self._blockers()
if blockers is None:
blockers = []
@@ -3242,16 +3254,8 @@ class dblink(object):
cfgfiledict["IGNORE"] = 1
break
- merge_task = MergeProcess(
- background=(self.settings.get('PORTAGE_BACKGROUND') == '1'),
- cfgfiledict=cfgfiledict, conf_mem_file=conf_mem_file, dblink=self,
- destroot=destroot,
- logfile=self.settings.get('PORTAGE_LOG_FILE'),
- scheduler=(scheduler or PollScheduler().sched_iface),
- srcroot=srcroot)
-
- merge_task.start()
- rval = merge_task.wait()
+ rval = self._merge_contents(srcroot, destroot, cfgfiledict,
+ conf_mem_file)
if rval != os.EX_OK:
return rval
@@ -3438,7 +3442,7 @@ class dblink(object):
return backup_p
- def _merge_process(self, srcroot, destroot, cfgfiledict, conf_mem_file):
+ def _merge_contents(self, srcroot, destroot, cfgfiledict, conf_mem_file):
cfgfiledict_orig = cfgfiledict.copy()
@@ -3667,7 +3671,7 @@ class dblink(object):
msg.append(_("This file will be renamed to a different name:"))
msg.append(" '%s'" % backup_dest)
msg.append("")
- self._elog_subprocess("eerror", "preinst", msg)
+ self._eerror("preinst", msg)
if movefile(mydest, backup_dest,
mysettings=self.settings,
encoding=_encodings['merge']) is None:
@@ -3745,7 +3749,7 @@ class dblink(object):
msg.append(_("This file will be merged with a different name:"))
msg.append(" '%s'" % newdest)
msg.append("")
- self._elog_subprocess("eerror", "preinst", msg)
+ self._eerror("preinst", msg)
mydest = newdest
elif stat.S_ISREG(mydmode) or (stat.S_ISLNK(mydmode) and os.path.exists(mydest) and stat.S_ISREG(os.stat(mydest)[stat.ST_MODE])):
@@ -3929,7 +3933,7 @@ class dblink(object):
self._scheduler.dblinkEbuildPhase(
self, mydbapi, myebuild, phase)
- elog_process(self.mycpv, self.settings)
+ self._elog_process()
if 'noclean' not in self.settings.features and \
(retval == os.EX_OK or \
@@ -4029,10 +4033,16 @@ def merge(mycat, mypkg, pkgloc, infloc,
writemsg(_("Permission denied: access('%s', W_OK)\n") % settings['EROOT'],
noiselevel=-1)
return errno.EACCES
- mylink = dblink(mycat, mypkg, settings=settings, treetype=mytree,
- vartree=vartree, blockers=blockers, scheduler=scheduler)
- return mylink.merge(pkgloc, infloc, myebuild=myebuild,
- mydbapi=mydbapi, prev_mtimes=prev_mtimes)
+ background = (settings.get('PORTAGE_BACKGROUND') == '1')
+ merge_task = MergeProcess(
+ dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings,
+ treetype=mytree, vartree=vartree, scheduler=scheduler,
+ background=background, blockers=blockers, pkgloc=pkgloc,
+ infloc=infloc, myebuild=myebuild, mydbapi=mydbapi,
+ prev_mtimes=prev_mtimes, logfile=settings.get('PORTAGE_LOG_FILE'))
+ merge_task.start()
+ retcode = merge_task.wait()
+ return retcode
def unmerge(cat, pkg, myroot=None, settings=None,
mytrimworld=None, vartree=None,