diff options
Diffstat (limited to 'pym/_emerge')
-rw-r--r-- | pym/_emerge/__init__.py | 461 |
1 files changed, 326 insertions, 135 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 0cd94208d..7f122bb4d 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -38,7 +38,7 @@ except ImportError: from os import path as osp sys.path.insert(0, osp.join(osp.dirname(osp.dirname(osp.realpath(__file__))), "pym")) import portage -portage._disable_legacy_globals() + from portage import digraph, portdbapi from portage.const import NEWS_LIB_PATH, CACHE_PATH, PRIVATE_PATH, USER_CONFIG_PATH, GLOBAL_CONFIG_PATH @@ -2292,6 +2292,95 @@ class EbuildExecuter(CompositeTask): self._start_task(ebuild_phases, self._default_final_exit) +class EbuildMetadataPhase(SubProcess): + + """ + Asynchronous interface for the ebuild "depend" phase which is + used to extract metadata from the ebuild. + """ + + __slots__ = ("cpv", "ebuild_path", "fd_pipes", "metadata_callback", + "ebuild_mtime", "portdb", "repo_path", "settings") + \ + ("files", "_raw_metadata") + + _file_names = ("ebuild",) + _files_dict = slot_dict_class(_file_names, prefix="") + _bufsize = SpawnProcess._bufsize + _metadata_fd = 9 + + def start(self): + settings = self.settings + ebuild_path = self.ebuild_path + debug = settings.get("PORTAGE_DEBUG") == "1" + master_fd = None + slave_fd = None + fd_pipes = None + if self.fd_pipes is not None: + fd_pipes = self.fd_pipes.copy() + else: + fd_pipes = {} + + fd_pipes.setdefault(0, sys.stdin.fileno()) + fd_pipes.setdefault(1, sys.stdout.fileno()) + fd_pipes.setdefault(2, sys.stderr.fileno()) + + # flush any pending output + for fd in fd_pipes.itervalues(): + if fd == sys.stdout.fileno(): + sys.stdout.flush() + if fd == sys.stderr.fileno(): + sys.stderr.flush() + + fd_pipes_orig = fd_pipes.copy() + self.files = self._files_dict() + files = self.files + + master_fd, slave_fd = os.pipe() + fcntl.fcntl(master_fd, fcntl.F_SETFL, + fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + + fd_pipes[self._metadata_fd] = slave_fd + + retval = portage.doebuild(ebuild_path, "depend", + settings["ROOT"], settings, debug, + mydbapi=self.portdb, tree="porttree", + fd_pipes=fd_pipes, returnpid=True) + + os.close(slave_fd) + + if isinstance(retval, int): + # doebuild failed before spawning + os.close(master_fd) + self.returncode = retval + self.wait() + return + + self.pid = retval[0] + portage.process.spawned_pids.remove(self.pid) + + self._raw_metadata = [] + files.ebuild = os.fdopen(master_fd, 'r') + self._reg_id = self.scheduler.register(files.ebuild.fileno(), + PollConstants.POLLIN, self._output_handler) + self.registered = True + + def _output_handler(self, fd, event): + files = self.files + self._raw_metadata.append(files.ebuild.read()) + if not self._raw_metadata[-1]: + for f in files.values(): + f.close() + self.registered = False + self._wait() + + if self.returncode == os.EX_OK: + metadata = izip(portage.auxdbkeys, + "".join(self._raw_metadata).splitlines()) + self.metadata_callback(self.cpv, self.ebuild_path, + self.repo_path, metadata, self.ebuild_mtime) + + return self.registered + class EbuildPhase(SubProcess): __slots__ = ("fd_pipes", "phase", "pkg", @@ -2387,6 +2476,15 @@ class EbuildPhase(SubProcess): mydbapi=mydbapi, tree=tree, fd_pipes=fd_pipes, returnpid=True) + os.close(slave_fd) + + if isinstance(retval, int): + # doebuild failed before spawning + os.close(master_fd) + self.returncode = retval + self.wait() + return + self.pid = retval[0] portage.process.spawned_pids.remove(self.pid) @@ -2398,7 +2496,6 @@ class EbuildPhase(SubProcess): else: output_handler = self._dummy_handler - os.close(slave_fd) files.ebuild = os.fdopen(master_fd, 'r') self._reg_id = self.scheduler.register(files.ebuild.fileno(), PollConstants.POLLIN, output_handler) @@ -7680,7 +7777,118 @@ class SequentialTaskQueue(SlotObject): def __len__(self): return len(self._task_queue) + len(self.running_tasks) -class Scheduler(object): +class PollLoop(object): + + def __init__(self): + self._max_jobs = 1 + self._max_load = None + self._jobs = 0 + self._poll_event_handlers = {} + self._poll_event_handler_ids = {} + # Increment id for each new handler. + self._event_handler_id = 0 + try: + self._poll = select.poll() + except AttributeError: + self._poll = PollSelectAdapter() + + def _can_add_job(self): + jobs = self._jobs + max_jobs = self._max_jobs + max_load = self._max_load + + if self._jobs >= self._max_jobs: + return False + + if max_load is not None and max_jobs > 1 and self._jobs > 1: + try: + avg1, avg5, avg15 = os.getloadavg() + except OSError, e: + writemsg("!!! getloadavg() failed: %s\n" % (e,), + noiselevel=-1) + del e + return False + + if avg1 >= max_load: + return False + + return True + + def _register(self, f, eventmask, handler): + """ + @rtype: Integer + @return: A unique registration id, for use in schedule() or + unregister() calls. + """ + self._event_handler_id += 1 + reg_id = self._event_handler_id + self._poll_event_handler_ids[reg_id] = f + self._poll_event_handlers[f] = (handler, reg_id) + self._poll.register(f, eventmask) + return reg_id + + def _unregister(self, reg_id): + f = self._poll_event_handler_ids[reg_id] + self._poll.unregister(f) + del self._poll_event_handlers[f] + del self._poll_event_handler_ids[reg_id] + self._schedule_tasks() + + def _schedule(self, wait_id): + """ + Schedule until wait_id is not longer registered + for poll() events. + @type wait_id: int + @param wait_id: a task id to wait for + """ + event_handlers = self._poll_event_handlers + handler_ids = self._poll_event_handler_ids + poll = self._poll.poll + + self._schedule_tasks() + + while wait_id in handler_ids: + for f, event in poll(): + handler, reg_id = event_handlers[f] + if not handler(f, event): + self._unregister(reg_id) + + def _schedule_tasks(self): + return False + + def _schedule_main(self, wait=False): + + event_handlers = self._poll_event_handlers + poll = self._poll.poll + max_jobs = self._max_jobs + + state_change = 0 + + if self._schedule_tasks(): + state_change += 1 + + while event_handlers: + jobs = self._jobs + + for f, event in poll(): + handler, reg_id = event_handlers[f] + if not handler(f, event): + state_change += 1 + self._unregister(reg_id) + + if jobs == self._jobs: + continue + + if self._schedule_tasks(): + state_change += 1 + + if not wait and self._jobs < max_jobs: + break + + if not state_change: + raise AssertionError("tight loop") + +class Scheduler(PollLoop): _opts_ignore_blockers = \ frozenset(["--buildpkgonly", @@ -7722,6 +7930,7 @@ class Scheduler(object): def __init__(self, settings, trees, mtimedb, myopts, spinner, mergelist, favorites, digraph): + PollLoop.__init__(self) self.settings = settings self.target_root = settings["ROOT"] self.trees = trees @@ -7758,15 +7967,6 @@ class Scheduler(object): self._sched_iface = self._iface_class( fetch=fetch_iface, register=self._register, schedule=self._schedule) - self._poll_event_handlers = {} - self._poll_event_handler_ids = {} - # Increment id for each new handler. - self._event_handler_id = 0 - - try: - self._poll = select.poll() - except AttributeError: - self._poll = PollSelectAdapter() self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: @@ -7792,7 +7992,6 @@ class Scheduler(object): self._max_load = myopts.get("--load-average") self._set_digraph(digraph) - self._jobs = 0 features = self.settings.features if "parallel-fetch" in features and \ @@ -8345,24 +8544,10 @@ class Scheduler(object): while pkg_queue and not failed_pkgs: - if self._jobs >= max_jobs: + if not self._can_add_job(): self._schedule_main() continue - if max_load is not None and max_jobs > 1 and self._jobs > 1: - try: - avg1, avg5, avg15 = os.getloadavg() - except OSError, e: - writemsg("!!! getloadavg() failed: %s\n" % (e,), - noiselevel=-1) - del e - self._schedule_main() - continue - - if avg1 >= max_load: - self._schedule_main() - continue - pkg = self._choose_pkg() if pkg is None: @@ -8389,38 +8574,6 @@ class Scheduler(object): while self._jobs: self._schedule_main(wait=True) - def _schedule_main(self, wait=False): - - event_handlers = self._poll_event_handlers - poll = self._poll.poll - max_jobs = self._max_jobs - - state_change = 0 - - if self._schedule_tasks(): - state_change += 1 - - while event_handlers: - jobs = self._jobs - - for f, event in poll(): - handler, reg_id = event_handlers[f] - if not handler(f, event): - state_change += 1 - self._unregister(reg_id) - - if jobs == self._jobs: - continue - - if self._schedule_tasks(): - state_change += 1 - - if not wait and self._jobs < max_jobs: - break - - if not state_change: - raise AssertionError("tight loop") - def _schedule_tasks(self): state_change = 0 for x in self._task_queues.values(): @@ -8524,45 +8677,6 @@ class Scheduler(object): return True return False - def _register(self, f, eventmask, handler): - """ - @rtype: Integer - @return: A unique registration id, for use in schedule() or - unregister() calls. - """ - self._event_handler_id += 1 - reg_id = self._event_handler_id - self._poll_event_handler_ids[reg_id] = f - self._poll_event_handlers[f] = (handler, reg_id) - self._poll.register(f, eventmask) - return reg_id - - def _unregister(self, reg_id): - f = self._poll_event_handler_ids[reg_id] - self._poll.unregister(f) - del self._poll_event_handlers[f] - del self._poll_event_handler_ids[reg_id] - self._schedule_tasks() - - def _schedule(self, wait_id): - """ - Schedule until wait_id is not longer registered - for poll() events. - @type wait_id: int - @param wait_id: a task id to wait for - """ - event_handlers = self._poll_event_handlers - handler_ids = self._poll_event_handler_ids - poll = self._poll.poll - - self._schedule_tasks() - - while wait_id in handler_ids: - for f, event in poll(): - handler, reg_id = event_handlers[f] - if not handler(f, event): - self._unregister(reg_id) - def _world_atom(self, pkg): """ Add the package to the world file, but only if @@ -8630,6 +8744,111 @@ class Scheduler(object): return pkg +class MetadataRegen(PollLoop): + + class _sched_iface_class(SlotObject): + __slots__ = ("register", "schedule") + + def __init__(self, portdb, max_jobs=None, max_load=None): + PollLoop.__init__(self) + self._portdb = portdb + + if max_jobs is None: + max_jobs = 1 + + self._job_queue = SequentialTaskQueue(max_jobs=max_jobs) + self._max_jobs = max_jobs + self._max_load = max_load + self._sched_iface = self._sched_iface_class( + register=self._register, + schedule=self._schedule) + + self._valid_pkgs = set() + + def _iter_metadata_processes(self): + portdb = self._portdb + valid_pkgs = self._valid_pkgs + every_cp = portdb.cp_all() + every_cp.sort(reverse=True) + + while every_cp: + cp = every_cp.pop() + portage.writemsg_stdout("Processing %s\n" % cp) + cpv_list = portdb.cp_list(cp) + for cpv in cpv_list: + valid_pkgs.add(cpv) + ebuild_path, repo_path = portdb.findname2(cpv) + metadata_process = portdb._metadata_process( + cpv, ebuild_path, repo_path) + if metadata_process is None: + continue + yield metadata_process + + def run(self): + + portdb = self._portdb + from portage.cache.cache_errors import CacheError + dead_nodes = {} + + for mytree in portdb.porttrees: + try: + dead_nodes[mytree] = set(portdb.auxdb[mytree].iterkeys()) + except CacheError, e: + portage.writemsg("Error listing cache entries for " + \ + "'%s': %s, continuing...\n" % (mytree, e), noiselevel=-1) + del e + dead_nodes = None + break + + self._main_loop() + + if dead_nodes: + for y in self._valid_pkgs: + for mytree in portdb.porttrees: + if portdb.findname2(y, mytree=mytree)[0]: + dead_nodes[mytree].discard(y) + + for mytree, nodes in dead_nodes.iteritems(): + auxdb = portdb.auxdb[mytree] + for y in nodes: + try: + del auxdb[y] + except (KeyError, CacheError): + pass + + def _main_loop(self): + + process_iter = self._iter_metadata_processes() + + while True: + + if not self._can_add_job(): + self._schedule_main() + continue + + try: + metadata_process = process_iter.next() + except StopIteration: + break + + self._jobs += 1 + metadata_process.scheduler = self._sched_iface + metadata_process.addExitListener(self._metadata_exit) + self._job_queue.add(metadata_process) + + while self._jobs: + self._schedule_main(wait=True) + + def _schedule_tasks(self): + return self._job_queue.schedule() + + def _metadata_exit(self, metadata_process): + self._jobs -= 1 + if metadata_process.returncode != os.EX_OK: + self._valid_pkgs.discard(metadata_process.cpv) + portage.writemsg("Error processing %s, continuing...\n" % \ + (metadata_process.cpv,)) + class UninstallFailure(portage.exception.PortageException): """ An instance of this class is raised by unmerge() when @@ -9934,7 +10153,7 @@ def action_metadata(settings, portdb, myopts): sys.stdout.flush() os.umask(old_umask) -def action_regen(settings, portdb): +def action_regen(settings, portdb, max_jobs, max_load): xterm_titles = "notitles" not in settings.features emergelog(xterm_titles, " === regen") #regenerate cache entries @@ -9946,40 +10165,10 @@ def action_regen(settings, portdb): except: pass sys.stdout.flush() - mynodes = portdb.cp_all() - from portage.cache.cache_errors import CacheError - dead_nodes = {} - for mytree in portdb.porttrees: - try: - dead_nodes[mytree] = set(portdb.auxdb[mytree].iterkeys()) - except CacheError, e: - portage.writemsg("Error listing cache entries for " + \ - "'%s': %s, continuing...\n" % (mytree, e), noiselevel=-1) - del e - dead_nodes = None - break - for x in mynodes: - mymatches = portdb.cp_list(x) - portage.writemsg_stdout("Processing %s\n" % x) - for y in mymatches: - try: - foo = portdb.aux_get(y,["DEPEND"]) - except (KeyError, portage.exception.PortageException), e: - portage.writemsg( - "Error processing %(cpv)s, continuing... (%(e)s)\n" % \ - {"cpv":y,"e":str(e)}, noiselevel=-1) - if dead_nodes: - for mytree in portdb.porttrees: - if portdb.findname2(y, mytree=mytree)[0]: - dead_nodes[mytree].discard(y) - if dead_nodes: - for mytree, nodes in dead_nodes.iteritems(): - auxdb = portdb.auxdb[mytree] - for y in nodes: - try: - del auxdb[y] - except (KeyError, CacheError): - pass + + regen = MetadataRegen(portdb, max_jobs=max_jobs, max_load=max_load) + regen.run() + portage.writemsg_stdout("done!\n") def action_config(settings, trees, myopts, myfiles): @@ -11408,6 +11597,7 @@ def adjust_config(myopts, settings): def emerge_main(): global portage # NFC why this is necessary now - genone + portage._disable_legacy_globals() # Disable color until we're sure that it should be enabled (after # EMERGE_DEFAULT_OPTS has been parsed). portage.output.havecolor = 0 @@ -11789,7 +11979,8 @@ def emerge_main(): action_metadata(settings, portdb, myopts) elif myaction=="regen": validate_ebuild_environment(trees) - action_regen(settings, portdb) + action_regen(settings, portdb, myopts.get("--jobs"), + myopts.get("--load-average")) # HELP action elif "config"==myaction: validate_ebuild_environment(trees) |