summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pym/_emerge/Binpkg.py21
-rw-r--r--pym/_emerge/EbuildBuild.py13
-rw-r--r--pym/_emerge/EbuildMerge.py47
-rw-r--r--pym/_emerge/MergeListItem.py21
-rw-r--r--pym/_emerge/PackageMerge.py12
-rw-r--r--pym/portage/dbapi/_MergeProcess.py82
-rw-r--r--pym/portage/dbapi/vartree.py94
7 files changed, 182 insertions, 108 deletions
diff --git a/pym/_emerge/Binpkg.py b/pym/_emerge/Binpkg.py
index 00587451a..62d44c48f 100644
--- a/pym/_emerge/Binpkg.py
+++ b/pym/_emerge/Binpkg.py
@@ -307,7 +307,7 @@ class Binpkg(CompositeTask):
portage.elog.elog_process(self.pkg.cpv, self.settings)
self._build_dir.unlock()
- def install(self):
+ def install(self, handler):
# This gives bashrc users an opportunity to do various things
# such as remove binary packages after they're installed.
@@ -320,19 +320,20 @@ class Binpkg(CompositeTask):
pkg=self.pkg, pkg_count=self.pkg_count,
pkg_path=self._pkg_path, scheduler=self.scheduler,
settings=settings, tree=self._tree, world_atom=self.world_atom)
+ task = merge.create_task()
+ task.addExitListener(self._install_exit)
+ self._start_task(task, handler)
- try:
- retval = merge.execute()
- finally:
- settings.pop("PORTAGE_BINPKG_FILE", None)
- self._unlock_builddir()
+ def _install_exit(self, task):
+ self.settings.pop("PORTAGE_BINPKG_FILE", None)
+ self._unlock_builddir()
- if retval == os.EX_OK and \
- 'binpkg-logs' not in self.settings.features and \
+ if self._default_final_exit(task) != os.EX_OK:
+ return
+
+ if 'binpkg-logs' not in self.settings.features and \
self.settings.get("PORTAGE_LOG_FILE"):
try:
os.unlink(self.settings["PORTAGE_LOG_FILE"])
except OSError:
pass
- return retval
-
diff --git a/pym/_emerge/EbuildBuild.py b/pym/_emerge/EbuildBuild.py
index 98ab24522..c7a5f5cdb 100644
--- a/pym/_emerge/EbuildBuild.py
+++ b/pym/_emerge/EbuildBuild.py
@@ -314,7 +314,7 @@ class EbuildBuild(CompositeTask):
self._unlock_builddir()
self.wait()
- def install(self):
+ def install(self, exit_handler):
"""
Install the package and then clean up and release locks.
Only call this after the build has completed successfully
@@ -343,10 +343,11 @@ class EbuildBuild(CompositeTask):
(pkg_count.curval, pkg_count.maxval, pkg.cpv)
logger.log(msg, short_msg=short_msg)
- try:
- rval = merge.execute()
- finally:
- self._unlock_builddir()
+ task = merge.create_task()
+ task.addExitListener(self._install_exit)
+ self._start_task(task, exit_handler)
- return rval
+ def _install_exit(self, task):
+ self._unlock_builddir()
+ self._default_final_exit(task)
diff --git a/pym/_emerge/EbuildMerge.py b/pym/_emerge/EbuildMerge.py
index d73a262b3..6a5869270 100644
--- a/pym/_emerge/EbuildMerge.py
+++ b/pym/_emerge/EbuildMerge.py
@@ -4,6 +4,8 @@
from _emerge.SlotObject import SlotObject
import portage
from portage import os
+from portage.dbapi._MergeProcess import MergeProcess
+from portage.dbapi.vartree import dblink
class EbuildMerge(SlotObject):
@@ -11,28 +13,35 @@ class EbuildMerge(SlotObject):
"pkg", "pkg_count", "pkg_path", "pretend",
"scheduler", "settings", "tree", "world_atom")
- def execute(self):
+ def create_task(self):
root_config = self.pkg.root_config
settings = self.settings
- retval = portage.merge(settings["CATEGORY"],
- settings["PF"], settings["D"],
- os.path.join(settings["PORTAGE_BUILDDIR"],
- "build-info"), root_config.root, settings,
- myebuild=settings["EBUILD"],
- mytree=self.tree, mydbapi=root_config.trees[self.tree].dbapi,
- vartree=root_config.trees["vartree"],
- prev_mtimes=self.ldpath_mtimes,
- scheduler=self.scheduler,
- blockers=self.find_blockers)
-
- if retval == os.EX_OK:
- self.world_atom(self.pkg)
- self._log_success()
-
- return retval
-
- def _log_success(self):
+ mycat = settings["CATEGORY"]
+ mypkg = settings["PF"]
+ pkgloc = settings["D"]
+ infloc = os.path.join(settings["PORTAGE_BUILDDIR"], "build-info")
+ myroot = root_config.root
+ myebuild = settings["EBUILD"]
+ mydbapi = root_config.trees[self.tree].dbapi
+ vartree = root_config.trees["vartree"]
+ background = (settings.get('PORTAGE_BACKGROUND') == '1')
+ logfile = settings.get('PORTAGE_LOG_FILE')
+
+ merge_task = MergeProcess(
+ dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings,
+ treetype=self.tree, vartree=vartree, scheduler=self.scheduler,
+ background=background, blockers=self.find_blockers, pkgloc=pkgloc,
+ infloc=infloc, myebuild=myebuild, mydbapi=mydbapi,
+ prev_mtimes=self.ldpath_mtimes, logfile=logfile)
+ merge_task.addExitListener(self._log_exit)
+ return merge_task
+
+ def _log_exit(self, task):
+ if task.returncode != os.EX_OK:
+ return
+
pkg = self.pkg
+ self.world_atom(pkg)
pkg_count = self.pkg_count
pkg_path = self.pkg_path
logger = self.logger
diff --git a/pym/_emerge/MergeListItem.py b/pym/_emerge/MergeListItem.py
index 1dcc1780a..768865e6b 100644
--- a/pym/_emerge/MergeListItem.py
+++ b/pym/_emerge/MergeListItem.py
@@ -111,7 +111,7 @@ class MergeListItem(CompositeTask):
self._install_task.wait()
return self.returncode
- def merge(self):
+ def merge(self, exit_handler):
pkg = self.pkg
build_opts = self.build_opts
@@ -135,15 +135,14 @@ class MergeListItem(CompositeTask):
world_atom=world_atom)
uninstall.start()
- retval = uninstall.wait()
- if retval != os.EX_OK:
- return retval
- return os.EX_OK
-
- if build_opts.fetchonly or \
+ self.returncode = uninstall.wait()
+ else:
+ self.returncode = os.EX_OK
+ exit_handler(self)
+ elif build_opts.fetchonly or \
build_opts.buildpkgonly:
- return self.returncode
-
- retval = self._install_task.install()
- return retval
+ exit_handler(self)
+ else:
+ self._current_task = self._install_task
+ self._install_task.install(exit_handler)
diff --git a/pym/_emerge/PackageMerge.py b/pym/_emerge/PackageMerge.py
index 4aecf8adb..45d2e7dc6 100644
--- a/pym/_emerge/PackageMerge.py
+++ b/pym/_emerge/PackageMerge.py
@@ -4,11 +4,6 @@
from _emerge.AsynchronousTask import AsynchronousTask
from portage.output import colorize
class PackageMerge(AsynchronousTask):
- """
- TODO: Implement asynchronous merge so that the scheduler can
- run while a merge is executing.
- """
-
__slots__ = ("merge",)
def _start(self):
@@ -40,6 +35,9 @@ class PackageMerge(AsynchronousTask):
not self.merge.build_opts.buildpkgonly:
self.merge.statusMessage(msg)
- self.returncode = self.merge.merge()
- self.wait()
+ self.merge.merge(self.exit_handler)
+
+ def exit_handler(self, task):
+ self.returncode = task.returncode
+ self._wait_hook()
diff --git a/pym/portage/dbapi/_MergeProcess.py b/pym/portage/dbapi/_MergeProcess.py
index f717d12df..6e63f84fd 100644
--- a/pym/portage/dbapi/_MergeProcess.py
+++ b/pym/portage/dbapi/_MergeProcess.py
@@ -4,30 +4,72 @@
import signal
import traceback
+import errno
+import fcntl
import portage
-from portage import os
+from portage import os, StringIO
+import portage.elog.messages
+from _emerge.PollConstants import PollConstants
from _emerge.SpawnProcess import SpawnProcess
class MergeProcess(SpawnProcess):
"""
- Merge package files in a subprocess, so the Scheduler can run in the
- main thread while files are moved or copied asynchronously.
+ Merge packages in a subprocess, so the Scheduler can run in the main
+ thread while files are moved or copied asynchronously.
"""
- __slots__ = ('cfgfiledict', 'conf_mem_file', \
- 'destroot', 'dblink', 'srcroot',)
+ __slots__ = ('dblink', 'mycat', 'mypkg', 'settings', 'treetype',
+ 'vartree', 'scheduler', 'blockers', 'pkgloc', 'infloc', 'myebuild',
+ 'mydbapi', 'prev_mtimes', '_elog_reader_fd', '_elog_reg_id',
+ '_buf')
- def _spawn(self, args, fd_pipes=None, **kwargs):
+ def _elog_output_handler(self, fd, event):
+ output = None
+ if event & PollConstants.POLLIN:
+ try:
+ output = os.read(fd, self._bufsize)
+ except OSError as e:
+ if e.errno not in (errno.EAGAIN, errno.EINTR):
+ raise
+ if output:
+ lines = output.split('\n')
+ if len(lines) == 1:
+ self._buf += lines[0]
+ else:
+ lines[0] = self._buf + lines[0]
+ self._buf = lines.pop()
+ out = StringIO()
+ for line in lines:
+ funcname, phase, key, msg = line.split(' ', 3)
+ reporter = getattr(portage.elog.messages, funcname)
+ reporter(msg, phase=phase, key=key, out=out)
+
+ def _spawn(self, args, fd_pipes, **kwargs):
"""
Fork a subprocess, apply local settings, and call
- dblink._merge_process().
+ dblink.merge().
"""
+ files = self._files
+ elog_reader_fd, elog_writer_fd = os.pipe()
+ fcntl.fcntl(elog_reader_fd, fcntl.F_SETFL,
+ fcntl.fcntl(elog_reader_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+ mylink = self.dblink(self.mycat, self.mypkg, settings=self.settings,
+ treetype=self.treetype, vartree=self.vartree,
+ blockers=self.blockers, scheduler=self.scheduler,
+ pipe=elog_writer_fd)
+ fd_pipes[elog_writer_fd] = elog_writer_fd
+ self._elog_reg_id = self.scheduler.register(elog_reader_fd,
+ self._registered_events, self._elog_output_handler)
+
pid = os.fork()
if pid != 0:
+ self._elog_reader_fd = elog_reader_fd
+ self._buf = ""
portage.process.spawned_pids.append(pid)
return [pid]
+ os.close(elog_reader_fd)
portage.process._setup_pipes(fd_pipes)
# Use default signal handlers since the ones inherited
@@ -35,18 +77,19 @@ class MergeProcess(SpawnProcess):
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
- portage.output.havecolor = self.dblink.settings.get('NOCOLOR') \
+ portage.output.havecolor = self.settings.get('NOCOLOR') \
not in ('yes', 'true')
- # In this subprocess we want dblink._display_merge() to use
+ # In this subprocess we want mylink._display_merge() to use
# stdout/stderr directly since they are pipes. This behavior
- # is triggered when dblink._scheduler is None.
- self.dblink._scheduler = None
+ # is triggered when mylink._scheduler is None.
+ mylink._scheduler = None
rval = 1
try:
- rval = self.dblink._merge_process(self.srcroot, self.destroot,
- self.cfgfiledict, self.conf_mem_file)
+ rval = mylink.merge(self.pkgloc, self.infloc,
+ myebuild=self.myebuild, mydbapi=self.mydbapi,
+ prev_mtimes=self.prev_mtimes)
except SystemExit:
raise
except:
@@ -55,3 +98,16 @@ class MergeProcess(SpawnProcess):
# Call os._exit() from finally block, in order to suppress any
# finally blocks from earlier in the call stack. See bug #345289.
os._exit(rval)
+
+ def _unregister(self):
+ """
+ Unregister from the scheduler and close open files.
+ """
+ if self._elog_reg_id is not None:
+ self.scheduler.unregister(self._elog_reg_id)
+ self._elog_reg_id = None
+ if self._elog_reader_fd:
+ os.close(self._elog_reader_fd)
+ self._elog_reader_fd = None
+
+ super(MergeProcess, self)._unregister()
diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
index bf48b1508..66e2955a6 100644
--- a/pym/portage/dbapi/vartree.py
+++ b/pym/portage/dbapi/vartree.py
@@ -13,7 +13,8 @@ portage.proxy.lazyimport.lazyimport(globals(),
'portage.dbapi._MergeProcess:MergeProcess',
'portage.dep:dep_getkey,isjustname,match_from_list,' + \
'use_reduce,_slot_re',
- 'portage.elog:elog_process,_preload_elog_modules',
+ 'portage.elog:collect_ebuild_messages,collect_messages,' + \
+ 'elog_process,_merge_logentries,_preload_elog_modules',
'portage.locks:lockdir,unlockdir',
'portage.output:bold,colorize',
'portage.package.ebuild.doebuild:doebuild_environment,' + \
@@ -1200,7 +1201,7 @@ class dblink(object):
_file_merge_yield_interval = 20
def __init__(self, cat, pkg, myroot=None, settings=None, treetype=None,
- vartree=None, blockers=None, scheduler=None):
+ vartree=None, blockers=None, scheduler=None, pipe=None):
"""
Creates a DBlink object for a given CPV.
The given CPV may not be present in the database already.
@@ -1259,6 +1260,7 @@ class dblink(object):
self._md5_merge_map = {}
self._hash_key = (self.myroot, self.mycpv)
self._protect_obj = None
+ self._pipe = pipe
def __hash__(self):
return hash(self._hash_key)
@@ -1502,7 +1504,7 @@ class dblink(object):
continue
others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1],
settings=self.settings, vartree=self.vartree,
- treetype="vartree"))
+ treetype="vartree", pipe=self._pipe))
retval = self._security_check([self] + others_in_slot)
if retval:
@@ -1666,9 +1668,7 @@ class dblink(object):
self._eerror(ebuild_phase, msg_lines)
- # process logs created during pre/postrm
- elog_process(self.mycpv, self.settings,
- phasefilter=('prerm', 'postrm'))
+ self._elog_process()
if retval == os.EX_OK:
# myebuildpath might be None, so ensure
@@ -1764,7 +1764,7 @@ class dblink(object):
continue
others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1],
settings=self.settings,
- vartree=self.vartree, treetype="vartree"))
+ vartree=self.vartree, treetype="vartree", pipe=self._pipe))
dest_root = self._eroot
dest_root_len = len(dest_root) - 1
@@ -2784,19 +2784,34 @@ class dblink(object):
self._scheduler.dblinkElog(self,
phase, _eerror, lines)
- def _elog_subprocess(self, funcname, phase, lines):
- """
- Subprocesses call this in order to create elog messages in
- $T, for collection by the main process.
- """
- cmd = "source %s/isolated-functions.sh ; " % \
- portage._shell_quote(self.settings["PORTAGE_BIN_PATH"])
- for line in lines:
- cmd += "%s %s ; " % (funcname, portage._shell_quote(line))
- env = self.settings.environ()
- env['EBUILD_PHASE'] = phase
- subprocess.call([portage.const.BASH_BINARY, "-c", cmd],
- env=env)
+ def _elog_process(self):
+ cpv = self.mycpv
+ if self._pipe is None:
+ elog_process(cpv, self.settings)
+ else:
+ logdir = os.path.join(self.settings["T"], "logging")
+ ebuild_logentries = collect_ebuild_messages(logdir)
+ py_logentries = collect_messages(key=cpv).get(cpv, {})
+ logentries = _merge_logentries(py_logentries, ebuild_logentries)
+ funcnames = {
+ "INFO": "einfo",
+ "LOG": "elog",
+ "WARN": "ewarn",
+ "QA": "eqawarn",
+ "ERROR": "eerror"
+ }
+ buffer = []
+ for phase, messages in logentries.items():
+ for key, lines in messages:
+ funcname = funcnames[key]
+ if isinstance(lines, basestring):
+ lines = [lines]
+ for line in lines:
+ fields = (funcname, phase, cpv, line.rstrip('\n'))
+ buffer.append(' '.join(fields))
+ buffer.append('\n')
+ if buffer:
+ os.write(self._pipe, ''.join(buffer))
def treewalk(self, srcroot, destroot, inforoot, myebuild, cleanup=0,
mydbapi=None, prev_mtimes=None):
@@ -2811,7 +2826,6 @@ class dblink(object):
unmerges old version (if required)
calls doebuild(mydo=pkg_postinst)
calls env_update
- calls elog_process
@param srcroot: Typically this is ${D}
@type srcroot: String (Path)
@@ -2921,7 +2935,7 @@ class dblink(object):
others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1],
settings=config(clone=self.settings),
vartree=self.vartree, treetype="vartree",
- scheduler=self._scheduler))
+ scheduler=self._scheduler, pipe=self._pipe))
retval = self._security_check(others_in_slot)
if retval:
@@ -3069,8 +3083,6 @@ class dblink(object):
# check for package collisions
blockers = None
if self._blockers is not None:
- # This is only supposed to be called when
- # the vdb is locked, like it is here.
blockers = self._blockers()
if blockers is None:
blockers = []
@@ -3242,16 +3254,8 @@ class dblink(object):
cfgfiledict["IGNORE"] = 1
break
- merge_task = MergeProcess(
- background=(self.settings.get('PORTAGE_BACKGROUND') == '1'),
- cfgfiledict=cfgfiledict, conf_mem_file=conf_mem_file, dblink=self,
- destroot=destroot,
- logfile=self.settings.get('PORTAGE_LOG_FILE'),
- scheduler=(scheduler or PollScheduler().sched_iface),
- srcroot=srcroot)
-
- merge_task.start()
- rval = merge_task.wait()
+ rval = self._merge_contents(srcroot, destroot, cfgfiledict,
+ conf_mem_file)
if rval != os.EX_OK:
return rval
@@ -3438,7 +3442,7 @@ class dblink(object):
return backup_p
- def _merge_process(self, srcroot, destroot, cfgfiledict, conf_mem_file):
+ def _merge_contents(self, srcroot, destroot, cfgfiledict, conf_mem_file):
cfgfiledict_orig = cfgfiledict.copy()
@@ -3667,7 +3671,7 @@ class dblink(object):
msg.append(_("This file will be renamed to a different name:"))
msg.append(" '%s'" % backup_dest)
msg.append("")
- self._elog_subprocess("eerror", "preinst", msg)
+ self._eerror("preinst", msg)
if movefile(mydest, backup_dest,
mysettings=self.settings,
encoding=_encodings['merge']) is None:
@@ -3745,7 +3749,7 @@ class dblink(object):
msg.append(_("This file will be merged with a different name:"))
msg.append(" '%s'" % newdest)
msg.append("")
- self._elog_subprocess("eerror", "preinst", msg)
+ self._eerror("preinst", msg)
mydest = newdest
elif stat.S_ISREG(mydmode) or (stat.S_ISLNK(mydmode) and os.path.exists(mydest) and stat.S_ISREG(os.stat(mydest)[stat.ST_MODE])):
@@ -3929,7 +3933,7 @@ class dblink(object):
self._scheduler.dblinkEbuildPhase(
self, mydbapi, myebuild, phase)
- elog_process(self.mycpv, self.settings)
+ self._elog_process()
if 'noclean' not in self.settings.features and \
(retval == os.EX_OK or \
@@ -4029,10 +4033,16 @@ def merge(mycat, mypkg, pkgloc, infloc,
writemsg(_("Permission denied: access('%s', W_OK)\n") % settings['EROOT'],
noiselevel=-1)
return errno.EACCES
- mylink = dblink(mycat, mypkg, settings=settings, treetype=mytree,
- vartree=vartree, blockers=blockers, scheduler=scheduler)
- return mylink.merge(pkgloc, infloc, myebuild=myebuild,
- mydbapi=mydbapi, prev_mtimes=prev_mtimes)
+ background = (settings.get('PORTAGE_BACKGROUND') == '1')
+ merge_task = MergeProcess(
+ dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings,
+ treetype=mytree, vartree=vartree, scheduler=scheduler,
+ background=background, blockers=blockers, pkgloc=pkgloc,
+ infloc=infloc, myebuild=myebuild, mydbapi=mydbapi,
+ prev_mtimes=prev_mtimes, logfile=settings.get('PORTAGE_LOG_FILE'))
+ merge_task.start()
+ retcode = merge_task.wait()
+ return retcode
def unmerge(cat, pkg, myroot=None, settings=None,
mytrimworld=None, vartree=None,