summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pym/_emerge/__init__.py168
1 files changed, 74 insertions, 94 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 7f122bb4d..dce4ef8d8 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -1886,7 +1886,8 @@ class SpawnProcess(SubProcess):
f.flush()
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
return self.registered
def _dummy_handler(self, fd, event):
@@ -1908,7 +1909,8 @@ class SpawnProcess(SubProcess):
for f in files.values():
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
return self.registered
class EbuildFetcher(SpawnProcess):
@@ -2045,11 +2047,11 @@ class EbuildBuild(CompositeTask):
prefetcher.cancel()
elif prefetcher.poll() is None:
- waiting_msg = ("Fetching '%s' " + \
+ waiting_msg = "Fetching files " + \
"in the background. " + \
"To view fetch progress, run `tail -f " + \
"/var/log/emerge-fetch.log` in another " + \
- "terminal.") % prefetcher.pkg_path
+ "terminal."
msg_prefix = colorize("GOOD", " * ")
from textwrap import wrap
waiting_msg = "".join("%s%s\n" % (msg_prefix, line) \
@@ -2371,7 +2373,8 @@ class EbuildMetadataPhase(SubProcess):
for f in files.values():
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
if self.returncode == os.EX_OK:
metadata = izip(portage.auxdbkeys,
@@ -2519,7 +2522,8 @@ class EbuildPhase(SubProcess):
for f in files.values():
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
return self.registered
def _dummy_handler(self, fd, event):
@@ -2541,7 +2545,8 @@ class EbuildPhase(SubProcess):
for f in files.values():
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
return self.registered
def _set_returncode(self, wait_retval):
@@ -7720,9 +7725,11 @@ class SequentialTaskQueue(SlotObject):
def add(self, task):
self._task_queue.append(task)
+ self.schedule()
def addFront(self, task):
self._task_queue.appendleft(task)
+ self.schedule()
def schedule(self):
@@ -7814,12 +7821,29 @@ class PollLoop(object):
return True
+ def _poll_loop(self):
+
+ event_handlers = self._poll_event_handlers
+ poll = self._poll.poll
+ state_change = 0
+
+ while event_handlers:
+ for f, event in poll():
+ handler, reg_id = event_handlers[f]
+ if not handler(f, event):
+ state_change += 1
+
+ if not state_change:
+ raise AssertionError("tight loop")
+
def _register(self, f, eventmask, handler):
"""
@rtype: Integer
@return: A unique registration id, for use in schedule() or
unregister() calls.
"""
+ if f in self._poll_event_handlers:
+ raise AssertionError("fd %d is already registered" % f)
self._event_handler_id += 1
reg_id = self._event_handler_id
self._poll_event_handler_ids[reg_id] = f
@@ -7832,7 +7856,6 @@ class PollLoop(object):
self._poll.unregister(f)
del self._poll_event_handlers[f]
del self._poll_event_handler_ids[reg_id]
- self._schedule_tasks()
def _schedule(self, wait_id):
"""
@@ -7845,48 +7868,10 @@ class PollLoop(object):
handler_ids = self._poll_event_handler_ids
poll = self._poll.poll
- self._schedule_tasks()
-
while wait_id in handler_ids:
for f, event in poll():
handler, reg_id = event_handlers[f]
- if not handler(f, event):
- self._unregister(reg_id)
-
- def _schedule_tasks(self):
- return False
-
- def _schedule_main(self, wait=False):
-
- event_handlers = self._poll_event_handlers
- poll = self._poll.poll
- max_jobs = self._max_jobs
-
- state_change = 0
-
- if self._schedule_tasks():
- state_change += 1
-
- while event_handlers:
- jobs = self._jobs
-
- for f, event in poll():
- handler, reg_id = event_handlers[f]
- if not handler(f, event):
- state_change += 1
- self._unregister(reg_id)
-
- if jobs == self._jobs:
- continue
-
- if self._schedule_tasks():
- state_change += 1
-
- if not wait and self._jobs < max_jobs:
- break
-
- if not state_change:
- raise AssertionError("tight loop")
+ handler(f, event)
class Scheduler(PollLoop):
@@ -7904,7 +7889,7 @@ class Scheduler(PollLoop):
_fetch_log = "/var/log/emerge-fetch.log"
class _iface_class(SlotObject):
- __slots__ = ("fetch", "register", "schedule")
+ __slots__ = ("fetch", "register", "schedule", "unregister")
class _fetch_iface_class(SlotObject):
__slots__ = ("log_file", "schedule")
@@ -7966,7 +7951,7 @@ class Scheduler(PollLoop):
schedule=self._schedule_fetch)
self._sched_iface = self._iface_class(
fetch=fetch_iface, register=self._register,
- schedule=self._schedule)
+ schedule=self._schedule, unregister=self._unregister)
self._task_queues = self._task_queues_class()
for k in self._task_queues.allowed_keys:
@@ -8535,24 +8520,30 @@ class Scheduler(PollLoop):
if self._is_restart_scheduled():
self._set_max_jobs(1)
- pkg_queue = self._pkg_queue
- failed_pkgs = self._failed_pkgs
+ while not self._failed_pkgs and \
+ self._schedule_tasks():
+ self._poll_loop()
+
+ while self._jobs:
+ self._poll_loop()
+
+ def _schedule_tasks(self):
+ """
+ @rtype: bool
+ @returns: True if tasks remain to schedule, False otherwise.
+ """
+
task_queues = self._task_queues
- max_jobs = self._max_jobs
- max_load = self._max_load
- background = max_jobs > 1
+ background = self._max_jobs > 1
- while pkg_queue and not failed_pkgs:
+ while self._can_add_job():
- if not self._can_add_job():
- self._schedule_main()
- continue
+ if not self._pkg_queue:
+ return False
pkg = self._choose_pkg()
-
if pkg is None:
- self._schedule_main()
- continue
+ return True
if not pkg.installed:
self._pkg_count.curval += 1
@@ -8570,16 +8561,7 @@ class Scheduler(PollLoop):
else:
task.addExitListener(self._build_exit)
task_queues.jobs.add(task)
-
- while self._jobs:
- self._schedule_main(wait=True)
-
- def _schedule_tasks(self):
- state_change = 0
- for x in self._task_queues.values():
- if x.schedule():
- state_change += 1
- return bool(state_change)
+ return True
def _task(self, pkg, background):
@@ -8747,7 +8729,7 @@ class Scheduler(PollLoop):
class MetadataRegen(PollLoop):
class _sched_iface_class(SlotObject):
- __slots__ = ("register", "schedule")
+ __slots__ = ("register", "schedule", "unregister")
def __init__(self, portdb, max_jobs=None, max_load=None):
PollLoop.__init__(self)
@@ -8756,14 +8738,15 @@ class MetadataRegen(PollLoop):
if max_jobs is None:
max_jobs = 1
- self._job_queue = SequentialTaskQueue(max_jobs=max_jobs)
self._max_jobs = max_jobs
self._max_load = max_load
self._sched_iface = self._sched_iface_class(
register=self._register,
- schedule=self._schedule)
+ schedule=self._schedule,
+ unregister=self._unregister)
self._valid_pkgs = set()
+ self._process_iter = self._iter_metadata_processes()
def _iter_metadata_processes(self):
portdb = self._portdb
@@ -8800,7 +8783,11 @@ class MetadataRegen(PollLoop):
dead_nodes = None
break
- self._main_loop()
+ while self._schedule_tasks():
+ self._poll_loop()
+
+ while self._jobs:
+ self._poll_loop()
if dead_nodes:
for y in self._valid_pkgs:
@@ -8816,31 +8803,23 @@ class MetadataRegen(PollLoop):
except (KeyError, CacheError):
pass
- def _main_loop(self):
-
- process_iter = self._iter_metadata_processes()
-
- while True:
-
- if not self._can_add_job():
- self._schedule_main()
- continue
-
+ def _schedule_tasks(self):
+ """
+ @rtype: bool
+ @returns: True if there may be remaining tasks to schedule,
+ False otherwise.
+ """
+ while self._can_add_job():
try:
- metadata_process = process_iter.next()
+ metadata_process = self._process_iter.next()
except StopIteration:
- break
+ return False
self._jobs += 1
metadata_process.scheduler = self._sched_iface
metadata_process.addExitListener(self._metadata_exit)
- self._job_queue.add(metadata_process)
-
- while self._jobs:
- self._schedule_main(wait=True)
-
- def _schedule_tasks(self):
- return self._job_queue.schedule()
+ metadata_process.start()
+ return True
def _metadata_exit(self, metadata_process):
self._jobs -= 1
@@ -8848,6 +8827,7 @@ class MetadataRegen(PollLoop):
self._valid_pkgs.discard(metadata_process.cpv)
portage.writemsg("Error processing %s, continuing...\n" % \
(metadata_process.cpv,))
+ self._schedule_tasks()
class UninstallFailure(portage.exception.PortageException):
"""