diff options
-rw-r--r-- | pym/_emerge/__init__.py | 168 |
1 files changed, 74 insertions, 94 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 7f122bb4d..dce4ef8d8 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1886,7 +1886,8 @@ class SpawnProcess(SubProcess): f.flush() f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() return self.registered def _dummy_handler(self, fd, event): @@ -1908,7 +1909,8 @@ class SpawnProcess(SubProcess): for f in files.values(): f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() return self.registered class EbuildFetcher(SpawnProcess): @@ -2045,11 +2047,11 @@ class EbuildBuild(CompositeTask): prefetcher.cancel() elif prefetcher.poll() is None: - waiting_msg = ("Fetching '%s' " + \ + waiting_msg = "Fetching files " + \ "in the background. " + \ "To view fetch progress, run `tail -f " + \ "/var/log/emerge-fetch.log` in another " + \ - "terminal.") % prefetcher.pkg_path + "terminal." msg_prefix = colorize("GOOD", " * ") from textwrap import wrap waiting_msg = "".join("%s%s\n" % (msg_prefix, line) \ @@ -2371,7 +2373,8 @@ class EbuildMetadataPhase(SubProcess): for f in files.values(): f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() if self.returncode == os.EX_OK: metadata = izip(portage.auxdbkeys, @@ -2519,7 +2522,8 @@ class EbuildPhase(SubProcess): for f in files.values(): f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() return self.registered def _dummy_handler(self, fd, event): @@ -2541,7 +2545,8 @@ class EbuildPhase(SubProcess): for f in files.values(): f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() return self.registered def _set_returncode(self, wait_retval): @@ -7720,9 +7725,11 @@ class SequentialTaskQueue(SlotObject): def add(self, task): self._task_queue.append(task) + self.schedule() def addFront(self, task): self._task_queue.appendleft(task) + self.schedule() def schedule(self): @@ -7814,12 +7821,29 @@ class PollLoop(object): return True + def _poll_loop(self): + + event_handlers = self._poll_event_handlers + poll = self._poll.poll + state_change = 0 + + while event_handlers: + for f, event in poll(): + handler, reg_id = event_handlers[f] + if not handler(f, event): + state_change += 1 + + if not state_change: + raise AssertionError("tight loop") + def _register(self, f, eventmask, handler): """ @rtype: Integer @return: A unique registration id, for use in schedule() or unregister() calls. """ + if f in self._poll_event_handlers: + raise AssertionError("fd %d is already registered" % f) self._event_handler_id += 1 reg_id = self._event_handler_id self._poll_event_handler_ids[reg_id] = f @@ -7832,7 +7856,6 @@ class PollLoop(object): self._poll.unregister(f) del self._poll_event_handlers[f] del self._poll_event_handler_ids[reg_id] - self._schedule_tasks() def _schedule(self, wait_id): """ @@ -7845,48 +7868,10 @@ class PollLoop(object): handler_ids = self._poll_event_handler_ids poll = self._poll.poll - self._schedule_tasks() - while wait_id in handler_ids: for f, event in poll(): handler, reg_id = event_handlers[f] - if not handler(f, event): - self._unregister(reg_id) - - def _schedule_tasks(self): - return False - - def _schedule_main(self, wait=False): - - event_handlers = self._poll_event_handlers - poll = self._poll.poll - max_jobs = self._max_jobs - - state_change = 0 - - if self._schedule_tasks(): - state_change += 1 - - while event_handlers: - jobs = self._jobs - - for f, event in poll(): - handler, reg_id = event_handlers[f] - if not handler(f, event): - state_change += 1 - self._unregister(reg_id) - - if jobs == self._jobs: - continue - - if self._schedule_tasks(): - state_change += 1 - - if not wait and self._jobs < max_jobs: - break - - if not state_change: - raise AssertionError("tight loop") + handler(f, event) class Scheduler(PollLoop): @@ -7904,7 +7889,7 @@ class Scheduler(PollLoop): _fetch_log = "/var/log/emerge-fetch.log" class _iface_class(SlotObject): - __slots__ = ("fetch", "register", "schedule") + __slots__ = ("fetch", "register", "schedule", "unregister") class _fetch_iface_class(SlotObject): __slots__ = ("log_file", "schedule") @@ -7966,7 +7951,7 @@ class Scheduler(PollLoop): schedule=self._schedule_fetch) self._sched_iface = self._iface_class( fetch=fetch_iface, register=self._register, - schedule=self._schedule) + schedule=self._schedule, unregister=self._unregister) self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: @@ -8535,24 +8520,30 @@ class Scheduler(PollLoop): if self._is_restart_scheduled(): self._set_max_jobs(1) - pkg_queue = self._pkg_queue - failed_pkgs = self._failed_pkgs + while not self._failed_pkgs and \ + self._schedule_tasks(): + self._poll_loop() + + while self._jobs: + self._poll_loop() + + def _schedule_tasks(self): + """ + @rtype: bool + @returns: True if tasks remain to schedule, False otherwise. + """ + task_queues = self._task_queues - max_jobs = self._max_jobs - max_load = self._max_load - background = max_jobs > 1 + background = self._max_jobs > 1 - while pkg_queue and not failed_pkgs: + while self._can_add_job(): - if not self._can_add_job(): - self._schedule_main() - continue + if not self._pkg_queue: + return False pkg = self._choose_pkg() - if pkg is None: - self._schedule_main() - continue + return True if not pkg.installed: self._pkg_count.curval += 1 @@ -8570,16 +8561,7 @@ class Scheduler(PollLoop): else: task.addExitListener(self._build_exit) task_queues.jobs.add(task) - - while self._jobs: - self._schedule_main(wait=True) - - def _schedule_tasks(self): - state_change = 0 - for x in self._task_queues.values(): - if x.schedule(): - state_change += 1 - return bool(state_change) + return True def _task(self, pkg, background): @@ -8747,7 +8729,7 @@ class Scheduler(PollLoop): class MetadataRegen(PollLoop): class _sched_iface_class(SlotObject): - __slots__ = ("register", "schedule") + __slots__ = ("register", "schedule", "unregister") def __init__(self, portdb, max_jobs=None, max_load=None): PollLoop.__init__(self) @@ -8756,14 +8738,15 @@ class MetadataRegen(PollLoop): if max_jobs is None: max_jobs = 1 - self._job_queue = SequentialTaskQueue(max_jobs=max_jobs) self._max_jobs = max_jobs self._max_load = max_load self._sched_iface = self._sched_iface_class( register=self._register, - schedule=self._schedule) + schedule=self._schedule, + unregister=self._unregister) self._valid_pkgs = set() + self._process_iter = self._iter_metadata_processes() def _iter_metadata_processes(self): portdb = self._portdb @@ -8800,7 +8783,11 @@ class MetadataRegen(PollLoop): dead_nodes = None break - self._main_loop() + while self._schedule_tasks(): + self._poll_loop() + + while self._jobs: + self._poll_loop() if dead_nodes: for y in self._valid_pkgs: @@ -8816,31 +8803,23 @@ class MetadataRegen(PollLoop): except (KeyError, CacheError): pass - def _main_loop(self): - - process_iter = self._iter_metadata_processes() - - while True: - - if not self._can_add_job(): - self._schedule_main() - continue - + def _schedule_tasks(self): + """ + @rtype: bool + @returns: True if there may be remaining tasks to schedule, + False otherwise. + """ + while self._can_add_job(): try: - metadata_process = process_iter.next() + metadata_process = self._process_iter.next() except StopIteration: - break + return False self._jobs += 1 metadata_process.scheduler = self._sched_iface metadata_process.addExitListener(self._metadata_exit) - self._job_queue.add(metadata_process) - - while self._jobs: - self._schedule_main(wait=True) - - def _schedule_tasks(self): - return self._job_queue.schedule() + metadata_process.start() + return True def _metadata_exit(self, metadata_process): self._jobs -= 1 @@ -8848,6 +8827,7 @@ class MetadataRegen(PollLoop): self._valid_pkgs.discard(metadata_process.cpv) portage.writemsg("Error processing %s, continuing...\n" % \ (metadata_process.cpv,)) + self._schedule_tasks() class UninstallFailure(portage.exception.PortageException): """ |