summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pym/_emerge/__init__.py85
1 files changed, 50 insertions, 35 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index bff2e3a04..25cbc3bb5 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -6966,6 +6966,49 @@ class PollSelectFallback(object):
poll_events.append((fd, select.POLLIN))
return poll_events
+class SequentialTaskQueue(SlotObject):
+
+ __slots__ = ("max_jobs", "running_tasks", "_task_queue")
+
+ def __init__(self, **kwargs):
+ SlotObject.__init__(self, **kwargs)
+ from collections import deque
+ self._task_queue = deque()
+ self.running_tasks = set()
+ if self.max_jobs is None:
+ self.max_jobs = 1
+
+ def add(self, task):
+ self._task_queue.append(task)
+
+ def schedule(self):
+ task_queue = self._task_queue
+ running_tasks = self.running_tasks
+ max_jobs = self.max_jobs
+ state_changed = False
+
+ for task in list(running_tasks):
+ if not task.registered and task.poll() is not None:
+ running_tasks.remove(task)
+ state_changed = True
+
+ while task_queue and (len(running_tasks) < max_jobs):
+ task = task_queue.popleft()
+ cancelled = getattr(task, "cancelled", None)
+ if not cancelled:
+ task.start()
+ running_tasks.add(task)
+ state_changed = True
+
+ return state_changed
+
+ def clear(self):
+ self._task_queue.clear()
+ running_tasks = self.running_tasks
+ while running_tasks:
+ task = running_tasks.pop()
+ task.cancel()
+
class Scheduler(object):
_opts_ignore_blockers = \
@@ -7038,10 +7081,10 @@ class Scheduler(object):
except AttributeError:
self._poll = PollSelectFallback()
- from collections import deque
- self._task_queue = deque()
- self._running_tasks = set()
- self._max_jobs = 1
+ self._prefetch_queue = SequentialTaskQueue()
+ self._add_task = self._prefetch_queue.add
+ self._schedule_tasks = self._prefetch_queue.schedule
+
self._prefetchers = weakref.WeakValueDictionary()
self._failed_fetches = []
self._parallel_fetch = False
@@ -7071,9 +7114,6 @@ class Scheduler(object):
except EnvironmentError:
pass
- def _add_task(self, task):
- self._task_queue.append(task)
-
class _pkg_failure(portage.exception.PortageException):
"""
An instance of this class is raised by unmerge() when
@@ -7282,7 +7322,7 @@ class Scheduler(object):
mtimedb = self._mtimedb
while True:
- self._merge()
+ rval = self._merge()
self._show_failed_fetches()
del self._failed_fetches[:]
@@ -7350,11 +7390,7 @@ class Scheduler(object):
return e.status
finally:
# clean up child process if necessary
- self._task_queue.clear()
- running_tasks = self._running_tasks
- while running_tasks:
- task = running_tasks.pop()
- task.cancel()
+ self._prefetch_queue.clear()
return os.EX_OK
def _save_resume_list(self):
@@ -7411,7 +7447,7 @@ class Scheduler(object):
def _schedule(self):
event_handlers = self._poll_event_handlers
- running_tasks = self._running_tasks
+ running_tasks = self._prefetch_queue.running_tasks
poll = self._poll.poll
self._schedule_tasks()
@@ -7426,27 +7462,6 @@ class Scheduler(object):
# handler, so it's time to yield.
break
- def _schedule_tasks(self):
- task_queue = self._task_queue
- running_tasks = self._running_tasks
- max_jobs = self._max_jobs
- state_changed = False
-
- for task in list(running_tasks):
- if not task.registered and task.poll() is not None:
- running_tasks.remove(task)
- state_changed = True
-
- while task_queue and (len(running_tasks) < max_jobs):
- task = task_queue.popleft()
- cancelled = getattr(task, "cancelled", None)
- if not cancelled:
- task.start()
- running_tasks.add(task)
- state_changed = True
-
- return state_changed
-
def _world_atom(self, pkg):
"""
Add the package to the world file, but only if