diff options
-rw-r--r-- | pym/_emerge/__init__.py | 461 | ||||
-rw-r--r-- | pym/portage/__init__.py | 7 | ||||
-rw-r--r-- | pym/portage/dbapi/porttree.py | 168 |
3 files changed, 434 insertions, 202 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 0cd94208d..7f122bb4d 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -38,7 +38,7 @@ except ImportError: from os import path as osp sys.path.insert(0, osp.join(osp.dirname(osp.dirname(osp.realpath(__file__))), "pym")) import portage -portage._disable_legacy_globals() + from portage import digraph, portdbapi from portage.const import NEWS_LIB_PATH, CACHE_PATH, PRIVATE_PATH, USER_CONFIG_PATH, GLOBAL_CONFIG_PATH @@ -2292,6 +2292,95 @@ class EbuildExecuter(CompositeTask): self._start_task(ebuild_phases, self._default_final_exit) +class EbuildMetadataPhase(SubProcess): + + """ + Asynchronous interface for the ebuild "depend" phase which is + used to extract metadata from the ebuild. + """ + + __slots__ = ("cpv", "ebuild_path", "fd_pipes", "metadata_callback", + "ebuild_mtime", "portdb", "repo_path", "settings") + \ + ("files", "_raw_metadata") + + _file_names = ("ebuild",) + _files_dict = slot_dict_class(_file_names, prefix="") + _bufsize = SpawnProcess._bufsize + _metadata_fd = 9 + + def start(self): + settings = self.settings + ebuild_path = self.ebuild_path + debug = settings.get("PORTAGE_DEBUG") == "1" + master_fd = None + slave_fd = None + fd_pipes = None + if self.fd_pipes is not None: + fd_pipes = self.fd_pipes.copy() + else: + fd_pipes = {} + + fd_pipes.setdefault(0, sys.stdin.fileno()) + fd_pipes.setdefault(1, sys.stdout.fileno()) + fd_pipes.setdefault(2, sys.stderr.fileno()) + + # flush any pending output + for fd in fd_pipes.itervalues(): + if fd == sys.stdout.fileno(): + sys.stdout.flush() + if fd == sys.stderr.fileno(): + sys.stderr.flush() + + fd_pipes_orig = fd_pipes.copy() + self.files = self._files_dict() + files = self.files + + master_fd, slave_fd = os.pipe() + fcntl.fcntl(master_fd, fcntl.F_SETFL, + fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + + fd_pipes[self._metadata_fd] = slave_fd + + retval = portage.doebuild(ebuild_path, "depend", + settings["ROOT"], settings, debug, + mydbapi=self.portdb, tree="porttree", + fd_pipes=fd_pipes, returnpid=True) + + os.close(slave_fd) + + if isinstance(retval, int): + # doebuild failed before spawning + os.close(master_fd) + self.returncode = retval + self.wait() + return + + self.pid = retval[0] + portage.process.spawned_pids.remove(self.pid) + + self._raw_metadata = [] + files.ebuild = os.fdopen(master_fd, 'r') + self._reg_id = self.scheduler.register(files.ebuild.fileno(), + PollConstants.POLLIN, self._output_handler) + self.registered = True + + def _output_handler(self, fd, event): + files = self.files + self._raw_metadata.append(files.ebuild.read()) + if not self._raw_metadata[-1]: + for f in files.values(): + f.close() + self.registered = False + self._wait() + + if self.returncode == os.EX_OK: + metadata = izip(portage.auxdbkeys, + "".join(self._raw_metadata).splitlines()) + self.metadata_callback(self.cpv, self.ebuild_path, + self.repo_path, metadata, self.ebuild_mtime) + + return self.registered + class EbuildPhase(SubProcess): __slots__ = ("fd_pipes", "phase", "pkg", @@ -2387,6 +2476,15 @@ class EbuildPhase(SubProcess): mydbapi=mydbapi, tree=tree, fd_pipes=fd_pipes, returnpid=True) + os.close(slave_fd) + + if isinstance(retval, int): + # doebuild failed before spawning + os.close(master_fd) + self.returncode = retval + self.wait() + return + self.pid = retval[0] portage.process.spawned_pids.remove(self.pid) @@ -2398,7 +2496,6 @@ class EbuildPhase(SubProcess): else: output_handler = self._dummy_handler - os.close(slave_fd) files.ebuild = os.fdopen(master_fd, 'r') self._reg_id = self.scheduler.register(files.ebuild.fileno(), PollConstants.POLLIN, output_handler) @@ -7680,7 +7777,118 @@ class SequentialTaskQueue(SlotObject): def __len__(self): return len(self._task_queue) + len(self.running_tasks) -class Scheduler(object): +class PollLoop(object): + + def __init__(self): + self._max_jobs = 1 + self._max_load = None + self._jobs = 0 + self._poll_event_handlers = {} + self._poll_event_handler_ids = {} + # Increment id for each new handler. + self._event_handler_id = 0 + try: + self._poll = select.poll() + except AttributeError: + self._poll = PollSelectAdapter() + + def _can_add_job(self): + jobs = self._jobs + max_jobs = self._max_jobs + max_load = self._max_load + + if self._jobs >= self._max_jobs: + return False + + if max_load is not None and max_jobs > 1 and self._jobs > 1: + try: + avg1, avg5, avg15 = os.getloadavg() + except OSError, e: + writemsg("!!! getloadavg() failed: %s\n" % (e,), + noiselevel=-1) + del e + return False + + if avg1 >= max_load: + return False + + return True + + def _register(self, f, eventmask, handler): + """ + @rtype: Integer + @return: A unique registration id, for use in schedule() or + unregister() calls. + """ + self._event_handler_id += 1 + reg_id = self._event_handler_id + self._poll_event_handler_ids[reg_id] = f + self._poll_event_handlers[f] = (handler, reg_id) + self._poll.register(f, eventmask) + return reg_id + + def _unregister(self, reg_id): + f = self._poll_event_handler_ids[reg_id] + self._poll.unregister(f) + del self._poll_event_handlers[f] + del self._poll_event_handler_ids[reg_id] + self._schedule_tasks() + + def _schedule(self, wait_id): + """ + Schedule until wait_id is not longer registered + for poll() events. + @type wait_id: int + @param wait_id: a task id to wait for + """ + event_handlers = self._poll_event_handlers + handler_ids = self._poll_event_handler_ids + poll = self._poll.poll + + self._schedule_tasks() + + while wait_id in handler_ids: + for f, event in poll(): + handler, reg_id = event_handlers[f] + if not handler(f, event): + self._unregister(reg_id) + + def _schedule_tasks(self): + return False + + def _schedule_main(self, wait=False): + + event_handlers = self._poll_event_handlers + poll = self._poll.poll + max_jobs = self._max_jobs + + state_change = 0 + + if self._schedule_tasks(): + state_change += 1 + + while event_handlers: + jobs = self._jobs + + for f, event in poll(): + handler, reg_id = event_handlers[f] + if not handler(f, event): + state_change += 1 + self._unregister(reg_id) + + if jobs == self._jobs: + continue + + if self._schedule_tasks(): + state_change += 1 + + if not wait and self._jobs < max_jobs: + break + + if not state_change: + raise AssertionError("tight loop") + +class Scheduler(PollLoop): _opts_ignore_blockers = \ frozenset(["--buildpkgonly", @@ -7722,6 +7930,7 @@ class Scheduler(object): def __init__(self, settings, trees, mtimedb, myopts, spinner, mergelist, favorites, digraph): + PollLoop.__init__(self) self.settings = settings self.target_root = settings["ROOT"] self.trees = trees @@ -7758,15 +7967,6 @@ class Scheduler(object): self._sched_iface = self._iface_class( fetch=fetch_iface, register=self._register, schedule=self._schedule) - self._poll_event_handlers = {} - self._poll_event_handler_ids = {} - # Increment id for each new handler. - self._event_handler_id = 0 - - try: - self._poll = select.poll() - except AttributeError: - self._poll = PollSelectAdapter() self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: @@ -7792,7 +7992,6 @@ class Scheduler(object): self._max_load = myopts.get("--load-average") self._set_digraph(digraph) - self._jobs = 0 features = self.settings.features if "parallel-fetch" in features and \ @@ -8345,24 +8544,10 @@ class Scheduler(object): while pkg_queue and not failed_pkgs: - if self._jobs >= max_jobs: + if not self._can_add_job(): self._schedule_main() continue - if max_load is not None and max_jobs > 1 and self._jobs > 1: - try: - avg1, avg5, avg15 = os.getloadavg() - except OSError, e: - writemsg("!!! getloadavg() failed: %s\n" % (e,), - noiselevel=-1) - del e - self._schedule_main() - continue - - if avg1 >= max_load: - self._schedule_main() - continue - pkg = self._choose_pkg() if pkg is None: @@ -8389,38 +8574,6 @@ class Scheduler(object): while self._jobs: self._schedule_main(wait=True) - def _schedule_main(self, wait=False): - - event_handlers = self._poll_event_handlers - poll = self._poll.poll - max_jobs = self._max_jobs - - state_change = 0 - - if self._schedule_tasks(): - state_change += 1 - - while event_handlers: - jobs = self._jobs - - for f, event in poll(): - handler, reg_id = event_handlers[f] - if not handler(f, event): - state_change += 1 - self._unregister(reg_id) - - if jobs == self._jobs: - continue - - if self._schedule_tasks(): - state_change += 1 - - if not wait and self._jobs < max_jobs: - break - - if not state_change: - raise AssertionError("tight loop") - def _schedule_tasks(self): state_change = 0 for x in self._task_queues.values(): @@ -8524,45 +8677,6 @@ class Scheduler(object): return True return False - def _register(self, f, eventmask, handler): - """ - @rtype: Integer - @return: A unique registration id, for use in schedule() or - unregister() calls. - """ - self._event_handler_id += 1 - reg_id = self._event_handler_id - self._poll_event_handler_ids[reg_id] = f - self._poll_event_handlers[f] = (handler, reg_id) - self._poll.register(f, eventmask) - return reg_id - - def _unregister(self, reg_id): - f = self._poll_event_handler_ids[reg_id] - self._poll.unregister(f) - del self._poll_event_handlers[f] - del self._poll_event_handler_ids[reg_id] - self._schedule_tasks() - - def _schedule(self, wait_id): - """ - Schedule until wait_id is not longer registered - for poll() events. - @type wait_id: int - @param wait_id: a task id to wait for - """ - event_handlers = self._poll_event_handlers - handler_ids = self._poll_event_handler_ids - poll = self._poll.poll - - self._schedule_tasks() - - while wait_id in handler_ids: - for f, event in poll(): - handler, reg_id = event_handlers[f] - if not handler(f, event): - self._unregister(reg_id) - def _world_atom(self, pkg): """ Add the package to the world file, but only if @@ -8630,6 +8744,111 @@ class Scheduler(object): return pkg +class MetadataRegen(PollLoop): + + class _sched_iface_class(SlotObject): + __slots__ = ("register", "schedule") + + def __init__(self, portdb, max_jobs=None, max_load=None): + PollLoop.__init__(self) + self._portdb = portdb + + if max_jobs is None: + max_jobs = 1 + + self._job_queue = SequentialTaskQueue(max_jobs=max_jobs) + self._max_jobs = max_jobs + self._max_load = max_load + self._sched_iface = self._sched_iface_class( + register=self._register, + schedule=self._schedule) + + self._valid_pkgs = set() + + def _iter_metadata_processes(self): + portdb = self._portdb + valid_pkgs = self._valid_pkgs + every_cp = portdb.cp_all() + every_cp.sort(reverse=True) + + while every_cp: + cp = every_cp.pop() + portage.writemsg_stdout("Processing %s\n" % cp) + cpv_list = portdb.cp_list(cp) + for cpv in cpv_list: + valid_pkgs.add(cpv) + ebuild_path, repo_path = portdb.findname2(cpv) + metadata_process = portdb._metadata_process( + cpv, ebuild_path, repo_path) + if metadata_process is None: + continue + yield metadata_process + + def run(self): + + portdb = self._portdb + from portage.cache.cache_errors import CacheError + dead_nodes = {} + + for mytree in portdb.porttrees: + try: + dead_nodes[mytree] = set(portdb.auxdb[mytree].iterkeys()) + except CacheError, e: + portage.writemsg("Error listing cache entries for " + \ + "'%s': %s, continuing...\n" % (mytree, e), noiselevel=-1) + del e + dead_nodes = None + break + + self._main_loop() + + if dead_nodes: + for y in self._valid_pkgs: + for mytree in portdb.porttrees: + if portdb.findname2(y, mytree=mytree)[0]: + dead_nodes[mytree].discard(y) + + for mytree, nodes in dead_nodes.iteritems(): + auxdb = portdb.auxdb[mytree] + for y in nodes: + try: + del auxdb[y] + except (KeyError, CacheError): + pass + + def _main_loop(self): + + process_iter = self._iter_metadata_processes() + + while True: + + if not self._can_add_job(): + self._schedule_main() + continue + + try: + metadata_process = process_iter.next() + except StopIteration: + break + + self._jobs += 1 + metadata_process.scheduler = self._sched_iface + metadata_process.addExitListener(self._metadata_exit) + self._job_queue.add(metadata_process) + + while self._jobs: + self._schedule_main(wait=True) + + def _schedule_tasks(self): + return self._job_queue.schedule() + + def _metadata_exit(self, metadata_process): + self._jobs -= 1 + if metadata_process.returncode != os.EX_OK: + self._valid_pkgs.discard(metadata_process.cpv) + portage.writemsg("Error processing %s, continuing...\n" % \ + (metadata_process.cpv,)) + class UninstallFailure(portage.exception.PortageException): """ An instance of this class is raised by unmerge() when @@ -9934,7 +10153,7 @@ def action_metadata(settings, portdb, myopts): sys.stdout.flush() os.umask(old_umask) -def action_regen(settings, portdb): +def action_regen(settings, portdb, max_jobs, max_load): xterm_titles = "notitles" not in settings.features emergelog(xterm_titles, " === regen") #regenerate cache entries @@ -9946,40 +10165,10 @@ def action_regen(settings, portdb): except: pass sys.stdout.flush() - mynodes = portdb.cp_all() - from portage.cache.cache_errors import CacheError - dead_nodes = {} - for mytree in portdb.porttrees: - try: - dead_nodes[mytree] = set(portdb.auxdb[mytree].iterkeys()) - except CacheError, e: - portage.writemsg("Error listing cache entries for " + \ - "'%s': %s, continuing...\n" % (mytree, e), noiselevel=-1) - del e - dead_nodes = None - break - for x in mynodes: - mymatches = portdb.cp_list(x) - portage.writemsg_stdout("Processing %s\n" % x) - for y in mymatches: - try: - foo = portdb.aux_get(y,["DEPEND"]) - except (KeyError, portage.exception.PortageException), e: - portage.writemsg( - "Error processing %(cpv)s, continuing... (%(e)s)\n" % \ - {"cpv":y,"e":str(e)}, noiselevel=-1) - if dead_nodes: - for mytree in portdb.porttrees: - if portdb.findname2(y, mytree=mytree)[0]: - dead_nodes[mytree].discard(y) - if dead_nodes: - for mytree, nodes in dead_nodes.iteritems(): - auxdb = portdb.auxdb[mytree] - for y in nodes: - try: - del auxdb[y] - except (KeyError, CacheError): - pass + + regen = MetadataRegen(portdb, max_jobs=max_jobs, max_load=max_load) + regen.run() + portage.writemsg_stdout("done!\n") def action_config(settings, trees, myopts, myfiles): @@ -11408,6 +11597,7 @@ def adjust_config(myopts, settings): def emerge_main(): global portage # NFC why this is necessary now - genone + portage._disable_legacy_globals() # Disable color until we're sure that it should be enabled (after # EMERGE_DEFAULT_OPTS has been parsed). portage.output.havecolor = 0 @@ -11789,7 +11979,8 @@ def emerge_main(): action_metadata(settings, portdb, myopts) elif myaction=="regen": validate_ebuild_environment(trees) - action_regen(settings, portdb) + action_regen(settings, portdb, myopts.get("--jobs"), + myopts.get("--load-average")) # HELP action elif "config"==myaction: validate_ebuild_environment(trees) diff --git a/pym/portage/__init__.py b/pym/portage/__init__.py index bd800b4e2..453096f67 100644 --- a/pym/portage/__init__.py +++ b/pym/portage/__init__.py @@ -5011,7 +5011,12 @@ def doebuild(myebuild, mydo, myroot, mysettings, debug=0, listonly=0, if mydo == "depend": writemsg("!!! DEBUG: dbkey: %s\n" % str(dbkey), 2) droppriv = "userpriv" in mysettings.features - if isinstance(dbkey, dict): + if returnpid: + mypids = spawn(_shell_quote(ebuild_sh_binary) + " depend", + mysettings, fd_pipes=fd_pipes, returnpid=True, + droppriv=droppriv) + return mypids + elif isinstance(dbkey, dict): mysettings["dbkey"] = "" pr, pw = os.pipe() fd_pipes = { diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py index dd8015cc1..797c886f2 100644 --- a/pym/portage/dbapi/porttree.py +++ b/pym/portage/dbapi/porttree.py @@ -229,6 +229,104 @@ class portdbapi(dbapi): return[file, x] return None, 0 + def _metadata_process(self, cpv, ebuild_path, repo_path): + """ + Create an EbuildMetadataPhase instance to generate metadata for the + give ebuild. + @rtype: EbuildMetadataPhase + @returns: A new EbuildMetadataPhase instance, or None if the + metadata cache is already valid. + """ + metadata, st, emtime = self._pull_valid_cache(cpv, ebuild_path, repo_path) + if metadata is not None: + return None + + import _emerge + process = _emerge.EbuildMetadataPhase(cpv=cpv, ebuild_path=ebuild_path, + ebuild_mtime=emtime, metadata_callback=self._metadata_callback, + portdb=self, repo_path=repo_path, settings=self.doebuild_settings) + return process + + def _metadata_callback(self, cpv, ebuild_path, repo_path, metadata, mtime): + + i = metadata + if hasattr(metadata, "iteritems"): + i = metadata.iteritems() + metadata = dict(i) + + if "EAPI" not in metadata or not metadata["EAPI"].strip(): + metadata["EAPI"] = "0" + + if not eapi_is_supported(metadata["EAPI"]): + # if newer version, wipe everything and negate eapi + eapi = metadata["EAPI"] + metadata = {} + map(lambda x: metadata.setdefault(x, ""), auxdbkeys) + metadata["EAPI"] = "-" + eapi + + if metadata.get("INHERITED", False): + metadata["_eclasses_"] = \ + self.eclassdb.get_eclass_data(metadata["INHERITED"].split()) + else: + metadata["_eclasses_"] = {} + + metadata.pop("INHERITED", None) + metadata["_mtime_"] = mtime + self.auxdb[repo_path][cpv] = metadata + + def _pull_valid_cache(self, cpv, ebuild_path, repo_path): + + try: + st = os.stat(ebuild_path) + emtime = st[stat.ST_MTIME] + except OSError: + writemsg("!!! aux_get(): ebuild for " + \ + "'%s' does not exist at:\n" % (cpv,), noiselevel=-1) + writemsg("!!! %s\n" % ebuild_path, noiselevel=-1) + raise KeyError(cpv) + + # Pull pre-generated metadata from the metadata/cache/ + # directory if it exists and is valid, otherwise fall + # back to the normal writable cache. + auxdbs = [] + pregen_auxdb = self._pregen_auxdb.get(repo_path) + if pregen_auxdb is not None: + auxdbs.append(pregen_auxdb) + auxdbs.append(self.auxdb[repo_path]) + + doregen = True + for auxdb in auxdbs: + try: + metadata = auxdb[cpv] + eapi = metadata.get("EAPI","").strip() + if not eapi: + eapi = "0" + if eapi.startswith("-") and eapi_is_supported(eapi[1:]): + pass + elif emtime != int(metadata.get("_mtime_", 0)): + pass + elif len(metadata.get("_eclasses_", [])) > 0: + if self.eclassdb.is_eclass_data_valid( + metadata["_eclasses_"]): + doregen = False + else: + doregen = False + except KeyError: + pass + except CacheError: + if auxdb is not pregen_auxdb: + try: + del auxdb[cpv] + except KeyError: + pass + if not doregen: + break + + if doregen: + metadata = None + + return (metadata, st, emtime) + def aux_get(self, mycpv, mylist, mytree=None): "stub code for returning auxilliary db information, such as SLOT, DEPEND, etc." 'input: "sys-apps/foo-1.0",["SLOT","DEPEND","HOMEPAGE"]' @@ -294,52 +392,8 @@ class portdbapi(dbapi): writemsg("!!! Manifest is missing or inaccessable: %(manifest)s\n" % {"manifest":myManifestPath}, noiselevel=-1) - - try: - st = os.stat(myebuild) - emtime = st[stat.ST_MTIME] - except OSError: - writemsg("!!! aux_get(): ebuild for '%(cpv)s' does not exist at:\n" % {"cpv":mycpv}, - noiselevel=-1) - writemsg("!!! %s\n" % myebuild, - noiselevel=-1) - raise KeyError(mycpv) - - # Pull pre-generated metadata from the metadata/cache/ - # directory if it exists and is valid, otherwise fall - # back to the normal writable cache. - auxdbs = [] - pregen_auxdb = self._pregen_auxdb.get(mylocation) - if pregen_auxdb is not None: - auxdbs.append(pregen_auxdb) - auxdbs.append(self.auxdb[mylocation]) - - doregen = True - for auxdb in auxdbs: - try: - mydata = auxdb[mycpv] - eapi = mydata.get("EAPI","").strip() - if not eapi: - eapi = "0" - if eapi.startswith("-") and eapi_is_supported(eapi[1:]): - pass - elif emtime != long(mydata.get("_mtime_", 0)): - pass - elif len(mydata.get("_eclasses_", [])) > 0: - if self.eclassdb.is_eclass_data_valid(mydata["_eclasses_"]): - doregen = False - else: - doregen = False - except KeyError: - pass - except CacheError: - if auxdb is not pregen_auxdb: - try: - del auxdb[mycpv] - except KeyError: - pass - if not doregen: - break + mydata, st, emtime = self._pull_valid_cache(mycpv, myebuild, mylocation) + doregen = mydata is None writemsg("auxdb is valid: "+str(not doregen)+" "+str(pkg)+"\n", 2) @@ -360,26 +414,8 @@ class portdbapi(dbapi): self._broken_ebuilds.add(myebuild) raise KeyError(mycpv) - if "EAPI" not in mydata or not mydata["EAPI"].strip(): - mydata["EAPI"] = "0" - - if not eapi_is_supported(mydata["EAPI"]): - # if newer version, wipe everything and negate eapi - eapi = mydata["EAPI"] - mydata = {} - map(lambda x: mydata.setdefault(x, ""), auxdbkeys) - mydata["EAPI"] = "-"+eapi - - if mydata.get("INHERITED", False): - mydata["_eclasses_"] = self.eclassdb.get_eclass_data(mydata["INHERITED"].split()) - else: - mydata["_eclasses_"] = {} - - del mydata["INHERITED"] - - mydata["_mtime_"] = emtime - - self.auxdb[mylocation][mycpv] = mydata + self._metadata_callback( + mycpv, myebuild, mylocation, mydata, emtime) if not mydata.setdefault("EAPI", "0"): mydata["EAPI"] = "0" |