diff options
-rw-r--r-- | pym/_emerge/Binpkg.py | 21 | ||||
-rw-r--r-- | pym/_emerge/EbuildBuild.py | 13 | ||||
-rw-r--r-- | pym/_emerge/EbuildMerge.py | 47 | ||||
-rw-r--r-- | pym/_emerge/MergeListItem.py | 21 | ||||
-rw-r--r-- | pym/_emerge/PackageMerge.py | 12 | ||||
-rw-r--r-- | pym/portage/dbapi/_MergeProcess.py | 82 | ||||
-rw-r--r-- | pym/portage/dbapi/vartree.py | 94 |
7 files changed, 182 insertions, 108 deletions
diff --git a/pym/_emerge/Binpkg.py b/pym/_emerge/Binpkg.py index 00587451a..62d44c48f 100644 --- a/pym/_emerge/Binpkg.py +++ b/pym/_emerge/Binpkg.py @@ -307,7 +307,7 @@ class Binpkg(CompositeTask): portage.elog.elog_process(self.pkg.cpv, self.settings) self._build_dir.unlock() - def install(self): + def install(self, handler): # This gives bashrc users an opportunity to do various things # such as remove binary packages after they're installed. @@ -320,19 +320,20 @@ class Binpkg(CompositeTask): pkg=self.pkg, pkg_count=self.pkg_count, pkg_path=self._pkg_path, scheduler=self.scheduler, settings=settings, tree=self._tree, world_atom=self.world_atom) + task = merge.create_task() + task.addExitListener(self._install_exit) + self._start_task(task, handler) - try: - retval = merge.execute() - finally: - settings.pop("PORTAGE_BINPKG_FILE", None) - self._unlock_builddir() + def _install_exit(self, task): + self.settings.pop("PORTAGE_BINPKG_FILE", None) + self._unlock_builddir() - if retval == os.EX_OK and \ - 'binpkg-logs' not in self.settings.features and \ + if self._default_final_exit(task) != os.EX_OK: + return + + if 'binpkg-logs' not in self.settings.features and \ self.settings.get("PORTAGE_LOG_FILE"): try: os.unlink(self.settings["PORTAGE_LOG_FILE"]) except OSError: pass - return retval - diff --git a/pym/_emerge/EbuildBuild.py b/pym/_emerge/EbuildBuild.py index 98ab24522..c7a5f5cdb 100644 --- a/pym/_emerge/EbuildBuild.py +++ b/pym/_emerge/EbuildBuild.py @@ -314,7 +314,7 @@ class EbuildBuild(CompositeTask): self._unlock_builddir() self.wait() - def install(self): + def install(self, exit_handler): """ Install the package and then clean up and release locks. Only call this after the build has completed successfully @@ -343,10 +343,11 @@ class EbuildBuild(CompositeTask): (pkg_count.curval, pkg_count.maxval, pkg.cpv) logger.log(msg, short_msg=short_msg) - try: - rval = merge.execute() - finally: - self._unlock_builddir() + task = merge.create_task() + task.addExitListener(self._install_exit) + self._start_task(task, exit_handler) - return rval + def _install_exit(self, task): + self._unlock_builddir() + self._default_final_exit(task) diff --git a/pym/_emerge/EbuildMerge.py b/pym/_emerge/EbuildMerge.py index d73a262b3..6a5869270 100644 --- a/pym/_emerge/EbuildMerge.py +++ b/pym/_emerge/EbuildMerge.py @@ -4,6 +4,8 @@ from _emerge.SlotObject import SlotObject import portage from portage import os +from portage.dbapi._MergeProcess import MergeProcess +from portage.dbapi.vartree import dblink class EbuildMerge(SlotObject): @@ -11,28 +13,35 @@ class EbuildMerge(SlotObject): "pkg", "pkg_count", "pkg_path", "pretend", "scheduler", "settings", "tree", "world_atom") - def execute(self): + def create_task(self): root_config = self.pkg.root_config settings = self.settings - retval = portage.merge(settings["CATEGORY"], - settings["PF"], settings["D"], - os.path.join(settings["PORTAGE_BUILDDIR"], - "build-info"), root_config.root, settings, - myebuild=settings["EBUILD"], - mytree=self.tree, mydbapi=root_config.trees[self.tree].dbapi, - vartree=root_config.trees["vartree"], - prev_mtimes=self.ldpath_mtimes, - scheduler=self.scheduler, - blockers=self.find_blockers) - - if retval == os.EX_OK: - self.world_atom(self.pkg) - self._log_success() - - return retval - - def _log_success(self): + mycat = settings["CATEGORY"] + mypkg = settings["PF"] + pkgloc = settings["D"] + infloc = os.path.join(settings["PORTAGE_BUILDDIR"], "build-info") + myroot = root_config.root + myebuild = settings["EBUILD"] + mydbapi = root_config.trees[self.tree].dbapi + vartree = root_config.trees["vartree"] + background = (settings.get('PORTAGE_BACKGROUND') == '1') + logfile = settings.get('PORTAGE_LOG_FILE') + + merge_task = MergeProcess( + dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings, + treetype=self.tree, vartree=vartree, scheduler=self.scheduler, + background=background, blockers=self.find_blockers, pkgloc=pkgloc, + infloc=infloc, myebuild=myebuild, mydbapi=mydbapi, + prev_mtimes=self.ldpath_mtimes, logfile=logfile) + merge_task.addExitListener(self._log_exit) + return merge_task + + def _log_exit(self, task): + if task.returncode != os.EX_OK: + return + pkg = self.pkg + self.world_atom(pkg) pkg_count = self.pkg_count pkg_path = self.pkg_path logger = self.logger diff --git a/pym/_emerge/MergeListItem.py b/pym/_emerge/MergeListItem.py index 1dcc1780a..768865e6b 100644 --- a/pym/_emerge/MergeListItem.py +++ b/pym/_emerge/MergeListItem.py @@ -111,7 +111,7 @@ class MergeListItem(CompositeTask): self._install_task.wait() return self.returncode - def merge(self): + def merge(self, exit_handler): pkg = self.pkg build_opts = self.build_opts @@ -135,15 +135,14 @@ class MergeListItem(CompositeTask): world_atom=world_atom) uninstall.start() - retval = uninstall.wait() - if retval != os.EX_OK: - return retval - return os.EX_OK - - if build_opts.fetchonly or \ + self.returncode = uninstall.wait() + else: + self.returncode = os.EX_OK + exit_handler(self) + elif build_opts.fetchonly or \ build_opts.buildpkgonly: - return self.returncode - - retval = self._install_task.install() - return retval + exit_handler(self) + else: + self._current_task = self._install_task + self._install_task.install(exit_handler) diff --git a/pym/_emerge/PackageMerge.py b/pym/_emerge/PackageMerge.py index 4aecf8adb..45d2e7dc6 100644 --- a/pym/_emerge/PackageMerge.py +++ b/pym/_emerge/PackageMerge.py @@ -4,11 +4,6 @@ from _emerge.AsynchronousTask import AsynchronousTask from portage.output import colorize class PackageMerge(AsynchronousTask): - """ - TODO: Implement asynchronous merge so that the scheduler can - run while a merge is executing. - """ - __slots__ = ("merge",) def _start(self): @@ -40,6 +35,9 @@ class PackageMerge(AsynchronousTask): not self.merge.build_opts.buildpkgonly: self.merge.statusMessage(msg) - self.returncode = self.merge.merge() - self.wait() + self.merge.merge(self.exit_handler) + + def exit_handler(self, task): + self.returncode = task.returncode + self._wait_hook() diff --git a/pym/portage/dbapi/_MergeProcess.py b/pym/portage/dbapi/_MergeProcess.py index f717d12df..6e63f84fd 100644 --- a/pym/portage/dbapi/_MergeProcess.py +++ b/pym/portage/dbapi/_MergeProcess.py @@ -4,30 +4,72 @@ import signal import traceback +import errno +import fcntl import portage -from portage import os +from portage import os, StringIO +import portage.elog.messages +from _emerge.PollConstants import PollConstants from _emerge.SpawnProcess import SpawnProcess class MergeProcess(SpawnProcess): """ - Merge package files in a subprocess, so the Scheduler can run in the - main thread while files are moved or copied asynchronously. + Merge packages in a subprocess, so the Scheduler can run in the main + thread while files are moved or copied asynchronously. """ - __slots__ = ('cfgfiledict', 'conf_mem_file', \ - 'destroot', 'dblink', 'srcroot',) + __slots__ = ('dblink', 'mycat', 'mypkg', 'settings', 'treetype', + 'vartree', 'scheduler', 'blockers', 'pkgloc', 'infloc', 'myebuild', + 'mydbapi', 'prev_mtimes', '_elog_reader_fd', '_elog_reg_id', + '_buf') - def _spawn(self, args, fd_pipes=None, **kwargs): + def _elog_output_handler(self, fd, event): + output = None + if event & PollConstants.POLLIN: + try: + output = os.read(fd, self._bufsize) + except OSError as e: + if e.errno not in (errno.EAGAIN, errno.EINTR): + raise + if output: + lines = output.split('\n') + if len(lines) == 1: + self._buf += lines[0] + else: + lines[0] = self._buf + lines[0] + self._buf = lines.pop() + out = StringIO() + for line in lines: + funcname, phase, key, msg = line.split(' ', 3) + reporter = getattr(portage.elog.messages, funcname) + reporter(msg, phase=phase, key=key, out=out) + + def _spawn(self, args, fd_pipes, **kwargs): """ Fork a subprocess, apply local settings, and call - dblink._merge_process(). + dblink.merge(). """ + files = self._files + elog_reader_fd, elog_writer_fd = os.pipe() + fcntl.fcntl(elog_reader_fd, fcntl.F_SETFL, + fcntl.fcntl(elog_reader_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + mylink = self.dblink(self.mycat, self.mypkg, settings=self.settings, + treetype=self.treetype, vartree=self.vartree, + blockers=self.blockers, scheduler=self.scheduler, + pipe=elog_writer_fd) + fd_pipes[elog_writer_fd] = elog_writer_fd + self._elog_reg_id = self.scheduler.register(elog_reader_fd, + self._registered_events, self._elog_output_handler) + pid = os.fork() if pid != 0: + self._elog_reader_fd = elog_reader_fd + self._buf = "" portage.process.spawned_pids.append(pid) return [pid] + os.close(elog_reader_fd) portage.process._setup_pipes(fd_pipes) # Use default signal handlers since the ones inherited @@ -35,18 +77,19 @@ class MergeProcess(SpawnProcess): signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) - portage.output.havecolor = self.dblink.settings.get('NOCOLOR') \ + portage.output.havecolor = self.settings.get('NOCOLOR') \ not in ('yes', 'true') - # In this subprocess we want dblink._display_merge() to use + # In this subprocess we want mylink._display_merge() to use # stdout/stderr directly since they are pipes. This behavior - # is triggered when dblink._scheduler is None. - self.dblink._scheduler = None + # is triggered when mylink._scheduler is None. + mylink._scheduler = None rval = 1 try: - rval = self.dblink._merge_process(self.srcroot, self.destroot, - self.cfgfiledict, self.conf_mem_file) + rval = mylink.merge(self.pkgloc, self.infloc, + myebuild=self.myebuild, mydbapi=self.mydbapi, + prev_mtimes=self.prev_mtimes) except SystemExit: raise except: @@ -55,3 +98,16 @@ class MergeProcess(SpawnProcess): # Call os._exit() from finally block, in order to suppress any # finally blocks from earlier in the call stack. See bug #345289. os._exit(rval) + + def _unregister(self): + """ + Unregister from the scheduler and close open files. + """ + if self._elog_reg_id is not None: + self.scheduler.unregister(self._elog_reg_id) + self._elog_reg_id = None + if self._elog_reader_fd: + os.close(self._elog_reader_fd) + self._elog_reader_fd = None + + super(MergeProcess, self)._unregister() diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py index bf48b1508..66e2955a6 100644 --- a/pym/portage/dbapi/vartree.py +++ b/pym/portage/dbapi/vartree.py @@ -13,7 +13,8 @@ portage.proxy.lazyimport.lazyimport(globals(), 'portage.dbapi._MergeProcess:MergeProcess', 'portage.dep:dep_getkey,isjustname,match_from_list,' + \ 'use_reduce,_slot_re', - 'portage.elog:elog_process,_preload_elog_modules', + 'portage.elog:collect_ebuild_messages,collect_messages,' + \ + 'elog_process,_merge_logentries,_preload_elog_modules', 'portage.locks:lockdir,unlockdir', 'portage.output:bold,colorize', 'portage.package.ebuild.doebuild:doebuild_environment,' + \ @@ -1200,7 +1201,7 @@ class dblink(object): _file_merge_yield_interval = 20 def __init__(self, cat, pkg, myroot=None, settings=None, treetype=None, - vartree=None, blockers=None, scheduler=None): + vartree=None, blockers=None, scheduler=None, pipe=None): """ Creates a DBlink object for a given CPV. The given CPV may not be present in the database already. @@ -1259,6 +1260,7 @@ class dblink(object): self._md5_merge_map = {} self._hash_key = (self.myroot, self.mycpv) self._protect_obj = None + self._pipe = pipe def __hash__(self): return hash(self._hash_key) @@ -1502,7 +1504,7 @@ class dblink(object): continue others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1], settings=self.settings, vartree=self.vartree, - treetype="vartree")) + treetype="vartree", pipe=self._pipe)) retval = self._security_check([self] + others_in_slot) if retval: @@ -1666,9 +1668,7 @@ class dblink(object): self._eerror(ebuild_phase, msg_lines) - # process logs created during pre/postrm - elog_process(self.mycpv, self.settings, - phasefilter=('prerm', 'postrm')) + self._elog_process() if retval == os.EX_OK: # myebuildpath might be None, so ensure @@ -1764,7 +1764,7 @@ class dblink(object): continue others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1], settings=self.settings, - vartree=self.vartree, treetype="vartree")) + vartree=self.vartree, treetype="vartree", pipe=self._pipe)) dest_root = self._eroot dest_root_len = len(dest_root) - 1 @@ -2784,19 +2784,34 @@ class dblink(object): self._scheduler.dblinkElog(self, phase, _eerror, lines) - def _elog_subprocess(self, funcname, phase, lines): - """ - Subprocesses call this in order to create elog messages in - $T, for collection by the main process. - """ - cmd = "source %s/isolated-functions.sh ; " % \ - portage._shell_quote(self.settings["PORTAGE_BIN_PATH"]) - for line in lines: - cmd += "%s %s ; " % (funcname, portage._shell_quote(line)) - env = self.settings.environ() - env['EBUILD_PHASE'] = phase - subprocess.call([portage.const.BASH_BINARY, "-c", cmd], - env=env) + def _elog_process(self): + cpv = self.mycpv + if self._pipe is None: + elog_process(cpv, self.settings) + else: + logdir = os.path.join(self.settings["T"], "logging") + ebuild_logentries = collect_ebuild_messages(logdir) + py_logentries = collect_messages(key=cpv).get(cpv, {}) + logentries = _merge_logentries(py_logentries, ebuild_logentries) + funcnames = { + "INFO": "einfo", + "LOG": "elog", + "WARN": "ewarn", + "QA": "eqawarn", + "ERROR": "eerror" + } + buffer = [] + for phase, messages in logentries.items(): + for key, lines in messages: + funcname = funcnames[key] + if isinstance(lines, basestring): + lines = [lines] + for line in lines: + fields = (funcname, phase, cpv, line.rstrip('\n')) + buffer.append(' '.join(fields)) + buffer.append('\n') + if buffer: + os.write(self._pipe, ''.join(buffer)) def treewalk(self, srcroot, destroot, inforoot, myebuild, cleanup=0, mydbapi=None, prev_mtimes=None): @@ -2811,7 +2826,6 @@ class dblink(object): unmerges old version (if required) calls doebuild(mydo=pkg_postinst) calls env_update - calls elog_process @param srcroot: Typically this is ${D} @type srcroot: String (Path) @@ -2921,7 +2935,7 @@ class dblink(object): others_in_slot.append(dblink(self.cat, catsplit(cur_cpv)[1], settings=config(clone=self.settings), vartree=self.vartree, treetype="vartree", - scheduler=self._scheduler)) + scheduler=self._scheduler, pipe=self._pipe)) retval = self._security_check(others_in_slot) if retval: @@ -3069,8 +3083,6 @@ class dblink(object): # check for package collisions blockers = None if self._blockers is not None: - # This is only supposed to be called when - # the vdb is locked, like it is here. blockers = self._blockers() if blockers is None: blockers = [] @@ -3242,16 +3254,8 @@ class dblink(object): cfgfiledict["IGNORE"] = 1 break - merge_task = MergeProcess( - background=(self.settings.get('PORTAGE_BACKGROUND') == '1'), - cfgfiledict=cfgfiledict, conf_mem_file=conf_mem_file, dblink=self, - destroot=destroot, - logfile=self.settings.get('PORTAGE_LOG_FILE'), - scheduler=(scheduler or PollScheduler().sched_iface), - srcroot=srcroot) - - merge_task.start() - rval = merge_task.wait() + rval = self._merge_contents(srcroot, destroot, cfgfiledict, + conf_mem_file) if rval != os.EX_OK: return rval @@ -3438,7 +3442,7 @@ class dblink(object): return backup_p - def _merge_process(self, srcroot, destroot, cfgfiledict, conf_mem_file): + def _merge_contents(self, srcroot, destroot, cfgfiledict, conf_mem_file): cfgfiledict_orig = cfgfiledict.copy() @@ -3667,7 +3671,7 @@ class dblink(object): msg.append(_("This file will be renamed to a different name:")) msg.append(" '%s'" % backup_dest) msg.append("") - self._elog_subprocess("eerror", "preinst", msg) + self._eerror("preinst", msg) if movefile(mydest, backup_dest, mysettings=self.settings, encoding=_encodings['merge']) is None: @@ -3745,7 +3749,7 @@ class dblink(object): msg.append(_("This file will be merged with a different name:")) msg.append(" '%s'" % newdest) msg.append("") - self._elog_subprocess("eerror", "preinst", msg) + self._eerror("preinst", msg) mydest = newdest elif stat.S_ISREG(mydmode) or (stat.S_ISLNK(mydmode) and os.path.exists(mydest) and stat.S_ISREG(os.stat(mydest)[stat.ST_MODE])): @@ -3929,7 +3933,7 @@ class dblink(object): self._scheduler.dblinkEbuildPhase( self, mydbapi, myebuild, phase) - elog_process(self.mycpv, self.settings) + self._elog_process() if 'noclean' not in self.settings.features and \ (retval == os.EX_OK or \ @@ -4029,10 +4033,16 @@ def merge(mycat, mypkg, pkgloc, infloc, writemsg(_("Permission denied: access('%s', W_OK)\n") % settings['EROOT'], noiselevel=-1) return errno.EACCES - mylink = dblink(mycat, mypkg, settings=settings, treetype=mytree, - vartree=vartree, blockers=blockers, scheduler=scheduler) - return mylink.merge(pkgloc, infloc, myebuild=myebuild, - mydbapi=mydbapi, prev_mtimes=prev_mtimes) + background = (settings.get('PORTAGE_BACKGROUND') == '1') + merge_task = MergeProcess( + dblink=dblink, mycat=mycat, mypkg=mypkg, settings=settings, + treetype=mytree, vartree=vartree, scheduler=scheduler, + background=background, blockers=blockers, pkgloc=pkgloc, + infloc=infloc, myebuild=myebuild, mydbapi=mydbapi, + prev_mtimes=prev_mtimes, logfile=settings.get('PORTAGE_LOG_FILE')) + merge_task.start() + retcode = merge_task.wait() + return retcode def unmerge(cat, pkg, myroot=None, settings=None, mytrimworld=None, vartree=None, |