From 0ae9252cfc891db0a6882b3732c92f0c3df3dfe0 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Fri, 4 Jul 2008 00:11:03 +0000 Subject: * Split out a SequentialTaskQueue class to encapsulate the parallel-fetch prefetcher queue. * Fix broken return value status handling in Scheduler.merge(). svn path=/main/trunk/; revision=10918 --- pym/_emerge/__init__.py | 85 +++++++++++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 35 deletions(-) (limited to 'pym') diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index bff2e3a04..25cbc3bb5 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -6966,6 +6966,49 @@ class PollSelectFallback(object): poll_events.append((fd, select.POLLIN)) return poll_events +class SequentialTaskQueue(SlotObject): + + __slots__ = ("max_jobs", "running_tasks", "_task_queue") + + def __init__(self, **kwargs): + SlotObject.__init__(self, **kwargs) + from collections import deque + self._task_queue = deque() + self.running_tasks = set() + if self.max_jobs is None: + self.max_jobs = 1 + + def add(self, task): + self._task_queue.append(task) + + def schedule(self): + task_queue = self._task_queue + running_tasks = self.running_tasks + max_jobs = self.max_jobs + state_changed = False + + for task in list(running_tasks): + if not task.registered and task.poll() is not None: + running_tasks.remove(task) + state_changed = True + + while task_queue and (len(running_tasks) < max_jobs): + task = task_queue.popleft() + cancelled = getattr(task, "cancelled", None) + if not cancelled: + task.start() + running_tasks.add(task) + state_changed = True + + return state_changed + + def clear(self): + self._task_queue.clear() + running_tasks = self.running_tasks + while running_tasks: + task = running_tasks.pop() + task.cancel() + class Scheduler(object): _opts_ignore_blockers = \ @@ -7038,10 +7081,10 @@ class Scheduler(object): except AttributeError: self._poll = PollSelectFallback() - from collections import deque - self._task_queue = deque() - self._running_tasks = set() - self._max_jobs = 1 + self._prefetch_queue = SequentialTaskQueue() + self._add_task = self._prefetch_queue.add + self._schedule_tasks = self._prefetch_queue.schedule + self._prefetchers = weakref.WeakValueDictionary() self._failed_fetches = [] self._parallel_fetch = False @@ -7071,9 +7114,6 @@ class Scheduler(object): except EnvironmentError: pass - def _add_task(self, task): - self._task_queue.append(task) - class _pkg_failure(portage.exception.PortageException): """ An instance of this class is raised by unmerge() when @@ -7282,7 +7322,7 @@ class Scheduler(object): mtimedb = self._mtimedb while True: - self._merge() + rval = self._merge() self._show_failed_fetches() del self._failed_fetches[:] @@ -7350,11 +7390,7 @@ class Scheduler(object): return e.status finally: # clean up child process if necessary - self._task_queue.clear() - running_tasks = self._running_tasks - while running_tasks: - task = running_tasks.pop() - task.cancel() + self._prefetch_queue.clear() return os.EX_OK def _save_resume_list(self): @@ -7411,7 +7447,7 @@ class Scheduler(object): def _schedule(self): event_handlers = self._poll_event_handlers - running_tasks = self._running_tasks + running_tasks = self._prefetch_queue.running_tasks poll = self._poll.poll self._schedule_tasks() @@ -7426,27 +7462,6 @@ class Scheduler(object): # handler, so it's time to yield. break - def _schedule_tasks(self): - task_queue = self._task_queue - running_tasks = self._running_tasks - max_jobs = self._max_jobs - state_changed = False - - for task in list(running_tasks): - if not task.registered and task.poll() is not None: - running_tasks.remove(task) - state_changed = True - - while task_queue and (len(running_tasks) < max_jobs): - task = task_queue.popleft() - cancelled = getattr(task, "cancelled", None) - if not cancelled: - task.start() - running_tasks.add(task) - state_changed = True - - return state_changed - def _world_atom(self, pkg): """ Add the package to the world file, but only if -- cgit v1.2.3-1-g7c22