From da8c62b6941d052ce5c197bad19a1b289f94a331 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Fri, 11 Jul 2008 02:38:35 +0000 Subject: Refactor and simplify the main task scheduling and poll loops: * Make output handlers unregister themselves and call wait() to notify exit listeners immediately. This makes the exit listeners more useful for scheduling tasks. This makes the poll loop nice an clean because it just calls the handlers and then the handlers can do the scheduling when necessary. * Make SequentialTaskQueue.add() and addFront() trigger scheduling internally, so that it's more of a chain reaction than something that has to be done explicitly. svn path=/main/trunk/; revision=11013 --- pym/_emerge/__init__.py | 168 +++++++++++++++++++++--------------------------- 1 file changed, 74 insertions(+), 94 deletions(-) (limited to 'pym') diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 7f122bb4d..dce4ef8d8 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1886,7 +1886,8 @@ class SpawnProcess(SubProcess): f.flush() f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() return self.registered def _dummy_handler(self, fd, event): @@ -1908,7 +1909,8 @@ class SpawnProcess(SubProcess): for f in files.values(): f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() return self.registered class EbuildFetcher(SpawnProcess): @@ -2045,11 +2047,11 @@ class EbuildBuild(CompositeTask): prefetcher.cancel() elif prefetcher.poll() is None: - waiting_msg = ("Fetching '%s' " + \ + waiting_msg = "Fetching files " + \ "in the background. " + \ "To view fetch progress, run `tail -f " + \ "/var/log/emerge-fetch.log` in another " + \ - "terminal.") % prefetcher.pkg_path + "terminal." msg_prefix = colorize("GOOD", " * ") from textwrap import wrap waiting_msg = "".join("%s%s\n" % (msg_prefix, line) \ @@ -2371,7 +2373,8 @@ class EbuildMetadataPhase(SubProcess): for f in files.values(): f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() if self.returncode == os.EX_OK: metadata = izip(portage.auxdbkeys, @@ -2519,7 +2522,8 @@ class EbuildPhase(SubProcess): for f in files.values(): f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() return self.registered def _dummy_handler(self, fd, event): @@ -2541,7 +2545,8 @@ class EbuildPhase(SubProcess): for f in files.values(): f.close() self.registered = False - self._wait() + self.scheduler.unregister(self._reg_id) + self.wait() return self.registered def _set_returncode(self, wait_retval): @@ -7720,9 +7725,11 @@ class SequentialTaskQueue(SlotObject): def add(self, task): self._task_queue.append(task) + self.schedule() def addFront(self, task): self._task_queue.appendleft(task) + self.schedule() def schedule(self): @@ -7814,12 +7821,29 @@ class PollLoop(object): return True + def _poll_loop(self): + + event_handlers = self._poll_event_handlers + poll = self._poll.poll + state_change = 0 + + while event_handlers: + for f, event in poll(): + handler, reg_id = event_handlers[f] + if not handler(f, event): + state_change += 1 + + if not state_change: + raise AssertionError("tight loop") + def _register(self, f, eventmask, handler): """ @rtype: Integer @return: A unique registration id, for use in schedule() or unregister() calls. """ + if f in self._poll_event_handlers: + raise AssertionError("fd %d is already registered" % f) self._event_handler_id += 1 reg_id = self._event_handler_id self._poll_event_handler_ids[reg_id] = f @@ -7832,7 +7856,6 @@ class PollLoop(object): self._poll.unregister(f) del self._poll_event_handlers[f] del self._poll_event_handler_ids[reg_id] - self._schedule_tasks() def _schedule(self, wait_id): """ @@ -7845,48 +7868,10 @@ class PollLoop(object): handler_ids = self._poll_event_handler_ids poll = self._poll.poll - self._schedule_tasks() - while wait_id in handler_ids: for f, event in poll(): handler, reg_id = event_handlers[f] - if not handler(f, event): - self._unregister(reg_id) - - def _schedule_tasks(self): - return False - - def _schedule_main(self, wait=False): - - event_handlers = self._poll_event_handlers - poll = self._poll.poll - max_jobs = self._max_jobs - - state_change = 0 - - if self._schedule_tasks(): - state_change += 1 - - while event_handlers: - jobs = self._jobs - - for f, event in poll(): - handler, reg_id = event_handlers[f] - if not handler(f, event): - state_change += 1 - self._unregister(reg_id) - - if jobs == self._jobs: - continue - - if self._schedule_tasks(): - state_change += 1 - - if not wait and self._jobs < max_jobs: - break - - if not state_change: - raise AssertionError("tight loop") + handler(f, event) class Scheduler(PollLoop): @@ -7904,7 +7889,7 @@ class Scheduler(PollLoop): _fetch_log = "/var/log/emerge-fetch.log" class _iface_class(SlotObject): - __slots__ = ("fetch", "register", "schedule") + __slots__ = ("fetch", "register", "schedule", "unregister") class _fetch_iface_class(SlotObject): __slots__ = ("log_file", "schedule") @@ -7966,7 +7951,7 @@ class Scheduler(PollLoop): schedule=self._schedule_fetch) self._sched_iface = self._iface_class( fetch=fetch_iface, register=self._register, - schedule=self._schedule) + schedule=self._schedule, unregister=self._unregister) self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: @@ -8535,24 +8520,30 @@ class Scheduler(PollLoop): if self._is_restart_scheduled(): self._set_max_jobs(1) - pkg_queue = self._pkg_queue - failed_pkgs = self._failed_pkgs + while not self._failed_pkgs and \ + self._schedule_tasks(): + self._poll_loop() + + while self._jobs: + self._poll_loop() + + def _schedule_tasks(self): + """ + @rtype: bool + @returns: True if tasks remain to schedule, False otherwise. + """ + task_queues = self._task_queues - max_jobs = self._max_jobs - max_load = self._max_load - background = max_jobs > 1 + background = self._max_jobs > 1 - while pkg_queue and not failed_pkgs: + while self._can_add_job(): - if not self._can_add_job(): - self._schedule_main() - continue + if not self._pkg_queue: + return False pkg = self._choose_pkg() - if pkg is None: - self._schedule_main() - continue + return True if not pkg.installed: self._pkg_count.curval += 1 @@ -8570,16 +8561,7 @@ class Scheduler(PollLoop): else: task.addExitListener(self._build_exit) task_queues.jobs.add(task) - - while self._jobs: - self._schedule_main(wait=True) - - def _schedule_tasks(self): - state_change = 0 - for x in self._task_queues.values(): - if x.schedule(): - state_change += 1 - return bool(state_change) + return True def _task(self, pkg, background): @@ -8747,7 +8729,7 @@ class Scheduler(PollLoop): class MetadataRegen(PollLoop): class _sched_iface_class(SlotObject): - __slots__ = ("register", "schedule") + __slots__ = ("register", "schedule", "unregister") def __init__(self, portdb, max_jobs=None, max_load=None): PollLoop.__init__(self) @@ -8756,14 +8738,15 @@ class MetadataRegen(PollLoop): if max_jobs is None: max_jobs = 1 - self._job_queue = SequentialTaskQueue(max_jobs=max_jobs) self._max_jobs = max_jobs self._max_load = max_load self._sched_iface = self._sched_iface_class( register=self._register, - schedule=self._schedule) + schedule=self._schedule, + unregister=self._unregister) self._valid_pkgs = set() + self._process_iter = self._iter_metadata_processes() def _iter_metadata_processes(self): portdb = self._portdb @@ -8800,7 +8783,11 @@ class MetadataRegen(PollLoop): dead_nodes = None break - self._main_loop() + while self._schedule_tasks(): + self._poll_loop() + + while self._jobs: + self._poll_loop() if dead_nodes: for y in self._valid_pkgs: @@ -8816,31 +8803,23 @@ class MetadataRegen(PollLoop): except (KeyError, CacheError): pass - def _main_loop(self): - - process_iter = self._iter_metadata_processes() - - while True: - - if not self._can_add_job(): - self._schedule_main() - continue - + def _schedule_tasks(self): + """ + @rtype: bool + @returns: True if there may be remaining tasks to schedule, + False otherwise. + """ + while self._can_add_job(): try: - metadata_process = process_iter.next() + metadata_process = self._process_iter.next() except StopIteration: - break + return False self._jobs += 1 metadata_process.scheduler = self._sched_iface metadata_process.addExitListener(self._metadata_exit) - self._job_queue.add(metadata_process) - - while self._jobs: - self._schedule_main(wait=True) - - def _schedule_tasks(self): - return self._job_queue.schedule() + metadata_process.start() + return True def _metadata_exit(self, metadata_process): self._jobs -= 1 @@ -8848,6 +8827,7 @@ class MetadataRegen(PollLoop): self._valid_pkgs.discard(metadata_process.cpv) portage.writemsg("Error processing %s, continuing...\n" % \ (metadata_process.cpv,)) + self._schedule_tasks() class UninstallFailure(portage.exception.PortageException): """ -- cgit v1.2.3-1-g7c22